001/*
002 * Copyright 2002-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.core;
018
019import java.lang.reflect.Method;
020import javax.jms.Connection;
021import javax.jms.ConnectionFactory;
022import javax.jms.DeliveryMode;
023import javax.jms.Destination;
024import javax.jms.JMSException;
025import javax.jms.Message;
026import javax.jms.MessageConsumer;
027import javax.jms.MessageProducer;
028import javax.jms.Queue;
029import javax.jms.QueueBrowser;
030import javax.jms.Session;
031import javax.jms.TemporaryQueue;
032
033import org.springframework.jms.JmsException;
034import org.springframework.jms.connection.ConnectionFactoryUtils;
035import org.springframework.jms.connection.JmsResourceHolder;
036import org.springframework.jms.support.JmsUtils;
037import org.springframework.jms.support.converter.MessageConverter;
038import org.springframework.jms.support.converter.SimpleMessageConverter;
039import org.springframework.jms.support.destination.JmsDestinationAccessor;
040import org.springframework.transaction.support.TransactionSynchronizationManager;
041import org.springframework.util.Assert;
042import org.springframework.util.ClassUtils;
043import org.springframework.util.ReflectionUtils;
044
045/**
046 * Helper class that simplifies synchronous JMS access code.
047 *
048 * <p>If you want to use dynamic destination creation, you must specify
049 * the type of JMS destination to create, using the "pubSubDomain" property.
050 * For other operations, this is not necessary. Point-to-Point (Queues) is the default
051 * domain.
052 *
053 * <p>Default settings for JMS Sessions are "not transacted" and "auto-acknowledge".
054 * As defined by the Java EE specification, the transaction and acknowledgement
055 * parameters are ignored when a JMS Session is created inside an active
056 * transaction, no matter if a JTA transaction or a Spring-managed transaction.
057 * To configure them for native JMS usage, specify appropriate values for
058 * the "sessionTransacted" and "sessionAcknowledgeMode" bean properties.
059 *
060 * <p>This template uses a
061 * {@link org.springframework.jms.support.destination.DynamicDestinationResolver}
062 * and a {@link org.springframework.jms.support.converter.SimpleMessageConverter}
063 * as default strategies for resolving a destination name or converting a message,
064 * respectively. These defaults can be overridden through the "destinationResolver"
065 * and "messageConverter" bean properties.
066 *
067 * <p><b>NOTE: The {@code ConnectionFactory} used with this template should
068 * return pooled Connections (or a single shared Connection) as well as pooled
069 * Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations
070 * is going to suffer.</b> The simplest option is to use the Spring-provided
071 * {@link org.springframework.jms.connection.SingleConnectionFactory} as a
072 * decorator for your target {@code ConnectionFactory}, reusing a single
073 * JMS Connection in a thread-safe fashion; this is often good enough for the
074 * purpose of sending messages via this template. In a Java EE environment,
075 * make sure that the {@code ConnectionFactory} is obtained from the
076 * application's environment naming context via JNDI; application servers
077 * typically expose pooled, transaction-aware factories there.
078 *
079 * @author Mark Pollack
080 * @author Juergen Hoeller
081 * @author Stephane Nicoll
082 * @since 1.1
083 * @see #setConnectionFactory
084 * @see #setPubSubDomain
085 * @see #setDestinationResolver
086 * @see #setMessageConverter
087 * @see javax.jms.MessageProducer
088 * @see javax.jms.MessageConsumer
089 */
090public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
091
092        /** The JMS 2.0 MessageProducer.setDeliveryDelay method, if available */
093        private static final Method setDeliveryDelayMethod =
094                        ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class);
095
096        /** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils */
097        private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();
098
099
100        private Object defaultDestination;
101
102        private MessageConverter messageConverter;
103
104
105        private boolean messageIdEnabled = true;
106
107        private boolean messageTimestampEnabled = true;
108
109        private boolean pubSubNoLocal = false;
110
111        private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
112
113        private long deliveryDelay = -1;
114
115
116        private boolean explicitQosEnabled = false;
117
118        private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
119
120        private int priority = Message.DEFAULT_PRIORITY;
121
122        private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
123
124
125        /**
126         * Create a new JmsTemplate for bean-style usage.
127         * <p>Note: The ConnectionFactory has to be set before using the instance.
128         * This constructor can be used to prepare a JmsTemplate via a BeanFactory,
129         * typically setting the ConnectionFactory via setConnectionFactory.
130         * @see #setConnectionFactory
131         */
132        public JmsTemplate() {
133                initDefaultStrategies();
134        }
135
136        /**
137         * Create a new JmsTemplate, given a ConnectionFactory.
138         * @param connectionFactory the ConnectionFactory to obtain Connections from
139         */
140        public JmsTemplate(ConnectionFactory connectionFactory) {
141                this();
142                setConnectionFactory(connectionFactory);
143                afterPropertiesSet();
144        }
145
146        /**
147         * Initialize the default implementations for the template's strategies:
148         * DynamicDestinationResolver and SimpleMessageConverter.
149         * @see #setDestinationResolver
150         * @see #setMessageConverter
151         * @see org.springframework.jms.support.destination.DynamicDestinationResolver
152         * @see org.springframework.jms.support.converter.SimpleMessageConverter
153         */
154        protected void initDefaultStrategies() {
155                setMessageConverter(new SimpleMessageConverter());
156        }
157
158
159        /**
160         * Set the destination to be used on send/receive operations that do not
161         * have a destination parameter.
162         * <p>Alternatively, specify a "defaultDestinationName", to be
163         * dynamically resolved via the DestinationResolver.
164         * @see #send(MessageCreator)
165         * @see #convertAndSend(Object)
166         * @see #convertAndSend(Object, MessagePostProcessor)
167         * @see #setDefaultDestinationName(String)
168         */
169        public void setDefaultDestination(Destination destination) {
170                this.defaultDestination = destination;
171        }
172
173        /**
174         * Return the destination to be used on send/receive operations that do not
175         * have a destination parameter.
176         */
177        public Destination getDefaultDestination() {
178                return (this.defaultDestination instanceof Destination ? (Destination) this.defaultDestination : null);
179        }
180
181        private Queue getDefaultQueue() {
182                Destination defaultDestination = getDefaultDestination();
183                if (defaultDestination != null && !(defaultDestination instanceof Queue)) {
184                        throw new IllegalStateException(
185                                        "'defaultDestination' does not correspond to a Queue. Check configuration of JmsTemplate.");
186                }
187                return (Queue) defaultDestination;
188        }
189
190        /**
191         * Set the destination name to be used on send/receive operations that
192         * do not have a destination parameter. The specified name will be
193         * dynamically resolved via the DestinationResolver.
194         * <p>Alternatively, specify a JMS Destination object as "defaultDestination".
195         * @see #send(MessageCreator)
196         * @see #convertAndSend(Object)
197         * @see #convertAndSend(Object, MessagePostProcessor)
198         * @see #setDestinationResolver
199         * @see #setDefaultDestination(javax.jms.Destination)
200         */
201        public void setDefaultDestinationName(String destinationName) {
202                this.defaultDestination = destinationName;
203        }
204
205        /**
206         * Return the destination name to be used on send/receive operations that
207         * do not have a destination parameter.
208         */
209        public String getDefaultDestinationName() {
210                return (this.defaultDestination instanceof String ? (String) this.defaultDestination : null);
211        }
212
213        private String getRequiredDefaultDestinationName() throws IllegalStateException {
214                String name = getDefaultDestinationName();
215                if (name == null) {
216                        throw new IllegalStateException(
217                                        "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate.");
218                }
219                return name;
220        }
221
222        /**
223         * Set the message converter for this template. Used to resolve
224         * Object parameters to convertAndSend methods and Object results
225         * from receiveAndConvert methods.
226         * <p>The default converter is a SimpleMessageConverter, which is able
227         * to handle BytesMessages, TextMessages and ObjectMessages.
228         * @see #convertAndSend
229         * @see #receiveAndConvert
230         * @see org.springframework.jms.support.converter.SimpleMessageConverter
231         */
232        public void setMessageConverter(MessageConverter messageConverter) {
233                this.messageConverter = messageConverter;
234        }
235
236        /**
237         * Return the message converter for this template.
238         */
239        public MessageConverter getMessageConverter() {
240                return this.messageConverter;
241        }
242
243        private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
244                MessageConverter converter = getMessageConverter();
245                if (converter == null) {
246                        throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate.");
247                }
248                return converter;
249        }
250
251
252        /**
253         * Set whether message IDs are enabled. Default is "true".
254         * <p>This is only a hint to the JMS producer.
255         * See the JMS javadocs for details.
256         * @see javax.jms.MessageProducer#setDisableMessageID
257         */
258        public void setMessageIdEnabled(boolean messageIdEnabled) {
259                this.messageIdEnabled = messageIdEnabled;
260        }
261
262        /**
263         * Return whether message IDs are enabled.
264         */
265        public boolean isMessageIdEnabled() {
266                return this.messageIdEnabled;
267        }
268
269        /**
270         * Set whether message timestamps are enabled. Default is "true".
271         * <p>This is only a hint to the JMS producer.
272         * See the JMS javadocs for details.
273         * @see javax.jms.MessageProducer#setDisableMessageTimestamp
274         */
275        public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
276                this.messageTimestampEnabled = messageTimestampEnabled;
277        }
278
279        /**
280         * Return whether message timestamps are enabled.
281         */
282        public boolean isMessageTimestampEnabled() {
283                return this.messageTimestampEnabled;
284        }
285
286        /**
287         * Set whether to inhibit the delivery of messages published by its own connection.
288         * Default is "false".
289         * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean)
290         */
291        public void setPubSubNoLocal(boolean pubSubNoLocal) {
292                this.pubSubNoLocal = pubSubNoLocal;
293        }
294
295        /**
296         * Return whether to inhibit the delivery of messages published by its own connection.
297         */
298        public boolean isPubSubNoLocal() {
299                return this.pubSubNoLocal;
300        }
301
302        /**
303         * Set the timeout to use for receive calls (in milliseconds).
304         * <p>The default is {@link #RECEIVE_TIMEOUT_INDEFINITE_WAIT}, which indicates
305         * a blocking receive without timeout.
306         * <p>Specify {@link #RECEIVE_TIMEOUT_NO_WAIT} (or any other negative value)
307         * to indicate that a receive operation should check if a message is
308         * immediately available without blocking.
309         * @see #receiveFromConsumer(MessageConsumer, long)
310         * @see javax.jms.MessageConsumer#receive(long)
311         * @see javax.jms.MessageConsumer#receiveNoWait()
312         * @see javax.jms.MessageConsumer#receive()
313         */
314        public void setReceiveTimeout(long receiveTimeout) {
315                this.receiveTimeout = receiveTimeout;
316        }
317
318        /**
319         * Return the timeout to use for receive calls (in milliseconds).
320         */
321        public long getReceiveTimeout() {
322                return this.receiveTimeout;
323        }
324
325        /**
326         * Set the delivery delay to use for send calls (in milliseconds).
327         * <p>The default is -1 (no delivery delay passed on to the broker).
328         * Note that this feature requires JMS 2.0.
329         */
330        public void setDeliveryDelay(long deliveryDelay) {
331                this.deliveryDelay = deliveryDelay;
332        }
333
334        /**
335         * Return the delivery delay to use for send calls (in milliseconds).
336         */
337        public long getDeliveryDelay() {
338                return this.deliveryDelay;
339        }
340
341
342        /**
343         * Set if the QOS values (deliveryMode, priority, timeToLive)
344         * should be used for sending a message.
345         * @see #setDeliveryMode
346         * @see #setPriority
347         * @see #setTimeToLive
348         */
349        public void setExplicitQosEnabled(boolean explicitQosEnabled) {
350                this.explicitQosEnabled = explicitQosEnabled;
351        }
352
353        /**
354         * If "true", then the values of deliveryMode, priority, and timeToLive
355         * will be used when sending a message. Otherwise, the default values,
356         * that may be set administratively, will be used.
357         * @return true if overriding default values of QOS parameters
358         * (deliveryMode, priority, and timeToLive)
359         * @see #setDeliveryMode
360         * @see #setPriority
361         * @see #setTimeToLive
362         */
363        public boolean isExplicitQosEnabled() {
364                return this.explicitQosEnabled;
365        }
366
367        /**
368         * Set whether message delivery should be persistent or non-persistent,
369         * specified as boolean value ("true" or "false"). This will set the delivery
370         * mode accordingly, to either "PERSISTENT" (2) or "NON_PERSISTENT" (1).
371         * <p>Default is "true" a.k.a. delivery mode "PERSISTENT".
372         * @see #setDeliveryMode(int)
373         * @see javax.jms.DeliveryMode#PERSISTENT
374         * @see javax.jms.DeliveryMode#NON_PERSISTENT
375         */
376        public void setDeliveryPersistent(boolean deliveryPersistent) {
377                this.deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
378        }
379
380        /**
381         * Set the delivery mode to use when sending a message.
382         * Default is the JMS Message default: "PERSISTENT".
383         * <p>Since a default value may be defined administratively,
384         * this is only used when "isExplicitQosEnabled" equals "true".
385         * @param deliveryMode the delivery mode to use
386         * @see #isExplicitQosEnabled
387         * @see javax.jms.DeliveryMode#PERSISTENT
388         * @see javax.jms.DeliveryMode#NON_PERSISTENT
389         * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
390         * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
391         */
392        public void setDeliveryMode(int deliveryMode) {
393                this.deliveryMode = deliveryMode;
394        }
395
396        /**
397         * Return the delivery mode to use when sending a message.
398         */
399        public int getDeliveryMode() {
400                return this.deliveryMode;
401        }
402
403        /**
404         * Set the priority of a message when sending.
405         * <p>Since a default value may be defined administratively,
406         * this is only used when "isExplicitQosEnabled" equals "true".
407         * @see #isExplicitQosEnabled
408         * @see javax.jms.Message#DEFAULT_PRIORITY
409         * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
410         */
411        public void setPriority(int priority) {
412                this.priority = priority;
413        }
414
415        /**
416         * Return the priority of a message when sending.
417         */
418        public int getPriority() {
419                return this.priority;
420        }
421
422        /**
423         * Set the time-to-live of the message when sending.
424         * <p>Since a default value may be defined administratively,
425         * this is only used when "isExplicitQosEnabled" equals "true".
426         * @param timeToLive the message's lifetime (in milliseconds)
427         * @see #isExplicitQosEnabled
428         * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
429         * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
430         */
431        public void setTimeToLive(long timeToLive) {
432                this.timeToLive = timeToLive;
433        }
434
435        /**
436         * Return the time-to-live of the message when sending.
437         */
438        public long getTimeToLive() {
439                return this.timeToLive;
440        }
441
442
443        //---------------------------------------------------------------------------------------
444        // JmsOperations execute methods
445        //---------------------------------------------------------------------------------------
446
447        @Override
448        public <T> T execute(SessionCallback<T> action) throws JmsException {
449                return execute(action, false);
450        }
451
452        /**
453         * Execute the action specified by the given action object within a
454         * JMS Session. Generalized version of {@code execute(SessionCallback)},
455         * allowing the JMS Connection to be started on the fly.
456         * <p>Use {@code execute(SessionCallback)} for the general case.
457         * Starting the JMS Connection is just necessary for receiving messages,
458         * which is preferably achieved through the {@code receive} methods.
459         * @param action callback object that exposes the Session
460         * @param startConnection whether to start the Connection
461         * @return the result object from working with the Session
462         * @throws JmsException if there is any problem
463         * @see #execute(SessionCallback)
464         * @see #receive
465         */
466        public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
467                Assert.notNull(action, "Callback object must not be null");
468                Connection conToClose = null;
469                Session sessionToClose = null;
470                try {
471                        Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
472                                        getConnectionFactory(), this.transactionalResourceFactory, startConnection);
473                        if (sessionToUse == null) {
474                                conToClose = createConnection();
475                                sessionToClose = createSession(conToClose);
476                                if (startConnection) {
477                                        conToClose.start();
478                                }
479                                sessionToUse = sessionToClose;
480                        }
481                        if (logger.isDebugEnabled()) {
482                                logger.debug("Executing callback on JMS Session: " + sessionToUse);
483                        }
484                        return action.doInJms(sessionToUse);
485                }
486                catch (JMSException ex) {
487                        throw convertJmsAccessException(ex);
488                }
489                finally {
490                        JmsUtils.closeSession(sessionToClose);
491                        ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
492                }
493        }
494
495        @Override
496        public <T> T execute(ProducerCallback<T> action) throws JmsException {
497                String defaultDestinationName = getDefaultDestinationName();
498                if (defaultDestinationName != null) {
499                        return execute(defaultDestinationName, action);
500                }
501                else {
502                        return execute(getDefaultDestination(), action);
503                }
504        }
505
506        @Override
507        public <T> T execute(final Destination destination, final ProducerCallback<T> action) throws JmsException {
508                Assert.notNull(action, "Callback object must not be null");
509                return execute(new SessionCallback<T>() {
510                        @Override
511                        public T doInJms(Session session) throws JMSException {
512                                MessageProducer producer = createProducer(session, destination);
513                                try {
514                                        return action.doInJms(session, producer);
515                                }
516                                finally {
517                                        JmsUtils.closeMessageProducer(producer);
518                                }
519                        }
520                }, false);
521        }
522
523        @Override
524        public <T> T execute(final String destinationName, final ProducerCallback<T> action) throws JmsException {
525                Assert.notNull(action, "Callback object must not be null");
526                return execute(new SessionCallback<T>() {
527                        @Override
528                        public T doInJms(Session session) throws JMSException {
529                                Destination destination = resolveDestinationName(session, destinationName);
530                                MessageProducer producer = createProducer(session, destination);
531                                try {
532                                        return action.doInJms(session, producer);
533                                }
534                                finally {
535                                        JmsUtils.closeMessageProducer(producer);
536                                }
537                        }
538                }, false);
539        }
540
541
542        //---------------------------------------------------------------------------------------
543        // Convenience methods for sending messages
544        //---------------------------------------------------------------------------------------
545
546        @Override
547        public void send(MessageCreator messageCreator) throws JmsException {
548                Destination defaultDestination = getDefaultDestination();
549                if (defaultDestination != null) {
550                        send(defaultDestination, messageCreator);
551                }
552                else {
553                        send(getRequiredDefaultDestinationName(), messageCreator);
554                }
555        }
556
557        @Override
558        public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
559                execute(new SessionCallback<Object>() {
560                        @Override
561                        public Object doInJms(Session session) throws JMSException {
562                                doSend(session, destination, messageCreator);
563                                return null;
564                        }
565                }, false);
566        }
567
568        @Override
569        public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
570                execute(new SessionCallback<Object>() {
571                        @Override
572                        public Object doInJms(Session session) throws JMSException {
573                                Destination destination = resolveDestinationName(session, destinationName);
574                                doSend(session, destination, messageCreator);
575                                return null;
576                        }
577                }, false);
578        }
579
580        /**
581         * Send the given JMS message.
582         * @param session the JMS Session to operate on
583         * @param destination the JMS Destination to send to
584         * @param messageCreator callback to create a JMS Message
585         * @throws JMSException if thrown by JMS API methods
586         */
587        protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
588                        throws JMSException {
589
590                Assert.notNull(messageCreator, "MessageCreator must not be null");
591                MessageProducer producer = createProducer(session, destination);
592                try {
593                        Message message = messageCreator.createMessage(session);
594                        if (logger.isDebugEnabled()) {
595                                logger.debug("Sending created message: " + message);
596                        }
597                        doSend(producer, message);
598                        // Check commit - avoid commit call within a JTA transaction.
599                        if (session.getTransacted() && isSessionLocallyTransacted(session)) {
600                                // Transacted session created by this template -> commit.
601                                JmsUtils.commitIfNecessary(session);
602                        }
603                }
604                finally {
605                        JmsUtils.closeMessageProducer(producer);
606                }
607        }
608
609        /**
610         * Actually send the given JMS message.
611         * @param producer the JMS MessageProducer to send with
612         * @param message the JMS Message to send
613         * @throws JMSException if thrown by JMS API methods
614         */
615        protected void doSend(MessageProducer producer, Message message) throws JMSException {
616                if (this.deliveryDelay >= 0) {
617                        if (setDeliveryDelayMethod == null) {
618                                throw new IllegalStateException("setDeliveryDelay requires JMS 2.0");
619                        }
620                        ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay);
621                }
622                if (isExplicitQosEnabled()) {
623                        producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
624                }
625                else {
626                        producer.send(message);
627                }
628        }
629
630
631        //---------------------------------------------------------------------------------------
632        // Convenience methods for sending auto-converted messages
633        //---------------------------------------------------------------------------------------
634
635        @Override
636        public void convertAndSend(Object message) throws JmsException {
637                Destination defaultDestination = getDefaultDestination();
638                if (defaultDestination != null) {
639                        convertAndSend(defaultDestination, message);
640                }
641                else {
642                        convertAndSend(getRequiredDefaultDestinationName(), message);
643                }
644        }
645
646        @Override
647        public void convertAndSend(Destination destination, final Object message) throws JmsException {
648                send(destination, new MessageCreator() {
649                        @Override
650                        public Message createMessage(Session session) throws JMSException {
651                                return getRequiredMessageConverter().toMessage(message, session);
652                        }
653                });
654        }
655
656        @Override
657        public void convertAndSend(String destinationName, final Object message) throws JmsException {
658                send(destinationName, new MessageCreator() {
659                        @Override
660                        public Message createMessage(Session session) throws JMSException {
661                                return getRequiredMessageConverter().toMessage(message, session);
662                        }
663                });
664        }
665
666        @Override
667        public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException {
668                Destination defaultDestination = getDefaultDestination();
669                if (defaultDestination != null) {
670                        convertAndSend(defaultDestination, message, postProcessor);
671                }
672                else {
673                        convertAndSend(getRequiredDefaultDestinationName(), message, postProcessor);
674                }
675        }
676
677        @Override
678        public void convertAndSend(
679                        Destination destination, final Object message, final MessagePostProcessor postProcessor)
680                        throws JmsException {
681
682                send(destination, new MessageCreator() {
683                        @Override
684                        public Message createMessage(Session session) throws JMSException {
685                                Message msg = getRequiredMessageConverter().toMessage(message, session);
686                                return postProcessor.postProcessMessage(msg);
687                        }
688                });
689        }
690
691        @Override
692        public void convertAndSend(
693                        String destinationName, final Object message, final MessagePostProcessor postProcessor)
694                throws JmsException {
695
696                send(destinationName, new MessageCreator() {
697                        @Override
698                        public Message createMessage(Session session) throws JMSException {
699                                Message msg = getRequiredMessageConverter().toMessage(message, session);
700                                return postProcessor.postProcessMessage(msg);
701                        }
702                });
703        }
704
705
706        //---------------------------------------------------------------------------------------
707        // Convenience methods for receiving messages
708        //---------------------------------------------------------------------------------------
709
710        @Override
711        public Message receive() throws JmsException {
712                Destination defaultDestination = getDefaultDestination();
713                if (defaultDestination != null) {
714                        return receive(defaultDestination);
715                }
716                else {
717                        return receive(getRequiredDefaultDestinationName());
718                }
719        }
720
721        @Override
722        public Message receive(Destination destination) throws JmsException {
723                return receiveSelected(destination, null);
724        }
725
726        @Override
727        public Message receive(String destinationName) throws JmsException {
728                return receiveSelected(destinationName, null);
729        }
730
731        @Override
732        public Message receiveSelected(String messageSelector) throws JmsException {
733                Destination defaultDestination = getDefaultDestination();
734                if (defaultDestination != null) {
735                        return receiveSelected(defaultDestination, messageSelector);
736                }
737                else {
738                        return receiveSelected(getRequiredDefaultDestinationName(), messageSelector);
739                }
740        }
741
742        @Override
743        public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException {
744                return execute(new SessionCallback<Message>() {
745                        @Override
746                        public Message doInJms(Session session) throws JMSException {
747                                return doReceive(session, destination, messageSelector);
748                        }
749                }, true);
750        }
751
752        @Override
753        public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException {
754                return execute(new SessionCallback<Message>() {
755                        @Override
756                        public Message doInJms(Session session) throws JMSException {
757                                Destination destination = resolveDestinationName(session, destinationName);
758                                return doReceive(session, destination, messageSelector);
759                        }
760                }, true);
761        }
762
763        /**
764         * Receive a JMS message.
765         * @param session the JMS Session to operate on
766         * @param destination the JMS Destination to receive from
767         * @param messageSelector the message selector for this consumer (can be {@code null})
768         * @return the JMS Message received, or {@code null} if none
769         * @throws JMSException if thrown by JMS API methods
770         */
771        protected Message doReceive(Session session, Destination destination, String messageSelector)
772                        throws JMSException {
773
774                return doReceive(session, createConsumer(session, destination, messageSelector));
775        }
776
777        /**
778         * Actually receive a JMS message.
779         * @param session the JMS Session to operate on
780         * @param consumer the JMS MessageConsumer to receive with
781         * @return the JMS Message received, or {@code null} if none
782         * @throws JMSException if thrown by JMS API methods
783         */
784        protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
785                try {
786                        // Use transaction timeout (if available).
787                        long timeout = getReceiveTimeout();
788                        JmsResourceHolder resourceHolder =
789                                        (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
790                        if (resourceHolder != null && resourceHolder.hasTimeout()) {
791                                timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
792                        }
793                        Message message = receiveFromConsumer(consumer, timeout);
794                        if (session.getTransacted()) {
795                                // Commit necessary - but avoid commit call within a JTA transaction.
796                                if (isSessionLocallyTransacted(session)) {
797                                        // Transacted session created by this template -> commit.
798                                        JmsUtils.commitIfNecessary(session);
799                                }
800                        }
801                        else if (isClientAcknowledge(session)) {
802                                // Manually acknowledge message, if any.
803                                if (message != null) {
804                                        message.acknowledge();
805                                }
806                        }
807                        return message;
808                }
809                finally {
810                        JmsUtils.closeMessageConsumer(consumer);
811                }
812        }
813
814
815        //---------------------------------------------------------------------------------------
816        // Convenience methods for receiving auto-converted messages
817        //---------------------------------------------------------------------------------------
818
819        @Override
820        public Object receiveAndConvert() throws JmsException {
821                return doConvertFromMessage(receive());
822        }
823
824        @Override
825        public Object receiveAndConvert(Destination destination) throws JmsException {
826                return doConvertFromMessage(receive(destination));
827        }
828
829        @Override
830        public Object receiveAndConvert(String destinationName) throws JmsException {
831                return doConvertFromMessage(receive(destinationName));
832        }
833
834        @Override
835        public Object receiveSelectedAndConvert(String messageSelector) throws JmsException {
836                return doConvertFromMessage(receiveSelected(messageSelector));
837        }
838
839        @Override
840        public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException {
841                return doConvertFromMessage(receiveSelected(destination, messageSelector));
842        }
843
844        @Override
845        public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException {
846                return doConvertFromMessage(receiveSelected(destinationName, messageSelector));
847        }
848
849        /**
850         * Extract the content from the given JMS message.
851         * @param message the JMS Message to convert (can be {@code null})
852         * @return the content of the message, or {@code null} if none
853         */
854        protected Object doConvertFromMessage(Message message) {
855                if (message != null) {
856                        try {
857                                return getRequiredMessageConverter().fromMessage(message);
858                        }
859                        catch (JMSException ex) {
860                                throw convertJmsAccessException(ex);
861                        }
862                }
863                return null;
864        }
865
866
867        //---------------------------------------------------------------------------------------
868        // Convenience methods for sending messages to and receiving the reply from a destination
869        //---------------------------------------------------------------------------------------
870
871        @Override
872        public Message sendAndReceive(MessageCreator messageCreator) throws JmsException {
873                Destination defaultDestination = getDefaultDestination();
874                if (defaultDestination != null) {
875                        return sendAndReceive(defaultDestination, messageCreator);
876                }
877                else {
878                        return sendAndReceive(getRequiredDefaultDestinationName(), messageCreator);
879                }
880        }
881
882        @Override
883        public Message sendAndReceive(final Destination destination, final MessageCreator messageCreator) throws JmsException {
884                return executeLocal(new SessionCallback<Message>() {
885                        @Override
886                        public Message doInJms(Session session) throws JMSException {
887                                return doSendAndReceive(session, destination, messageCreator);
888                        }
889                }, true);
890        }
891
892        @Override
893        public Message sendAndReceive(final String destinationName, final MessageCreator messageCreator) throws JmsException {
894                return executeLocal(new SessionCallback<Message>() {
895                        @Override
896                        public Message doInJms(Session session) throws JMSException {
897                                Destination destination = resolveDestinationName(session, destinationName);
898                                return doSendAndReceive(session, destination, messageCreator);
899                        }
900                }, true);
901        }
902
903        /**
904         * Send a request message to the given {@link Destination} and block until
905         * a reply has been received on a temporary queue created on-the-fly.
906         * <p>Return the response message or {@code null} if no message has
907         * @throws JMSException if thrown by JMS API methods
908         */
909        protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
910                        throws JMSException {
911
912                Assert.notNull(messageCreator, "MessageCreator must not be null");
913                TemporaryQueue responseQueue = null;
914                MessageProducer producer = null;
915                MessageConsumer consumer = null;
916                try {
917                        Message requestMessage = messageCreator.createMessage(session);
918                        responseQueue = session.createTemporaryQueue();
919                        producer = session.createProducer(destination);
920                        consumer = session.createConsumer(responseQueue);
921                        requestMessage.setJMSReplyTo(responseQueue);
922                        if (logger.isDebugEnabled()) {
923                                logger.debug("Sending created message: " + requestMessage);
924                        }
925                        doSend(producer, requestMessage);
926                        return receiveFromConsumer(consumer, getReceiveTimeout());
927                }
928                finally {
929                        JmsUtils.closeMessageConsumer(consumer);
930                        JmsUtils.closeMessageProducer(producer);
931                        if (responseQueue != null) {
932                                responseQueue.delete();
933                        }
934                }
935        }
936
937        /**
938         * A variant of {@link #execute(SessionCallback, boolean)} that explicitly
939         * creates a non-transactional {@link Session}. The given {@link SessionCallback}
940         * does not participate in an existing transaction.
941         */
942        private <T> T executeLocal(SessionCallback<T> action, boolean startConnection) throws JmsException {
943                Assert.notNull(action, "Callback object must not be null");
944                Connection con = null;
945                Session session = null;
946                try {
947                        con = getConnectionFactory().createConnection();
948                        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
949                        if (startConnection) {
950                                con.start();
951                        }
952                        if (logger.isDebugEnabled()) {
953                                logger.debug("Executing callback on JMS Session: " + session);
954                        }
955                        return action.doInJms(session);
956                }
957                catch (JMSException ex) {
958                        throw convertJmsAccessException(ex);
959                }
960                finally {
961                        JmsUtils.closeSession(session);
962                        ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory(), startConnection);
963                }
964        }
965
966
967        //---------------------------------------------------------------------------------------
968        // Convenience methods for browsing messages
969        //---------------------------------------------------------------------------------------
970
971        @Override
972        public <T> T browse(BrowserCallback<T> action) throws JmsException {
973                Queue defaultQueue = getDefaultQueue();
974                if (defaultQueue != null) {
975                        return browse(defaultQueue, action);
976                }
977                else {
978                        return browse(getRequiredDefaultDestinationName(), action);
979                }
980        }
981
982        @Override
983        public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException {
984                return browseSelected(queue, null, action);
985        }
986
987        @Override
988        public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException {
989                return browseSelected(queueName, null, action);
990        }
991
992        @Override
993        public <T> T browseSelected(String messageSelector, BrowserCallback<T> action) throws JmsException {
994                Queue defaultQueue = getDefaultQueue();
995                if (defaultQueue != null) {
996                        return browseSelected(defaultQueue, messageSelector, action);
997                }
998                else {
999                        return browseSelected(getRequiredDefaultDestinationName(), messageSelector, action);
1000                }
1001        }
1002
1003        @Override
1004        public <T> T browseSelected(final Queue queue, final String messageSelector, final BrowserCallback<T> action)
1005                        throws JmsException {
1006
1007                Assert.notNull(action, "Callback object must not be null");
1008                return execute(new SessionCallback<T>() {
1009                        @Override
1010                        public T doInJms(Session session) throws JMSException {
1011                                QueueBrowser browser = createBrowser(session, queue, messageSelector);
1012                                try {
1013                                        return action.doInJms(session, browser);
1014                                }
1015                                finally {
1016                                        JmsUtils.closeQueueBrowser(browser);
1017                                }
1018                        }
1019                }, true);
1020        }
1021
1022        @Override
1023        public <T> T browseSelected(final String queueName, final String messageSelector, final BrowserCallback<T> action)
1024                        throws JmsException {
1025
1026                Assert.notNull(action, "Callback object must not be null");
1027                return execute(new SessionCallback<T>() {
1028                        @Override
1029                        public T doInJms(Session session) throws JMSException {
1030                                Queue queue = (Queue) getDestinationResolver().resolveDestinationName(session, queueName, false);
1031                                QueueBrowser browser = createBrowser(session, queue, messageSelector);
1032                                try {
1033                                        return action.doInJms(session, browser);
1034                                }
1035                                finally {
1036                                        JmsUtils.closeQueueBrowser(browser);
1037                                }
1038                        }
1039                }, true);
1040        }
1041
1042
1043        /**
1044         * Fetch an appropriate Connection from the given JmsResourceHolder.
1045         * <p>This implementation accepts any JMS 1.1 Connection.
1046         * @param holder the JmsResourceHolder
1047         * @return an appropriate Connection fetched from the holder,
1048         * or {@code null} if none found
1049         */
1050        protected Connection getConnection(JmsResourceHolder holder) {
1051                return holder.getConnection();
1052        }
1053
1054        /**
1055         * Fetch an appropriate Session from the given JmsResourceHolder.
1056         * <p>This implementation accepts any JMS 1.1 Session.
1057         * @param holder the JmsResourceHolder
1058         * @return an appropriate Session fetched from the holder,
1059         * or {@code null} if none found
1060         */
1061        protected Session getSession(JmsResourceHolder holder) {
1062                return holder.getSession();
1063        }
1064
1065        /**
1066         * Check whether the given Session is locally transacted, that is, whether
1067         * its transaction is managed by this listener container's Session handling
1068         * and not by an external transaction coordinator.
1069         * <p>Note: The Session's own transacted flag will already have been checked
1070         * before. This method is about finding out whether the Session's transaction
1071         * is local or externally coordinated.
1072         * @param session the Session to check
1073         * @return whether the given Session is locally transacted
1074         * @see #isSessionTransacted()
1075         * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
1076         */
1077        protected boolean isSessionLocallyTransacted(Session session) {
1078                return isSessionTransacted() &&
1079                                !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory());
1080        }
1081
1082        /**
1083         * Create a JMS MessageProducer for the given Session and Destination,
1084         * configuring it to disable message ids and/or timestamps (if necessary).
1085         * <p>Delegates to {@link #doCreateProducer} for creation of the raw
1086         * JMS MessageProducer.
1087         * @param session the JMS Session to create a MessageProducer for
1088         * @param destination the JMS Destination to create a MessageProducer for
1089         * @return the new JMS MessageProducer
1090         * @throws JMSException if thrown by JMS API methods
1091         * @see #setMessageIdEnabled
1092         * @see #setMessageTimestampEnabled
1093         */
1094        protected MessageProducer createProducer(Session session, Destination destination) throws JMSException {
1095                MessageProducer producer = doCreateProducer(session, destination);
1096                if (!isMessageIdEnabled()) {
1097                        producer.setDisableMessageID(true);
1098                }
1099                if (!isMessageTimestampEnabled()) {
1100                        producer.setDisableMessageTimestamp(true);
1101                }
1102                return producer;
1103        }
1104
1105        /**
1106         * Create a raw JMS MessageProducer for the given Session and Destination.
1107         * <p>This implementation uses JMS 1.1 API.
1108         * @param session the JMS Session to create a MessageProducer for
1109         * @param destination the JMS Destination to create a MessageProducer for
1110         * @return the new JMS MessageProducer
1111         * @throws JMSException if thrown by JMS API methods
1112         */
1113        protected MessageProducer doCreateProducer(Session session, Destination destination) throws JMSException {
1114                return session.createProducer(destination);
1115        }
1116
1117        /**
1118         * Create a JMS MessageConsumer for the given Session and Destination.
1119         * <p>This implementation uses JMS 1.1 API.
1120         * @param session the JMS Session to create a MessageConsumer for
1121         * @param destination the JMS Destination to create a MessageConsumer for
1122         * @param messageSelector the message selector for this consumer (can be {@code null})
1123         * @return the new JMS MessageConsumer
1124         * @throws JMSException if thrown by JMS API methods
1125         */
1126        protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector)
1127                        throws JMSException {
1128
1129                // Only pass in the NoLocal flag in case of a Topic:
1130                // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
1131                // in case of the NoLocal flag being specified for a Queue.
1132                if (isPubSubDomain()) {
1133                        return session.createConsumer(destination, messageSelector, isPubSubNoLocal());
1134                }
1135                else {
1136                        return session.createConsumer(destination, messageSelector);
1137                }
1138        }
1139
1140        /**
1141         * Create a JMS MessageProducer for the given Session and Destination,
1142         * configuring it to disable message ids and/or timestamps (if necessary).
1143         * <p>Delegates to {@link #doCreateProducer} for creation of the raw
1144         * JMS MessageProducer.
1145         * @param session the JMS Session to create a QueueBrowser for
1146         * @param queue the JMS Queue to create a QueueBrowser for
1147         * @param messageSelector the message selector for this consumer (can be {@code null})
1148         * @return the new JMS QueueBrowser
1149         * @throws JMSException if thrown by JMS API methods
1150         * @see #setMessageIdEnabled
1151         * @see #setMessageTimestampEnabled
1152         */
1153        protected QueueBrowser createBrowser(Session session, Queue queue, String messageSelector)
1154                        throws JMSException {
1155
1156                return session.createBrowser(queue, messageSelector);
1157        }
1158
1159
1160        /**
1161         * ResourceFactory implementation that delegates to this template's protected callback methods.
1162         */
1163        private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
1164
1165                @Override
1166                public Connection getConnection(JmsResourceHolder holder) {
1167                        return JmsTemplate.this.getConnection(holder);
1168                }
1169
1170                @Override
1171                public Session getSession(JmsResourceHolder holder) {
1172                        return JmsTemplate.this.getSession(holder);
1173                }
1174
1175                @Override
1176                public Connection createConnection() throws JMSException {
1177                        return JmsTemplate.this.createConnection();
1178                }
1179
1180                @Override
1181                public Session createSession(Connection con) throws JMSException {
1182                        return JmsTemplate.this.createSession(con);
1183                }
1184
1185                @Override
1186                public boolean isSynchedLocalTransactionAllowed() {
1187                        return JmsTemplate.this.isSessionTransacted();
1188                }
1189        }
1190
1191}