001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.util.Iterator;
020import java.util.LinkedList;
021import java.util.List;
022
023import javax.jms.Connection;
024import javax.jms.JMSException;
025
026import org.springframework.beans.factory.BeanNameAware;
027import org.springframework.beans.factory.DisposableBean;
028import org.springframework.context.SmartLifecycle;
029import org.springframework.jms.JmsException;
030import org.springframework.jms.connection.ConnectionFactoryUtils;
031import org.springframework.jms.support.JmsUtils;
032import org.springframework.jms.support.destination.JmsDestinationAccessor;
033import org.springframework.lang.Nullable;
034import org.springframework.util.ClassUtils;
035
036/**
037 * Common base class for all containers which need to implement listening
038 * based on a JMS Connection (either shared or freshly obtained for each attempt).
039 * Inherits basic Connection and Session configuration handling from the
040 * {@link org.springframework.jms.support.JmsAccessor} base class.
041 *
042 * <p>This class provides basic lifecycle management, in particular management
043 * of a shared JMS Connection. Subclasses are supposed to plug into this
044 * lifecycle, implementing the {@link #sharedConnectionEnabled()} as well
045 * as the {@link #doInitialize()} and {@link #doShutdown()} template methods.
046 *
047 * <p>This base class does not assume any specific listener programming model
048 * or listener invoker mechanism. It just provides the general runtime
049 * lifecycle management needed for any kind of JMS-based listening mechanism
050 * that operates on a JMS Connection/Session.
051 *
052 * <p>For a concrete listener programming model, check out the
053 * {@link AbstractMessageListenerContainer} subclass. For a concrete listener
054 * invoker mechanism, check out the {@link DefaultMessageListenerContainer} class.
055 *
056 * @author Juergen Hoeller
057 * @since 2.0.3
058 * @see #sharedConnectionEnabled()
059 * @see #doInitialize()
060 * @see #doShutdown()
061 */
062public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor
063                implements BeanNameAware, DisposableBean, SmartLifecycle {
064
065        @Nullable
066        private String clientId;
067
068        private boolean autoStartup = true;
069
070        private int phase = DEFAULT_PHASE;
071
072        @Nullable
073        private String beanName;
074
075        @Nullable
076        private Connection sharedConnection;
077
078        private boolean sharedConnectionStarted = false;
079
080        protected final Object sharedConnectionMonitor = new Object();
081
082        private boolean active = false;
083
084        private volatile boolean running = false;
085
086        private final List<Object> pausedTasks = new LinkedList<>();
087
088        protected final Object lifecycleMonitor = new Object();
089
090
091        /**
092         * Specify the JMS client ID for a shared Connection created and used
093         * by this container.
094         * <p>Note that client IDs need to be unique among all active Connections
095         * of the underlying JMS provider. Furthermore, a client ID can only be
096         * assigned if the original ConnectionFactory hasn't already assigned one.
097         * @see javax.jms.Connection#setClientID
098         * @see #setConnectionFactory
099         */
100        public void setClientId(@Nullable String clientId) {
101                this.clientId = clientId;
102        }
103
104        /**
105         * Return the JMS client ID for the shared Connection created and used
106         * by this container, if any.
107         */
108        @Nullable
109        public String getClientId() {
110                return this.clientId;
111        }
112
113        /**
114         * Set whether to automatically start the container after initialization.
115         * <p>Default is "true"; set this to "false" to allow for manual startup
116         * through the {@link #start()} method.
117         */
118        public void setAutoStartup(boolean autoStartup) {
119                this.autoStartup = autoStartup;
120        }
121
122        @Override
123        public boolean isAutoStartup() {
124                return this.autoStartup;
125        }
126
127        /**
128         * Specify the phase in which this container should be started and
129         * stopped. The startup order proceeds from lowest to highest, and
130         * the shutdown order is the reverse of that. By default this value
131         * is Integer.MAX_VALUE meaning that this container starts as late
132         * as possible and stops as soon as possible.
133         */
134        public void setPhase(int phase) {
135                this.phase = phase;
136        }
137
138        /**
139         * Return the phase in which this container will be started and stopped.
140         */
141        @Override
142        public int getPhase() {
143                return this.phase;
144        }
145
146        @Override
147        public void setBeanName(@Nullable String beanName) {
148                this.beanName = beanName;
149        }
150
151        /**
152         * Return the bean name that this listener container has been assigned
153         * in its containing bean factory, if any.
154         */
155        @Nullable
156        protected final String getBeanName() {
157                return this.beanName;
158        }
159
160
161        /**
162         * Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
163         */
164        @Override
165        public void afterPropertiesSet() {
166                super.afterPropertiesSet();
167                validateConfiguration();
168                initialize();
169        }
170
171        /**
172         * Validate the configuration of this container.
173         * <p>The default implementation is empty. To be overridden in subclasses.
174         */
175        protected void validateConfiguration() {
176        }
177
178        /**
179         * Calls {@link #shutdown()} when the BeanFactory destroys the container instance.
180         * @see #shutdown()
181         */
182        @Override
183        public void destroy() {
184                shutdown();
185        }
186
187
188        //-------------------------------------------------------------------------
189        // Lifecycle methods for starting and stopping the container
190        //-------------------------------------------------------------------------
191
192        /**
193         * Initialize this container.
194         * <p>Creates a JMS Connection, starts the {@link javax.jms.Connection}
195         * (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off),
196         * and calls {@link #doInitialize()}.
197         * @throws org.springframework.jms.JmsException if startup failed
198         */
199        public void initialize() throws JmsException {
200                try {
201                        synchronized (this.lifecycleMonitor) {
202                                this.active = true;
203                                this.lifecycleMonitor.notifyAll();
204                        }
205                        doInitialize();
206                }
207                catch (JMSException ex) {
208                        synchronized (this.sharedConnectionMonitor) {
209                                ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
210                                this.sharedConnection = null;
211                        }
212                        throw convertJmsAccessException(ex);
213                }
214        }
215
216        /**
217         * Stop the shared Connection, call {@link #doShutdown()},
218         * and close this container.
219         * @throws JmsException if shutdown failed
220         */
221        public void shutdown() throws JmsException {
222                logger.debug("Shutting down JMS listener container");
223                boolean wasRunning;
224                synchronized (this.lifecycleMonitor) {
225                        wasRunning = this.running;
226                        this.running = false;
227                        this.active = false;
228                        this.pausedTasks.clear();
229                        this.lifecycleMonitor.notifyAll();
230                }
231
232                // Stop shared Connection early, if necessary.
233                if (wasRunning && sharedConnectionEnabled()) {
234                        try {
235                                stopSharedConnection();
236                        }
237                        catch (Throwable ex) {
238                                logger.debug("Could not stop JMS Connection on shutdown", ex);
239                        }
240                }
241
242                // Shut down the invokers.
243                try {
244                        doShutdown();
245                }
246                catch (JMSException ex) {
247                        throw convertJmsAccessException(ex);
248                }
249                finally {
250                        if (sharedConnectionEnabled()) {
251                                synchronized (this.sharedConnectionMonitor) {
252                                        ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), false);
253                                        this.sharedConnection = null;
254                                }
255                        }
256                }
257        }
258
259        /**
260         * Return whether this container is currently active,
261         * that is, whether it has been set up but not shut down yet.
262         */
263        public final boolean isActive() {
264                synchronized (this.lifecycleMonitor) {
265                        return this.active;
266                }
267        }
268
269        /**
270         * Start this container.
271         * @throws JmsException if starting failed
272         * @see #doStart
273         */
274        @Override
275        public void start() throws JmsException {
276                try {
277                        doStart();
278                }
279                catch (JMSException ex) {
280                        throw convertJmsAccessException(ex);
281                }
282        }
283
284        /**
285         * Start the shared Connection, if any, and notify all invoker tasks.
286         * @throws JMSException if thrown by JMS API methods
287         * @see #startSharedConnection
288         */
289        protected void doStart() throws JMSException {
290                // Lazily establish a shared Connection, if necessary.
291                if (sharedConnectionEnabled()) {
292                        establishSharedConnection();
293                }
294
295                // Reschedule paused tasks, if any.
296                synchronized (this.lifecycleMonitor) {
297                        this.running = true;
298                        this.lifecycleMonitor.notifyAll();
299                        resumePausedTasks();
300                }
301
302                // Start the shared Connection, if any.
303                if (sharedConnectionEnabled()) {
304                        startSharedConnection();
305                }
306        }
307
308        /**
309         * Stop this container.
310         * @throws JmsException if stopping failed
311         * @see #doStop
312         */
313        @Override
314        public void stop() throws JmsException {
315                try {
316                        doStop();
317                }
318                catch (JMSException ex) {
319                        throw convertJmsAccessException(ex);
320                }
321        }
322
323        /**
324         * Notify all invoker tasks and stop the shared Connection, if any.
325         * @throws JMSException if thrown by JMS API methods
326         * @see #stopSharedConnection
327         */
328        protected void doStop() throws JMSException {
329                synchronized (this.lifecycleMonitor) {
330                        this.running = false;
331                        this.lifecycleMonitor.notifyAll();
332                }
333
334                if (sharedConnectionEnabled()) {
335                        stopSharedConnection();
336                }
337        }
338
339        /**
340         * Determine whether this container is currently running,
341         * that is, whether it has been started and not stopped yet.
342         * @see #start()
343         * @see #stop()
344         * @see #runningAllowed()
345         */
346        @Override
347        public final boolean isRunning() {
348                return (this.running && runningAllowed());
349        }
350
351        /**
352         * Check whether this container's listeners are generally allowed to run.
353         * <p>This implementation always returns {@code true}; the default 'running'
354         * state is purely determined by {@link #start()} / {@link #stop()}.
355         * <p>Subclasses may override this method to check against temporary
356         * conditions that prevent listeners from actually running. In other words,
357         * they may apply further restrictions to the 'running' state, returning
358         * {@code false} if such a restriction prevents listeners from running.
359         */
360        protected boolean runningAllowed() {
361                return true;
362        }
363
364
365        //-------------------------------------------------------------------------
366        // Management of a shared JMS Connection
367        //-------------------------------------------------------------------------
368
369        /**
370         * Establish a shared Connection for this container.
371         * <p>The default implementation delegates to {@link #createSharedConnection()},
372         * which does one immediate attempt and throws an exception if it fails.
373         * Can be overridden to have a recovery process in place, retrying
374         * until a Connection can be successfully established.
375         * @throws JMSException if thrown by JMS API methods
376         */
377        protected void establishSharedConnection() throws JMSException {
378                synchronized (this.sharedConnectionMonitor) {
379                        if (this.sharedConnection == null) {
380                                this.sharedConnection = createSharedConnection();
381                                logger.debug("Established shared JMS Connection");
382                        }
383                }
384        }
385
386        /**
387         * Refresh the shared Connection that this container holds.
388         * <p>Called on startup and also after an infrastructure exception
389         * that occurred during invoker setup and/or execution.
390         * @throws JMSException if thrown by JMS API methods
391         */
392        protected final void refreshSharedConnection() throws JMSException {
393                synchronized (this.sharedConnectionMonitor) {
394                        ConnectionFactoryUtils.releaseConnection(
395                                        this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted);
396                        this.sharedConnection = null;
397                        this.sharedConnection = createSharedConnection();
398                        if (this.sharedConnectionStarted) {
399                                this.sharedConnection.start();
400                        }
401                }
402        }
403
404        /**
405         * Create a shared Connection for this container.
406         * <p>The default implementation creates a standard Connection
407         * and prepares it through {@link #prepareSharedConnection}.
408         * @return the prepared Connection
409         * @throws JMSException if the creation failed
410         */
411        protected Connection createSharedConnection() throws JMSException {
412                Connection con = createConnection();
413                try {
414                        prepareSharedConnection(con);
415                        return con;
416                }
417                catch (JMSException ex) {
418                        JmsUtils.closeConnection(con);
419                        throw ex;
420                }
421        }
422
423        /**
424         * Prepare the given Connection, which is about to be registered
425         * as shared Connection for this container.
426         * <p>The default implementation sets the specified client id, if any.
427         * Subclasses can override this to apply further settings.
428         * @param connection the Connection to prepare
429         * @throws JMSException if the preparation efforts failed
430         * @see #getClientId()
431         */
432        protected void prepareSharedConnection(Connection connection) throws JMSException {
433                String clientId = getClientId();
434                if (clientId != null) {
435                        connection.setClientID(clientId);
436                }
437        }
438
439        /**
440         * Start the shared Connection.
441         * @throws JMSException if thrown by JMS API methods
442         * @see javax.jms.Connection#start()
443         */
444        protected void startSharedConnection() throws JMSException {
445                synchronized (this.sharedConnectionMonitor) {
446                        this.sharedConnectionStarted = true;
447                        if (this.sharedConnection != null) {
448                                try {
449                                        this.sharedConnection.start();
450                                }
451                                catch (javax.jms.IllegalStateException ex) {
452                                        logger.debug("Ignoring Connection start exception - assuming already started: " + ex);
453                                }
454                        }
455                }
456        }
457
458        /**
459         * Stop the shared Connection.
460         * @throws JMSException if thrown by JMS API methods
461         * @see javax.jms.Connection#start()
462         */
463        protected void stopSharedConnection() throws JMSException {
464                synchronized (this.sharedConnectionMonitor) {
465                        this.sharedConnectionStarted = false;
466                        if (this.sharedConnection != null) {
467                                try {
468                                        this.sharedConnection.stop();
469                                }
470                                catch (javax.jms.IllegalStateException ex) {
471                                        logger.debug("Ignoring Connection stop exception - assuming already stopped: " + ex);
472                                }
473                        }
474                }
475        }
476
477        /**
478         * Return the shared JMS Connection maintained by this container.
479         * Available after initialization.
480         * @return the shared Connection (never {@code null})
481         * @throws IllegalStateException if this container does not maintain a
482         * shared Connection, or if the Connection hasn't been initialized yet
483         * @see #sharedConnectionEnabled()
484         */
485        protected final Connection getSharedConnection() {
486                if (!sharedConnectionEnabled()) {
487                        throw new IllegalStateException(
488                                        "This listener container does not maintain a shared Connection");
489                }
490                synchronized (this.sharedConnectionMonitor) {
491                        if (this.sharedConnection == null) {
492                                throw new SharedConnectionNotInitializedException(
493                                                "This listener container's shared Connection has not been initialized yet");
494                        }
495                        return this.sharedConnection;
496                }
497        }
498
499
500        //-------------------------------------------------------------------------
501        // Management of paused tasks
502        //-------------------------------------------------------------------------
503
504        /**
505         * Take the given task object and reschedule it, either immediately if
506         * this container is currently running, or later once this container
507         * has been restarted.
508         * <p>If this container has already been shut down, the task will not
509         * get rescheduled at all.
510         * @param task the task object to reschedule
511         * @return whether the task has been rescheduled
512         * (either immediately or for a restart of this container)
513         * @see #doRescheduleTask
514         */
515        protected final boolean rescheduleTaskIfNecessary(Object task) {
516                if (this.running) {
517                        try {
518                                doRescheduleTask(task);
519                        }
520                        catch (RuntimeException ex) {
521                                logRejectedTask(task, ex);
522                                this.pausedTasks.add(task);
523                        }
524                        return true;
525                }
526                else if (this.active) {
527                        this.pausedTasks.add(task);
528                        return true;
529                }
530                else {
531                        return false;
532                }
533        }
534
535        /**
536         * Try to resume all paused tasks.
537         * Tasks for which rescheduling failed simply remain in paused mode.
538         */
539        protected void resumePausedTasks() {
540                synchronized (this.lifecycleMonitor) {
541                        if (!this.pausedTasks.isEmpty()) {
542                                for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {
543                                        Object task = it.next();
544                                        try {
545                                                doRescheduleTask(task);
546                                                it.remove();
547                                                if (logger.isDebugEnabled()) {
548                                                        logger.debug("Resumed paused task: " + task);
549                                                }
550                                        }
551                                        catch (RuntimeException ex) {
552                                                logRejectedTask(task, ex);
553                                                // Keep the task in paused mode...
554                                        }
555                                }
556                        }
557                }
558        }
559
560        /**
561         * Determine the number of currently paused tasks, if any.
562         */
563        public int getPausedTaskCount() {
564                synchronized (this.lifecycleMonitor) {
565                        return this.pausedTasks.size();
566                }
567        }
568
569        /**
570         * Reschedule the given task object immediately.
571         * <p>To be implemented by subclasses if they ever call
572         * {@code rescheduleTaskIfNecessary}.
573         * This implementation throws an UnsupportedOperationException.
574         * @param task the task object to reschedule
575         * @see #rescheduleTaskIfNecessary
576         */
577        protected void doRescheduleTask(Object task) {
578                throw new UnsupportedOperationException(
579                                ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks");
580        }
581
582        /**
583         * Log a task that has been rejected by {@link #doRescheduleTask}.
584         * <p>The default implementation simply logs a corresponding message
585         * at debug level.
586         * @param task the rejected task object
587         * @param ex the exception thrown from {@link #doRescheduleTask}
588         */
589        protected void logRejectedTask(Object task, RuntimeException ex) {
590                if (logger.isDebugEnabled()) {
591                        logger.debug("Listener container task [" + task + "] has been rejected and paused: " + ex);
592                }
593        }
594
595
596        //-------------------------------------------------------------------------
597        // Template methods to be implemented by subclasses
598        //-------------------------------------------------------------------------
599
600        /**
601         * Return whether a shared JMS Connection should be maintained
602         * by this container base class.
603         * @see #getSharedConnection()
604         */
605        protected abstract boolean sharedConnectionEnabled();
606
607        /**
608         * Register any invokers within this container.
609         * <p>Subclasses need to implement this method for their specific
610         * invoker management process.
611         * <p>A shared JMS Connection, if any, will already have been
612         * started at this point.
613         * @throws JMSException if registration failed
614         * @see #getSharedConnection()
615         */
616        protected abstract void doInitialize() throws JMSException;
617
618        /**
619         * Close the registered invokers.
620         * <p>Subclasses need to implement this method for their specific
621         * invoker management process.
622         * <p>A shared JMS Connection, if any, will automatically be closed
623         * <i>afterwards</i>.
624         * @throws JMSException if shutdown failed
625         * @see #shutdown()
626         */
627        protected abstract void doShutdown() throws JMSException;
628
629
630        /**
631         * Exception that indicates that the initial setup of this container's
632         * shared JMS Connection failed. This is indicating to invokers that they need
633         * to establish the shared Connection themselves on first access.
634         */
635        @SuppressWarnings("serial")
636        public static class SharedConnectionNotInitializedException extends RuntimeException {
637
638                /**
639                 * Create a new SharedConnectionNotInitializedException.
640                 * @param msg the detail message
641                 */
642                protected SharedConnectionNotInitializedException(String msg) {
643                        super(msg);
644                }
645        }
646
647}