001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener.adapter; 018 019import javax.jms.BytesMessage; 020import javax.jms.Destination; 021import javax.jms.InvalidDestinationException; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageListener; 025import javax.jms.MessageProducer; 026import javax.jms.Session; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030 031import org.springframework.jms.listener.SessionAwareMessageListener; 032import org.springframework.jms.support.JmsHeaderMapper; 033import org.springframework.jms.support.JmsUtils; 034import org.springframework.jms.support.SimpleJmsHeaderMapper; 035import org.springframework.jms.support.converter.MessageConversionException; 036import org.springframework.jms.support.converter.MessageConverter; 037import org.springframework.jms.support.converter.MessagingMessageConverter; 038import org.springframework.jms.support.converter.SimpleMessageConverter; 039import org.springframework.jms.support.converter.SmartMessageConverter; 040import org.springframework.jms.support.destination.DestinationResolver; 041import org.springframework.jms.support.destination.DynamicDestinationResolver; 042import org.springframework.messaging.MessageHeaders; 043import org.springframework.util.Assert; 044 045/** 046 * An abstract JMS {@link MessageListener} adapter providing the necessary 047 * infrastructure to extract the payload of a JMS {@link Message}. 048 * 049 * @author Juergen Hoeller 050 * @author Stephane Nicoll 051 * @since 4.1 052 * @see MessageListener 053 * @see SessionAwareMessageListener 054 */ 055public abstract class AbstractAdaptableMessageListener 056 implements MessageListener, SessionAwareMessageListener<Message> { 057 058 /** Logger available to subclasses */ 059 protected final Log logger = LogFactory.getLog(getClass()); 060 061 private Object defaultResponseDestination; 062 063 private DestinationResolver destinationResolver = new DynamicDestinationResolver(); 064 065 private MessageConverter messageConverter = new SimpleMessageConverter(); 066 067 private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter(); 068 069 070 /** 071 * Set the default destination to send response messages to. This will be applied 072 * in case of a request message that does not carry a "JMSReplyTo" field. 073 * <p>Response destinations are only relevant for listener methods that return 074 * result objects, which will be wrapped in a response message and sent to a 075 * response destination. 076 * <p>Alternatively, specify a "defaultResponseQueueName" or "defaultResponseTopicName", 077 * to be dynamically resolved via the DestinationResolver. 078 * @see #setDefaultResponseQueueName(String) 079 * @see #setDefaultResponseTopicName(String) 080 * @see #getResponseDestination 081 */ 082 public void setDefaultResponseDestination(Destination destination) { 083 this.defaultResponseDestination = destination; 084 } 085 086 /** 087 * Set the name of the default response queue to send response messages to. 088 * This will be applied in case of a request message that does not carry a 089 * "JMSReplyTo" field. 090 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination". 091 * @see #setDestinationResolver 092 * @see #setDefaultResponseDestination(javax.jms.Destination) 093 */ 094 public void setDefaultResponseQueueName(String destinationName) { 095 this.defaultResponseDestination = new DestinationNameHolder(destinationName, false); 096 } 097 098 /** 099 * Set the name of the default response topic to send response messages to. 100 * This will be applied in case of a request message that does not carry a 101 * "JMSReplyTo" field. 102 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination". 103 * @see #setDestinationResolver 104 * @see #setDefaultResponseDestination(javax.jms.Destination) 105 */ 106 public void setDefaultResponseTopicName(String destinationName) { 107 this.defaultResponseDestination = new DestinationNameHolder(destinationName, true); 108 } 109 110 /** 111 * Set the DestinationResolver that should be used to resolve response 112 * destination names for this adapter. 113 * <p>The default resolver is a DynamicDestinationResolver. Specify a 114 * JndiDestinationResolver for resolving destination names as JNDI locations. 115 * @see org.springframework.jms.support.destination.DynamicDestinationResolver 116 * @see org.springframework.jms.support.destination.JndiDestinationResolver 117 */ 118 public void setDestinationResolver(DestinationResolver destinationResolver) { 119 Assert.notNull(destinationResolver, "DestinationResolver must not be null"); 120 this.destinationResolver = destinationResolver; 121 } 122 123 /** 124 * Return the DestinationResolver for this adapter. 125 */ 126 protected DestinationResolver getDestinationResolver() { 127 return this.destinationResolver; 128 } 129 130 /** 131 * Set the converter that will convert incoming JMS messages to 132 * listener method arguments, and objects returned from listener 133 * methods back to JMS messages. 134 * <p>The default converter is a {@link SimpleMessageConverter}, which is able 135 * to handle {@link javax.jms.BytesMessage BytesMessages}, 136 * {@link javax.jms.TextMessage TextMessages} and 137 * {@link javax.jms.ObjectMessage ObjectMessages}. 138 */ 139 public void setMessageConverter(MessageConverter messageConverter) { 140 this.messageConverter = messageConverter; 141 } 142 143 /** 144 * Return the converter that will convert incoming JMS messages to 145 * listener method arguments, and objects returned from listener 146 * methods back to JMS messages. 147 */ 148 protected MessageConverter getMessageConverter() { 149 return this.messageConverter; 150 } 151 152 /** 153 * Set the {@link JmsHeaderMapper} implementation to use to map the standard 154 * JMS headers. By default, a {@link SimpleJmsHeaderMapper} is used. 155 * @see SimpleJmsHeaderMapper 156 */ 157 public void setHeaderMapper(JmsHeaderMapper headerMapper) { 158 Assert.notNull(headerMapper, "HeaderMapper must not be null"); 159 this.messagingMessageConverter.setHeaderMapper(headerMapper); 160 } 161 162 /** 163 * Return the {@link MessagingMessageConverter} for this listener, 164 * being able to convert {@link org.springframework.messaging.Message}. 165 */ 166 protected final MessagingMessageConverter getMessagingMessageConverter() { 167 return this.messagingMessageConverter; 168 } 169 170 171 /** 172 * Standard JMS {@link MessageListener} entry point. 173 * <p>Delegates the message to the target listener method, with appropriate 174 * conversion of the message argument. In case of an exception, the 175 * {@link #handleListenerException(Throwable)} method will be invoked. 176 * <p><b>Note:</b> Does not support sending response messages based on 177 * result objects returned from listener methods. Use the 178 * {@link SessionAwareMessageListener} entry point (typically through a Spring 179 * message listener container) for handling result objects as well. 180 * @param message the incoming JMS message 181 * @see #handleListenerException 182 * @see #onMessage(javax.jms.Message, javax.jms.Session) 183 */ 184 @Override 185 public void onMessage(Message message) { 186 try { 187 onMessage(message, null); 188 } 189 catch (Throwable ex) { 190 handleListenerException(ex); 191 } 192 } 193 194 @Override 195 public abstract void onMessage(Message message, Session session) throws JMSException; 196 197 /** 198 * Handle the given exception that arose during listener execution. 199 * The default implementation logs the exception at error level. 200 * <p>This method only applies when used as standard JMS {@link MessageListener}. 201 * In case of the Spring {@link SessionAwareMessageListener} mechanism, 202 * exceptions get handled by the caller instead. 203 * @param ex the exception to handle 204 * @see #onMessage(javax.jms.Message) 205 */ 206 protected void handleListenerException(Throwable ex) { 207 logger.error("Listener execution failed", ex); 208 } 209 210 211 /** 212 * Extract the message body from the given JMS message. 213 * @param message the JMS {@code Message} 214 * @return the content of the message, to be passed into the listener method 215 * as an argument 216 * @throws MessageConversionException if the message could not be extracted 217 */ 218 protected Object extractMessage(Message message) { 219 try { 220 MessageConverter converter = getMessageConverter(); 221 if (converter != null) { 222 return converter.fromMessage(message); 223 } 224 return message; 225 } 226 catch (JMSException ex) { 227 throw new MessageConversionException("Could not convert JMS message", ex); 228 } 229 } 230 231 /** 232 * Handle the given result object returned from the listener method, 233 * sending a response message back. 234 * @param result the result object to handle (never {@code null}) 235 * @param request the original request message 236 * @param session the JMS Session to operate on (may be {@code null}) 237 * @throws ReplyFailureException if the response message could not be sent 238 * @see #buildMessage 239 * @see #postProcessResponse 240 * @see #getResponseDestination 241 * @see #sendResponse 242 */ 243 protected void handleResult(Object result, Message request, Session session) { 244 if (session != null) { 245 if (logger.isDebugEnabled()) { 246 logger.debug("Listener method returned result [" + result + 247 "] - generating response message for it"); 248 } 249 try { 250 Message response = buildMessage(session, result); 251 postProcessResponse(request, response); 252 Destination destination = getResponseDestination(request, response, session, result); 253 sendResponse(session, destination, response); 254 } 255 catch (Exception ex) { 256 throw new ReplyFailureException("Failed to send reply with payload [" + result + "]", ex); 257 } 258 } 259 260 else { 261 // No JMS Session available 262 if (logger.isWarnEnabled()) { 263 logger.warn("Listener method returned result [" + result + 264 "]: not generating response message for it because of no JMS Session given"); 265 } 266 } 267 } 268 269 /** 270 * Build a JMS message to be sent as response based on the given result object. 271 * @param session the JMS Session to operate on 272 * @param result the content of the message, as returned from the listener method 273 * @return the JMS {@code Message} (never {@code null}) 274 * @throws JMSException if thrown by JMS API methods 275 * @see #setMessageConverter 276 */ 277 protected Message buildMessage(Session session, Object result) throws JMSException { 278 Object content = preProcessResponse(result instanceof JmsResponse 279 ? ((JmsResponse<?>) result).getResponse() : result); 280 281 MessageConverter converter = getMessageConverter(); 282 if (converter != null) { 283 if (content instanceof org.springframework.messaging.Message) { 284 return this.messagingMessageConverter.toMessage(content, session); 285 } 286 else { 287 return converter.toMessage(content, session); 288 } 289 } 290 291 if (!(content instanceof Message)) { 292 throw new MessageConversionException( 293 "No MessageConverter specified - cannot handle message [" + content + "]"); 294 } 295 return (Message) content; 296 } 297 298 /** 299 * Pre-process the given result before it is converted to a {@link Message}. 300 * @param result the result of the invocation 301 * @return the payload response to handle, either the {@code result} argument 302 * or any other object (for instance wrapping the result). 303 * @since 4.3 304 */ 305 protected Object preProcessResponse(Object result) { 306 return result; 307 } 308 309 /** 310 * Post-process the given response message before it will be sent. 311 * <p>The default implementation sets the response's correlation id 312 * to the request message's correlation id, if any; otherwise to the 313 * request message id. 314 * @param request the original incoming JMS message 315 * @param response the outgoing JMS message about to be sent 316 * @throws JMSException if thrown by JMS API methods 317 * @see javax.jms.Message#setJMSCorrelationID 318 */ 319 protected void postProcessResponse(Message request, Message response) throws JMSException { 320 String correlation = request.getJMSCorrelationID(); 321 if (correlation == null) { 322 correlation = request.getJMSMessageID(); 323 } 324 response.setJMSCorrelationID(correlation); 325 } 326 327 private Destination getResponseDestination(Message request, Message response, Session session, Object result) 328 throws JMSException { 329 330 if (result instanceof JmsResponse) { 331 JmsResponse<?> jmsResponse = (JmsResponse) result; 332 Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session); 333 if (destination != null) { 334 return destination; 335 } 336 } 337 return getResponseDestination(request, response, session); 338 } 339 340 /** 341 * Determine a response destination for the given message. 342 * <p>The default implementation first checks the JMS Reply-To 343 * {@link Destination} of the supplied request; if that is not {@code null} 344 * it is returned; if it is {@code null}, then the configured 345 * {@link #resolveDefaultResponseDestination default response destination} 346 * is returned; if this too is {@code null}, then an 347 * {@link javax.jms.InvalidDestinationException} is thrown. 348 * @param request the original incoming JMS message 349 * @param response the outgoing JMS message about to be sent 350 * @param session the JMS Session to operate on 351 * @return the response destination (never {@code null}) 352 * @throws JMSException if thrown by JMS API methods 353 * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined 354 * @see #setDefaultResponseDestination 355 * @see javax.jms.Message#getJMSReplyTo() 356 */ 357 protected Destination getResponseDestination(Message request, Message response, Session session) 358 throws JMSException { 359 360 Destination replyTo = request.getJMSReplyTo(); 361 if (replyTo == null) { 362 replyTo = resolveDefaultResponseDestination(session); 363 if (replyTo == null) { 364 throw new InvalidDestinationException("Cannot determine response destination: " + 365 "Request message does not contain reply-to destination, and no default response destination set."); 366 } 367 } 368 return replyTo; 369 } 370 371 /** 372 * Resolve the default response destination into a JMS {@link Destination}, using this 373 * accessor's {@link DestinationResolver} in case of a destination name. 374 * @return the located {@link Destination} 375 * @throws javax.jms.JMSException if resolution failed 376 * @see #setDefaultResponseDestination 377 * @see #setDefaultResponseQueueName 378 * @see #setDefaultResponseTopicName 379 * @see #setDestinationResolver 380 */ 381 protected Destination resolveDefaultResponseDestination(Session session) throws JMSException { 382 if (this.defaultResponseDestination instanceof Destination) { 383 return (Destination) this.defaultResponseDestination; 384 } 385 if (this.defaultResponseDestination instanceof DestinationNameHolder) { 386 DestinationNameHolder nameHolder = (DestinationNameHolder) this.defaultResponseDestination; 387 return getDestinationResolver().resolveDestinationName(session, nameHolder.name, nameHolder.isTopic); 388 } 389 return null; 390 } 391 392 /** 393 * Send the given response message to the given destination. 394 * @param response the JMS message to send 395 * @param destination the JMS destination to send to 396 * @param session the JMS session to operate on 397 * @throws JMSException if thrown by JMS API methods 398 * @see #postProcessProducer 399 * @see javax.jms.Session#createProducer 400 * @see javax.jms.MessageProducer#send 401 */ 402 protected void sendResponse(Session session, Destination destination, Message response) throws JMSException { 403 MessageProducer producer = session.createProducer(destination); 404 try { 405 postProcessProducer(producer, response); 406 producer.send(response); 407 } 408 finally { 409 JmsUtils.closeMessageProducer(producer); 410 } 411 } 412 413 /** 414 * Post-process the given message producer before using it to send the response. 415 * <p>The default implementation is empty. 416 * @param producer the JMS message producer that will be used to send the message 417 * @param response the outgoing JMS message about to be sent 418 * @throws JMSException if thrown by JMS API methods 419 */ 420 protected void postProcessProducer(MessageProducer producer, Message response) throws JMSException { 421 } 422 423 424 /** 425 * A {@link MessagingMessageConverter} that lazily invoke payload extraction and 426 * delegate it to {@link #extractMessage(javax.jms.Message)} in order to enforce 427 * backward compatibility. 428 */ 429 private class MessagingMessageConverterAdapter extends MessagingMessageConverter { 430 431 @SuppressWarnings("unchecked") 432 @Override 433 public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException { 434 if (message == null) { 435 return null; 436 } 437 return new LazyResolutionMessage(message); 438 } 439 440 @Override 441 protected Object extractPayload(Message message) throws JMSException { 442 Object payload = extractMessage(message); 443 if (message instanceof BytesMessage) { 444 try { 445 // In case the BytesMessage is going to be received as a user argument: 446 // reset it, otherwise it would appear empty to such processing code... 447 ((BytesMessage) message).reset(); 448 } 449 catch (JMSException ex) { 450 // Continue since the BytesMessage typically won't be used any further. 451 logger.debug("Failed to reset BytesMessage after payload extraction", ex); 452 } 453 } 454 return payload; 455 } 456 457 @Override 458 protected Message createMessageForPayload(Object payload, Session session, Object conversionHint) 459 throws JMSException { 460 461 MessageConverter converter = getMessageConverter(); 462 if (converter == null) { 463 throw new IllegalStateException("No message converter, cannot handle '" + payload + "'"); 464 } 465 if (converter instanceof SmartMessageConverter) { 466 return ((SmartMessageConverter) converter).toMessage(payload, session, conversionHint); 467 468 } 469 return converter.toMessage(payload, session); 470 } 471 472 473 protected class LazyResolutionMessage implements org.springframework.messaging.Message<Object> { 474 475 private final javax.jms.Message message; 476 477 private Object payload; 478 479 private MessageHeaders headers; 480 481 public LazyResolutionMessage(javax.jms.Message message) { 482 this.message = message; 483 } 484 485 @Override 486 public Object getPayload() { 487 if (this.payload == null) { 488 try { 489 this.payload = unwrapPayload(); 490 } 491 catch (JMSException ex) { 492 throw new MessageConversionException( 493 "Failed to extract payload from [" + this.message + "]", ex); 494 } 495 } 496 return this.payload; 497 } 498 499 /** 500 * Extract the payload of the current message. Since we deferred the resolution 501 * of the payload, a custom converter may still return a full message for it. In 502 * this case, its payload is returned. 503 * @return the payload of the message 504 */ 505 private Object unwrapPayload() throws JMSException { 506 Object payload = extractPayload(this.message); 507 if (payload instanceof org.springframework.messaging.Message) { 508 return ((org.springframework.messaging.Message) payload).getPayload(); 509 } 510 return payload; 511 } 512 513 @Override 514 public MessageHeaders getHeaders() { 515 if (this.headers == null) { 516 this.headers = extractHeaders(this.message); 517 } 518 return this.headers; 519 } 520 } 521 } 522 523 524 /** 525 * Internal class combining a destination name 526 * and its target destination type (queue or topic). 527 */ 528 private static class DestinationNameHolder { 529 530 public final String name; 531 532 public final boolean isTopic; 533 534 public DestinationNameHolder(String name, boolean isTopic) { 535 this.name = name; 536 this.isTopic = isTopic; 537 } 538 } 539 540}