001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.config; 018 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028 029import org.springframework.beans.factory.BeanInitializationException; 030import org.springframework.beans.factory.DisposableBean; 031import org.springframework.beans.factory.InitializingBean; 032import org.springframework.context.ApplicationContext; 033import org.springframework.context.ApplicationContextAware; 034import org.springframework.context.ApplicationListener; 035import org.springframework.context.SmartLifecycle; 036import org.springframework.context.event.ContextRefreshedEvent; 037import org.springframework.jms.listener.MessageListenerContainer; 038import org.springframework.util.Assert; 039 040/** 041 * Creates the necessary {@link MessageListenerContainer} instances for the 042 * registered {@linkplain JmsListenerEndpoint endpoints}. Also manages the 043 * lifecycle of the listener containers, in particular within the lifecycle 044 * of the application context. 045 * 046 * <p>Contrary to {@link MessageListenerContainer}s created manually, listener 047 * containers managed by registry are not beans in the application context and 048 * are not candidates for autowiring. Use {@link #getListenerContainers()} if 049 * you need to access this registry's listener containers for management purposes. 050 * If you need to access to a specific message listener container, use 051 * {@link #getListenerContainer(String)} with the id of the endpoint. 052 * 053 * @author Stephane Nicoll 054 * @author Juergen Hoeller 055 * @since 4.1 056 * @see JmsListenerEndpoint 057 * @see MessageListenerContainer 058 * @see JmsListenerContainerFactory 059 */ 060public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle, 061 ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { 062 063 protected final Log logger = LogFactory.getLog(getClass()); 064 065 private final Map<String, MessageListenerContainer> listenerContainers = 066 new ConcurrentHashMap<String, MessageListenerContainer>(); 067 068 private int phase = Integer.MAX_VALUE; 069 070 private ApplicationContext applicationContext; 071 072 private boolean contextRefreshed; 073 074 075 @Override 076 public void setApplicationContext(ApplicationContext applicationContext) { 077 this.applicationContext = applicationContext; 078 } 079 080 @Override 081 public void onApplicationEvent(ContextRefreshedEvent event) { 082 if (event.getApplicationContext() == this.applicationContext) { 083 this.contextRefreshed = true; 084 } 085 } 086 087 088 /** 089 * Return the {@link MessageListenerContainer} with the specified id or 090 * {@code null} if no such container exists. 091 * @param id the id of the container 092 * @return the container or {@code null} if no container with that id exists 093 * @see JmsListenerEndpoint#getId() 094 * @see #getListenerContainerIds() 095 */ 096 public MessageListenerContainer getListenerContainer(String id) { 097 Assert.notNull(id, "Container identifier must not be null"); 098 return this.listenerContainers.get(id); 099 } 100 101 /** 102 * Return the ids of the managed {@link MessageListenerContainer} instance(s). 103 * @since 4.2.3 104 * @see #getListenerContainer(String) 105 */ 106 public Set<String> getListenerContainerIds() { 107 return Collections.unmodifiableSet(this.listenerContainers.keySet()); 108 } 109 110 /** 111 * Return the managed {@link MessageListenerContainer} instance(s). 112 */ 113 public Collection<MessageListenerContainer> getListenerContainers() { 114 return Collections.unmodifiableCollection(this.listenerContainers.values()); 115 } 116 117 /** 118 * Create a message listener container for the given {@link JmsListenerEndpoint}. 119 * <p>This create the necessary infrastructure to honor that endpoint 120 * with regards to its configuration. 121 * <p>The {@code startImmediately} flag determines if the container should be 122 * started immediately. 123 * @param endpoint the endpoint to add 124 * @param factory the listener factory to use 125 * @param startImmediately start the container immediately if necessary 126 * @see #getListenerContainers() 127 * @see #getListenerContainer(String) 128 */ 129 public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory, 130 boolean startImmediately) { 131 132 Assert.notNull(endpoint, "Endpoint must not be null"); 133 Assert.notNull(factory, "Factory must not be null"); 134 String id = endpoint.getId(); 135 Assert.notNull(id, "Endpoint id must not be null"); 136 137 synchronized (this.listenerContainers) { 138 if (this.listenerContainers.containsKey(id)) { 139 throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'"); 140 } 141 MessageListenerContainer container = createListenerContainer(endpoint, factory); 142 this.listenerContainers.put(id, container); 143 if (startImmediately) { 144 startIfNecessary(container); 145 } 146 } 147 } 148 149 /** 150 * Create a message listener container for the given {@link JmsListenerEndpoint}. 151 * <p>This create the necessary infrastructure to honor that endpoint 152 * with regards to its configuration. 153 * @param endpoint the endpoint to add 154 * @param factory the listener factory to use 155 * @see #registerListenerContainer(JmsListenerEndpoint, JmsListenerContainerFactory, boolean) 156 */ 157 public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) { 158 registerListenerContainer(endpoint, factory, false); 159 } 160 161 /** 162 * Create and start a new container using the specified factory. 163 */ 164 protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint, 165 JmsListenerContainerFactory<?> factory) { 166 167 MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); 168 169 if (listenerContainer instanceof InitializingBean) { 170 try { 171 ((InitializingBean) listenerContainer).afterPropertiesSet(); 172 } 173 catch (Exception ex) { 174 throw new BeanInitializationException("Failed to initialize message listener container", ex); 175 } 176 } 177 178 int containerPhase = listenerContainer.getPhase(); 179 if (containerPhase < Integer.MAX_VALUE) { // a custom phase value 180 if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { 181 throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + 182 this.phase + " vs " + containerPhase); 183 } 184 this.phase = listenerContainer.getPhase(); 185 } 186 187 return listenerContainer; 188 } 189 190 191 // Delegating implementation of SmartLifecycle 192 193 @Override 194 public boolean isAutoStartup() { 195 return true; 196 } 197 198 @Override 199 public int getPhase() { 200 return this.phase; 201 } 202 203 @Override 204 public void start() { 205 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 206 startIfNecessary(listenerContainer); 207 } 208 } 209 210 @Override 211 public void stop() { 212 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 213 listenerContainer.stop(); 214 } 215 } 216 217 @Override 218 public void stop(Runnable callback) { 219 Collection<MessageListenerContainer> listenerContainers = getListenerContainers(); 220 AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback); 221 for (MessageListenerContainer listenerContainer : listenerContainers) { 222 listenerContainer.stop(aggregatingCallback); 223 } 224 } 225 226 @Override 227 public boolean isRunning() { 228 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 229 if (listenerContainer.isRunning()) { 230 return true; 231 } 232 } 233 return false; 234 } 235 236 /** 237 * Start the specified {@link MessageListenerContainer} if it should be started 238 * on startup or when start is called explicitly after startup. 239 * @see MessageListenerContainer#isAutoStartup() 240 */ 241 private void startIfNecessary(MessageListenerContainer listenerContainer) { 242 if (this.contextRefreshed || listenerContainer.isAutoStartup()) { 243 listenerContainer.start(); 244 } 245 } 246 247 @Override 248 public void destroy() { 249 for (MessageListenerContainer listenerContainer : getListenerContainers()) { 250 if (listenerContainer instanceof DisposableBean) { 251 try { 252 ((DisposableBean) listenerContainer).destroy(); 253 } 254 catch (Throwable ex) { 255 logger.warn("Failed to destroy message listener container", ex); 256 } 257 } 258 } 259 } 260 261 262 private static class AggregatingCallback implements Runnable { 263 264 private final AtomicInteger count; 265 266 private final Runnable finishCallback; 267 268 public AggregatingCallback(int count, Runnable finishCallback) { 269 this.count = new AtomicInteger(count); 270 this.finishCallback = finishCallback; 271 } 272 273 @Override 274 public void run() { 275 if (this.count.decrementAndGet() == 0) { 276 this.finishCallback.run(); 277 } 278 } 279 } 280 281}