001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.core;
018
019import javax.jms.Connection;
020import javax.jms.ConnectionFactory;
021import javax.jms.DeliveryMode;
022import javax.jms.Destination;
023import javax.jms.JMSException;
024import javax.jms.Message;
025import javax.jms.MessageConsumer;
026import javax.jms.MessageProducer;
027import javax.jms.Queue;
028import javax.jms.QueueBrowser;
029import javax.jms.Session;
030import javax.jms.TemporaryQueue;
031
032import org.springframework.jms.JmsException;
033import org.springframework.jms.connection.ConnectionFactoryUtils;
034import org.springframework.jms.connection.JmsResourceHolder;
035import org.springframework.jms.support.JmsUtils;
036import org.springframework.jms.support.QosSettings;
037import org.springframework.jms.support.converter.MessageConverter;
038import org.springframework.jms.support.converter.SimpleMessageConverter;
039import org.springframework.jms.support.destination.JmsDestinationAccessor;
040import org.springframework.lang.Nullable;
041import org.springframework.transaction.support.TransactionSynchronizationManager;
042import org.springframework.util.Assert;
043
044/**
045 * Helper class that simplifies synchronous JMS access code.
046 *
047 * <p>If you want to use dynamic destination creation, you must specify
048 * the type of JMS destination to create, using the "pubSubDomain" property.
049 * For other operations, this is not necessary. Point-to-Point (Queues) is the default
050 * domain.
051 *
052 * <p>Default settings for JMS Sessions are "not transacted" and "auto-acknowledge".
053 * As defined by the Java EE specification, the transaction and acknowledgement
054 * parameters are ignored when a JMS Session is created inside an active
055 * transaction, no matter if a JTA transaction or a Spring-managed transaction.
056 * To configure them for native JMS usage, specify appropriate values for
057 * the "sessionTransacted" and "sessionAcknowledgeMode" bean properties.
058 *
059 * <p>This template uses a
060 * {@link org.springframework.jms.support.destination.DynamicDestinationResolver}
061 * and a {@link org.springframework.jms.support.converter.SimpleMessageConverter}
062 * as default strategies for resolving a destination name or converting a message,
063 * respectively. These defaults can be overridden through the "destinationResolver"
064 * and "messageConverter" bean properties.
065 *
066 * <p><b>NOTE: The {@code ConnectionFactory} used with this template should
067 * return pooled Connections (or a single shared Connection) as well as pooled
068 * Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations
069 * is going to suffer.</b> The simplest option is to use the Spring-provided
070 * {@link org.springframework.jms.connection.SingleConnectionFactory} as a
071 * decorator for your target {@code ConnectionFactory}, reusing a single
072 * JMS Connection in a thread-safe fashion; this is often good enough for the
073 * purpose of sending messages via this template. In a Java EE environment,
074 * make sure that the {@code ConnectionFactory} is obtained from the
075 * application's environment naming context via JNDI; application servers
076 * typically expose pooled, transaction-aware factories there.
077 *
078 * @author Mark Pollack
079 * @author Juergen Hoeller
080 * @author Stephane Nicoll
081 * @since 1.1
082 * @see #setConnectionFactory
083 * @see #setPubSubDomain
084 * @see #setDestinationResolver
085 * @see #setMessageConverter
086 * @see javax.jms.MessageProducer
087 * @see javax.jms.MessageConsumer
088 */
089public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
090
091        /** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils. */
092        private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();
093
094
095        @Nullable
096        private Object defaultDestination;
097
098        @Nullable
099        private MessageConverter messageConverter;
100
101
102        private boolean messageIdEnabled = true;
103
104        private boolean messageTimestampEnabled = true;
105
106        private boolean pubSubNoLocal = false;
107
108        private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
109
110        private long deliveryDelay = -1;
111
112
113        private boolean explicitQosEnabled = false;
114
115        private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
116
117        private int priority = Message.DEFAULT_PRIORITY;
118
119        private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
120
121
122        /**
123         * Create a new JmsTemplate for bean-style usage.
124         * <p>Note: The ConnectionFactory has to be set before using the instance.
125         * This constructor can be used to prepare a JmsTemplate via a BeanFactory,
126         * typically setting the ConnectionFactory via setConnectionFactory.
127         * @see #setConnectionFactory
128         */
129        public JmsTemplate() {
130                initDefaultStrategies();
131        }
132
133        /**
134         * Create a new JmsTemplate, given a ConnectionFactory.
135         * @param connectionFactory the ConnectionFactory to obtain Connections from
136         */
137        public JmsTemplate(ConnectionFactory connectionFactory) {
138                this();
139                setConnectionFactory(connectionFactory);
140                afterPropertiesSet();
141        }
142
143        /**
144         * Initialize the default implementations for the template's strategies:
145         * DynamicDestinationResolver and SimpleMessageConverter.
146         * @see #setDestinationResolver
147         * @see #setMessageConverter
148         * @see org.springframework.jms.support.destination.DynamicDestinationResolver
149         * @see org.springframework.jms.support.converter.SimpleMessageConverter
150         */
151        protected void initDefaultStrategies() {
152                setMessageConverter(new SimpleMessageConverter());
153        }
154
155
156        /**
157         * Set the destination to be used on send/receive operations that do not
158         * have a destination parameter.
159         * <p>Alternatively, specify a "defaultDestinationName", to be
160         * dynamically resolved via the DestinationResolver.
161         * @see #send(MessageCreator)
162         * @see #convertAndSend(Object)
163         * @see #convertAndSend(Object, MessagePostProcessor)
164         * @see #setDefaultDestinationName(String)
165         */
166        public void setDefaultDestination(@Nullable Destination destination) {
167                this.defaultDestination = destination;
168        }
169
170        /**
171         * Return the destination to be used on send/receive operations that do not
172         * have a destination parameter.
173         */
174        @Nullable
175        public Destination getDefaultDestination() {
176                return (this.defaultDestination instanceof Destination ? (Destination) this.defaultDestination : null);
177        }
178
179        @Nullable
180        private Queue getDefaultQueue() {
181                Destination defaultDestination = getDefaultDestination();
182                if (defaultDestination != null && !(defaultDestination instanceof Queue)) {
183                        throw new IllegalStateException(
184                                        "'defaultDestination' does not correspond to a Queue. Check configuration of JmsTemplate.");
185                }
186                return (Queue) defaultDestination;
187        }
188
189        /**
190         * Set the destination name to be used on send/receive operations that
191         * do not have a destination parameter. The specified name will be
192         * dynamically resolved via the DestinationResolver.
193         * <p>Alternatively, specify a JMS Destination object as "defaultDestination".
194         * @see #send(MessageCreator)
195         * @see #convertAndSend(Object)
196         * @see #convertAndSend(Object, MessagePostProcessor)
197         * @see #setDestinationResolver
198         * @see #setDefaultDestination(javax.jms.Destination)
199         */
200        public void setDefaultDestinationName(@Nullable String destinationName) {
201                this.defaultDestination = destinationName;
202        }
203
204        /**
205         * Return the destination name to be used on send/receive operations that
206         * do not have a destination parameter.
207         */
208        @Nullable
209        public String getDefaultDestinationName() {
210                return (this.defaultDestination instanceof String ? (String) this.defaultDestination : null);
211        }
212
213        private String getRequiredDefaultDestinationName() throws IllegalStateException {
214                String name = getDefaultDestinationName();
215                if (name == null) {
216                        throw new IllegalStateException(
217                                        "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate.");
218                }
219                return name;
220        }
221
222        /**
223         * Set the message converter for this template. Used to resolve
224         * Object parameters to convertAndSend methods and Object results
225         * from receiveAndConvert methods.
226         * <p>The default converter is a SimpleMessageConverter, which is able
227         * to handle BytesMessages, TextMessages and ObjectMessages.
228         * @see #convertAndSend
229         * @see #receiveAndConvert
230         * @see org.springframework.jms.support.converter.SimpleMessageConverter
231         */
232        public void setMessageConverter(@Nullable MessageConverter messageConverter) {
233                this.messageConverter = messageConverter;
234        }
235
236        /**
237         * Return the message converter for this template.
238         */
239        @Nullable
240        public MessageConverter getMessageConverter() {
241                return this.messageConverter;
242        }
243
244        private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
245                MessageConverter converter = getMessageConverter();
246                if (converter == null) {
247                        throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate.");
248                }
249                return converter;
250        }
251
252
253        /**
254         * Set whether message IDs are enabled. Default is "true".
255         * <p>This is only a hint to the JMS producer.
256         * See the JMS javadocs for details.
257         * @see javax.jms.MessageProducer#setDisableMessageID
258         */
259        public void setMessageIdEnabled(boolean messageIdEnabled) {
260                this.messageIdEnabled = messageIdEnabled;
261        }
262
263        /**
264         * Return whether message IDs are enabled.
265         */
266        public boolean isMessageIdEnabled() {
267                return this.messageIdEnabled;
268        }
269
270        /**
271         * Set whether message timestamps are enabled. Default is "true".
272         * <p>This is only a hint to the JMS producer.
273         * See the JMS javadocs for details.
274         * @see javax.jms.MessageProducer#setDisableMessageTimestamp
275         */
276        public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
277                this.messageTimestampEnabled = messageTimestampEnabled;
278        }
279
280        /**
281         * Return whether message timestamps are enabled.
282         */
283        public boolean isMessageTimestampEnabled() {
284                return this.messageTimestampEnabled;
285        }
286
287        /**
288         * Set whether to inhibit the delivery of messages published by its own connection.
289         * Default is "false".
290         * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean)
291         */
292        public void setPubSubNoLocal(boolean pubSubNoLocal) {
293                this.pubSubNoLocal = pubSubNoLocal;
294        }
295
296        /**
297         * Return whether to inhibit the delivery of messages published by its own connection.
298         */
299        public boolean isPubSubNoLocal() {
300                return this.pubSubNoLocal;
301        }
302
303        /**
304         * Set the timeout to use for receive calls (in milliseconds).
305         * <p>The default is {@link #RECEIVE_TIMEOUT_INDEFINITE_WAIT}, which indicates
306         * a blocking receive without timeout.
307         * <p>Specify {@link #RECEIVE_TIMEOUT_NO_WAIT} (or any other negative value)
308         * to indicate that a receive operation should check if a message is
309         * immediately available without blocking.
310         * @see #receiveFromConsumer(MessageConsumer, long)
311         * @see javax.jms.MessageConsumer#receive(long)
312         * @see javax.jms.MessageConsumer#receiveNoWait()
313         * @see javax.jms.MessageConsumer#receive()
314         */
315        public void setReceiveTimeout(long receiveTimeout) {
316                this.receiveTimeout = receiveTimeout;
317        }
318
319        /**
320         * Return the timeout to use for receive calls (in milliseconds).
321         */
322        public long getReceiveTimeout() {
323                return this.receiveTimeout;
324        }
325
326        /**
327         * Set the delivery delay to use for send calls (in milliseconds).
328         * <p>The default is -1 (no delivery delay passed on to the broker).
329         * Note that this feature requires JMS 2.0.
330         */
331        public void setDeliveryDelay(long deliveryDelay) {
332                this.deliveryDelay = deliveryDelay;
333        }
334
335        /**
336         * Return the delivery delay to use for send calls (in milliseconds).
337         */
338        public long getDeliveryDelay() {
339                return this.deliveryDelay;
340        }
341
342
343        /**
344         * Set if the QOS values (deliveryMode, priority, timeToLive)
345         * should be used for sending a message.
346         * @see #setDeliveryMode
347         * @see #setPriority
348         * @see #setTimeToLive
349         */
350        public void setExplicitQosEnabled(boolean explicitQosEnabled) {
351                this.explicitQosEnabled = explicitQosEnabled;
352        }
353
354        /**
355         * If "true", then the values of deliveryMode, priority, and timeToLive
356         * will be used when sending a message. Otherwise, the default values,
357         * that may be set administratively, will be used.
358         * @return true if overriding default values of QOS parameters
359         * (deliveryMode, priority, and timeToLive)
360         * @see #setDeliveryMode
361         * @see #setPriority
362         * @see #setTimeToLive
363         */
364        public boolean isExplicitQosEnabled() {
365                return this.explicitQosEnabled;
366        }
367
368        /**
369         * Set the {@link QosSettings} to use when sending a message.
370         * @param settings the deliveryMode, priority, and timeToLive settings to use
371         * @since 5.0
372         * @see #setExplicitQosEnabled(boolean)
373         * @see #setDeliveryMode(int)
374         * @see #setPriority(int)
375         * @see #setTimeToLive(long)
376         */
377        public void setQosSettings(QosSettings settings) {
378                Assert.notNull(settings, "Settings must not be null");
379                setExplicitQosEnabled(true);
380                setDeliveryMode(settings.getDeliveryMode());
381                setPriority(settings.getPriority());
382                setTimeToLive(settings.getTimeToLive());
383        }
384
385        /**
386         * Set whether message delivery should be persistent or non-persistent,
387         * specified as boolean value ("true" or "false"). This will set the delivery
388         * mode accordingly, to either "PERSISTENT" (2) or "NON_PERSISTENT" (1).
389         * <p>Default is "true" a.k.a. delivery mode "PERSISTENT".
390         * @see #setDeliveryMode(int)
391         * @see javax.jms.DeliveryMode#PERSISTENT
392         * @see javax.jms.DeliveryMode#NON_PERSISTENT
393         */
394        public void setDeliveryPersistent(boolean deliveryPersistent) {
395                this.deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
396        }
397
398        /**
399         * Set the delivery mode to use when sending a message.
400         * Default is the JMS Message default: "PERSISTENT".
401         * <p>Since a default value may be defined administratively,
402         * this is only used when "isExplicitQosEnabled" equals "true".
403         * @param deliveryMode the delivery mode to use
404         * @see #isExplicitQosEnabled
405         * @see javax.jms.DeliveryMode#PERSISTENT
406         * @see javax.jms.DeliveryMode#NON_PERSISTENT
407         * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
408         * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
409         */
410        public void setDeliveryMode(int deliveryMode) {
411                this.deliveryMode = deliveryMode;
412        }
413
414        /**
415         * Return the delivery mode to use when sending a message.
416         */
417        public int getDeliveryMode() {
418                return this.deliveryMode;
419        }
420
421        /**
422         * Set the priority of a message when sending.
423         * <p>Since a default value may be defined administratively,
424         * this is only used when "isExplicitQosEnabled" equals "true".
425         * @see #isExplicitQosEnabled
426         * @see javax.jms.Message#DEFAULT_PRIORITY
427         * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
428         */
429        public void setPriority(int priority) {
430                this.priority = priority;
431        }
432
433        /**
434         * Return the priority of a message when sending.
435         */
436        public int getPriority() {
437                return this.priority;
438        }
439
440        /**
441         * Set the time-to-live of the message when sending.
442         * <p>Since a default value may be defined administratively,
443         * this is only used when "isExplicitQosEnabled" equals "true".
444         * @param timeToLive the message's lifetime (in milliseconds)
445         * @see #isExplicitQosEnabled
446         * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
447         * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
448         */
449        public void setTimeToLive(long timeToLive) {
450                this.timeToLive = timeToLive;
451        }
452
453        /**
454         * Return the time-to-live of the message when sending.
455         */
456        public long getTimeToLive() {
457                return this.timeToLive;
458        }
459
460
461        //---------------------------------------------------------------------------------------
462        // JmsOperations execute methods
463        //---------------------------------------------------------------------------------------
464
465        @Override
466        @Nullable
467        public <T> T execute(SessionCallback<T> action) throws JmsException {
468                return execute(action, false);
469        }
470
471        /**
472         * Execute the action specified by the given action object within a
473         * JMS Session. Generalized version of {@code execute(SessionCallback)},
474         * allowing the JMS Connection to be started on the fly.
475         * <p>Use {@code execute(SessionCallback)} for the general case.
476         * Starting the JMS Connection is just necessary for receiving messages,
477         * which is preferably achieved through the {@code receive} methods.
478         * @param action callback object that exposes the Session
479         * @param startConnection whether to start the Connection
480         * @return the result object from working with the Session
481         * @throws JmsException if there is any problem
482         * @see #execute(SessionCallback)
483         * @see #receive
484         */
485        @Nullable
486        public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
487                Assert.notNull(action, "Callback object must not be null");
488                Connection conToClose = null;
489                Session sessionToClose = null;
490                try {
491                        Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
492                                        obtainConnectionFactory(), this.transactionalResourceFactory, startConnection);
493                        if (sessionToUse == null) {
494                                conToClose = createConnection();
495                                sessionToClose = createSession(conToClose);
496                                if (startConnection) {
497                                        conToClose.start();
498                                }
499                                sessionToUse = sessionToClose;
500                        }
501                        if (logger.isDebugEnabled()) {
502                                logger.debug("Executing callback on JMS Session: " + sessionToUse);
503                        }
504                        return action.doInJms(sessionToUse);
505                }
506                catch (JMSException ex) {
507                        throw convertJmsAccessException(ex);
508                }
509                finally {
510                        JmsUtils.closeSession(sessionToClose);
511                        ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
512                }
513        }
514
515        @Override
516        @Nullable
517        public <T> T execute(ProducerCallback<T> action) throws JmsException {
518                String defaultDestinationName = getDefaultDestinationName();
519                if (defaultDestinationName != null) {
520                        return execute(defaultDestinationName, action);
521                }
522                else {
523                        return execute(getDefaultDestination(), action);
524                }
525        }
526
527        @Override
528        @Nullable
529        public <T> T execute(final @Nullable Destination destination, final ProducerCallback<T> action) throws JmsException {
530                Assert.notNull(action, "Callback object must not be null");
531                return execute(session -> {
532                        MessageProducer producer = createProducer(session, destination);
533                        try {
534                                return action.doInJms(session, producer);
535                        }
536                        finally {
537                                JmsUtils.closeMessageProducer(producer);
538                        }
539                }, false);
540        }
541
542        @Override
543        @Nullable
544        public <T> T execute(final String destinationName, final ProducerCallback<T> action) throws JmsException {
545                Assert.notNull(action, "Callback object must not be null");
546                return execute(session -> {
547                        Destination destination = resolveDestinationName(session, destinationName);
548                        MessageProducer producer = createProducer(session, destination);
549                        try {
550                                return action.doInJms(session, producer);
551                        }
552                        finally {
553                                JmsUtils.closeMessageProducer(producer);
554                        }
555                }, false);
556        }
557
558
559        //---------------------------------------------------------------------------------------
560        // Convenience methods for sending messages
561        //---------------------------------------------------------------------------------------
562
563        @Override
564        public void send(MessageCreator messageCreator) throws JmsException {
565                Destination defaultDestination = getDefaultDestination();
566                if (defaultDestination != null) {
567                        send(defaultDestination, messageCreator);
568                }
569                else {
570                        send(getRequiredDefaultDestinationName(), messageCreator);
571                }
572        }
573
574        @Override
575        public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
576                execute(session -> {
577                        doSend(session, destination, messageCreator);
578                        return null;
579                }, false);
580        }
581
582        @Override
583        public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
584                execute(session -> {
585                        Destination destination = resolveDestinationName(session, destinationName);
586                        doSend(session, destination, messageCreator);
587                        return null;
588                }, false);
589        }
590
591        /**
592         * Send the given JMS message.
593         * @param session the JMS Session to operate on
594         * @param destination the JMS Destination to send to
595         * @param messageCreator callback to create a JMS Message
596         * @throws JMSException if thrown by JMS API methods
597         */
598        protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
599                        throws JMSException {
600
601                Assert.notNull(messageCreator, "MessageCreator must not be null");
602                MessageProducer producer = createProducer(session, destination);
603                try {
604                        Message message = messageCreator.createMessage(session);
605                        if (logger.isDebugEnabled()) {
606                                logger.debug("Sending created message: " + message);
607                        }
608                        doSend(producer, message);
609                        // Check commit - avoid commit call within a JTA transaction.
610                        if (session.getTransacted() && isSessionLocallyTransacted(session)) {
611                                // Transacted session created by this template -> commit.
612                                JmsUtils.commitIfNecessary(session);
613                        }
614                }
615                finally {
616                        JmsUtils.closeMessageProducer(producer);
617                }
618        }
619
620        /**
621         * Actually send the given JMS message.
622         * @param producer the JMS MessageProducer to send with
623         * @param message the JMS Message to send
624         * @throws JMSException if thrown by JMS API methods
625         */
626        protected void doSend(MessageProducer producer, Message message) throws JMSException {
627                if (this.deliveryDelay >= 0) {
628                        producer.setDeliveryDelay(this.deliveryDelay);
629                }
630                if (isExplicitQosEnabled()) {
631                        producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
632                }
633                else {
634                        producer.send(message);
635                }
636        }
637
638
639        //---------------------------------------------------------------------------------------
640        // Convenience methods for sending auto-converted messages
641        //---------------------------------------------------------------------------------------
642
643        @Override
644        public void convertAndSend(Object message) throws JmsException {
645                Destination defaultDestination = getDefaultDestination();
646                if (defaultDestination != null) {
647                        convertAndSend(defaultDestination, message);
648                }
649                else {
650                        convertAndSend(getRequiredDefaultDestinationName(), message);
651                }
652        }
653
654        @Override
655        public void convertAndSend(Destination destination, final Object message) throws JmsException {
656                send(destination, session -> getRequiredMessageConverter().toMessage(message, session));
657        }
658
659        @Override
660        public void convertAndSend(String destinationName, final Object message) throws JmsException {
661                send(destinationName, session -> getRequiredMessageConverter().toMessage(message, session));
662        }
663
664        @Override
665        public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException {
666                Destination defaultDestination = getDefaultDestination();
667                if (defaultDestination != null) {
668                        convertAndSend(defaultDestination, message, postProcessor);
669                }
670                else {
671                        convertAndSend(getRequiredDefaultDestinationName(), message, postProcessor);
672                }
673        }
674
675        @Override
676        public void convertAndSend(
677                        Destination destination, final Object message, final MessagePostProcessor postProcessor)
678                        throws JmsException {
679
680                send(destination, session -> {
681                        Message msg = getRequiredMessageConverter().toMessage(message, session);
682                        return postProcessor.postProcessMessage(msg);
683                });
684        }
685
686        @Override
687        public void convertAndSend(
688                        String destinationName, final Object message, final MessagePostProcessor postProcessor)
689                throws JmsException {
690
691                send(destinationName, session -> {
692                        Message msg = getRequiredMessageConverter().toMessage(message, session);
693                        return postProcessor.postProcessMessage(msg);
694                });
695        }
696
697
698        //---------------------------------------------------------------------------------------
699        // Convenience methods for receiving messages
700        //---------------------------------------------------------------------------------------
701
702        @Override
703        @Nullable
704        public Message receive() throws JmsException {
705                Destination defaultDestination = getDefaultDestination();
706                if (defaultDestination != null) {
707                        return receive(defaultDestination);
708                }
709                else {
710                        return receive(getRequiredDefaultDestinationName());
711                }
712        }
713
714        @Override
715        @Nullable
716        public Message receive(Destination destination) throws JmsException {
717                return receiveSelected(destination, null);
718        }
719
720        @Override
721        @Nullable
722        public Message receive(String destinationName) throws JmsException {
723                return receiveSelected(destinationName, null);
724        }
725
726        @Override
727        @Nullable
728        public Message receiveSelected(String messageSelector) throws JmsException {
729                Destination defaultDestination = getDefaultDestination();
730                if (defaultDestination != null) {
731                        return receiveSelected(defaultDestination, messageSelector);
732                }
733                else {
734                        return receiveSelected(getRequiredDefaultDestinationName(), messageSelector);
735                }
736        }
737
738        @Override
739        @Nullable
740        public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException {
741                return execute(session -> doReceive(session, destination, messageSelector), true);
742        }
743
744        @Override
745        @Nullable
746        public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException {
747                return execute(session -> {
748                        Destination destination = resolveDestinationName(session, destinationName);
749                        return doReceive(session, destination, messageSelector);
750                }, true);
751        }
752
753        /**
754         * Receive a JMS message.
755         * @param session the JMS Session to operate on
756         * @param destination the JMS Destination to receive from
757         * @param messageSelector the message selector for this consumer (can be {@code null})
758         * @return the JMS Message received, or {@code null} if none
759         * @throws JMSException if thrown by JMS API methods
760         */
761        @Nullable
762        protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
763                        throws JMSException {
764
765                return doReceive(session, createConsumer(session, destination, messageSelector));
766        }
767
768        /**
769         * Actually receive a JMS message.
770         * @param session the JMS Session to operate on
771         * @param consumer the JMS MessageConsumer to receive with
772         * @return the JMS Message received, or {@code null} if none
773         * @throws JMSException if thrown by JMS API methods
774         */
775        @Nullable
776        protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
777                try {
778                        // Use transaction timeout (if available).
779                        long timeout = getReceiveTimeout();
780                        ConnectionFactory connectionFactory = getConnectionFactory();
781                        JmsResourceHolder resourceHolder = null;
782                        if (connectionFactory != null) {
783                                resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
784                        }
785                        if (resourceHolder != null && resourceHolder.hasTimeout()) {
786                                timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
787                        }
788                        Message message = receiveFromConsumer(consumer, timeout);
789                        if (session.getTransacted()) {
790                                // Commit necessary - but avoid commit call within a JTA transaction.
791                                if (isSessionLocallyTransacted(session)) {
792                                        // Transacted session created by this template -> commit.
793                                        JmsUtils.commitIfNecessary(session);
794                                }
795                        }
796                        else if (isClientAcknowledge(session)) {
797                                // Manually acknowledge message, if any.
798                                if (message != null) {
799                                        message.acknowledge();
800                                }
801                        }
802                        return message;
803                }
804                finally {
805                        JmsUtils.closeMessageConsumer(consumer);
806                }
807        }
808
809
810        //---------------------------------------------------------------------------------------
811        // Convenience methods for receiving auto-converted messages
812        //---------------------------------------------------------------------------------------
813
814        @Override
815        @Nullable
816        public Object receiveAndConvert() throws JmsException {
817                return doConvertFromMessage(receive());
818        }
819
820        @Override
821        @Nullable
822        public Object receiveAndConvert(Destination destination) throws JmsException {
823                return doConvertFromMessage(receive(destination));
824        }
825
826        @Override
827        @Nullable
828        public Object receiveAndConvert(String destinationName) throws JmsException {
829                return doConvertFromMessage(receive(destinationName));
830        }
831
832        @Override
833        @Nullable
834        public Object receiveSelectedAndConvert(String messageSelector) throws JmsException {
835                return doConvertFromMessage(receiveSelected(messageSelector));
836        }
837
838        @Override
839        @Nullable
840        public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException {
841                return doConvertFromMessage(receiveSelected(destination, messageSelector));
842        }
843
844        @Override
845        @Nullable
846        public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException {
847                return doConvertFromMessage(receiveSelected(destinationName, messageSelector));
848        }
849
850        /**
851         * Extract the content from the given JMS message.
852         * @param message the JMS Message to convert (can be {@code null})
853         * @return the content of the message, or {@code null} if none
854         */
855        @Nullable
856        protected Object doConvertFromMessage(@Nullable Message message) {
857                if (message != null) {
858                        try {
859                                return getRequiredMessageConverter().fromMessage(message);
860                        }
861                        catch (JMSException ex) {
862                                throw convertJmsAccessException(ex);
863                        }
864                }
865                return null;
866        }
867
868
869        //---------------------------------------------------------------------------------------
870        // Convenience methods for sending messages to and receiving the reply from a destination
871        //---------------------------------------------------------------------------------------
872
873        @Override
874        @Nullable
875        public Message sendAndReceive(MessageCreator messageCreator) throws JmsException {
876                Destination defaultDestination = getDefaultDestination();
877                if (defaultDestination != null) {
878                        return sendAndReceive(defaultDestination, messageCreator);
879                }
880                else {
881                        return sendAndReceive(getRequiredDefaultDestinationName(), messageCreator);
882                }
883        }
884
885        @Override
886        @Nullable
887        public Message sendAndReceive(final Destination destination, final MessageCreator messageCreator) throws JmsException {
888                return executeLocal(session -> doSendAndReceive(session, destination, messageCreator), true);
889        }
890
891        @Override
892        @Nullable
893        public Message sendAndReceive(final String destinationName, final MessageCreator messageCreator) throws JmsException {
894                return executeLocal(session -> {
895                        Destination destination = resolveDestinationName(session, destinationName);
896                        return doSendAndReceive(session, destination, messageCreator);
897                }, true);
898        }
899
900        /**
901         * Send a request message to the given {@link Destination} and block until
902         * a reply has been received on a temporary queue created on-the-fly.
903         * <p>Return the response message or {@code null} if no message has
904         * @throws JMSException if thrown by JMS API methods
905         */
906        @Nullable
907        protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
908                        throws JMSException {
909
910                Assert.notNull(messageCreator, "MessageCreator must not be null");
911                TemporaryQueue responseQueue = null;
912                MessageProducer producer = null;
913                MessageConsumer consumer = null;
914                try {
915                        Message requestMessage = messageCreator.createMessage(session);
916                        responseQueue = session.createTemporaryQueue();
917                        producer = session.createProducer(destination);
918                        consumer = session.createConsumer(responseQueue);
919                        requestMessage.setJMSReplyTo(responseQueue);
920                        if (logger.isDebugEnabled()) {
921                                logger.debug("Sending created message: " + requestMessage);
922                        }
923                        doSend(producer, requestMessage);
924                        return receiveFromConsumer(consumer, getReceiveTimeout());
925                }
926                finally {
927                        JmsUtils.closeMessageConsumer(consumer);
928                        JmsUtils.closeMessageProducer(producer);
929                        if (responseQueue != null) {
930                                responseQueue.delete();
931                        }
932                }
933        }
934
935        /**
936         * A variant of {@link #execute(SessionCallback, boolean)} that explicitly
937         * creates a non-transactional {@link Session}. The given {@link SessionCallback}
938         * does not participate in an existing transaction.
939         */
940        @Nullable
941        private <T> T executeLocal(SessionCallback<T> action, boolean startConnection) throws JmsException {
942                Assert.notNull(action, "Callback object must not be null");
943                Connection con = null;
944                Session session = null;
945                try {
946                        con = createConnection();
947                        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
948                        if (startConnection) {
949                                con.start();
950                        }
951                        if (logger.isDebugEnabled()) {
952                                logger.debug("Executing callback on JMS Session: " + session);
953                        }
954                        return action.doInJms(session);
955                }
956                catch (JMSException ex) {
957                        throw convertJmsAccessException(ex);
958                }
959                finally {
960                        JmsUtils.closeSession(session);
961                        ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory(), startConnection);
962                }
963        }
964
965
966        //---------------------------------------------------------------------------------------
967        // Convenience methods for browsing messages
968        //---------------------------------------------------------------------------------------
969
970        @Override
971        @Nullable
972        public <T> T browse(BrowserCallback<T> action) throws JmsException {
973                Queue defaultQueue = getDefaultQueue();
974                if (defaultQueue != null) {
975                        return browse(defaultQueue, action);
976                }
977                else {
978                        return browse(getRequiredDefaultDestinationName(), action);
979                }
980        }
981
982        @Override
983        @Nullable
984        public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException {
985                return browseSelected(queue, null, action);
986        }
987
988        @Override
989        @Nullable
990        public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException {
991                return browseSelected(queueName, null, action);
992        }
993
994        @Override
995        @Nullable
996        public <T> T browseSelected(String messageSelector, BrowserCallback<T> action) throws JmsException {
997                Queue defaultQueue = getDefaultQueue();
998                if (defaultQueue != null) {
999                        return browseSelected(defaultQueue, messageSelector, action);
1000                }
1001                else {
1002                        return browseSelected(getRequiredDefaultDestinationName(), messageSelector, action);
1003                }
1004        }
1005
1006        @Override
1007        @Nullable
1008        public <T> T browseSelected(final Queue queue, @Nullable final String messageSelector, final BrowserCallback<T> action)
1009                        throws JmsException {
1010
1011                Assert.notNull(action, "Callback object must not be null");
1012                return execute(session -> {
1013                        QueueBrowser browser = createBrowser(session, queue, messageSelector);
1014                        try {
1015                                return action.doInJms(session, browser);
1016                        }
1017                        finally {
1018                                JmsUtils.closeQueueBrowser(browser);
1019                        }
1020                }, true);
1021        }
1022
1023        @Override
1024        @Nullable
1025        public <T> T browseSelected(final String queueName, @Nullable final String messageSelector, final BrowserCallback<T> action)
1026                        throws JmsException {
1027
1028                Assert.notNull(action, "Callback object must not be null");
1029                return execute(session -> {
1030                        Queue queue = (Queue) getDestinationResolver().resolveDestinationName(session, queueName, false);
1031                        QueueBrowser browser = createBrowser(session, queue, messageSelector);
1032                        try {
1033                                return action.doInJms(session, browser);
1034                        }
1035                        finally {
1036                                JmsUtils.closeQueueBrowser(browser);
1037                        }
1038                }, true);
1039        }
1040
1041
1042        /**
1043         * Fetch an appropriate Connection from the given JmsResourceHolder.
1044         * <p>This implementation accepts any JMS 1.1 Connection.
1045         * @param holder the JmsResourceHolder
1046         * @return an appropriate Connection fetched from the holder,
1047         * or {@code null} if none found
1048         */
1049        @Nullable
1050        protected Connection getConnection(JmsResourceHolder holder) {
1051                return holder.getConnection();
1052        }
1053
1054        /**
1055         * Fetch an appropriate Session from the given JmsResourceHolder.
1056         * <p>This implementation accepts any JMS 1.1 Session.
1057         * @param holder the JmsResourceHolder
1058         * @return an appropriate Session fetched from the holder,
1059         * or {@code null} if none found
1060         */
1061        @Nullable
1062        protected Session getSession(JmsResourceHolder holder) {
1063                return holder.getSession();
1064        }
1065
1066        /**
1067         * Check whether the given Session is locally transacted, that is, whether
1068         * its transaction is managed by this listener container's Session handling
1069         * and not by an external transaction coordinator.
1070         * <p>Note: The Session's own transacted flag will already have been checked
1071         * before. This method is about finding out whether the Session's transaction
1072         * is local or externally coordinated.
1073         * @param session the Session to check
1074         * @return whether the given Session is locally transacted
1075         * @see #isSessionTransacted()
1076         * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
1077         */
1078        protected boolean isSessionLocallyTransacted(Session session) {
1079                return isSessionTransacted() &&
1080                                !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory());
1081        }
1082
1083        /**
1084         * Create a JMS MessageProducer for the given Session and Destination,
1085         * configuring it to disable message ids and/or timestamps (if necessary).
1086         * <p>Delegates to {@link #doCreateProducer} for creation of the raw
1087         * JMS MessageProducer.
1088         * @param session the JMS Session to create a MessageProducer for
1089         * @param destination the JMS Destination to create a MessageProducer for
1090         * @return the new JMS MessageProducer
1091         * @throws JMSException if thrown by JMS API methods
1092         * @see #setMessageIdEnabled
1093         * @see #setMessageTimestampEnabled
1094         */
1095        protected MessageProducer createProducer(Session session, @Nullable Destination destination) throws JMSException {
1096                MessageProducer producer = doCreateProducer(session, destination);
1097                if (!isMessageIdEnabled()) {
1098                        producer.setDisableMessageID(true);
1099                }
1100                if (!isMessageTimestampEnabled()) {
1101                        producer.setDisableMessageTimestamp(true);
1102                }
1103                return producer;
1104        }
1105
1106        /**
1107         * Create a raw JMS MessageProducer for the given Session and Destination.
1108         * <p>This implementation uses JMS 1.1 API.
1109         * @param session the JMS Session to create a MessageProducer for
1110         * @param destination the JMS Destination to create a MessageProducer for
1111         * @return the new JMS MessageProducer
1112         * @throws JMSException if thrown by JMS API methods
1113         */
1114        protected MessageProducer doCreateProducer(Session session, @Nullable Destination destination) throws JMSException {
1115                return session.createProducer(destination);
1116        }
1117
1118        /**
1119         * Create a JMS MessageConsumer for the given Session and Destination.
1120         * <p>This implementation uses JMS 1.1 API.
1121         * @param session the JMS Session to create a MessageConsumer for
1122         * @param destination the JMS Destination to create a MessageConsumer for
1123         * @param messageSelector the message selector for this consumer (can be {@code null})
1124         * @return the new JMS MessageConsumer
1125         * @throws JMSException if thrown by JMS API methods
1126         */
1127        protected MessageConsumer createConsumer(Session session, Destination destination, @Nullable String messageSelector)
1128                        throws JMSException {
1129
1130                // Only pass in the NoLocal flag in case of a Topic:
1131                // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
1132                // in case of the NoLocal flag being specified for a Queue.
1133                if (isPubSubDomain()) {
1134                        return session.createConsumer(destination, messageSelector, isPubSubNoLocal());
1135                }
1136                else {
1137                        return session.createConsumer(destination, messageSelector);
1138                }
1139        }
1140
1141        /**
1142         * Create a JMS MessageProducer for the given Session and Destination,
1143         * configuring it to disable message ids and/or timestamps (if necessary).
1144         * <p>Delegates to {@link #doCreateProducer} for creation of the raw
1145         * JMS MessageProducer.
1146         * @param session the JMS Session to create a QueueBrowser for
1147         * @param queue the JMS Queue to create a QueueBrowser for
1148         * @param messageSelector the message selector for this consumer (can be {@code null})
1149         * @return the new JMS QueueBrowser
1150         * @throws JMSException if thrown by JMS API methods
1151         * @see #setMessageIdEnabled
1152         * @see #setMessageTimestampEnabled
1153         */
1154        protected QueueBrowser createBrowser(Session session, Queue queue, @Nullable String messageSelector)
1155                        throws JMSException {
1156
1157                return session.createBrowser(queue, messageSelector);
1158        }
1159
1160
1161        /**
1162         * ResourceFactory implementation that delegates to this template's protected callback methods.
1163         */
1164        private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
1165
1166                @Override
1167                @Nullable
1168                public Connection getConnection(JmsResourceHolder holder) {
1169                        return JmsTemplate.this.getConnection(holder);
1170                }
1171
1172                @Override
1173                @Nullable
1174                public Session getSession(JmsResourceHolder holder) {
1175                        return JmsTemplate.this.getSession(holder);
1176                }
1177
1178                @Override
1179                public Connection createConnection() throws JMSException {
1180                        return JmsTemplate.this.createConnection();
1181                }
1182
1183                @Override
1184                public Session createSession(Connection con) throws JMSException {
1185                        return JmsTemplate.this.createSession(con);
1186                }
1187
1188                @Override
1189                public boolean isSynchedLocalTransactionAllowed() {
1190                        return JmsTemplate.this.isSessionTransacted();
1191                }
1192        }
1193
1194}