001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener.adapter; 018 019import javax.jms.BytesMessage; 020import javax.jms.Destination; 021import javax.jms.InvalidDestinationException; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageListener; 025import javax.jms.MessageProducer; 026import javax.jms.Session; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030 031import org.springframework.jms.listener.SessionAwareMessageListener; 032import org.springframework.jms.support.JmsHeaderMapper; 033import org.springframework.jms.support.JmsUtils; 034import org.springframework.jms.support.QosSettings; 035import org.springframework.jms.support.SimpleJmsHeaderMapper; 036import org.springframework.jms.support.converter.MessageConversionException; 037import org.springframework.jms.support.converter.MessageConverter; 038import org.springframework.jms.support.converter.MessagingMessageConverter; 039import org.springframework.jms.support.converter.SimpleMessageConverter; 040import org.springframework.jms.support.converter.SmartMessageConverter; 041import org.springframework.jms.support.destination.DestinationResolver; 042import org.springframework.jms.support.destination.DynamicDestinationResolver; 043import org.springframework.lang.Nullable; 044import org.springframework.messaging.MessageHeaders; 045import org.springframework.util.Assert; 046 047/** 048 * An abstract JMS {@link MessageListener} adapter providing the necessary 049 * infrastructure to extract the payload of a JMS {@link Message}. 050 * 051 * @author Juergen Hoeller 052 * @author Stephane Nicoll 053 * @since 4.1 054 * @see MessageListener 055 * @see SessionAwareMessageListener 056 */ 057public abstract class AbstractAdaptableMessageListener 058 implements MessageListener, SessionAwareMessageListener<Message> { 059 060 /** Logger available to subclasses. */ 061 protected final Log logger = LogFactory.getLog(getClass()); 062 063 @Nullable 064 private Object defaultResponseDestination; 065 066 private DestinationResolver destinationResolver = new DynamicDestinationResolver(); 067 068 @Nullable 069 private MessageConverter messageConverter = new SimpleMessageConverter(); 070 071 private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter(); 072 073 @Nullable 074 private QosSettings responseQosSettings; 075 076 077 /** 078 * Set the default destination to send response messages to. This will be applied 079 * in case of a request message that does not carry a "JMSReplyTo" field. 080 * <p>Response destinations are only relevant for listener methods that return 081 * result objects, which will be wrapped in a response message and sent to a 082 * response destination. 083 * <p>Alternatively, specify a "defaultResponseQueueName" or "defaultResponseTopicName", 084 * to be dynamically resolved via the DestinationResolver. 085 * @see #setDefaultResponseQueueName(String) 086 * @see #setDefaultResponseTopicName(String) 087 * @see #getResponseDestination 088 */ 089 public void setDefaultResponseDestination(Destination destination) { 090 this.defaultResponseDestination = destination; 091 } 092 093 /** 094 * Set the name of the default response queue to send response messages to. 095 * This will be applied in case of a request message that does not carry a 096 * "JMSReplyTo" field. 097 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination". 098 * @see #setDestinationResolver 099 * @see #setDefaultResponseDestination(javax.jms.Destination) 100 */ 101 public void setDefaultResponseQueueName(String destinationName) { 102 this.defaultResponseDestination = new DestinationNameHolder(destinationName, false); 103 } 104 105 /** 106 * Set the name of the default response topic to send response messages to. 107 * This will be applied in case of a request message that does not carry a 108 * "JMSReplyTo" field. 109 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination". 110 * @see #setDestinationResolver 111 * @see #setDefaultResponseDestination(javax.jms.Destination) 112 */ 113 public void setDefaultResponseTopicName(String destinationName) { 114 this.defaultResponseDestination = new DestinationNameHolder(destinationName, true); 115 } 116 117 /** 118 * Set the DestinationResolver that should be used to resolve response 119 * destination names for this adapter. 120 * <p>The default resolver is a DynamicDestinationResolver. Specify a 121 * JndiDestinationResolver for resolving destination names as JNDI locations. 122 * @see org.springframework.jms.support.destination.DynamicDestinationResolver 123 * @see org.springframework.jms.support.destination.JndiDestinationResolver 124 */ 125 public void setDestinationResolver(DestinationResolver destinationResolver) { 126 Assert.notNull(destinationResolver, "DestinationResolver must not be null"); 127 this.destinationResolver = destinationResolver; 128 } 129 130 /** 131 * Return the DestinationResolver for this adapter. 132 */ 133 protected DestinationResolver getDestinationResolver() { 134 return this.destinationResolver; 135 } 136 137 /** 138 * Set the converter that will convert incoming JMS messages to 139 * listener method arguments, and objects returned from listener 140 * methods back to JMS messages. 141 * <p>The default converter is a {@link SimpleMessageConverter}, which is able 142 * to handle {@link javax.jms.BytesMessage BytesMessages}, 143 * {@link javax.jms.TextMessage TextMessages} and 144 * {@link javax.jms.ObjectMessage ObjectMessages}. 145 */ 146 public void setMessageConverter(@Nullable MessageConverter messageConverter) { 147 this.messageConverter = messageConverter; 148 } 149 150 /** 151 * Return the converter that will convert incoming JMS messages to 152 * listener method arguments, and objects returned from listener 153 * methods back to JMS messages. 154 */ 155 @Nullable 156 protected MessageConverter getMessageConverter() { 157 return this.messageConverter; 158 } 159 160 /** 161 * Set the {@link JmsHeaderMapper} implementation to use to map the standard 162 * JMS headers. By default, a {@link SimpleJmsHeaderMapper} is used. 163 * @see SimpleJmsHeaderMapper 164 */ 165 public void setHeaderMapper(JmsHeaderMapper headerMapper) { 166 Assert.notNull(headerMapper, "HeaderMapper must not be null"); 167 this.messagingMessageConverter.setHeaderMapper(headerMapper); 168 } 169 170 /** 171 * Return the {@link MessagingMessageConverter} for this listener, 172 * being able to convert {@link org.springframework.messaging.Message}. 173 */ 174 protected final MessagingMessageConverter getMessagingMessageConverter() { 175 return this.messagingMessageConverter; 176 } 177 178 /** 179 * Set the {@link QosSettings} to use when sending a response. Can be set to 180 * {@code null} to indicate that the broker's defaults should be used. 181 * @param responseQosSettings the QoS settings to use when sending a response or 182 * {@code null} to use the default values. 183 * @since 5.0 184 */ 185 public void setResponseQosSettings(@Nullable QosSettings responseQosSettings) { 186 this.responseQosSettings = responseQosSettings; 187 } 188 189 /** 190 * Return the {@link QosSettings} to use when sending a response, 191 * or {@code null} if the defaults should be used. 192 * @since 5.0 193 */ 194 @Nullable 195 protected QosSettings getResponseQosSettings() { 196 return this.responseQosSettings; 197 } 198 199 200 /** 201 * Standard JMS {@link MessageListener} entry point. 202 * <p>Delegates the message to the target listener method, with appropriate 203 * conversion of the message argument. In case of an exception, the 204 * {@link #handleListenerException(Throwable)} method will be invoked. 205 * <p><b>Note:</b> Does not support sending response messages based on 206 * result objects returned from listener methods. Use the 207 * {@link SessionAwareMessageListener} entry point (typically through a Spring 208 * message listener container) for handling result objects as well. 209 * @param message the incoming JMS message 210 * @see #handleListenerException 211 * @see #onMessage(javax.jms.Message, javax.jms.Session) 212 */ 213 @Override 214 public void onMessage(Message message) { 215 try { 216 onMessage(message, null); 217 } 218 catch (Throwable ex) { 219 handleListenerException(ex); 220 } 221 } 222 223 @Override 224 public abstract void onMessage(Message message, @Nullable Session session) throws JMSException; 225 226 /** 227 * Handle the given exception that arose during listener execution. 228 * The default implementation logs the exception at error level. 229 * <p>This method only applies when used as standard JMS {@link MessageListener}. 230 * In case of the Spring {@link SessionAwareMessageListener} mechanism, 231 * exceptions get handled by the caller instead. 232 * @param ex the exception to handle 233 * @see #onMessage(javax.jms.Message) 234 */ 235 protected void handleListenerException(Throwable ex) { 236 logger.error("Listener execution failed", ex); 237 } 238 239 240 /** 241 * Extract the message body from the given JMS message. 242 * @param message the JMS {@code Message} 243 * @return the content of the message, to be passed into the listener method 244 * as an argument 245 * @throws MessageConversionException if the message could not be extracted 246 */ 247 protected Object extractMessage(Message message) { 248 try { 249 MessageConverter converter = getMessageConverter(); 250 if (converter != null) { 251 return converter.fromMessage(message); 252 } 253 return message; 254 } 255 catch (JMSException ex) { 256 throw new MessageConversionException("Could not convert JMS message", ex); 257 } 258 } 259 260 /** 261 * Handle the given result object returned from the listener method, 262 * sending a response message back. 263 * @param result the result object to handle (never {@code null}) 264 * @param request the original request message 265 * @param session the JMS Session to operate on (may be {@code null}) 266 * @throws ReplyFailureException if the response message could not be sent 267 * @see #buildMessage 268 * @see #postProcessResponse 269 * @see #getResponseDestination 270 * @see #sendResponse 271 */ 272 protected void handleResult(Object result, Message request, @Nullable Session session) { 273 if (session != null) { 274 if (logger.isDebugEnabled()) { 275 logger.debug("Listener method returned result [" + result + 276 "] - generating response message for it"); 277 } 278 try { 279 Message response = buildMessage(session, result); 280 postProcessResponse(request, response); 281 Destination destination = getResponseDestination(request, response, session, result); 282 sendResponse(session, destination, response); 283 } 284 catch (Exception ex) { 285 throw new ReplyFailureException("Failed to send reply with payload [" + result + "]", ex); 286 } 287 } 288 289 else { 290 // No JMS Session available 291 if (logger.isWarnEnabled()) { 292 logger.warn("Listener method returned result [" + result + 293 "]: not generating response message for it because of no JMS Session given"); 294 } 295 } 296 } 297 298 /** 299 * Build a JMS message to be sent as response based on the given result object. 300 * @param session the JMS Session to operate on 301 * @param result the content of the message, as returned from the listener method 302 * @return the JMS {@code Message} (never {@code null}) 303 * @throws JMSException if thrown by JMS API methods 304 * @see #setMessageConverter 305 */ 306 protected Message buildMessage(Session session, Object result) throws JMSException { 307 Object content = preProcessResponse(result instanceof JmsResponse 308 ? ((JmsResponse<?>) result).getResponse() : result); 309 310 MessageConverter converter = getMessageConverter(); 311 if (converter != null) { 312 if (content instanceof org.springframework.messaging.Message) { 313 return this.messagingMessageConverter.toMessage(content, session); 314 } 315 else { 316 return converter.toMessage(content, session); 317 } 318 } 319 320 if (!(content instanceof Message)) { 321 throw new MessageConversionException( 322 "No MessageConverter specified - cannot handle message [" + content + "]"); 323 } 324 return (Message) content; 325 } 326 327 /** 328 * Pre-process the given result before it is converted to a {@link Message}. 329 * @param result the result of the invocation 330 * @return the payload response to handle, either the {@code result} argument 331 * or any other object (for instance wrapping the result). 332 * @since 4.3 333 */ 334 protected Object preProcessResponse(Object result) { 335 return result; 336 } 337 338 /** 339 * Post-process the given response message before it will be sent. 340 * <p>The default implementation sets the response's correlation id 341 * to the request message's correlation id, if any; otherwise to the 342 * request message id. 343 * @param request the original incoming JMS message 344 * @param response the outgoing JMS message about to be sent 345 * @throws JMSException if thrown by JMS API methods 346 * @see javax.jms.Message#setJMSCorrelationID 347 */ 348 protected void postProcessResponse(Message request, Message response) throws JMSException { 349 String correlation = request.getJMSCorrelationID(); 350 if (correlation == null) { 351 correlation = request.getJMSMessageID(); 352 } 353 response.setJMSCorrelationID(correlation); 354 } 355 356 private Destination getResponseDestination(Message request, Message response, Session session, Object result) 357 throws JMSException { 358 359 if (result instanceof JmsResponse) { 360 JmsResponse<?> jmsResponse = (JmsResponse<?>) result; 361 Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session); 362 if (destination != null) { 363 return destination; 364 } 365 } 366 return getResponseDestination(request, response, session); 367 } 368 369 /** 370 * Determine a response destination for the given message. 371 * <p>The default implementation first checks the JMS Reply-To 372 * {@link Destination} of the supplied request; if that is not {@code null} 373 * it is returned; if it is {@code null}, then the configured 374 * {@link #resolveDefaultResponseDestination default response destination} 375 * is returned; if this too is {@code null}, then an 376 * {@link javax.jms.InvalidDestinationException} is thrown. 377 * @param request the original incoming JMS message 378 * @param response the outgoing JMS message about to be sent 379 * @param session the JMS Session to operate on 380 * @return the response destination (never {@code null}) 381 * @throws JMSException if thrown by JMS API methods 382 * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined 383 * @see #setDefaultResponseDestination 384 * @see javax.jms.Message#getJMSReplyTo() 385 */ 386 protected Destination getResponseDestination(Message request, Message response, Session session) 387 throws JMSException { 388 389 Destination replyTo = request.getJMSReplyTo(); 390 if (replyTo == null) { 391 replyTo = resolveDefaultResponseDestination(session); 392 if (replyTo == null) { 393 throw new InvalidDestinationException("Cannot determine response destination: " + 394 "Request message does not contain reply-to destination, and no default response destination set."); 395 } 396 } 397 return replyTo; 398 } 399 400 /** 401 * Resolve the default response destination into a JMS {@link Destination}, using this 402 * accessor's {@link DestinationResolver} in case of a destination name. 403 * @return the located {@link Destination} 404 * @throws javax.jms.JMSException if resolution failed 405 * @see #setDefaultResponseDestination 406 * @see #setDefaultResponseQueueName 407 * @see #setDefaultResponseTopicName 408 * @see #setDestinationResolver 409 */ 410 @Nullable 411 protected Destination resolveDefaultResponseDestination(Session session) throws JMSException { 412 if (this.defaultResponseDestination instanceof Destination) { 413 return (Destination) this.defaultResponseDestination; 414 } 415 if (this.defaultResponseDestination instanceof DestinationNameHolder) { 416 DestinationNameHolder nameHolder = (DestinationNameHolder) this.defaultResponseDestination; 417 return getDestinationResolver().resolveDestinationName(session, nameHolder.name, nameHolder.isTopic); 418 } 419 return null; 420 } 421 422 /** 423 * Send the given response message to the given destination. 424 * @param response the JMS message to send 425 * @param destination the JMS destination to send to 426 * @param session the JMS session to operate on 427 * @throws JMSException if thrown by JMS API methods 428 * @see #postProcessProducer 429 * @see javax.jms.Session#createProducer 430 * @see javax.jms.MessageProducer#send 431 */ 432 protected void sendResponse(Session session, Destination destination, Message response) throws JMSException { 433 MessageProducer producer = session.createProducer(destination); 434 try { 435 postProcessProducer(producer, response); 436 QosSettings settings = getResponseQosSettings(); 437 if (settings != null) { 438 producer.send(response, settings.getDeliveryMode(), settings.getPriority(), 439 settings.getTimeToLive()); 440 } 441 else { 442 producer.send(response); 443 } 444 } 445 finally { 446 JmsUtils.closeMessageProducer(producer); 447 } 448 } 449 450 /** 451 * Post-process the given message producer before using it to send the response. 452 * <p>The default implementation is empty. 453 * @param producer the JMS message producer that will be used to send the message 454 * @param response the outgoing JMS message about to be sent 455 * @throws JMSException if thrown by JMS API methods 456 */ 457 protected void postProcessProducer(MessageProducer producer, Message response) throws JMSException { 458 } 459 460 461 /** 462 * A {@link MessagingMessageConverter} that lazily invoke payload extraction and 463 * delegate it to {@link #extractMessage(javax.jms.Message)} in order to enforce 464 * backward compatibility. 465 */ 466 private class MessagingMessageConverterAdapter extends MessagingMessageConverter { 467 468 @SuppressWarnings("unchecked") 469 @Override 470 public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException { 471 return new LazyResolutionMessage(message); 472 } 473 474 @Override 475 protected Object extractPayload(Message message) throws JMSException { 476 Object payload = extractMessage(message); 477 if (message instanceof BytesMessage) { 478 try { 479 // In case the BytesMessage is going to be received as a user argument: 480 // reset it, otherwise it would appear empty to such processing code... 481 ((BytesMessage) message).reset(); 482 } 483 catch (JMSException ex) { 484 // Continue since the BytesMessage typically won't be used any further. 485 logger.debug("Failed to reset BytesMessage after payload extraction", ex); 486 } 487 } 488 return payload; 489 } 490 491 @Override 492 protected Message createMessageForPayload(Object payload, Session session, @Nullable Object conversionHint) 493 throws JMSException { 494 495 MessageConverter converter = getMessageConverter(); 496 if (converter == null) { 497 throw new IllegalStateException("No message converter, cannot handle '" + payload + "'"); 498 } 499 if (converter instanceof SmartMessageConverter) { 500 return ((SmartMessageConverter) converter).toMessage(payload, session, conversionHint); 501 502 } 503 return converter.toMessage(payload, session); 504 } 505 506 507 protected class LazyResolutionMessage implements org.springframework.messaging.Message<Object> { 508 509 private final javax.jms.Message message; 510 511 @Nullable 512 private Object payload; 513 514 @Nullable 515 private MessageHeaders headers; 516 517 public LazyResolutionMessage(javax.jms.Message message) { 518 this.message = message; 519 } 520 521 @Override 522 public Object getPayload() { 523 if (this.payload == null) { 524 try { 525 this.payload = unwrapPayload(); 526 } 527 catch (JMSException ex) { 528 throw new MessageConversionException( 529 "Failed to extract payload from [" + this.message + "]", ex); 530 } 531 } 532 return this.payload; 533 } 534 535 /** 536 * Extract the payload of the current message. Since we deferred the resolution 537 * of the payload, a custom converter may still return a full message for it. In 538 * this case, its payload is returned. 539 * @return the payload of the message 540 */ 541 @SuppressWarnings("rawtypes") 542 private Object unwrapPayload() throws JMSException { 543 Object payload = extractPayload(this.message); 544 if (payload instanceof org.springframework.messaging.Message) { 545 return ((org.springframework.messaging.Message) payload).getPayload(); 546 } 547 return payload; 548 } 549 550 @Override 551 public MessageHeaders getHeaders() { 552 if (this.headers == null) { 553 this.headers = extractHeaders(this.message); 554 } 555 return this.headers; 556 } 557 } 558 } 559 560 561 /** 562 * Internal class combining a destination name 563 * and its target destination type (queue or topic). 564 */ 565 private static class DestinationNameHolder { 566 567 public final String name; 568 569 public final boolean isTopic; 570 571 public DestinationNameHolder(String name, boolean isTopic) { 572 this.name = name; 573 this.isTopic = isTopic; 574 } 575 } 576 577}