001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.config;
018
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Map;
022import java.util.Set;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028
029import org.springframework.beans.factory.BeanInitializationException;
030import org.springframework.beans.factory.DisposableBean;
031import org.springframework.beans.factory.InitializingBean;
032import org.springframework.context.ApplicationContext;
033import org.springframework.context.ApplicationContextAware;
034import org.springframework.context.ApplicationListener;
035import org.springframework.context.SmartLifecycle;
036import org.springframework.context.event.ContextRefreshedEvent;
037import org.springframework.jms.listener.MessageListenerContainer;
038import org.springframework.lang.Nullable;
039import org.springframework.util.Assert;
040
041/**
042 * Creates the necessary {@link MessageListenerContainer} instances for the
043 * registered {@linkplain JmsListenerEndpoint endpoints}. Also manages the
044 * lifecycle of the listener containers, in particular within the lifecycle
045 * of the application context.
046 *
047 * <p>Contrary to {@link MessageListenerContainer MessageListenerContainers}
048 * created manually, listener containers managed by registry are not beans
049 * in the application context and are not candidates for autowiring.
050 * Use {@link #getListenerContainers()} if you need to access this registry's
051 * listener containers for management purposes. If you need to access to a
052 * specific message listener container, use {@link #getListenerContainer(String)}
053 * with the id of the endpoint.
054 *
055 * @author Stephane Nicoll
056 * @author Juergen Hoeller
057 * @since 4.1
058 * @see JmsListenerEndpoint
059 * @see MessageListenerContainer
060 * @see JmsListenerContainerFactory
061 */
062public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle,
063                ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
064
065        protected final Log logger = LogFactory.getLog(getClass());
066
067        private final Map<String, MessageListenerContainer> listenerContainers =
068                        new ConcurrentHashMap<>();
069
070        private int phase = DEFAULT_PHASE;
071
072        @Nullable
073        private ApplicationContext applicationContext;
074
075        private boolean contextRefreshed;
076
077
078        @Override
079        public void setApplicationContext(ApplicationContext applicationContext) {
080                this.applicationContext = applicationContext;
081        }
082
083        @Override
084        public void onApplicationEvent(ContextRefreshedEvent event) {
085                if (event.getApplicationContext() == this.applicationContext) {
086                        this.contextRefreshed = true;
087                }
088        }
089
090
091        /**
092         * Return the {@link MessageListenerContainer} with the specified id or
093         * {@code null} if no such container exists.
094         * @param id the id of the container
095         * @return the container or {@code null} if no container with that id exists
096         * @see JmsListenerEndpoint#getId()
097         * @see #getListenerContainerIds()
098         */
099        @Nullable
100        public MessageListenerContainer getListenerContainer(String id) {
101                Assert.notNull(id, "Container identifier must not be null");
102                return this.listenerContainers.get(id);
103        }
104
105        /**
106         * Return the ids of the managed {@link MessageListenerContainer} instance(s).
107         * @since 4.2.3
108         * @see #getListenerContainer(String)
109         */
110        public Set<String> getListenerContainerIds() {
111                return Collections.unmodifiableSet(this.listenerContainers.keySet());
112        }
113
114        /**
115         * Return the managed {@link MessageListenerContainer} instance(s).
116         */
117        public Collection<MessageListenerContainer> getListenerContainers() {
118                return Collections.unmodifiableCollection(this.listenerContainers.values());
119        }
120
121        /**
122         * Create a message listener container for the given {@link JmsListenerEndpoint}.
123         * <p>This create the necessary infrastructure to honor that endpoint
124         * with regards to its configuration.
125         * <p>The {@code startImmediately} flag determines if the container should be
126         * started immediately.
127         * @param endpoint the endpoint to add
128         * @param factory the listener factory to use
129         * @param startImmediately start the container immediately if necessary
130         * @see #getListenerContainers()
131         * @see #getListenerContainer(String)
132         */
133        public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
134                        boolean startImmediately) {
135
136                Assert.notNull(endpoint, "Endpoint must not be null");
137                Assert.notNull(factory, "Factory must not be null");
138                String id = endpoint.getId();
139                Assert.hasText(id, "Endpoint id must be set");
140
141                synchronized (this.listenerContainers) {
142                        if (this.listenerContainers.containsKey(id)) {
143                                throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
144                        }
145                        MessageListenerContainer container = createListenerContainer(endpoint, factory);
146                        this.listenerContainers.put(id, container);
147                        if (startImmediately) {
148                                startIfNecessary(container);
149                        }
150                }
151        }
152
153        /**
154         * Create a message listener container for the given {@link JmsListenerEndpoint}.
155         * <p>This create the necessary infrastructure to honor that endpoint
156         * with regards to its configuration.
157         * @param endpoint the endpoint to add
158         * @param factory the listener factory to use
159         * @see #registerListenerContainer(JmsListenerEndpoint, JmsListenerContainerFactory, boolean)
160         */
161        public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
162                registerListenerContainer(endpoint, factory, false);
163        }
164
165        /**
166         * Create and start a new container using the specified factory.
167         */
168        protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,
169                        JmsListenerContainerFactory<?> factory) {
170
171                MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
172
173                if (listenerContainer instanceof InitializingBean) {
174                        try {
175                                ((InitializingBean) listenerContainer).afterPropertiesSet();
176                        }
177                        catch (Exception ex) {
178                                throw new BeanInitializationException("Failed to initialize message listener container", ex);
179                        }
180                }
181
182                int containerPhase = listenerContainer.getPhase();
183                if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
184                        if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
185                                throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
186                                                this.phase + " vs " + containerPhase);
187                        }
188                        this.phase = listenerContainer.getPhase();
189                }
190
191                return listenerContainer;
192        }
193
194
195        // Delegating implementation of SmartLifecycle
196
197        @Override
198        public int getPhase() {
199                return this.phase;
200        }
201
202        @Override
203        public void start() {
204                for (MessageListenerContainer listenerContainer : getListenerContainers()) {
205                        startIfNecessary(listenerContainer);
206                }
207        }
208
209        @Override
210        public void stop() {
211                for (MessageListenerContainer listenerContainer : getListenerContainers()) {
212                        listenerContainer.stop();
213                }
214        }
215
216        @Override
217        public void stop(Runnable callback) {
218                Collection<MessageListenerContainer> listenerContainers = getListenerContainers();
219                AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback);
220                for (MessageListenerContainer listenerContainer : listenerContainers) {
221                        listenerContainer.stop(aggregatingCallback);
222                }
223        }
224
225        @Override
226        public boolean isRunning() {
227                for (MessageListenerContainer listenerContainer : getListenerContainers()) {
228                        if (listenerContainer.isRunning()) {
229                                return true;
230                        }
231                }
232                return false;
233        }
234
235        /**
236         * Start the specified {@link MessageListenerContainer} if it should be started
237         * on startup or when start is called explicitly after startup.
238         * @see MessageListenerContainer#isAutoStartup()
239         */
240        private void startIfNecessary(MessageListenerContainer listenerContainer) {
241                if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
242                        listenerContainer.start();
243                }
244        }
245
246        @Override
247        public void destroy() {
248                for (MessageListenerContainer listenerContainer : getListenerContainers()) {
249                        if (listenerContainer instanceof DisposableBean) {
250                                try {
251                                        ((DisposableBean) listenerContainer).destroy();
252                                }
253                                catch (Throwable ex) {
254                                        logger.warn("Failed to destroy message listener container", ex);
255                                }
256                        }
257                }
258        }
259
260
261        private static class AggregatingCallback implements Runnable {
262
263                private final AtomicInteger count;
264
265                private final Runnable finishCallback;
266
267                public AggregatingCallback(int count, Runnable finishCallback) {
268                        this.count = new AtomicInteger(count);
269                        this.finishCallback = finishCallback;
270                }
271
272                @Override
273                public void run() {
274                        if (this.count.decrementAndGet() == 0) {
275                                this.finishCallback.run();
276                        }
277                }
278        }
279
280}