001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.config; 018 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028 029import org.springframework.beans.factory.BeanInitializationException; 030import org.springframework.beans.factory.DisposableBean; 031import org.springframework.beans.factory.InitializingBean; 032import org.springframework.context.ApplicationContext; 033import org.springframework.context.ApplicationContextAware; 034import org.springframework.context.ApplicationListener; 035import org.springframework.context.SmartLifecycle; 036import org.springframework.context.event.ContextRefreshedEvent; 037import org.springframework.jms.listener.MessageListenerContainer; 038import org.springframework.lang.Nullable; 039import org.springframework.util.Assert; 040 041/** 042 * Creates the necessary {@link MessageListenerContainer} instances for the 043 * registered {@linkplain JmsListenerEndpoint endpoints}. Also manages the 044 * lifecycle of the listener containers, in particular within the lifecycle 045 * of the application context. 046 * 047 * <p>Contrary to {@link MessageListenerContainer MessageListenerContainers} 048 * created manually, listener containers managed by registry are not beans 049 * in the application context and are not candidates for autowiring. 050 * Use {@link #getListenerContainers()} if you need to access this registry's 051 * listener containers for management purposes. If you need to access to a 052 * specific message listener container, use {@link #getListenerContainer(String)} 053 * with the id of the endpoint. 054 * 055 * @author Stephane Nicoll 056 * @author Juergen Hoeller 057 * @since 4.1 058 * @see JmsListenerEndpoint 059 * @see MessageListenerContainer 060 * @see JmsListenerContainerFactory 061 */ 062public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle, 063 ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { 064 065 protected final Log logger = LogFactory.getLog(getClass()); 066 067 private final Map<String, MessageListenerContainer> listenerContainers = 068 new ConcurrentHashMap<>(); 069 070 private int phase = DEFAULT_PHASE; 071 072 @Nullable 073 private ApplicationContext applicationContext; 074 075 private boolean contextRefreshed; 076 077 078 @Override 079 public void setApplicationContext(ApplicationContext applicationContext) { 080 this.applicationContext = applicationContext; 081 } 082 083 @Override 084 public void onApplicationEvent(ContextRefreshedEvent event) { 085 if (event.getApplicationContext() == this.applicationContext) { 086 this.contextRefreshed = true; 087 } 088 } 089 090 091 /** 092 * Return the {@link MessageListenerContainer} with the specified id or 093 * {@code null} if no such container exists. 094 * @param id the id of the container 095 * @return the container or {@code null} if no container with that id exists 096 * @see JmsListenerEndpoint#getId() 097 * @see #getListenerContainerIds() 098 */ 099 @Nullable 100 public MessageListenerContainer getListenerContainer(String id) { 101 Assert.notNull(id, "Container identifier must not be null"); 102 return this.listenerContainers.get(id); 103 } 104 105 /** 106 * Return the ids of the managed {@link MessageListenerContainer} instance(s). 107 * @since 4.2.3 108 * @see #getListenerContainer(String) 109 */ 110 public Set<String> getListenerContainerIds() { 111 return Collections.unmodifiableSet(this.listenerContainers.keySet()); 112 } 113 114 /** 115 * Return the managed {@link MessageListenerContainer} instance(s). 116 */ 117 public Collection<MessageListenerContainer> getListenerContainers() { 118 return Collections.unmodifiableCollection(this.listenerContainers.values()); 119 } 120 121 /** 122 * Create a message listener container for the given {@link JmsListenerEndpoint}. 123 * <p>This create the necessary infrastructure to honor that endpoint 124 * with regards to its configuration. 125 * <p>The {@code startImmediately} flag determines if the container should be 126 * started immediately. 127 * @param endpoint the endpoint to add 128 * @param factory the listener factory to use 129 * @param startImmediately start the container immediately if necessary 130 * @see #getListenerContainers() 131 * @see #getListenerContainer(String) 132 */ 133 public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory, 134 boolean startImmediately) { 135 136 Assert.notNull(endpoint, "Endpoint must not be null"); 137 Assert.notNull(factory, "Factory must not be null"); 138 String id = endpoint.getId(); 139 Assert.hasText(id, "Endpoint id must be set"); 140 141 synchronized (this.listenerContainers) { 142 if (this.listenerContainers.containsKey(id)) { 143 throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'"); 144 } 145 MessageListenerContainer container = createListenerContainer(endpoint, factory); 146 this.listenerContainers.put(id, container); 147 if (startImmediately) { 148 startIfNecessary(container); 149 } 150 } 151 } 152 153 /** 154 * Create a message listener container for the given {@link JmsListenerEndpoint}. 155 * <p>This create the necessary infrastructure to honor that endpoint 156 * with regards to its configuration. 157 * @param endpoint the endpoint to add 158 * @param factory the listener factory to use 159 * @see #registerListenerContainer(JmsListenerEndpoint, JmsListenerContainerFactory, boolean) 160 */ 161 public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) { 162 registerListenerContainer(endpoint, factory, false); 163 } 164 165 /** 166 * Create and start a new container using the specified factory. 167 */ 168 protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint, 169 JmsListenerContainerFactory<?> factory) { 170 171 MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); 172 173 if (listenerContainer instanceof InitializingBean) { 174 try { 175 ((InitializingBean) listenerContainer).afterPropertiesSet(); 176 } 177 catch (Exception ex) { 178 throw new BeanInitializationException("Failed to initialize message listener container", ex); 179 } 180 } 181 182 int containerPhase = listenerContainer.getPhase(); 183 if (containerPhase < Integer.MAX_VALUE) { // a custom phase value 184 if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { 185 throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + 186 this.phase + " vs " + containerPhase); 187 } 188 this.phase = listenerContainer.getPhase(); 189 } 190 191 return listenerContainer; 192 } 193 194 195 // Delegating implementation of SmartLifecycle 196 197 @Override 198 public int getPhase() { 199 return this.phase; 200 } 201 202 @Override 203 public void start() { 204 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 205 startIfNecessary(listenerContainer); 206 } 207 } 208 209 @Override 210 public void stop() { 211 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 212 listenerContainer.stop(); 213 } 214 } 215 216 @Override 217 public void stop(Runnable callback) { 218 Collection<MessageListenerContainer> listenerContainers = getListenerContainers(); 219 AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback); 220 for (MessageListenerContainer listenerContainer : listenerContainers) { 221 listenerContainer.stop(aggregatingCallback); 222 } 223 } 224 225 @Override 226 public boolean isRunning() { 227 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 228 if (listenerContainer.isRunning()) { 229 return true; 230 } 231 } 232 return false; 233 } 234 235 /** 236 * Start the specified {@link MessageListenerContainer} if it should be started 237 * on startup or when start is called explicitly after startup. 238 * @see MessageListenerContainer#isAutoStartup() 239 */ 240 private void startIfNecessary(MessageListenerContainer listenerContainer) { 241 if (this.contextRefreshed || listenerContainer.isAutoStartup()) { 242 listenerContainer.start(); 243 } 244 } 245 246 @Override 247 public void destroy() { 248 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 249 if (listenerContainer instanceof DisposableBean) { 250 try { 251 ((DisposableBean) listenerContainer).destroy(); 252 } 253 catch (Throwable ex) { 254 logger.warn("Failed to destroy message listener container", ex); 255 } 256 } 257 } 258 } 259 260 261 private static class AggregatingCallback implements Runnable { 262 263 private final AtomicInteger count; 264 265 private final Runnable finishCallback; 266 267 public AggregatingCallback(int count, Runnable finishCallback) { 268 this.count = new AtomicInteger(count); 269 this.finishCallback = finishCallback; 270 } 271 272 @Override 273 public void run() { 274 if (this.count.decrementAndGet() == 0) { 275 this.finishCallback.run(); 276 } 277 } 278 } 279 280}