001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.util.Iterator;
020import java.util.LinkedList;
021import java.util.List;
022import javax.jms.Connection;
023import javax.jms.JMSException;
024
025import org.springframework.beans.factory.BeanNameAware;
026import org.springframework.beans.factory.DisposableBean;
027import org.springframework.context.SmartLifecycle;
028import org.springframework.jms.JmsException;
029import org.springframework.jms.connection.ConnectionFactoryUtils;
030import org.springframework.jms.support.JmsUtils;
031import org.springframework.jms.support.destination.JmsDestinationAccessor;
032import org.springframework.util.ClassUtils;
033
034/**
035 * Common base class for all containers which need to implement listening
036 * based on a JMS Connection (either shared or freshly obtained for each attempt).
037 * Inherits basic Connection and Session configuration handling from the
038 * {@link org.springframework.jms.support.JmsAccessor} base class.
039 *
040 * <p>This class provides basic lifecycle management, in particular management
041 * of a shared JMS Connection. Subclasses are supposed to plug into this
042 * lifecycle, implementing the {@link #sharedConnectionEnabled()} as well
043 * as the {@link #doInitialize()} and {@link #doShutdown()} template methods.
044 *
045 * <p>This base class does not assume any specific listener programming model
046 * or listener invoker mechanism. It just provides the general runtime
047 * lifecycle management needed for any kind of JMS-based listening mechanism
048 * that operates on a JMS Connection/Session.
049 *
050 * <p>For a concrete listener programming model, check out the
051 * {@link AbstractMessageListenerContainer} subclass. For a concrete listener
052 * invoker mechanism, check out the {@link DefaultMessageListenerContainer} class.
053 *
054 * @author Juergen Hoeller
055 * @since 2.0.3
056 * @see #sharedConnectionEnabled()
057 * @see #doInitialize()
058 * @see #doShutdown()
059 */
060public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor
061                implements BeanNameAware, DisposableBean, SmartLifecycle {
062
063        private String clientId;
064
065        private boolean autoStartup = true;
066
067        private int phase = Integer.MAX_VALUE;
068
069        private String beanName;
070
071        private Connection sharedConnection;
072
073        private boolean sharedConnectionStarted = false;
074
075        protected final Object sharedConnectionMonitor = new Object();
076
077        private boolean active = false;
078
079        private volatile boolean running = false;
080
081        private final List<Object> pausedTasks = new LinkedList<Object>();
082
083        protected final Object lifecycleMonitor = new Object();
084
085
086        /**
087         * Specify the JMS client ID for a shared Connection created and used
088         * by this container.
089         * <p>Note that client IDs need to be unique among all active Connections
090         * of the underlying JMS provider. Furthermore, a client ID can only be
091         * assigned if the original ConnectionFactory hasn't already assigned one.
092         * @see javax.jms.Connection#setClientID
093         * @see #setConnectionFactory
094         */
095        public void setClientId(String clientId) {
096                this.clientId = clientId;
097        }
098
099        /**
100         * Return the JMS client ID for the shared Connection created and used
101         * by this container, if any.
102         */
103        public String getClientId() {
104                return this.clientId;
105        }
106
107        /**
108         * Set whether to automatically start the container after initialization.
109         * <p>Default is "true"; set this to "false" to allow for manual startup
110         * through the {@link #start()} method.
111         */
112        public void setAutoStartup(boolean autoStartup) {
113                this.autoStartup = autoStartup;
114        }
115
116        @Override
117        public boolean isAutoStartup() {
118                return this.autoStartup;
119        }
120
121        /**
122         * Specify the phase in which this container should be started and
123         * stopped. The startup order proceeds from lowest to highest, and
124         * the shutdown order is the reverse of that. By default this value
125         * is Integer.MAX_VALUE meaning that this container starts as late
126         * as possible and stops as soon as possible.
127         */
128        public void setPhase(int phase) {
129                this.phase = phase;
130        }
131
132        /**
133         * Return the phase in which this container will be started and stopped.
134         */
135        @Override
136        public int getPhase() {
137                return this.phase;
138        }
139
140        @Override
141        public void setBeanName(String beanName) {
142                this.beanName = beanName;
143        }
144
145        /**
146         * Return the bean name that this listener container has been assigned
147         * in its containing bean factory, if any.
148         */
149        protected final String getBeanName() {
150                return this.beanName;
151        }
152
153
154        /**
155         * Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
156         */
157        @Override
158        public void afterPropertiesSet() {
159                super.afterPropertiesSet();
160                validateConfiguration();
161                initialize();
162        }
163
164        /**
165         * Validate the configuration of this container.
166         * <p>The default implementation is empty. To be overridden in subclasses.
167         */
168        protected void validateConfiguration() {
169        }
170
171        /**
172         * Calls {@link #shutdown()} when the BeanFactory destroys the container instance.
173         * @see #shutdown()
174         */
175        @Override
176        public void destroy() {
177                shutdown();
178        }
179
180
181        //-------------------------------------------------------------------------
182        // Lifecycle methods for starting and stopping the container
183        //-------------------------------------------------------------------------
184
185        /**
186         * Initialize this container.
187         * <p>Creates a JMS Connection, starts the {@link javax.jms.Connection}
188         * (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off),
189         * and calls {@link #doInitialize()}.
190         * @throws org.springframework.jms.JmsException if startup failed
191         */
192        public void initialize() throws JmsException {
193                try {
194                        synchronized (this.lifecycleMonitor) {
195                                this.active = true;
196                                this.lifecycleMonitor.notifyAll();
197                        }
198                        doInitialize();
199                }
200                catch (JMSException ex) {
201                        synchronized (this.sharedConnectionMonitor) {
202                                ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
203                                this.sharedConnection = null;
204                        }
205                        throw convertJmsAccessException(ex);
206                }
207        }
208
209        /**
210         * Stop the shared Connection, call {@link #doShutdown()},
211         * and close this container.
212         * @throws JmsException if shutdown failed
213         */
214        public void shutdown() throws JmsException {
215                logger.debug("Shutting down JMS listener container");
216                boolean wasRunning;
217                synchronized (this.lifecycleMonitor) {
218                        wasRunning = this.running;
219                        this.running = false;
220                        this.active = false;
221                        this.pausedTasks.clear();
222                        this.lifecycleMonitor.notifyAll();
223                }
224
225                // Stop shared Connection early, if necessary.
226                if (wasRunning && sharedConnectionEnabled()) {
227                        try {
228                                stopSharedConnection();
229                        }
230                        catch (Throwable ex) {
231                                logger.debug("Could not stop JMS Connection on shutdown", ex);
232                        }
233                }
234
235                // Shut down the invokers.
236                try {
237                        doShutdown();
238                }
239                catch (JMSException ex) {
240                        throw convertJmsAccessException(ex);
241                }
242                finally {
243                        if (sharedConnectionEnabled()) {
244                                synchronized (this.sharedConnectionMonitor) {
245                                        ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), false);
246                                        this.sharedConnection = null;
247                                }
248                        }
249                }
250        }
251
252        /**
253         * Return whether this container is currently active,
254         * that is, whether it has been set up but not shut down yet.
255         */
256        public final boolean isActive() {
257                synchronized (this.lifecycleMonitor) {
258                        return this.active;
259                }
260        }
261
262        /**
263         * Start this container.
264         * @throws JmsException if starting failed
265         * @see #doStart
266         */
267        @Override
268        public void start() throws JmsException {
269                try {
270                        doStart();
271                }
272                catch (JMSException ex) {
273                        throw convertJmsAccessException(ex);
274                }
275        }
276
277        /**
278         * Start the shared Connection, if any, and notify all invoker tasks.
279         * @throws JMSException if thrown by JMS API methods
280         * @see #startSharedConnection
281         */
282        protected void doStart() throws JMSException {
283                // Lazily establish a shared Connection, if necessary.
284                if (sharedConnectionEnabled()) {
285                        establishSharedConnection();
286                }
287
288                // Reschedule paused tasks, if any.
289                synchronized (this.lifecycleMonitor) {
290                        this.running = true;
291                        this.lifecycleMonitor.notifyAll();
292                        resumePausedTasks();
293                }
294
295                // Start the shared Connection, if any.
296                if (sharedConnectionEnabled()) {
297                        startSharedConnection();
298                }
299        }
300
301        /**
302         * Stop this container.
303         * @throws JmsException if stopping failed
304         * @see #doStop
305         */
306        @Override
307        public void stop() throws JmsException {
308                try {
309                        doStop();
310                }
311                catch (JMSException ex) {
312                        throw convertJmsAccessException(ex);
313                }
314        }
315
316        @Override
317        public void stop(Runnable callback) {
318                stop();
319                callback.run();
320        }
321
322        /**
323         * Notify all invoker tasks and stop the shared Connection, if any.
324         * @throws JMSException if thrown by JMS API methods
325         * @see #stopSharedConnection
326         */
327        protected void doStop() throws JMSException {
328                synchronized (this.lifecycleMonitor) {
329                        this.running = false;
330                        this.lifecycleMonitor.notifyAll();
331                }
332
333                if (sharedConnectionEnabled()) {
334                        stopSharedConnection();
335                }
336        }
337
338        /**
339         * Determine whether this container is currently running,
340         * that is, whether it has been started and not stopped yet.
341         * @see #start()
342         * @see #stop()
343         * @see #runningAllowed()
344         */
345        @Override
346        public final boolean isRunning() {
347                return (this.running && runningAllowed());
348        }
349
350        /**
351         * Check whether this container's listeners are generally allowed to run.
352         * <p>This implementation always returns {@code true}; the default 'running'
353         * state is purely determined by {@link #start()} / {@link #stop()}.
354         * <p>Subclasses may override this method to check against temporary
355         * conditions that prevent listeners from actually running. In other words,
356         * they may apply further restrictions to the 'running' state, returning
357         * {@code false} if such a restriction prevents listeners from running.
358         */
359        protected boolean runningAllowed() {
360                return true;
361        }
362
363
364        //-------------------------------------------------------------------------
365        // Management of a shared JMS Connection
366        //-------------------------------------------------------------------------
367
368        /**
369         * Establish a shared Connection for this container.
370         * <p>The default implementation delegates to {@link #createSharedConnection()},
371         * which does one immediate attempt and throws an exception if it fails.
372         * Can be overridden to have a recovery process in place, retrying
373         * until a Connection can be successfully established.
374         * @throws JMSException if thrown by JMS API methods
375         */
376        protected void establishSharedConnection() throws JMSException {
377                synchronized (this.sharedConnectionMonitor) {
378                        if (this.sharedConnection == null) {
379                                this.sharedConnection = createSharedConnection();
380                                logger.debug("Established shared JMS Connection");
381                        }
382                }
383        }
384
385        /**
386         * Refresh the shared Connection that this container holds.
387         * <p>Called on startup and also after an infrastructure exception
388         * that occurred during invoker setup and/or execution.
389         * @throws JMSException if thrown by JMS API methods
390         */
391        protected final void refreshSharedConnection() throws JMSException {
392                synchronized (this.sharedConnectionMonitor) {
393                        ConnectionFactoryUtils.releaseConnection(
394                                        this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted);
395                        this.sharedConnection = null;
396                        this.sharedConnection = createSharedConnection();
397                        if (this.sharedConnectionStarted) {
398                                this.sharedConnection.start();
399                        }
400                }
401        }
402
403        /**
404         * Create a shared Connection for this container.
405         * <p>The default implementation creates a standard Connection
406         * and prepares it through {@link #prepareSharedConnection}.
407         * @return the prepared Connection
408         * @throws JMSException if the creation failed
409         */
410        protected Connection createSharedConnection() throws JMSException {
411                Connection con = createConnection();
412                try {
413                        prepareSharedConnection(con);
414                        return con;
415                }
416                catch (JMSException ex) {
417                        JmsUtils.closeConnection(con);
418                        throw ex;
419                }
420        }
421
422        /**
423         * Prepare the given Connection, which is about to be registered
424         * as shared Connection for this container.
425         * <p>The default implementation sets the specified client id, if any.
426         * Subclasses can override this to apply further settings.
427         * @param connection the Connection to prepare
428         * @throws JMSException if the preparation efforts failed
429         * @see #getClientId()
430         */
431        protected void prepareSharedConnection(Connection connection) throws JMSException {
432                String clientId = getClientId();
433                if (clientId != null) {
434                        connection.setClientID(clientId);
435                }
436        }
437
438        /**
439         * Start the shared Connection.
440         * @throws JMSException if thrown by JMS API methods
441         * @see javax.jms.Connection#start()
442         */
443        protected void startSharedConnection() throws JMSException {
444                synchronized (this.sharedConnectionMonitor) {
445                        this.sharedConnectionStarted = true;
446                        if (this.sharedConnection != null) {
447                                try {
448                                        this.sharedConnection.start();
449                                }
450                                catch (javax.jms.IllegalStateException ex) {
451                                        logger.debug("Ignoring Connection start exception - assuming already started: " + ex);
452                                }
453                        }
454                }
455        }
456
457        /**
458         * Stop the shared Connection.
459         * @throws JMSException if thrown by JMS API methods
460         * @see javax.jms.Connection#start()
461         */
462        protected void stopSharedConnection() throws JMSException {
463                synchronized (this.sharedConnectionMonitor) {
464                        this.sharedConnectionStarted = false;
465                        if (this.sharedConnection != null) {
466                                try {
467                                        this.sharedConnection.stop();
468                                }
469                                catch (javax.jms.IllegalStateException ex) {
470                                        logger.debug("Ignoring Connection stop exception - assuming already stopped: " + ex);
471                                }
472                        }
473                }
474        }
475
476        /**
477         * Return the shared JMS Connection maintained by this container.
478         * Available after initialization.
479         * @return the shared Connection (never {@code null})
480         * @throws IllegalStateException if this container does not maintain a
481         * shared Connection, or if the Connection hasn't been initialized yet
482         * @see #sharedConnectionEnabled()
483         */
484        protected final Connection getSharedConnection() {
485                if (!sharedConnectionEnabled()) {
486                        throw new IllegalStateException(
487                                        "This listener container does not maintain a shared Connection");
488                }
489                synchronized (this.sharedConnectionMonitor) {
490                        if (this.sharedConnection == null) {
491                                throw new SharedConnectionNotInitializedException(
492                                                "This listener container's shared Connection has not been initialized yet");
493                        }
494                        return this.sharedConnection;
495                }
496        }
497
498
499        //-------------------------------------------------------------------------
500        // Management of paused tasks
501        //-------------------------------------------------------------------------
502
503        /**
504         * Take the given task object and reschedule it, either immediately if
505         * this container is currently running, or later once this container
506         * has been restarted.
507         * <p>If this container has already been shut down, the task will not
508         * get rescheduled at all.
509         * @param task the task object to reschedule
510         * @return whether the task has been rescheduled
511         * (either immediately or for a restart of this container)
512         * @see #doRescheduleTask
513         */
514        protected final boolean rescheduleTaskIfNecessary(Object task) {
515                if (this.running) {
516                        try {
517                                doRescheduleTask(task);
518                        }
519                        catch (RuntimeException ex) {
520                                logRejectedTask(task, ex);
521                                this.pausedTasks.add(task);
522                        }
523                        return true;
524                }
525                else if (this.active) {
526                        this.pausedTasks.add(task);
527                        return true;
528                }
529                else {
530                        return false;
531                }
532        }
533
534        /**
535         * Try to resume all paused tasks.
536         * Tasks for which rescheduling failed simply remain in paused mode.
537         */
538        protected void resumePausedTasks() {
539                synchronized (this.lifecycleMonitor) {
540                        if (!this.pausedTasks.isEmpty()) {
541                                for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {
542                                        Object task = it.next();
543                                        try {
544                                                doRescheduleTask(task);
545                                                it.remove();
546                                                if (logger.isDebugEnabled()) {
547                                                        logger.debug("Resumed paused task: " + task);
548                                                }
549                                        }
550                                        catch (RuntimeException ex) {
551                                                logRejectedTask(task, ex);
552                                                // Keep the task in paused mode...
553                                        }
554                                }
555                        }
556                }
557        }
558
559        /**
560         * Determine the number of currently paused tasks, if any.
561         */
562        public int getPausedTaskCount() {
563                synchronized (this.lifecycleMonitor) {
564                        return this.pausedTasks.size();
565                }
566        }
567
568        /**
569         * Reschedule the given task object immediately.
570         * <p>To be implemented by subclasses if they ever call
571         * {@code rescheduleTaskIfNecessary}.
572         * This implementation throws an UnsupportedOperationException.
573         * @param task the task object to reschedule
574         * @see #rescheduleTaskIfNecessary
575         */
576        protected void doRescheduleTask(Object task) {
577                throw new UnsupportedOperationException(
578                                ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks");
579        }
580
581        /**
582         * Log a task that has been rejected by {@link #doRescheduleTask}.
583         * <p>The default implementation simply logs a corresponding message
584         * at debug level.
585         * @param task the rejected task object
586         * @param ex the exception thrown from {@link #doRescheduleTask}
587         */
588        protected void logRejectedTask(Object task, RuntimeException ex) {
589                if (logger.isDebugEnabled()) {
590                        logger.debug("Listener container task [" + task + "] has been rejected and paused: " + ex);
591                }
592        }
593
594
595        //-------------------------------------------------------------------------
596        // Template methods to be implemented by subclasses
597        //-------------------------------------------------------------------------
598
599        /**
600         * Return whether a shared JMS Connection should be maintained
601         * by this container base class.
602         * @see #getSharedConnection()
603         */
604        protected abstract boolean sharedConnectionEnabled();
605
606        /**
607         * Register any invokers within this container.
608         * <p>Subclasses need to implement this method for their specific
609         * invoker management process.
610         * <p>A shared JMS Connection, if any, will already have been
611         * started at this point.
612         * @throws JMSException if registration failed
613         * @see #getSharedConnection()
614         */
615        protected abstract void doInitialize() throws JMSException;
616
617        /**
618         * Close the registered invokers.
619         * <p>Subclasses need to implement this method for their specific
620         * invoker management process.
621         * <p>A shared JMS Connection, if any, will automatically be closed
622         * <i>afterwards</i>.
623         * @throws JMSException if shutdown failed
624         * @see #shutdown()
625         */
626        protected abstract void doShutdown() throws JMSException;
627
628
629        /**
630         * Exception that indicates that the initial setup of this container's
631         * shared JMS Connection failed. This is indicating to invokers that they need
632         * to establish the shared Connection themselves on first access.
633         */
634        @SuppressWarnings("serial")
635        public static class SharedConnectionNotInitializedException extends RuntimeException {
636
637                /**
638                 * Create a new SharedConnectionNotInitializedException.
639                 * @param msg the detail message
640                 */
641                protected SharedConnectionNotInitializedException(String msg) {
642                        super(msg);
643                }
644        }
645
646}