001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener.adapter;
018
019import javax.jms.BytesMessage;
020import javax.jms.Destination;
021import javax.jms.InvalidDestinationException;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageListener;
025import javax.jms.MessageProducer;
026import javax.jms.Session;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030
031import org.springframework.jms.listener.SessionAwareMessageListener;
032import org.springframework.jms.support.JmsHeaderMapper;
033import org.springframework.jms.support.JmsUtils;
034import org.springframework.jms.support.QosSettings;
035import org.springframework.jms.support.SimpleJmsHeaderMapper;
036import org.springframework.jms.support.converter.MessageConversionException;
037import org.springframework.jms.support.converter.MessageConverter;
038import org.springframework.jms.support.converter.MessagingMessageConverter;
039import org.springframework.jms.support.converter.SimpleMessageConverter;
040import org.springframework.jms.support.converter.SmartMessageConverter;
041import org.springframework.jms.support.destination.DestinationResolver;
042import org.springframework.jms.support.destination.DynamicDestinationResolver;
043import org.springframework.lang.Nullable;
044import org.springframework.messaging.MessageHeaders;
045import org.springframework.util.Assert;
046
047/**
048 * An abstract JMS {@link MessageListener} adapter providing the necessary
049 * infrastructure to extract the payload of a JMS {@link Message}.
050 *
051 * @author Juergen Hoeller
052 * @author Stephane Nicoll
053 * @since 4.1
054 * @see MessageListener
055 * @see SessionAwareMessageListener
056 */
057public abstract class AbstractAdaptableMessageListener
058                implements MessageListener, SessionAwareMessageListener<Message> {
059
060        /** Logger available to subclasses. */
061        protected final Log logger = LogFactory.getLog(getClass());
062
063        @Nullable
064        private Object defaultResponseDestination;
065
066        private DestinationResolver destinationResolver = new DynamicDestinationResolver();
067
068        @Nullable
069        private MessageConverter messageConverter = new SimpleMessageConverter();
070
071        private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter();
072
073        @Nullable
074        private QosSettings responseQosSettings;
075
076
077        /**
078         * Set the default destination to send response messages to. This will be applied
079         * in case of a request message that does not carry a "JMSReplyTo" field.
080         * <p>Response destinations are only relevant for listener methods that return
081         * result objects, which will be wrapped in a response message and sent to a
082         * response destination.
083         * <p>Alternatively, specify a "defaultResponseQueueName" or "defaultResponseTopicName",
084         * to be dynamically resolved via the DestinationResolver.
085         * @see #setDefaultResponseQueueName(String)
086         * @see #setDefaultResponseTopicName(String)
087         * @see #getResponseDestination
088         */
089        public void setDefaultResponseDestination(Destination destination) {
090                this.defaultResponseDestination = destination;
091        }
092
093        /**
094         * Set the name of the default response queue to send response messages to.
095         * This will be applied in case of a request message that does not carry a
096         * "JMSReplyTo" field.
097         * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination".
098         * @see #setDestinationResolver
099         * @see #setDefaultResponseDestination(javax.jms.Destination)
100         */
101        public void setDefaultResponseQueueName(String destinationName) {
102                this.defaultResponseDestination = new DestinationNameHolder(destinationName, false);
103        }
104
105        /**
106         * Set the name of the default response topic to send response messages to.
107         * This will be applied in case of a request message that does not carry a
108         * "JMSReplyTo" field.
109         * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination".
110         * @see #setDestinationResolver
111         * @see #setDefaultResponseDestination(javax.jms.Destination)
112         */
113        public void setDefaultResponseTopicName(String destinationName) {
114                this.defaultResponseDestination = new DestinationNameHolder(destinationName, true);
115        }
116
117        /**
118         * Set the DestinationResolver that should be used to resolve response
119         * destination names for this adapter.
120         * <p>The default resolver is a DynamicDestinationResolver. Specify a
121         * JndiDestinationResolver for resolving destination names as JNDI locations.
122         * @see org.springframework.jms.support.destination.DynamicDestinationResolver
123         * @see org.springframework.jms.support.destination.JndiDestinationResolver
124         */
125        public void setDestinationResolver(DestinationResolver destinationResolver) {
126                Assert.notNull(destinationResolver, "DestinationResolver must not be null");
127                this.destinationResolver = destinationResolver;
128        }
129
130        /**
131         * Return the DestinationResolver for this adapter.
132         */
133        protected DestinationResolver getDestinationResolver() {
134                return this.destinationResolver;
135        }
136
137        /**
138         * Set the converter that will convert incoming JMS messages to
139         * listener method arguments, and objects returned from listener
140         * methods back to JMS messages.
141         * <p>The default converter is a {@link SimpleMessageConverter}, which is able
142         * to handle {@link javax.jms.BytesMessage BytesMessages},
143         * {@link javax.jms.TextMessage TextMessages} and
144         * {@link javax.jms.ObjectMessage ObjectMessages}.
145         */
146        public void setMessageConverter(@Nullable MessageConverter messageConverter) {
147                this.messageConverter = messageConverter;
148        }
149
150        /**
151         * Return the converter that will convert incoming JMS messages to
152         * listener method arguments, and objects returned from listener
153         * methods back to JMS messages.
154         */
155        @Nullable
156        protected MessageConverter getMessageConverter() {
157                return this.messageConverter;
158        }
159
160        /**
161         * Set the {@link JmsHeaderMapper} implementation to use to map the standard
162         * JMS headers. By default, a {@link SimpleJmsHeaderMapper} is used.
163         * @see SimpleJmsHeaderMapper
164         */
165        public void setHeaderMapper(JmsHeaderMapper headerMapper) {
166                Assert.notNull(headerMapper, "HeaderMapper must not be null");
167                this.messagingMessageConverter.setHeaderMapper(headerMapper);
168        }
169
170        /**
171         * Return the {@link MessagingMessageConverter} for this listener,
172         * being able to convert {@link org.springframework.messaging.Message}.
173         */
174        protected final MessagingMessageConverter getMessagingMessageConverter() {
175                return this.messagingMessageConverter;
176        }
177
178        /**
179         * Set the {@link QosSettings} to use when sending a response. Can be set to
180         * {@code null} to indicate that the broker's defaults should be used.
181         * @param responseQosSettings the QoS settings to use when sending a response or
182         * {@code null} to use the default values.
183         * @since 5.0
184         */
185        public void setResponseQosSettings(@Nullable QosSettings responseQosSettings) {
186                this.responseQosSettings = responseQosSettings;
187        }
188
189        /**
190         * Return the {@link QosSettings} to use when sending a response,
191         * or {@code null} if the defaults should be used.
192         * @since 5.0
193         */
194        @Nullable
195        protected QosSettings getResponseQosSettings() {
196                return this.responseQosSettings;
197        }
198
199
200        /**
201         * Standard JMS {@link MessageListener} entry point.
202         * <p>Delegates the message to the target listener method, with appropriate
203         * conversion of the message argument. In case of an exception, the
204         * {@link #handleListenerException(Throwable)} method will be invoked.
205         * <p><b>Note:</b> Does not support sending response messages based on
206         * result objects returned from listener methods. Use the
207         * {@link SessionAwareMessageListener} entry point (typically through a Spring
208         * message listener container) for handling result objects as well.
209         * @param message the incoming JMS message
210         * @see #handleListenerException
211         * @see #onMessage(javax.jms.Message, javax.jms.Session)
212         */
213        @Override
214        public void onMessage(Message message) {
215                try {
216                        onMessage(message, null);
217                }
218                catch (Throwable ex) {
219                        handleListenerException(ex);
220                }
221        }
222
223        @Override
224        public abstract void onMessage(Message message, @Nullable Session session) throws JMSException;
225
226        /**
227         * Handle the given exception that arose during listener execution.
228         * The default implementation logs the exception at error level.
229         * <p>This method only applies when used as standard JMS {@link MessageListener}.
230         * In case of the Spring {@link SessionAwareMessageListener} mechanism,
231         * exceptions get handled by the caller instead.
232         * @param ex the exception to handle
233         * @see #onMessage(javax.jms.Message)
234         */
235        protected void handleListenerException(Throwable ex) {
236                logger.error("Listener execution failed", ex);
237        }
238
239
240        /**
241         * Extract the message body from the given JMS message.
242         * @param message the JMS {@code Message}
243         * @return the content of the message, to be passed into the listener method
244         * as an argument
245         * @throws MessageConversionException if the message could not be extracted
246         */
247        protected Object extractMessage(Message message)  {
248                try {
249                        MessageConverter converter = getMessageConverter();
250                        if (converter != null) {
251                                return converter.fromMessage(message);
252                        }
253                        return message;
254                }
255                catch (JMSException ex) {
256                        throw new MessageConversionException("Could not convert JMS message", ex);
257                }
258        }
259
260        /**
261         * Handle the given result object returned from the listener method,
262         * sending a response message back.
263         * @param result the result object to handle (never {@code null})
264         * @param request the original request message
265         * @param session the JMS Session to operate on (may be {@code null})
266         * @throws ReplyFailureException if the response message could not be sent
267         * @see #buildMessage
268         * @see #postProcessResponse
269         * @see #getResponseDestination
270         * @see #sendResponse
271         */
272        protected void handleResult(Object result, Message request, @Nullable Session session) {
273                if (session != null) {
274                        if (logger.isDebugEnabled()) {
275                                logger.debug("Listener method returned result [" + result +
276                                                "] - generating response message for it");
277                        }
278                        try {
279                                Message response = buildMessage(session, result);
280                                postProcessResponse(request, response);
281                                Destination destination = getResponseDestination(request, response, session, result);
282                                sendResponse(session, destination, response);
283                        }
284                        catch (Exception ex) {
285                                throw new ReplyFailureException("Failed to send reply with payload [" + result + "]", ex);
286                        }
287                }
288
289                else {
290                        // No JMS Session available
291                        if (logger.isWarnEnabled()) {
292                                logger.warn("Listener method returned result [" + result +
293                                                "]: not generating response message for it because of no JMS Session given");
294                        }
295                }
296        }
297
298        /**
299         * Build a JMS message to be sent as response based on the given result object.
300         * @param session the JMS Session to operate on
301         * @param result the content of the message, as returned from the listener method
302         * @return the JMS {@code Message} (never {@code null})
303         * @throws JMSException if thrown by JMS API methods
304         * @see #setMessageConverter
305         */
306        protected Message buildMessage(Session session, Object result) throws JMSException {
307                Object content = preProcessResponse(result instanceof JmsResponse
308                                ? ((JmsResponse<?>) result).getResponse() : result);
309
310                MessageConverter converter = getMessageConverter();
311                if (converter != null) {
312                        if (content instanceof org.springframework.messaging.Message) {
313                                return this.messagingMessageConverter.toMessage(content, session);
314                        }
315                        else {
316                                return converter.toMessage(content, session);
317                        }
318                }
319
320                if (!(content instanceof Message)) {
321                        throw new MessageConversionException(
322                                        "No MessageConverter specified - cannot handle message [" + content + "]");
323                }
324                return (Message) content;
325        }
326
327        /**
328         * Pre-process the given result before it is converted to a {@link Message}.
329         * @param result the result of the invocation
330         * @return the payload response to handle, either the {@code result} argument
331         * or any other object (for instance wrapping the result).
332         * @since 4.3
333         */
334        protected Object preProcessResponse(Object result) {
335                return result;
336        }
337
338        /**
339         * Post-process the given response message before it will be sent.
340         * <p>The default implementation sets the response's correlation id
341         * to the request message's correlation id, if any; otherwise to the
342         * request message id.
343         * @param request the original incoming JMS message
344         * @param response the outgoing JMS message about to be sent
345         * @throws JMSException if thrown by JMS API methods
346         * @see javax.jms.Message#setJMSCorrelationID
347         */
348        protected void postProcessResponse(Message request, Message response) throws JMSException {
349                String correlation = request.getJMSCorrelationID();
350                if (correlation == null) {
351                        correlation = request.getJMSMessageID();
352                }
353                response.setJMSCorrelationID(correlation);
354        }
355
356        private Destination getResponseDestination(Message request, Message response, Session session, Object result)
357                        throws JMSException {
358
359                if (result instanceof JmsResponse) {
360                        JmsResponse<?> jmsResponse = (JmsResponse<?>) result;
361                        Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session);
362                        if (destination != null) {
363                                return destination;
364                        }
365                }
366                return getResponseDestination(request, response, session);
367        }
368
369        /**
370         * Determine a response destination for the given message.
371         * <p>The default implementation first checks the JMS Reply-To
372         * {@link Destination} of the supplied request; if that is not {@code null}
373         * it is returned; if it is {@code null}, then the configured
374         * {@link #resolveDefaultResponseDestination default response destination}
375         * is returned; if this too is {@code null}, then an
376         * {@link javax.jms.InvalidDestinationException} is thrown.
377         * @param request the original incoming JMS message
378         * @param response the outgoing JMS message about to be sent
379         * @param session the JMS Session to operate on
380         * @return the response destination (never {@code null})
381         * @throws JMSException if thrown by JMS API methods
382         * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined
383         * @see #setDefaultResponseDestination
384         * @see javax.jms.Message#getJMSReplyTo()
385         */
386        protected Destination getResponseDestination(Message request, Message response, Session session)
387                        throws JMSException {
388
389                Destination replyTo = request.getJMSReplyTo();
390                if (replyTo == null) {
391                        replyTo = resolveDefaultResponseDestination(session);
392                        if (replyTo == null) {
393                                throw new InvalidDestinationException("Cannot determine response destination: " +
394                                                "Request message does not contain reply-to destination, and no default response destination set.");
395                        }
396                }
397                return replyTo;
398        }
399
400        /**
401         * Resolve the default response destination into a JMS {@link Destination}, using this
402         * accessor's {@link DestinationResolver} in case of a destination name.
403         * @return the located {@link Destination}
404         * @throws javax.jms.JMSException if resolution failed
405         * @see #setDefaultResponseDestination
406         * @see #setDefaultResponseQueueName
407         * @see #setDefaultResponseTopicName
408         * @see #setDestinationResolver
409         */
410        @Nullable
411        protected Destination resolveDefaultResponseDestination(Session session) throws JMSException {
412                if (this.defaultResponseDestination instanceof Destination) {
413                        return (Destination) this.defaultResponseDestination;
414                }
415                if (this.defaultResponseDestination instanceof DestinationNameHolder) {
416                        DestinationNameHolder nameHolder = (DestinationNameHolder) this.defaultResponseDestination;
417                        return getDestinationResolver().resolveDestinationName(session, nameHolder.name, nameHolder.isTopic);
418                }
419                return null;
420        }
421
422        /**
423         * Send the given response message to the given destination.
424         * @param response the JMS message to send
425         * @param destination the JMS destination to send to
426         * @param session the JMS session to operate on
427         * @throws JMSException if thrown by JMS API methods
428         * @see #postProcessProducer
429         * @see javax.jms.Session#createProducer
430         * @see javax.jms.MessageProducer#send
431         */
432        protected void sendResponse(Session session, Destination destination, Message response) throws JMSException {
433                MessageProducer producer = session.createProducer(destination);
434                try {
435                        postProcessProducer(producer, response);
436                        QosSettings settings = getResponseQosSettings();
437                        if (settings != null) {
438                                producer.send(response, settings.getDeliveryMode(), settings.getPriority(),
439                                                settings.getTimeToLive());
440                        }
441                        else {
442                                producer.send(response);
443                        }
444                }
445                finally {
446                        JmsUtils.closeMessageProducer(producer);
447                }
448        }
449
450        /**
451         * Post-process the given message producer before using it to send the response.
452         * <p>The default implementation is empty.
453         * @param producer the JMS message producer that will be used to send the message
454         * @param response the outgoing JMS message about to be sent
455         * @throws JMSException if thrown by JMS API methods
456         */
457        protected void postProcessProducer(MessageProducer producer, Message response) throws JMSException {
458        }
459
460
461        /**
462         * A {@link MessagingMessageConverter} that lazily invoke payload extraction and
463         * delegate it to {@link #extractMessage(javax.jms.Message)} in order to enforce
464         * backward compatibility.
465         */
466        private class MessagingMessageConverterAdapter extends MessagingMessageConverter {
467
468                @SuppressWarnings("unchecked")
469                @Override
470                public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException {
471                        return new LazyResolutionMessage(message);
472                }
473
474                @Override
475                protected Object extractPayload(Message message) throws JMSException {
476                        Object payload = extractMessage(message);
477                        if (message instanceof BytesMessage) {
478                                try {
479                                        // In case the BytesMessage is going to be received as a user argument:
480                                        // reset it, otherwise it would appear empty to such processing code...
481                                        ((BytesMessage) message).reset();
482                                }
483                                catch (JMSException ex) {
484                                        // Continue since the BytesMessage typically won't be used any further.
485                                        logger.debug("Failed to reset BytesMessage after payload extraction", ex);
486                                }
487                        }
488                        return payload;
489                }
490
491                @Override
492                protected Message createMessageForPayload(Object payload, Session session, @Nullable Object conversionHint)
493                                throws JMSException {
494
495                        MessageConverter converter = getMessageConverter();
496                        if (converter == null) {
497                                throw new IllegalStateException("No message converter, cannot handle '" + payload + "'");
498                        }
499                        if (converter instanceof SmartMessageConverter) {
500                                return ((SmartMessageConverter) converter).toMessage(payload, session, conversionHint);
501
502                        }
503                        return converter.toMessage(payload, session);
504                }
505
506
507                protected class LazyResolutionMessage implements org.springframework.messaging.Message<Object> {
508
509                        private final javax.jms.Message message;
510
511                        @Nullable
512                        private Object payload;
513
514                        @Nullable
515                        private MessageHeaders headers;
516
517                        public LazyResolutionMessage(javax.jms.Message message) {
518                                this.message = message;
519                        }
520
521                        @Override
522                        public Object getPayload() {
523                                if (this.payload == null) {
524                                        try {
525                                                this.payload = unwrapPayload();
526                                        }
527                                        catch (JMSException ex) {
528                                                throw new MessageConversionException(
529                                                                "Failed to extract payload from [" + this.message + "]", ex);
530                                        }
531                                }
532                                return this.payload;
533                        }
534
535                        /**
536                         * Extract the payload of the current message. Since we deferred the resolution
537                         * of the payload, a custom converter may still return a full message for it. In
538                         * this case, its payload is returned.
539                         * @return the payload of the message
540                         */
541                        @SuppressWarnings("rawtypes")
542                        private Object unwrapPayload() throws JMSException {
543                                Object payload = extractPayload(this.message);
544                                if (payload instanceof org.springframework.messaging.Message) {
545                                        return ((org.springframework.messaging.Message) payload).getPayload();
546                                }
547                                return payload;
548                        }
549
550                        @Override
551                        public MessageHeaders getHeaders() {
552                                if (this.headers == null) {
553                                        this.headers = extractHeaders(this.message);
554                                }
555                                return this.headers;
556                        }
557                }
558        }
559
560
561        /**
562         * Internal class combining a destination name
563         * and its target destination type (queue or topic).
564         */
565        private static class DestinationNameHolder {
566
567                public final String name;
568
569                public final boolean isTopic;
570
571                public DestinationNameHolder(String name, boolean isTopic) {
572                        this.name = name;
573                        this.isTopic = isTopic;
574                }
575        }
576
577}