001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener.adapter;
018
019import javax.jms.BytesMessage;
020import javax.jms.Destination;
021import javax.jms.InvalidDestinationException;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageListener;
025import javax.jms.MessageProducer;
026import javax.jms.Session;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030
031import org.springframework.jms.listener.SessionAwareMessageListener;
032import org.springframework.jms.support.JmsHeaderMapper;
033import org.springframework.jms.support.JmsUtils;
034import org.springframework.jms.support.SimpleJmsHeaderMapper;
035import org.springframework.jms.support.converter.MessageConversionException;
036import org.springframework.jms.support.converter.MessageConverter;
037import org.springframework.jms.support.converter.MessagingMessageConverter;
038import org.springframework.jms.support.converter.SimpleMessageConverter;
039import org.springframework.jms.support.converter.SmartMessageConverter;
040import org.springframework.jms.support.destination.DestinationResolver;
041import org.springframework.jms.support.destination.DynamicDestinationResolver;
042import org.springframework.messaging.MessageHeaders;
043import org.springframework.util.Assert;
044
045/**
046 * An abstract JMS {@link MessageListener} adapter providing the necessary
047 * infrastructure to extract the payload of a JMS {@link Message}.
048 *
049 * @author Juergen Hoeller
050 * @author Stephane Nicoll
051 * @since 4.1
052 * @see MessageListener
053 * @see SessionAwareMessageListener
054 */
055public abstract class AbstractAdaptableMessageListener
056                implements MessageListener, SessionAwareMessageListener<Message> {
057
058        /** Logger available to subclasses */
059        protected final Log logger = LogFactory.getLog(getClass());
060
061        private Object defaultResponseDestination;
062
063        private DestinationResolver destinationResolver = new DynamicDestinationResolver();
064
065        private MessageConverter messageConverter = new SimpleMessageConverter();
066
067        private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter();
068
069
070        /**
071         * Set the default destination to send response messages to. This will be applied
072         * in case of a request message that does not carry a "JMSReplyTo" field.
073         * <p>Response destinations are only relevant for listener methods that return
074         * result objects, which will be wrapped in a response message and sent to a
075         * response destination.
076         * <p>Alternatively, specify a "defaultResponseQueueName" or "defaultResponseTopicName",
077         * to be dynamically resolved via the DestinationResolver.
078         * @see #setDefaultResponseQueueName(String)
079         * @see #setDefaultResponseTopicName(String)
080         * @see #getResponseDestination
081         */
082        public void setDefaultResponseDestination(Destination destination) {
083                this.defaultResponseDestination = destination;
084        }
085
086        /**
087         * Set the name of the default response queue to send response messages to.
088         * This will be applied in case of a request message that does not carry a
089         * "JMSReplyTo" field.
090         * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination".
091         * @see #setDestinationResolver
092         * @see #setDefaultResponseDestination(javax.jms.Destination)
093         */
094        public void setDefaultResponseQueueName(String destinationName) {
095                this.defaultResponseDestination = new DestinationNameHolder(destinationName, false);
096        }
097
098        /**
099         * Set the name of the default response topic to send response messages to.
100         * This will be applied in case of a request message that does not carry a
101         * "JMSReplyTo" field.
102         * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination".
103         * @see #setDestinationResolver
104         * @see #setDefaultResponseDestination(javax.jms.Destination)
105         */
106        public void setDefaultResponseTopicName(String destinationName) {
107                this.defaultResponseDestination = new DestinationNameHolder(destinationName, true);
108        }
109
110        /**
111         * Set the DestinationResolver that should be used to resolve response
112         * destination names for this adapter.
113         * <p>The default resolver is a DynamicDestinationResolver. Specify a
114         * JndiDestinationResolver for resolving destination names as JNDI locations.
115         * @see org.springframework.jms.support.destination.DynamicDestinationResolver
116         * @see org.springframework.jms.support.destination.JndiDestinationResolver
117         */
118        public void setDestinationResolver(DestinationResolver destinationResolver) {
119                Assert.notNull(destinationResolver, "DestinationResolver must not be null");
120                this.destinationResolver = destinationResolver;
121        }
122
123        /**
124         * Return the DestinationResolver for this adapter.
125         */
126        protected DestinationResolver getDestinationResolver() {
127                return this.destinationResolver;
128        }
129
130        /**
131         * Set the converter that will convert incoming JMS messages to
132         * listener method arguments, and objects returned from listener
133         * methods back to JMS messages.
134         * <p>The default converter is a {@link SimpleMessageConverter}, which is able
135         * to handle {@link javax.jms.BytesMessage BytesMessages},
136         * {@link javax.jms.TextMessage TextMessages} and
137         * {@link javax.jms.ObjectMessage ObjectMessages}.
138         */
139        public void setMessageConverter(MessageConverter messageConverter) {
140                this.messageConverter = messageConverter;
141        }
142
143        /**
144         * Return the converter that will convert incoming JMS messages to
145         * listener method arguments, and objects returned from listener
146         * methods back to JMS messages.
147         */
148        protected MessageConverter getMessageConverter() {
149                return this.messageConverter;
150        }
151
152        /**
153         * Set the {@link JmsHeaderMapper} implementation to use to map the standard
154         * JMS headers. By default, a {@link SimpleJmsHeaderMapper} is used.
155         * @see SimpleJmsHeaderMapper
156         */
157        public void setHeaderMapper(JmsHeaderMapper headerMapper) {
158                Assert.notNull(headerMapper, "HeaderMapper must not be null");
159                this.messagingMessageConverter.setHeaderMapper(headerMapper);
160        }
161
162        /**
163         * Return the {@link MessagingMessageConverter} for this listener,
164         * being able to convert {@link org.springframework.messaging.Message}.
165         */
166        protected final MessagingMessageConverter getMessagingMessageConverter() {
167                return this.messagingMessageConverter;
168        }
169
170
171        /**
172         * Standard JMS {@link MessageListener} entry point.
173         * <p>Delegates the message to the target listener method, with appropriate
174         * conversion of the message argument. In case of an exception, the
175         * {@link #handleListenerException(Throwable)} method will be invoked.
176         * <p><b>Note:</b> Does not support sending response messages based on
177         * result objects returned from listener methods. Use the
178         * {@link SessionAwareMessageListener} entry point (typically through a Spring
179         * message listener container) for handling result objects as well.
180         * @param message the incoming JMS message
181         * @see #handleListenerException
182         * @see #onMessage(javax.jms.Message, javax.jms.Session)
183         */
184        @Override
185        public void onMessage(Message message) {
186                try {
187                        onMessage(message, null);
188                }
189                catch (Throwable ex) {
190                        handleListenerException(ex);
191                }
192        }
193
194        @Override
195        public abstract void onMessage(Message message, Session session) throws JMSException;
196
197        /**
198         * Handle the given exception that arose during listener execution.
199         * The default implementation logs the exception at error level.
200         * <p>This method only applies when used as standard JMS {@link MessageListener}.
201         * In case of the Spring {@link SessionAwareMessageListener} mechanism,
202         * exceptions get handled by the caller instead.
203         * @param ex the exception to handle
204         * @see #onMessage(javax.jms.Message)
205         */
206        protected void handleListenerException(Throwable ex) {
207                logger.error("Listener execution failed", ex);
208        }
209
210
211        /**
212         * Extract the message body from the given JMS message.
213         * @param message the JMS {@code Message}
214         * @return the content of the message, to be passed into the listener method
215         * as an argument
216         * @throws MessageConversionException if the message could not be extracted
217         */
218        protected Object extractMessage(Message message)  {
219                try {
220                        MessageConverter converter = getMessageConverter();
221                        if (converter != null) {
222                                return converter.fromMessage(message);
223                        }
224                        return message;
225                }
226                catch (JMSException ex) {
227                        throw new MessageConversionException("Could not convert JMS message", ex);
228                }
229        }
230
231        /**
232         * Handle the given result object returned from the listener method,
233         * sending a response message back.
234         * @param result the result object to handle (never {@code null})
235         * @param request the original request message
236         * @param session the JMS Session to operate on (may be {@code null})
237         * @throws ReplyFailureException if the response message could not be sent
238         * @see #buildMessage
239         * @see #postProcessResponse
240         * @see #getResponseDestination
241         * @see #sendResponse
242         */
243        protected void handleResult(Object result, Message request, Session session) {
244                if (session != null) {
245                        if (logger.isDebugEnabled()) {
246                                logger.debug("Listener method returned result [" + result +
247                                                "] - generating response message for it");
248                        }
249                        try {
250                                Message response = buildMessage(session, result);
251                                postProcessResponse(request, response);
252                                Destination destination = getResponseDestination(request, response, session, result);
253                                sendResponse(session, destination, response);
254                        }
255                        catch (Exception ex) {
256                                throw new ReplyFailureException("Failed to send reply with payload [" + result + "]", ex);
257                        }
258                }
259
260                else {
261                        // No JMS Session available
262                        if (logger.isWarnEnabled()) {
263                                logger.warn("Listener method returned result [" + result +
264                                                "]: not generating response message for it because of no JMS Session given");
265                        }
266                }
267        }
268
269        /**
270         * Build a JMS message to be sent as response based on the given result object.
271         * @param session the JMS Session to operate on
272         * @param result the content of the message, as returned from the listener method
273         * @return the JMS {@code Message} (never {@code null})
274         * @throws JMSException if thrown by JMS API methods
275         * @see #setMessageConverter
276         */
277        protected Message buildMessage(Session session, Object result) throws JMSException {
278                Object content = preProcessResponse(result instanceof JmsResponse
279                                ? ((JmsResponse<?>) result).getResponse() : result);
280
281                MessageConverter converter = getMessageConverter();
282                if (converter != null) {
283                        if (content instanceof org.springframework.messaging.Message) {
284                                return this.messagingMessageConverter.toMessage(content, session);
285                        }
286                        else {
287                                return converter.toMessage(content, session);
288                        }
289                }
290
291                if (!(content instanceof Message)) {
292                        throw new MessageConversionException(
293                                        "No MessageConverter specified - cannot handle message [" + content + "]");
294                }
295                return (Message) content;
296        }
297
298        /**
299         * Pre-process the given result before it is converted to a {@link Message}.
300         * @param result the result of the invocation
301         * @return the payload response to handle, either the {@code result} argument
302         * or any other object (for instance wrapping the result).
303         * @since 4.3
304         */
305        protected Object preProcessResponse(Object result) {
306                return result;
307        }
308
309        /**
310         * Post-process the given response message before it will be sent.
311         * <p>The default implementation sets the response's correlation id
312         * to the request message's correlation id, if any; otherwise to the
313         * request message id.
314         * @param request the original incoming JMS message
315         * @param response the outgoing JMS message about to be sent
316         * @throws JMSException if thrown by JMS API methods
317         * @see javax.jms.Message#setJMSCorrelationID
318         */
319        protected void postProcessResponse(Message request, Message response) throws JMSException {
320                String correlation = request.getJMSCorrelationID();
321                if (correlation == null) {
322                        correlation = request.getJMSMessageID();
323                }
324                response.setJMSCorrelationID(correlation);
325        }
326
327        private Destination getResponseDestination(Message request, Message response, Session session, Object result)
328                        throws JMSException {
329
330                if (result instanceof JmsResponse) {
331                        JmsResponse<?> jmsResponse = (JmsResponse) result;
332                        Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session);
333                        if (destination != null) {
334                                return destination;
335                        }
336                }
337                return getResponseDestination(request, response, session);
338        }
339
340        /**
341         * Determine a response destination for the given message.
342         * <p>The default implementation first checks the JMS Reply-To
343         * {@link Destination} of the supplied request; if that is not {@code null}
344         * it is returned; if it is {@code null}, then the configured
345         * {@link #resolveDefaultResponseDestination default response destination}
346         * is returned; if this too is {@code null}, then an
347         * {@link javax.jms.InvalidDestinationException} is thrown.
348         * @param request the original incoming JMS message
349         * @param response the outgoing JMS message about to be sent
350         * @param session the JMS Session to operate on
351         * @return the response destination (never {@code null})
352         * @throws JMSException if thrown by JMS API methods
353         * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined
354         * @see #setDefaultResponseDestination
355         * @see javax.jms.Message#getJMSReplyTo()
356         */
357        protected Destination getResponseDestination(Message request, Message response, Session session)
358                        throws JMSException {
359
360                Destination replyTo = request.getJMSReplyTo();
361                if (replyTo == null) {
362                        replyTo = resolveDefaultResponseDestination(session);
363                        if (replyTo == null) {
364                                throw new InvalidDestinationException("Cannot determine response destination: " +
365                                                "Request message does not contain reply-to destination, and no default response destination set.");
366                        }
367                }
368                return replyTo;
369        }
370
371        /**
372         * Resolve the default response destination into a JMS {@link Destination}, using this
373         * accessor's {@link DestinationResolver} in case of a destination name.
374         * @return the located {@link Destination}
375         * @throws javax.jms.JMSException if resolution failed
376         * @see #setDefaultResponseDestination
377         * @see #setDefaultResponseQueueName
378         * @see #setDefaultResponseTopicName
379         * @see #setDestinationResolver
380         */
381        protected Destination resolveDefaultResponseDestination(Session session) throws JMSException {
382                if (this.defaultResponseDestination instanceof Destination) {
383                        return (Destination) this.defaultResponseDestination;
384                }
385                if (this.defaultResponseDestination instanceof DestinationNameHolder) {
386                        DestinationNameHolder nameHolder = (DestinationNameHolder) this.defaultResponseDestination;
387                        return getDestinationResolver().resolveDestinationName(session, nameHolder.name, nameHolder.isTopic);
388                }
389                return null;
390        }
391
392        /**
393         * Send the given response message to the given destination.
394         * @param response the JMS message to send
395         * @param destination the JMS destination to send to
396         * @param session the JMS session to operate on
397         * @throws JMSException if thrown by JMS API methods
398         * @see #postProcessProducer
399         * @see javax.jms.Session#createProducer
400         * @see javax.jms.MessageProducer#send
401         */
402        protected void sendResponse(Session session, Destination destination, Message response) throws JMSException {
403                MessageProducer producer = session.createProducer(destination);
404                try {
405                        postProcessProducer(producer, response);
406                        producer.send(response);
407                }
408                finally {
409                        JmsUtils.closeMessageProducer(producer);
410                }
411        }
412
413        /**
414         * Post-process the given message producer before using it to send the response.
415         * <p>The default implementation is empty.
416         * @param producer the JMS message producer that will be used to send the message
417         * @param response the outgoing JMS message about to be sent
418         * @throws JMSException if thrown by JMS API methods
419         */
420        protected void postProcessProducer(MessageProducer producer, Message response) throws JMSException {
421        }
422
423
424        /**
425         * A {@link MessagingMessageConverter} that lazily invoke payload extraction and
426         * delegate it to {@link #extractMessage(javax.jms.Message)} in order to enforce
427         * backward compatibility.
428         */
429        private class MessagingMessageConverterAdapter extends MessagingMessageConverter {
430
431                @SuppressWarnings("unchecked")
432                @Override
433                public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException {
434                        if (message == null) {
435                                return null;
436                        }
437                        return new LazyResolutionMessage(message);
438                }
439
440                @Override
441                protected Object extractPayload(Message message) throws JMSException {
442                        Object payload = extractMessage(message);
443                        if (message instanceof BytesMessage) {
444                                try {
445                                        // In case the BytesMessage is going to be received as a user argument:
446                                        // reset it, otherwise it would appear empty to such processing code...
447                                        ((BytesMessage) message).reset();
448                                }
449                                catch (JMSException ex) {
450                                        // Continue since the BytesMessage typically won't be used any further.
451                                        logger.debug("Failed to reset BytesMessage after payload extraction", ex);
452                                }
453                        }
454                        return payload;
455                }
456
457                @Override
458                protected Message createMessageForPayload(Object payload, Session session, Object conversionHint)
459                                throws JMSException {
460
461                        MessageConverter converter = getMessageConverter();
462                        if (converter == null) {
463                                throw new IllegalStateException("No message converter, cannot handle '" + payload + "'");
464                        }
465                        if (converter instanceof SmartMessageConverter) {
466                                return ((SmartMessageConverter) converter).toMessage(payload, session, conversionHint);
467
468                        }
469                        return converter.toMessage(payload, session);
470                }
471
472
473                protected class LazyResolutionMessage implements org.springframework.messaging.Message<Object> {
474
475                        private final javax.jms.Message message;
476
477                        private Object payload;
478
479                        private MessageHeaders headers;
480
481                        public LazyResolutionMessage(javax.jms.Message message) {
482                                this.message = message;
483                        }
484
485                        @Override
486                        public Object getPayload() {
487                                if (this.payload == null) {
488                                        try {
489                                                this.payload = unwrapPayload();
490                                        }
491                                        catch (JMSException ex) {
492                                                throw new MessageConversionException(
493                                                                "Failed to extract payload from [" + this.message + "]", ex);
494                                        }
495                                }
496                                return this.payload;
497                        }
498
499                        /**
500                         * Extract the payload of the current message. Since we deferred the resolution
501                         * of the payload, a custom converter may still return a full message for it. In
502                         * this case, its payload is returned.
503                         * @return the payload of the message
504                         */
505                        private Object unwrapPayload() throws JMSException {
506                                Object payload = extractPayload(this.message);
507                                if (payload instanceof org.springframework.messaging.Message) {
508                                        return ((org.springframework.messaging.Message) payload).getPayload();
509                                }
510                                return payload;
511                        }
512
513                        @Override
514                        public MessageHeaders getHeaders() {
515                                if (this.headers == null) {
516                                        this.headers = extractHeaders(this.message);
517                                }
518                                return this.headers;
519                        }
520                }
521        }
522
523
524        /**
525         * Internal class combining a destination name
526         * and its target destination type (queue or topic).
527         */
528        private static class DestinationNameHolder {
529
530                public final String name;
531
532                public final boolean isTopic;
533
534                public DestinationNameHolder(String name, boolean isTopic) {
535                        this.name = name;
536                        this.isTopic = isTopic;
537                }
538        }
539
540}