001/* 002 * Copyright 2002-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.core; 018 019import java.lang.reflect.Method; 020import javax.jms.Connection; 021import javax.jms.ConnectionFactory; 022import javax.jms.DeliveryMode; 023import javax.jms.Destination; 024import javax.jms.JMSException; 025import javax.jms.Message; 026import javax.jms.MessageConsumer; 027import javax.jms.MessageProducer; 028import javax.jms.Queue; 029import javax.jms.QueueBrowser; 030import javax.jms.Session; 031import javax.jms.TemporaryQueue; 032 033import org.springframework.jms.JmsException; 034import org.springframework.jms.connection.ConnectionFactoryUtils; 035import org.springframework.jms.connection.JmsResourceHolder; 036import org.springframework.jms.support.JmsUtils; 037import org.springframework.jms.support.converter.MessageConverter; 038import org.springframework.jms.support.converter.SimpleMessageConverter; 039import org.springframework.jms.support.destination.JmsDestinationAccessor; 040import org.springframework.transaction.support.TransactionSynchronizationManager; 041import org.springframework.util.Assert; 042import org.springframework.util.ClassUtils; 043import org.springframework.util.ReflectionUtils; 044 045/** 046 * Helper class that simplifies synchronous JMS access code. 047 * 048 * <p>If you want to use dynamic destination creation, you must specify 049 * the type of JMS destination to create, using the "pubSubDomain" property. 050 * For other operations, this is not necessary. Point-to-Point (Queues) is the default 051 * domain. 052 * 053 * <p>Default settings for JMS Sessions are "not transacted" and "auto-acknowledge". 054 * As defined by the Java EE specification, the transaction and acknowledgement 055 * parameters are ignored when a JMS Session is created inside an active 056 * transaction, no matter if a JTA transaction or a Spring-managed transaction. 057 * To configure them for native JMS usage, specify appropriate values for 058 * the "sessionTransacted" and "sessionAcknowledgeMode" bean properties. 059 * 060 * <p>This template uses a 061 * {@link org.springframework.jms.support.destination.DynamicDestinationResolver} 062 * and a {@link org.springframework.jms.support.converter.SimpleMessageConverter} 063 * as default strategies for resolving a destination name or converting a message, 064 * respectively. These defaults can be overridden through the "destinationResolver" 065 * and "messageConverter" bean properties. 066 * 067 * <p><b>NOTE: The {@code ConnectionFactory} used with this template should 068 * return pooled Connections (or a single shared Connection) as well as pooled 069 * Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations 070 * is going to suffer.</b> The simplest option is to use the Spring-provided 071 * {@link org.springframework.jms.connection.SingleConnectionFactory} as a 072 * decorator for your target {@code ConnectionFactory}, reusing a single 073 * JMS Connection in a thread-safe fashion; this is often good enough for the 074 * purpose of sending messages via this template. In a Java EE environment, 075 * make sure that the {@code ConnectionFactory} is obtained from the 076 * application's environment naming context via JNDI; application servers 077 * typically expose pooled, transaction-aware factories there. 078 * 079 * @author Mark Pollack 080 * @author Juergen Hoeller 081 * @author Stephane Nicoll 082 * @since 1.1 083 * @see #setConnectionFactory 084 * @see #setPubSubDomain 085 * @see #setDestinationResolver 086 * @see #setMessageConverter 087 * @see javax.jms.MessageProducer 088 * @see javax.jms.MessageConsumer 089 */ 090public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations { 091 092 /** The JMS 2.0 MessageProducer.setDeliveryDelay method, if available */ 093 private static final Method setDeliveryDelayMethod = 094 ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class); 095 096 /** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils */ 097 private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory(); 098 099 100 private Object defaultDestination; 101 102 private MessageConverter messageConverter; 103 104 105 private boolean messageIdEnabled = true; 106 107 private boolean messageTimestampEnabled = true; 108 109 private boolean pubSubNoLocal = false; 110 111 private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT; 112 113 private long deliveryDelay = -1; 114 115 116 private boolean explicitQosEnabled = false; 117 118 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 119 120 private int priority = Message.DEFAULT_PRIORITY; 121 122 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; 123 124 125 /** 126 * Create a new JmsTemplate for bean-style usage. 127 * <p>Note: The ConnectionFactory has to be set before using the instance. 128 * This constructor can be used to prepare a JmsTemplate via a BeanFactory, 129 * typically setting the ConnectionFactory via setConnectionFactory. 130 * @see #setConnectionFactory 131 */ 132 public JmsTemplate() { 133 initDefaultStrategies(); 134 } 135 136 /** 137 * Create a new JmsTemplate, given a ConnectionFactory. 138 * @param connectionFactory the ConnectionFactory to obtain Connections from 139 */ 140 public JmsTemplate(ConnectionFactory connectionFactory) { 141 this(); 142 setConnectionFactory(connectionFactory); 143 afterPropertiesSet(); 144 } 145 146 /** 147 * Initialize the default implementations for the template's strategies: 148 * DynamicDestinationResolver and SimpleMessageConverter. 149 * @see #setDestinationResolver 150 * @see #setMessageConverter 151 * @see org.springframework.jms.support.destination.DynamicDestinationResolver 152 * @see org.springframework.jms.support.converter.SimpleMessageConverter 153 */ 154 protected void initDefaultStrategies() { 155 setMessageConverter(new SimpleMessageConverter()); 156 } 157 158 159 /** 160 * Set the destination to be used on send/receive operations that do not 161 * have a destination parameter. 162 * <p>Alternatively, specify a "defaultDestinationName", to be 163 * dynamically resolved via the DestinationResolver. 164 * @see #send(MessageCreator) 165 * @see #convertAndSend(Object) 166 * @see #convertAndSend(Object, MessagePostProcessor) 167 * @see #setDefaultDestinationName(String) 168 */ 169 public void setDefaultDestination(Destination destination) { 170 this.defaultDestination = destination; 171 } 172 173 /** 174 * Return the destination to be used on send/receive operations that do not 175 * have a destination parameter. 176 */ 177 public Destination getDefaultDestination() { 178 return (this.defaultDestination instanceof Destination ? (Destination) this.defaultDestination : null); 179 } 180 181 private Queue getDefaultQueue() { 182 Destination defaultDestination = getDefaultDestination(); 183 if (defaultDestination != null && !(defaultDestination instanceof Queue)) { 184 throw new IllegalStateException( 185 "'defaultDestination' does not correspond to a Queue. Check configuration of JmsTemplate."); 186 } 187 return (Queue) defaultDestination; 188 } 189 190 /** 191 * Set the destination name to be used on send/receive operations that 192 * do not have a destination parameter. The specified name will be 193 * dynamically resolved via the DestinationResolver. 194 * <p>Alternatively, specify a JMS Destination object as "defaultDestination". 195 * @see #send(MessageCreator) 196 * @see #convertAndSend(Object) 197 * @see #convertAndSend(Object, MessagePostProcessor) 198 * @see #setDestinationResolver 199 * @see #setDefaultDestination(javax.jms.Destination) 200 */ 201 public void setDefaultDestinationName(String destinationName) { 202 this.defaultDestination = destinationName; 203 } 204 205 /** 206 * Return the destination name to be used on send/receive operations that 207 * do not have a destination parameter. 208 */ 209 public String getDefaultDestinationName() { 210 return (this.defaultDestination instanceof String ? (String) this.defaultDestination : null); 211 } 212 213 private String getRequiredDefaultDestinationName() throws IllegalStateException { 214 String name = getDefaultDestinationName(); 215 if (name == null) { 216 throw new IllegalStateException( 217 "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate."); 218 } 219 return name; 220 } 221 222 /** 223 * Set the message converter for this template. Used to resolve 224 * Object parameters to convertAndSend methods and Object results 225 * from receiveAndConvert methods. 226 * <p>The default converter is a SimpleMessageConverter, which is able 227 * to handle BytesMessages, TextMessages and ObjectMessages. 228 * @see #convertAndSend 229 * @see #receiveAndConvert 230 * @see org.springframework.jms.support.converter.SimpleMessageConverter 231 */ 232 public void setMessageConverter(MessageConverter messageConverter) { 233 this.messageConverter = messageConverter; 234 } 235 236 /** 237 * Return the message converter for this template. 238 */ 239 public MessageConverter getMessageConverter() { 240 return this.messageConverter; 241 } 242 243 private MessageConverter getRequiredMessageConverter() throws IllegalStateException { 244 MessageConverter converter = getMessageConverter(); 245 if (converter == null) { 246 throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate."); 247 } 248 return converter; 249 } 250 251 252 /** 253 * Set whether message IDs are enabled. Default is "true". 254 * <p>This is only a hint to the JMS producer. 255 * See the JMS javadocs for details. 256 * @see javax.jms.MessageProducer#setDisableMessageID 257 */ 258 public void setMessageIdEnabled(boolean messageIdEnabled) { 259 this.messageIdEnabled = messageIdEnabled; 260 } 261 262 /** 263 * Return whether message IDs are enabled. 264 */ 265 public boolean isMessageIdEnabled() { 266 return this.messageIdEnabled; 267 } 268 269 /** 270 * Set whether message timestamps are enabled. Default is "true". 271 * <p>This is only a hint to the JMS producer. 272 * See the JMS javadocs for details. 273 * @see javax.jms.MessageProducer#setDisableMessageTimestamp 274 */ 275 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 276 this.messageTimestampEnabled = messageTimestampEnabled; 277 } 278 279 /** 280 * Return whether message timestamps are enabled. 281 */ 282 public boolean isMessageTimestampEnabled() { 283 return this.messageTimestampEnabled; 284 } 285 286 /** 287 * Set whether to inhibit the delivery of messages published by its own connection. 288 * Default is "false". 289 * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean) 290 */ 291 public void setPubSubNoLocal(boolean pubSubNoLocal) { 292 this.pubSubNoLocal = pubSubNoLocal; 293 } 294 295 /** 296 * Return whether to inhibit the delivery of messages published by its own connection. 297 */ 298 public boolean isPubSubNoLocal() { 299 return this.pubSubNoLocal; 300 } 301 302 /** 303 * Set the timeout to use for receive calls (in milliseconds). 304 * <p>The default is {@link #RECEIVE_TIMEOUT_INDEFINITE_WAIT}, which indicates 305 * a blocking receive without timeout. 306 * <p>Specify {@link #RECEIVE_TIMEOUT_NO_WAIT} (or any other negative value) 307 * to indicate that a receive operation should check if a message is 308 * immediately available without blocking. 309 * @see #receiveFromConsumer(MessageConsumer, long) 310 * @see javax.jms.MessageConsumer#receive(long) 311 * @see javax.jms.MessageConsumer#receiveNoWait() 312 * @see javax.jms.MessageConsumer#receive() 313 */ 314 public void setReceiveTimeout(long receiveTimeout) { 315 this.receiveTimeout = receiveTimeout; 316 } 317 318 /** 319 * Return the timeout to use for receive calls (in milliseconds). 320 */ 321 public long getReceiveTimeout() { 322 return this.receiveTimeout; 323 } 324 325 /** 326 * Set the delivery delay to use for send calls (in milliseconds). 327 * <p>The default is -1 (no delivery delay passed on to the broker). 328 * Note that this feature requires JMS 2.0. 329 */ 330 public void setDeliveryDelay(long deliveryDelay) { 331 this.deliveryDelay = deliveryDelay; 332 } 333 334 /** 335 * Return the delivery delay to use for send calls (in milliseconds). 336 */ 337 public long getDeliveryDelay() { 338 return this.deliveryDelay; 339 } 340 341 342 /** 343 * Set if the QOS values (deliveryMode, priority, timeToLive) 344 * should be used for sending a message. 345 * @see #setDeliveryMode 346 * @see #setPriority 347 * @see #setTimeToLive 348 */ 349 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 350 this.explicitQosEnabled = explicitQosEnabled; 351 } 352 353 /** 354 * If "true", then the values of deliveryMode, priority, and timeToLive 355 * will be used when sending a message. Otherwise, the default values, 356 * that may be set administratively, will be used. 357 * @return true if overriding default values of QOS parameters 358 * (deliveryMode, priority, and timeToLive) 359 * @see #setDeliveryMode 360 * @see #setPriority 361 * @see #setTimeToLive 362 */ 363 public boolean isExplicitQosEnabled() { 364 return this.explicitQosEnabled; 365 } 366 367 /** 368 * Set whether message delivery should be persistent or non-persistent, 369 * specified as boolean value ("true" or "false"). This will set the delivery 370 * mode accordingly, to either "PERSISTENT" (2) or "NON_PERSISTENT" (1). 371 * <p>Default is "true" a.k.a. delivery mode "PERSISTENT". 372 * @see #setDeliveryMode(int) 373 * @see javax.jms.DeliveryMode#PERSISTENT 374 * @see javax.jms.DeliveryMode#NON_PERSISTENT 375 */ 376 public void setDeliveryPersistent(boolean deliveryPersistent) { 377 this.deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 378 } 379 380 /** 381 * Set the delivery mode to use when sending a message. 382 * Default is the JMS Message default: "PERSISTENT". 383 * <p>Since a default value may be defined administratively, 384 * this is only used when "isExplicitQosEnabled" equals "true". 385 * @param deliveryMode the delivery mode to use 386 * @see #isExplicitQosEnabled 387 * @see javax.jms.DeliveryMode#PERSISTENT 388 * @see javax.jms.DeliveryMode#NON_PERSISTENT 389 * @see javax.jms.Message#DEFAULT_DELIVERY_MODE 390 * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) 391 */ 392 public void setDeliveryMode(int deliveryMode) { 393 this.deliveryMode = deliveryMode; 394 } 395 396 /** 397 * Return the delivery mode to use when sending a message. 398 */ 399 public int getDeliveryMode() { 400 return this.deliveryMode; 401 } 402 403 /** 404 * Set the priority of a message when sending. 405 * <p>Since a default value may be defined administratively, 406 * this is only used when "isExplicitQosEnabled" equals "true". 407 * @see #isExplicitQosEnabled 408 * @see javax.jms.Message#DEFAULT_PRIORITY 409 * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) 410 */ 411 public void setPriority(int priority) { 412 this.priority = priority; 413 } 414 415 /** 416 * Return the priority of a message when sending. 417 */ 418 public int getPriority() { 419 return this.priority; 420 } 421 422 /** 423 * Set the time-to-live of the message when sending. 424 * <p>Since a default value may be defined administratively, 425 * this is only used when "isExplicitQosEnabled" equals "true". 426 * @param timeToLive the message's lifetime (in milliseconds) 427 * @see #isExplicitQosEnabled 428 * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE 429 * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) 430 */ 431 public void setTimeToLive(long timeToLive) { 432 this.timeToLive = timeToLive; 433 } 434 435 /** 436 * Return the time-to-live of the message when sending. 437 */ 438 public long getTimeToLive() { 439 return this.timeToLive; 440 } 441 442 443 //--------------------------------------------------------------------------------------- 444 // JmsOperations execute methods 445 //--------------------------------------------------------------------------------------- 446 447 @Override 448 public <T> T execute(SessionCallback<T> action) throws JmsException { 449 return execute(action, false); 450 } 451 452 /** 453 * Execute the action specified by the given action object within a 454 * JMS Session. Generalized version of {@code execute(SessionCallback)}, 455 * allowing the JMS Connection to be started on the fly. 456 * <p>Use {@code execute(SessionCallback)} for the general case. 457 * Starting the JMS Connection is just necessary for receiving messages, 458 * which is preferably achieved through the {@code receive} methods. 459 * @param action callback object that exposes the Session 460 * @param startConnection whether to start the Connection 461 * @return the result object from working with the Session 462 * @throws JmsException if there is any problem 463 * @see #execute(SessionCallback) 464 * @see #receive 465 */ 466 public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { 467 Assert.notNull(action, "Callback object must not be null"); 468 Connection conToClose = null; 469 Session sessionToClose = null; 470 try { 471 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 472 getConnectionFactory(), this.transactionalResourceFactory, startConnection); 473 if (sessionToUse == null) { 474 conToClose = createConnection(); 475 sessionToClose = createSession(conToClose); 476 if (startConnection) { 477 conToClose.start(); 478 } 479 sessionToUse = sessionToClose; 480 } 481 if (logger.isDebugEnabled()) { 482 logger.debug("Executing callback on JMS Session: " + sessionToUse); 483 } 484 return action.doInJms(sessionToUse); 485 } 486 catch (JMSException ex) { 487 throw convertJmsAccessException(ex); 488 } 489 finally { 490 JmsUtils.closeSession(sessionToClose); 491 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 492 } 493 } 494 495 @Override 496 public <T> T execute(ProducerCallback<T> action) throws JmsException { 497 String defaultDestinationName = getDefaultDestinationName(); 498 if (defaultDestinationName != null) { 499 return execute(defaultDestinationName, action); 500 } 501 else { 502 return execute(getDefaultDestination(), action); 503 } 504 } 505 506 @Override 507 public <T> T execute(final Destination destination, final ProducerCallback<T> action) throws JmsException { 508 Assert.notNull(action, "Callback object must not be null"); 509 return execute(new SessionCallback<T>() { 510 @Override 511 public T doInJms(Session session) throws JMSException { 512 MessageProducer producer = createProducer(session, destination); 513 try { 514 return action.doInJms(session, producer); 515 } 516 finally { 517 JmsUtils.closeMessageProducer(producer); 518 } 519 } 520 }, false); 521 } 522 523 @Override 524 public <T> T execute(final String destinationName, final ProducerCallback<T> action) throws JmsException { 525 Assert.notNull(action, "Callback object must not be null"); 526 return execute(new SessionCallback<T>() { 527 @Override 528 public T doInJms(Session session) throws JMSException { 529 Destination destination = resolveDestinationName(session, destinationName); 530 MessageProducer producer = createProducer(session, destination); 531 try { 532 return action.doInJms(session, producer); 533 } 534 finally { 535 JmsUtils.closeMessageProducer(producer); 536 } 537 } 538 }, false); 539 } 540 541 542 //--------------------------------------------------------------------------------------- 543 // Convenience methods for sending messages 544 //--------------------------------------------------------------------------------------- 545 546 @Override 547 public void send(MessageCreator messageCreator) throws JmsException { 548 Destination defaultDestination = getDefaultDestination(); 549 if (defaultDestination != null) { 550 send(defaultDestination, messageCreator); 551 } 552 else { 553 send(getRequiredDefaultDestinationName(), messageCreator); 554 } 555 } 556 557 @Override 558 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 559 execute(new SessionCallback<Object>() { 560 @Override 561 public Object doInJms(Session session) throws JMSException { 562 doSend(session, destination, messageCreator); 563 return null; 564 } 565 }, false); 566 } 567 568 @Override 569 public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException { 570 execute(new SessionCallback<Object>() { 571 @Override 572 public Object doInJms(Session session) throws JMSException { 573 Destination destination = resolveDestinationName(session, destinationName); 574 doSend(session, destination, messageCreator); 575 return null; 576 } 577 }, false); 578 } 579 580 /** 581 * Send the given JMS message. 582 * @param session the JMS Session to operate on 583 * @param destination the JMS Destination to send to 584 * @param messageCreator callback to create a JMS Message 585 * @throws JMSException if thrown by JMS API methods 586 */ 587 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 588 throws JMSException { 589 590 Assert.notNull(messageCreator, "MessageCreator must not be null"); 591 MessageProducer producer = createProducer(session, destination); 592 try { 593 Message message = messageCreator.createMessage(session); 594 if (logger.isDebugEnabled()) { 595 logger.debug("Sending created message: " + message); 596 } 597 doSend(producer, message); 598 // Check commit - avoid commit call within a JTA transaction. 599 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 600 // Transacted session created by this template -> commit. 601 JmsUtils.commitIfNecessary(session); 602 } 603 } 604 finally { 605 JmsUtils.closeMessageProducer(producer); 606 } 607 } 608 609 /** 610 * Actually send the given JMS message. 611 * @param producer the JMS MessageProducer to send with 612 * @param message the JMS Message to send 613 * @throws JMSException if thrown by JMS API methods 614 */ 615 protected void doSend(MessageProducer producer, Message message) throws JMSException { 616 if (this.deliveryDelay >= 0) { 617 if (setDeliveryDelayMethod == null) { 618 throw new IllegalStateException("setDeliveryDelay requires JMS 2.0"); 619 } 620 ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay); 621 } 622 if (isExplicitQosEnabled()) { 623 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 624 } 625 else { 626 producer.send(message); 627 } 628 } 629 630 631 //--------------------------------------------------------------------------------------- 632 // Convenience methods for sending auto-converted messages 633 //--------------------------------------------------------------------------------------- 634 635 @Override 636 public void convertAndSend(Object message) throws JmsException { 637 Destination defaultDestination = getDefaultDestination(); 638 if (defaultDestination != null) { 639 convertAndSend(defaultDestination, message); 640 } 641 else { 642 convertAndSend(getRequiredDefaultDestinationName(), message); 643 } 644 } 645 646 @Override 647 public void convertAndSend(Destination destination, final Object message) throws JmsException { 648 send(destination, new MessageCreator() { 649 @Override 650 public Message createMessage(Session session) throws JMSException { 651 return getRequiredMessageConverter().toMessage(message, session); 652 } 653 }); 654 } 655 656 @Override 657 public void convertAndSend(String destinationName, final Object message) throws JmsException { 658 send(destinationName, new MessageCreator() { 659 @Override 660 public Message createMessage(Session session) throws JMSException { 661 return getRequiredMessageConverter().toMessage(message, session); 662 } 663 }); 664 } 665 666 @Override 667 public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException { 668 Destination defaultDestination = getDefaultDestination(); 669 if (defaultDestination != null) { 670 convertAndSend(defaultDestination, message, postProcessor); 671 } 672 else { 673 convertAndSend(getRequiredDefaultDestinationName(), message, postProcessor); 674 } 675 } 676 677 @Override 678 public void convertAndSend( 679 Destination destination, final Object message, final MessagePostProcessor postProcessor) 680 throws JmsException { 681 682 send(destination, new MessageCreator() { 683 @Override 684 public Message createMessage(Session session) throws JMSException { 685 Message msg = getRequiredMessageConverter().toMessage(message, session); 686 return postProcessor.postProcessMessage(msg); 687 } 688 }); 689 } 690 691 @Override 692 public void convertAndSend( 693 String destinationName, final Object message, final MessagePostProcessor postProcessor) 694 throws JmsException { 695 696 send(destinationName, new MessageCreator() { 697 @Override 698 public Message createMessage(Session session) throws JMSException { 699 Message msg = getRequiredMessageConverter().toMessage(message, session); 700 return postProcessor.postProcessMessage(msg); 701 } 702 }); 703 } 704 705 706 //--------------------------------------------------------------------------------------- 707 // Convenience methods for receiving messages 708 //--------------------------------------------------------------------------------------- 709 710 @Override 711 public Message receive() throws JmsException { 712 Destination defaultDestination = getDefaultDestination(); 713 if (defaultDestination != null) { 714 return receive(defaultDestination); 715 } 716 else { 717 return receive(getRequiredDefaultDestinationName()); 718 } 719 } 720 721 @Override 722 public Message receive(Destination destination) throws JmsException { 723 return receiveSelected(destination, null); 724 } 725 726 @Override 727 public Message receive(String destinationName) throws JmsException { 728 return receiveSelected(destinationName, null); 729 } 730 731 @Override 732 public Message receiveSelected(String messageSelector) throws JmsException { 733 Destination defaultDestination = getDefaultDestination(); 734 if (defaultDestination != null) { 735 return receiveSelected(defaultDestination, messageSelector); 736 } 737 else { 738 return receiveSelected(getRequiredDefaultDestinationName(), messageSelector); 739 } 740 } 741 742 @Override 743 public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException { 744 return execute(new SessionCallback<Message>() { 745 @Override 746 public Message doInJms(Session session) throws JMSException { 747 return doReceive(session, destination, messageSelector); 748 } 749 }, true); 750 } 751 752 @Override 753 public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException { 754 return execute(new SessionCallback<Message>() { 755 @Override 756 public Message doInJms(Session session) throws JMSException { 757 Destination destination = resolveDestinationName(session, destinationName); 758 return doReceive(session, destination, messageSelector); 759 } 760 }, true); 761 } 762 763 /** 764 * Receive a JMS message. 765 * @param session the JMS Session to operate on 766 * @param destination the JMS Destination to receive from 767 * @param messageSelector the message selector for this consumer (can be {@code null}) 768 * @return the JMS Message received, or {@code null} if none 769 * @throws JMSException if thrown by JMS API methods 770 */ 771 protected Message doReceive(Session session, Destination destination, String messageSelector) 772 throws JMSException { 773 774 return doReceive(session, createConsumer(session, destination, messageSelector)); 775 } 776 777 /** 778 * Actually receive a JMS message. 779 * @param session the JMS Session to operate on 780 * @param consumer the JMS MessageConsumer to receive with 781 * @return the JMS Message received, or {@code null} if none 782 * @throws JMSException if thrown by JMS API methods 783 */ 784 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 785 try { 786 // Use transaction timeout (if available). 787 long timeout = getReceiveTimeout(); 788 JmsResourceHolder resourceHolder = 789 (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); 790 if (resourceHolder != null && resourceHolder.hasTimeout()) { 791 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 792 } 793 Message message = receiveFromConsumer(consumer, timeout); 794 if (session.getTransacted()) { 795 // Commit necessary - but avoid commit call within a JTA transaction. 796 if (isSessionLocallyTransacted(session)) { 797 // Transacted session created by this template -> commit. 798 JmsUtils.commitIfNecessary(session); 799 } 800 } 801 else if (isClientAcknowledge(session)) { 802 // Manually acknowledge message, if any. 803 if (message != null) { 804 message.acknowledge(); 805 } 806 } 807 return message; 808 } 809 finally { 810 JmsUtils.closeMessageConsumer(consumer); 811 } 812 } 813 814 815 //--------------------------------------------------------------------------------------- 816 // Convenience methods for receiving auto-converted messages 817 //--------------------------------------------------------------------------------------- 818 819 @Override 820 public Object receiveAndConvert() throws JmsException { 821 return doConvertFromMessage(receive()); 822 } 823 824 @Override 825 public Object receiveAndConvert(Destination destination) throws JmsException { 826 return doConvertFromMessage(receive(destination)); 827 } 828 829 @Override 830 public Object receiveAndConvert(String destinationName) throws JmsException { 831 return doConvertFromMessage(receive(destinationName)); 832 } 833 834 @Override 835 public Object receiveSelectedAndConvert(String messageSelector) throws JmsException { 836 return doConvertFromMessage(receiveSelected(messageSelector)); 837 } 838 839 @Override 840 public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException { 841 return doConvertFromMessage(receiveSelected(destination, messageSelector)); 842 } 843 844 @Override 845 public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException { 846 return doConvertFromMessage(receiveSelected(destinationName, messageSelector)); 847 } 848 849 /** 850 * Extract the content from the given JMS message. 851 * @param message the JMS Message to convert (can be {@code null}) 852 * @return the content of the message, or {@code null} if none 853 */ 854 protected Object doConvertFromMessage(Message message) { 855 if (message != null) { 856 try { 857 return getRequiredMessageConverter().fromMessage(message); 858 } 859 catch (JMSException ex) { 860 throw convertJmsAccessException(ex); 861 } 862 } 863 return null; 864 } 865 866 867 //--------------------------------------------------------------------------------------- 868 // Convenience methods for sending messages to and receiving the reply from a destination 869 //--------------------------------------------------------------------------------------- 870 871 @Override 872 public Message sendAndReceive(MessageCreator messageCreator) throws JmsException { 873 Destination defaultDestination = getDefaultDestination(); 874 if (defaultDestination != null) { 875 return sendAndReceive(defaultDestination, messageCreator); 876 } 877 else { 878 return sendAndReceive(getRequiredDefaultDestinationName(), messageCreator); 879 } 880 } 881 882 @Override 883 public Message sendAndReceive(final Destination destination, final MessageCreator messageCreator) throws JmsException { 884 return executeLocal(new SessionCallback<Message>() { 885 @Override 886 public Message doInJms(Session session) throws JMSException { 887 return doSendAndReceive(session, destination, messageCreator); 888 } 889 }, true); 890 } 891 892 @Override 893 public Message sendAndReceive(final String destinationName, final MessageCreator messageCreator) throws JmsException { 894 return executeLocal(new SessionCallback<Message>() { 895 @Override 896 public Message doInJms(Session session) throws JMSException { 897 Destination destination = resolveDestinationName(session, destinationName); 898 return doSendAndReceive(session, destination, messageCreator); 899 } 900 }, true); 901 } 902 903 /** 904 * Send a request message to the given {@link Destination} and block until 905 * a reply has been received on a temporary queue created on-the-fly. 906 * <p>Return the response message or {@code null} if no message has 907 * @throws JMSException if thrown by JMS API methods 908 */ 909 protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator) 910 throws JMSException { 911 912 Assert.notNull(messageCreator, "MessageCreator must not be null"); 913 TemporaryQueue responseQueue = null; 914 MessageProducer producer = null; 915 MessageConsumer consumer = null; 916 try { 917 Message requestMessage = messageCreator.createMessage(session); 918 responseQueue = session.createTemporaryQueue(); 919 producer = session.createProducer(destination); 920 consumer = session.createConsumer(responseQueue); 921 requestMessage.setJMSReplyTo(responseQueue); 922 if (logger.isDebugEnabled()) { 923 logger.debug("Sending created message: " + requestMessage); 924 } 925 doSend(producer, requestMessage); 926 return receiveFromConsumer(consumer, getReceiveTimeout()); 927 } 928 finally { 929 JmsUtils.closeMessageConsumer(consumer); 930 JmsUtils.closeMessageProducer(producer); 931 if (responseQueue != null) { 932 responseQueue.delete(); 933 } 934 } 935 } 936 937 /** 938 * A variant of {@link #execute(SessionCallback, boolean)} that explicitly 939 * creates a non-transactional {@link Session}. The given {@link SessionCallback} 940 * does not participate in an existing transaction. 941 */ 942 private <T> T executeLocal(SessionCallback<T> action, boolean startConnection) throws JmsException { 943 Assert.notNull(action, "Callback object must not be null"); 944 Connection con = null; 945 Session session = null; 946 try { 947 con = getConnectionFactory().createConnection(); 948 session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); 949 if (startConnection) { 950 con.start(); 951 } 952 if (logger.isDebugEnabled()) { 953 logger.debug("Executing callback on JMS Session: " + session); 954 } 955 return action.doInJms(session); 956 } 957 catch (JMSException ex) { 958 throw convertJmsAccessException(ex); 959 } 960 finally { 961 JmsUtils.closeSession(session); 962 ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory(), startConnection); 963 } 964 } 965 966 967 //--------------------------------------------------------------------------------------- 968 // Convenience methods for browsing messages 969 //--------------------------------------------------------------------------------------- 970 971 @Override 972 public <T> T browse(BrowserCallback<T> action) throws JmsException { 973 Queue defaultQueue = getDefaultQueue(); 974 if (defaultQueue != null) { 975 return browse(defaultQueue, action); 976 } 977 else { 978 return browse(getRequiredDefaultDestinationName(), action); 979 } 980 } 981 982 @Override 983 public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException { 984 return browseSelected(queue, null, action); 985 } 986 987 @Override 988 public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException { 989 return browseSelected(queueName, null, action); 990 } 991 992 @Override 993 public <T> T browseSelected(String messageSelector, BrowserCallback<T> action) throws JmsException { 994 Queue defaultQueue = getDefaultQueue(); 995 if (defaultQueue != null) { 996 return browseSelected(defaultQueue, messageSelector, action); 997 } 998 else { 999 return browseSelected(getRequiredDefaultDestinationName(), messageSelector, action); 1000 } 1001 } 1002 1003 @Override 1004 public <T> T browseSelected(final Queue queue, final String messageSelector, final BrowserCallback<T> action) 1005 throws JmsException { 1006 1007 Assert.notNull(action, "Callback object must not be null"); 1008 return execute(new SessionCallback<T>() { 1009 @Override 1010 public T doInJms(Session session) throws JMSException { 1011 QueueBrowser browser = createBrowser(session, queue, messageSelector); 1012 try { 1013 return action.doInJms(session, browser); 1014 } 1015 finally { 1016 JmsUtils.closeQueueBrowser(browser); 1017 } 1018 } 1019 }, true); 1020 } 1021 1022 @Override 1023 public <T> T browseSelected(final String queueName, final String messageSelector, final BrowserCallback<T> action) 1024 throws JmsException { 1025 1026 Assert.notNull(action, "Callback object must not be null"); 1027 return execute(new SessionCallback<T>() { 1028 @Override 1029 public T doInJms(Session session) throws JMSException { 1030 Queue queue = (Queue) getDestinationResolver().resolveDestinationName(session, queueName, false); 1031 QueueBrowser browser = createBrowser(session, queue, messageSelector); 1032 try { 1033 return action.doInJms(session, browser); 1034 } 1035 finally { 1036 JmsUtils.closeQueueBrowser(browser); 1037 } 1038 } 1039 }, true); 1040 } 1041 1042 1043 /** 1044 * Fetch an appropriate Connection from the given JmsResourceHolder. 1045 * <p>This implementation accepts any JMS 1.1 Connection. 1046 * @param holder the JmsResourceHolder 1047 * @return an appropriate Connection fetched from the holder, 1048 * or {@code null} if none found 1049 */ 1050 protected Connection getConnection(JmsResourceHolder holder) { 1051 return holder.getConnection(); 1052 } 1053 1054 /** 1055 * Fetch an appropriate Session from the given JmsResourceHolder. 1056 * <p>This implementation accepts any JMS 1.1 Session. 1057 * @param holder the JmsResourceHolder 1058 * @return an appropriate Session fetched from the holder, 1059 * or {@code null} if none found 1060 */ 1061 protected Session getSession(JmsResourceHolder holder) { 1062 return holder.getSession(); 1063 } 1064 1065 /** 1066 * Check whether the given Session is locally transacted, that is, whether 1067 * its transaction is managed by this listener container's Session handling 1068 * and not by an external transaction coordinator. 1069 * <p>Note: The Session's own transacted flag will already have been checked 1070 * before. This method is about finding out whether the Session's transaction 1071 * is local or externally coordinated. 1072 * @param session the Session to check 1073 * @return whether the given Session is locally transacted 1074 * @see #isSessionTransacted() 1075 * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional 1076 */ 1077 protected boolean isSessionLocallyTransacted(Session session) { 1078 return isSessionTransacted() && 1079 !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory()); 1080 } 1081 1082 /** 1083 * Create a JMS MessageProducer for the given Session and Destination, 1084 * configuring it to disable message ids and/or timestamps (if necessary). 1085 * <p>Delegates to {@link #doCreateProducer} for creation of the raw 1086 * JMS MessageProducer. 1087 * @param session the JMS Session to create a MessageProducer for 1088 * @param destination the JMS Destination to create a MessageProducer for 1089 * @return the new JMS MessageProducer 1090 * @throws JMSException if thrown by JMS API methods 1091 * @see #setMessageIdEnabled 1092 * @see #setMessageTimestampEnabled 1093 */ 1094 protected MessageProducer createProducer(Session session, Destination destination) throws JMSException { 1095 MessageProducer producer = doCreateProducer(session, destination); 1096 if (!isMessageIdEnabled()) { 1097 producer.setDisableMessageID(true); 1098 } 1099 if (!isMessageTimestampEnabled()) { 1100 producer.setDisableMessageTimestamp(true); 1101 } 1102 return producer; 1103 } 1104 1105 /** 1106 * Create a raw JMS MessageProducer for the given Session and Destination. 1107 * <p>This implementation uses JMS 1.1 API. 1108 * @param session the JMS Session to create a MessageProducer for 1109 * @param destination the JMS Destination to create a MessageProducer for 1110 * @return the new JMS MessageProducer 1111 * @throws JMSException if thrown by JMS API methods 1112 */ 1113 protected MessageProducer doCreateProducer(Session session, Destination destination) throws JMSException { 1114 return session.createProducer(destination); 1115 } 1116 1117 /** 1118 * Create a JMS MessageConsumer for the given Session and Destination. 1119 * <p>This implementation uses JMS 1.1 API. 1120 * @param session the JMS Session to create a MessageConsumer for 1121 * @param destination the JMS Destination to create a MessageConsumer for 1122 * @param messageSelector the message selector for this consumer (can be {@code null}) 1123 * @return the new JMS MessageConsumer 1124 * @throws JMSException if thrown by JMS API methods 1125 */ 1126 protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector) 1127 throws JMSException { 1128 1129 // Only pass in the NoLocal flag in case of a Topic: 1130 // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException 1131 // in case of the NoLocal flag being specified for a Queue. 1132 if (isPubSubDomain()) { 1133 return session.createConsumer(destination, messageSelector, isPubSubNoLocal()); 1134 } 1135 else { 1136 return session.createConsumer(destination, messageSelector); 1137 } 1138 } 1139 1140 /** 1141 * Create a JMS MessageProducer for the given Session and Destination, 1142 * configuring it to disable message ids and/or timestamps (if necessary). 1143 * <p>Delegates to {@link #doCreateProducer} for creation of the raw 1144 * JMS MessageProducer. 1145 * @param session the JMS Session to create a QueueBrowser for 1146 * @param queue the JMS Queue to create a QueueBrowser for 1147 * @param messageSelector the message selector for this consumer (can be {@code null}) 1148 * @return the new JMS QueueBrowser 1149 * @throws JMSException if thrown by JMS API methods 1150 * @see #setMessageIdEnabled 1151 * @see #setMessageTimestampEnabled 1152 */ 1153 protected QueueBrowser createBrowser(Session session, Queue queue, String messageSelector) 1154 throws JMSException { 1155 1156 return session.createBrowser(queue, messageSelector); 1157 } 1158 1159 1160 /** 1161 * ResourceFactory implementation that delegates to this template's protected callback methods. 1162 */ 1163 private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 1164 1165 @Override 1166 public Connection getConnection(JmsResourceHolder holder) { 1167 return JmsTemplate.this.getConnection(holder); 1168 } 1169 1170 @Override 1171 public Session getSession(JmsResourceHolder holder) { 1172 return JmsTemplate.this.getSession(holder); 1173 } 1174 1175 @Override 1176 public Connection createConnection() throws JMSException { 1177 return JmsTemplate.this.createConnection(); 1178 } 1179 1180 @Override 1181 public Session createSession(Connection con) throws JMSException { 1182 return JmsTemplate.this.createSession(con); 1183 } 1184 1185 @Override 1186 public boolean isSynchedLocalTransactionAllowed() { 1187 return JmsTemplate.this.isSessionTransacted(); 1188 } 1189 } 1190 1191}