001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.core; 018 019import javax.jms.Connection; 020import javax.jms.ConnectionFactory; 021import javax.jms.DeliveryMode; 022import javax.jms.Destination; 023import javax.jms.JMSException; 024import javax.jms.Message; 025import javax.jms.MessageConsumer; 026import javax.jms.MessageProducer; 027import javax.jms.Queue; 028import javax.jms.QueueBrowser; 029import javax.jms.Session; 030import javax.jms.TemporaryQueue; 031 032import org.springframework.jms.JmsException; 033import org.springframework.jms.connection.ConnectionFactoryUtils; 034import org.springframework.jms.connection.JmsResourceHolder; 035import org.springframework.jms.support.JmsUtils; 036import org.springframework.jms.support.QosSettings; 037import org.springframework.jms.support.converter.MessageConverter; 038import org.springframework.jms.support.converter.SimpleMessageConverter; 039import org.springframework.jms.support.destination.JmsDestinationAccessor; 040import org.springframework.lang.Nullable; 041import org.springframework.transaction.support.TransactionSynchronizationManager; 042import org.springframework.util.Assert; 043 044/** 045 * Helper class that simplifies synchronous JMS access code. 046 * 047 * <p>If you want to use dynamic destination creation, you must specify 048 * the type of JMS destination to create, using the "pubSubDomain" property. 049 * For other operations, this is not necessary. Point-to-Point (Queues) is the default 050 * domain. 051 * 052 * <p>Default settings for JMS Sessions are "not transacted" and "auto-acknowledge". 053 * As defined by the Java EE specification, the transaction and acknowledgement 054 * parameters are ignored when a JMS Session is created inside an active 055 * transaction, no matter if a JTA transaction or a Spring-managed transaction. 056 * To configure them for native JMS usage, specify appropriate values for 057 * the "sessionTransacted" and "sessionAcknowledgeMode" bean properties. 058 * 059 * <p>This template uses a 060 * {@link org.springframework.jms.support.destination.DynamicDestinationResolver} 061 * and a {@link org.springframework.jms.support.converter.SimpleMessageConverter} 062 * as default strategies for resolving a destination name or converting a message, 063 * respectively. These defaults can be overridden through the "destinationResolver" 064 * and "messageConverter" bean properties. 065 * 066 * <p><b>NOTE: The {@code ConnectionFactory} used with this template should 067 * return pooled Connections (or a single shared Connection) as well as pooled 068 * Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations 069 * is going to suffer.</b> The simplest option is to use the Spring-provided 070 * {@link org.springframework.jms.connection.SingleConnectionFactory} as a 071 * decorator for your target {@code ConnectionFactory}, reusing a single 072 * JMS Connection in a thread-safe fashion; this is often good enough for the 073 * purpose of sending messages via this template. In a Java EE environment, 074 * make sure that the {@code ConnectionFactory} is obtained from the 075 * application's environment naming context via JNDI; application servers 076 * typically expose pooled, transaction-aware factories there. 077 * 078 * @author Mark Pollack 079 * @author Juergen Hoeller 080 * @author Stephane Nicoll 081 * @since 1.1 082 * @see #setConnectionFactory 083 * @see #setPubSubDomain 084 * @see #setDestinationResolver 085 * @see #setMessageConverter 086 * @see javax.jms.MessageProducer 087 * @see javax.jms.MessageConsumer 088 */ 089public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations { 090 091 /** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils. */ 092 private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory(); 093 094 095 @Nullable 096 private Object defaultDestination; 097 098 @Nullable 099 private MessageConverter messageConverter; 100 101 102 private boolean messageIdEnabled = true; 103 104 private boolean messageTimestampEnabled = true; 105 106 private boolean pubSubNoLocal = false; 107 108 private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT; 109 110 private long deliveryDelay = -1; 111 112 113 private boolean explicitQosEnabled = false; 114 115 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 116 117 private int priority = Message.DEFAULT_PRIORITY; 118 119 private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; 120 121 122 /** 123 * Create a new JmsTemplate for bean-style usage. 124 * <p>Note: The ConnectionFactory has to be set before using the instance. 125 * This constructor can be used to prepare a JmsTemplate via a BeanFactory, 126 * typically setting the ConnectionFactory via setConnectionFactory. 127 * @see #setConnectionFactory 128 */ 129 public JmsTemplate() { 130 initDefaultStrategies(); 131 } 132 133 /** 134 * Create a new JmsTemplate, given a ConnectionFactory. 135 * @param connectionFactory the ConnectionFactory to obtain Connections from 136 */ 137 public JmsTemplate(ConnectionFactory connectionFactory) { 138 this(); 139 setConnectionFactory(connectionFactory); 140 afterPropertiesSet(); 141 } 142 143 /** 144 * Initialize the default implementations for the template's strategies: 145 * DynamicDestinationResolver and SimpleMessageConverter. 146 * @see #setDestinationResolver 147 * @see #setMessageConverter 148 * @see org.springframework.jms.support.destination.DynamicDestinationResolver 149 * @see org.springframework.jms.support.converter.SimpleMessageConverter 150 */ 151 protected void initDefaultStrategies() { 152 setMessageConverter(new SimpleMessageConverter()); 153 } 154 155 156 /** 157 * Set the destination to be used on send/receive operations that do not 158 * have a destination parameter. 159 * <p>Alternatively, specify a "defaultDestinationName", to be 160 * dynamically resolved via the DestinationResolver. 161 * @see #send(MessageCreator) 162 * @see #convertAndSend(Object) 163 * @see #convertAndSend(Object, MessagePostProcessor) 164 * @see #setDefaultDestinationName(String) 165 */ 166 public void setDefaultDestination(@Nullable Destination destination) { 167 this.defaultDestination = destination; 168 } 169 170 /** 171 * Return the destination to be used on send/receive operations that do not 172 * have a destination parameter. 173 */ 174 @Nullable 175 public Destination getDefaultDestination() { 176 return (this.defaultDestination instanceof Destination ? (Destination) this.defaultDestination : null); 177 } 178 179 @Nullable 180 private Queue getDefaultQueue() { 181 Destination defaultDestination = getDefaultDestination(); 182 if (defaultDestination != null && !(defaultDestination instanceof Queue)) { 183 throw new IllegalStateException( 184 "'defaultDestination' does not correspond to a Queue. Check configuration of JmsTemplate."); 185 } 186 return (Queue) defaultDestination; 187 } 188 189 /** 190 * Set the destination name to be used on send/receive operations that 191 * do not have a destination parameter. The specified name will be 192 * dynamically resolved via the DestinationResolver. 193 * <p>Alternatively, specify a JMS Destination object as "defaultDestination". 194 * @see #send(MessageCreator) 195 * @see #convertAndSend(Object) 196 * @see #convertAndSend(Object, MessagePostProcessor) 197 * @see #setDestinationResolver 198 * @see #setDefaultDestination(javax.jms.Destination) 199 */ 200 public void setDefaultDestinationName(@Nullable String destinationName) { 201 this.defaultDestination = destinationName; 202 } 203 204 /** 205 * Return the destination name to be used on send/receive operations that 206 * do not have a destination parameter. 207 */ 208 @Nullable 209 public String getDefaultDestinationName() { 210 return (this.defaultDestination instanceof String ? (String) this.defaultDestination : null); 211 } 212 213 private String getRequiredDefaultDestinationName() throws IllegalStateException { 214 String name = getDefaultDestinationName(); 215 if (name == null) { 216 throw new IllegalStateException( 217 "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate."); 218 } 219 return name; 220 } 221 222 /** 223 * Set the message converter for this template. Used to resolve 224 * Object parameters to convertAndSend methods and Object results 225 * from receiveAndConvert methods. 226 * <p>The default converter is a SimpleMessageConverter, which is able 227 * to handle BytesMessages, TextMessages and ObjectMessages. 228 * @see #convertAndSend 229 * @see #receiveAndConvert 230 * @see org.springframework.jms.support.converter.SimpleMessageConverter 231 */ 232 public void setMessageConverter(@Nullable MessageConverter messageConverter) { 233 this.messageConverter = messageConverter; 234 } 235 236 /** 237 * Return the message converter for this template. 238 */ 239 @Nullable 240 public MessageConverter getMessageConverter() { 241 return this.messageConverter; 242 } 243 244 private MessageConverter getRequiredMessageConverter() throws IllegalStateException { 245 MessageConverter converter = getMessageConverter(); 246 if (converter == null) { 247 throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate."); 248 } 249 return converter; 250 } 251 252 253 /** 254 * Set whether message IDs are enabled. Default is "true". 255 * <p>This is only a hint to the JMS producer. 256 * See the JMS javadocs for details. 257 * @see javax.jms.MessageProducer#setDisableMessageID 258 */ 259 public void setMessageIdEnabled(boolean messageIdEnabled) { 260 this.messageIdEnabled = messageIdEnabled; 261 } 262 263 /** 264 * Return whether message IDs are enabled. 265 */ 266 public boolean isMessageIdEnabled() { 267 return this.messageIdEnabled; 268 } 269 270 /** 271 * Set whether message timestamps are enabled. Default is "true". 272 * <p>This is only a hint to the JMS producer. 273 * See the JMS javadocs for details. 274 * @see javax.jms.MessageProducer#setDisableMessageTimestamp 275 */ 276 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 277 this.messageTimestampEnabled = messageTimestampEnabled; 278 } 279 280 /** 281 * Return whether message timestamps are enabled. 282 */ 283 public boolean isMessageTimestampEnabled() { 284 return this.messageTimestampEnabled; 285 } 286 287 /** 288 * Set whether to inhibit the delivery of messages published by its own connection. 289 * Default is "false". 290 * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean) 291 */ 292 public void setPubSubNoLocal(boolean pubSubNoLocal) { 293 this.pubSubNoLocal = pubSubNoLocal; 294 } 295 296 /** 297 * Return whether to inhibit the delivery of messages published by its own connection. 298 */ 299 public boolean isPubSubNoLocal() { 300 return this.pubSubNoLocal; 301 } 302 303 /** 304 * Set the timeout to use for receive calls (in milliseconds). 305 * <p>The default is {@link #RECEIVE_TIMEOUT_INDEFINITE_WAIT}, which indicates 306 * a blocking receive without timeout. 307 * <p>Specify {@link #RECEIVE_TIMEOUT_NO_WAIT} (or any other negative value) 308 * to indicate that a receive operation should check if a message is 309 * immediately available without blocking. 310 * @see #receiveFromConsumer(MessageConsumer, long) 311 * @see javax.jms.MessageConsumer#receive(long) 312 * @see javax.jms.MessageConsumer#receiveNoWait() 313 * @see javax.jms.MessageConsumer#receive() 314 */ 315 public void setReceiveTimeout(long receiveTimeout) { 316 this.receiveTimeout = receiveTimeout; 317 } 318 319 /** 320 * Return the timeout to use for receive calls (in milliseconds). 321 */ 322 public long getReceiveTimeout() { 323 return this.receiveTimeout; 324 } 325 326 /** 327 * Set the delivery delay to use for send calls (in milliseconds). 328 * <p>The default is -1 (no delivery delay passed on to the broker). 329 * Note that this feature requires JMS 2.0. 330 */ 331 public void setDeliveryDelay(long deliveryDelay) { 332 this.deliveryDelay = deliveryDelay; 333 } 334 335 /** 336 * Return the delivery delay to use for send calls (in milliseconds). 337 */ 338 public long getDeliveryDelay() { 339 return this.deliveryDelay; 340 } 341 342 343 /** 344 * Set if the QOS values (deliveryMode, priority, timeToLive) 345 * should be used for sending a message. 346 * @see #setDeliveryMode 347 * @see #setPriority 348 * @see #setTimeToLive 349 */ 350 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 351 this.explicitQosEnabled = explicitQosEnabled; 352 } 353 354 /** 355 * If "true", then the values of deliveryMode, priority, and timeToLive 356 * will be used when sending a message. Otherwise, the default values, 357 * that may be set administratively, will be used. 358 * @return true if overriding default values of QOS parameters 359 * (deliveryMode, priority, and timeToLive) 360 * @see #setDeliveryMode 361 * @see #setPriority 362 * @see #setTimeToLive 363 */ 364 public boolean isExplicitQosEnabled() { 365 return this.explicitQosEnabled; 366 } 367 368 /** 369 * Set the {@link QosSettings} to use when sending a message. 370 * @param settings the deliveryMode, priority, and timeToLive settings to use 371 * @since 5.0 372 * @see #setExplicitQosEnabled(boolean) 373 * @see #setDeliveryMode(int) 374 * @see #setPriority(int) 375 * @see #setTimeToLive(long) 376 */ 377 public void setQosSettings(QosSettings settings) { 378 Assert.notNull(settings, "Settings must not be null"); 379 setExplicitQosEnabled(true); 380 setDeliveryMode(settings.getDeliveryMode()); 381 setPriority(settings.getPriority()); 382 setTimeToLive(settings.getTimeToLive()); 383 } 384 385 /** 386 * Set whether message delivery should be persistent or non-persistent, 387 * specified as boolean value ("true" or "false"). This will set the delivery 388 * mode accordingly, to either "PERSISTENT" (2) or "NON_PERSISTENT" (1). 389 * <p>Default is "true" a.k.a. delivery mode "PERSISTENT". 390 * @see #setDeliveryMode(int) 391 * @see javax.jms.DeliveryMode#PERSISTENT 392 * @see javax.jms.DeliveryMode#NON_PERSISTENT 393 */ 394 public void setDeliveryPersistent(boolean deliveryPersistent) { 395 this.deliveryMode = (deliveryPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 396 } 397 398 /** 399 * Set the delivery mode to use when sending a message. 400 * Default is the JMS Message default: "PERSISTENT". 401 * <p>Since a default value may be defined administratively, 402 * this is only used when "isExplicitQosEnabled" equals "true". 403 * @param deliveryMode the delivery mode to use 404 * @see #isExplicitQosEnabled 405 * @see javax.jms.DeliveryMode#PERSISTENT 406 * @see javax.jms.DeliveryMode#NON_PERSISTENT 407 * @see javax.jms.Message#DEFAULT_DELIVERY_MODE 408 * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) 409 */ 410 public void setDeliveryMode(int deliveryMode) { 411 this.deliveryMode = deliveryMode; 412 } 413 414 /** 415 * Return the delivery mode to use when sending a message. 416 */ 417 public int getDeliveryMode() { 418 return this.deliveryMode; 419 } 420 421 /** 422 * Set the priority of a message when sending. 423 * <p>Since a default value may be defined administratively, 424 * this is only used when "isExplicitQosEnabled" equals "true". 425 * @see #isExplicitQosEnabled 426 * @see javax.jms.Message#DEFAULT_PRIORITY 427 * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) 428 */ 429 public void setPriority(int priority) { 430 this.priority = priority; 431 } 432 433 /** 434 * Return the priority of a message when sending. 435 */ 436 public int getPriority() { 437 return this.priority; 438 } 439 440 /** 441 * Set the time-to-live of the message when sending. 442 * <p>Since a default value may be defined administratively, 443 * this is only used when "isExplicitQosEnabled" equals "true". 444 * @param timeToLive the message's lifetime (in milliseconds) 445 * @see #isExplicitQosEnabled 446 * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE 447 * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) 448 */ 449 public void setTimeToLive(long timeToLive) { 450 this.timeToLive = timeToLive; 451 } 452 453 /** 454 * Return the time-to-live of the message when sending. 455 */ 456 public long getTimeToLive() { 457 return this.timeToLive; 458 } 459 460 461 //--------------------------------------------------------------------------------------- 462 // JmsOperations execute methods 463 //--------------------------------------------------------------------------------------- 464 465 @Override 466 @Nullable 467 public <T> T execute(SessionCallback<T> action) throws JmsException { 468 return execute(action, false); 469 } 470 471 /** 472 * Execute the action specified by the given action object within a 473 * JMS Session. Generalized version of {@code execute(SessionCallback)}, 474 * allowing the JMS Connection to be started on the fly. 475 * <p>Use {@code execute(SessionCallback)} for the general case. 476 * Starting the JMS Connection is just necessary for receiving messages, 477 * which is preferably achieved through the {@code receive} methods. 478 * @param action callback object that exposes the Session 479 * @param startConnection whether to start the Connection 480 * @return the result object from working with the Session 481 * @throws JmsException if there is any problem 482 * @see #execute(SessionCallback) 483 * @see #receive 484 */ 485 @Nullable 486 public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { 487 Assert.notNull(action, "Callback object must not be null"); 488 Connection conToClose = null; 489 Session sessionToClose = null; 490 try { 491 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 492 obtainConnectionFactory(), this.transactionalResourceFactory, startConnection); 493 if (sessionToUse == null) { 494 conToClose = createConnection(); 495 sessionToClose = createSession(conToClose); 496 if (startConnection) { 497 conToClose.start(); 498 } 499 sessionToUse = sessionToClose; 500 } 501 if (logger.isDebugEnabled()) { 502 logger.debug("Executing callback on JMS Session: " + sessionToUse); 503 } 504 return action.doInJms(sessionToUse); 505 } 506 catch (JMSException ex) { 507 throw convertJmsAccessException(ex); 508 } 509 finally { 510 JmsUtils.closeSession(sessionToClose); 511 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 512 } 513 } 514 515 @Override 516 @Nullable 517 public <T> T execute(ProducerCallback<T> action) throws JmsException { 518 String defaultDestinationName = getDefaultDestinationName(); 519 if (defaultDestinationName != null) { 520 return execute(defaultDestinationName, action); 521 } 522 else { 523 return execute(getDefaultDestination(), action); 524 } 525 } 526 527 @Override 528 @Nullable 529 public <T> T execute(final @Nullable Destination destination, final ProducerCallback<T> action) throws JmsException { 530 Assert.notNull(action, "Callback object must not be null"); 531 return execute(session -> { 532 MessageProducer producer = createProducer(session, destination); 533 try { 534 return action.doInJms(session, producer); 535 } 536 finally { 537 JmsUtils.closeMessageProducer(producer); 538 } 539 }, false); 540 } 541 542 @Override 543 @Nullable 544 public <T> T execute(final String destinationName, final ProducerCallback<T> action) throws JmsException { 545 Assert.notNull(action, "Callback object must not be null"); 546 return execute(session -> { 547 Destination destination = resolveDestinationName(session, destinationName); 548 MessageProducer producer = createProducer(session, destination); 549 try { 550 return action.doInJms(session, producer); 551 } 552 finally { 553 JmsUtils.closeMessageProducer(producer); 554 } 555 }, false); 556 } 557 558 559 //--------------------------------------------------------------------------------------- 560 // Convenience methods for sending messages 561 //--------------------------------------------------------------------------------------- 562 563 @Override 564 public void send(MessageCreator messageCreator) throws JmsException { 565 Destination defaultDestination = getDefaultDestination(); 566 if (defaultDestination != null) { 567 send(defaultDestination, messageCreator); 568 } 569 else { 570 send(getRequiredDefaultDestinationName(), messageCreator); 571 } 572 } 573 574 @Override 575 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 576 execute(session -> { 577 doSend(session, destination, messageCreator); 578 return null; 579 }, false); 580 } 581 582 @Override 583 public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException { 584 execute(session -> { 585 Destination destination = resolveDestinationName(session, destinationName); 586 doSend(session, destination, messageCreator); 587 return null; 588 }, false); 589 } 590 591 /** 592 * Send the given JMS message. 593 * @param session the JMS Session to operate on 594 * @param destination the JMS Destination to send to 595 * @param messageCreator callback to create a JMS Message 596 * @throws JMSException if thrown by JMS API methods 597 */ 598 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 599 throws JMSException { 600 601 Assert.notNull(messageCreator, "MessageCreator must not be null"); 602 MessageProducer producer = createProducer(session, destination); 603 try { 604 Message message = messageCreator.createMessage(session); 605 if (logger.isDebugEnabled()) { 606 logger.debug("Sending created message: " + message); 607 } 608 doSend(producer, message); 609 // Check commit - avoid commit call within a JTA transaction. 610 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 611 // Transacted session created by this template -> commit. 612 JmsUtils.commitIfNecessary(session); 613 } 614 } 615 finally { 616 JmsUtils.closeMessageProducer(producer); 617 } 618 } 619 620 /** 621 * Actually send the given JMS message. 622 * @param producer the JMS MessageProducer to send with 623 * @param message the JMS Message to send 624 * @throws JMSException if thrown by JMS API methods 625 */ 626 protected void doSend(MessageProducer producer, Message message) throws JMSException { 627 if (this.deliveryDelay >= 0) { 628 producer.setDeliveryDelay(this.deliveryDelay); 629 } 630 if (isExplicitQosEnabled()) { 631 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 632 } 633 else { 634 producer.send(message); 635 } 636 } 637 638 639 //--------------------------------------------------------------------------------------- 640 // Convenience methods for sending auto-converted messages 641 //--------------------------------------------------------------------------------------- 642 643 @Override 644 public void convertAndSend(Object message) throws JmsException { 645 Destination defaultDestination = getDefaultDestination(); 646 if (defaultDestination != null) { 647 convertAndSend(defaultDestination, message); 648 } 649 else { 650 convertAndSend(getRequiredDefaultDestinationName(), message); 651 } 652 } 653 654 @Override 655 public void convertAndSend(Destination destination, final Object message) throws JmsException { 656 send(destination, session -> getRequiredMessageConverter().toMessage(message, session)); 657 } 658 659 @Override 660 public void convertAndSend(String destinationName, final Object message) throws JmsException { 661 send(destinationName, session -> getRequiredMessageConverter().toMessage(message, session)); 662 } 663 664 @Override 665 public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException { 666 Destination defaultDestination = getDefaultDestination(); 667 if (defaultDestination != null) { 668 convertAndSend(defaultDestination, message, postProcessor); 669 } 670 else { 671 convertAndSend(getRequiredDefaultDestinationName(), message, postProcessor); 672 } 673 } 674 675 @Override 676 public void convertAndSend( 677 Destination destination, final Object message, final MessagePostProcessor postProcessor) 678 throws JmsException { 679 680 send(destination, session -> { 681 Message msg = getRequiredMessageConverter().toMessage(message, session); 682 return postProcessor.postProcessMessage(msg); 683 }); 684 } 685 686 @Override 687 public void convertAndSend( 688 String destinationName, final Object message, final MessagePostProcessor postProcessor) 689 throws JmsException { 690 691 send(destinationName, session -> { 692 Message msg = getRequiredMessageConverter().toMessage(message, session); 693 return postProcessor.postProcessMessage(msg); 694 }); 695 } 696 697 698 //--------------------------------------------------------------------------------------- 699 // Convenience methods for receiving messages 700 //--------------------------------------------------------------------------------------- 701 702 @Override 703 @Nullable 704 public Message receive() throws JmsException { 705 Destination defaultDestination = getDefaultDestination(); 706 if (defaultDestination != null) { 707 return receive(defaultDestination); 708 } 709 else { 710 return receive(getRequiredDefaultDestinationName()); 711 } 712 } 713 714 @Override 715 @Nullable 716 public Message receive(Destination destination) throws JmsException { 717 return receiveSelected(destination, null); 718 } 719 720 @Override 721 @Nullable 722 public Message receive(String destinationName) throws JmsException { 723 return receiveSelected(destinationName, null); 724 } 725 726 @Override 727 @Nullable 728 public Message receiveSelected(String messageSelector) throws JmsException { 729 Destination defaultDestination = getDefaultDestination(); 730 if (defaultDestination != null) { 731 return receiveSelected(defaultDestination, messageSelector); 732 } 733 else { 734 return receiveSelected(getRequiredDefaultDestinationName(), messageSelector); 735 } 736 } 737 738 @Override 739 @Nullable 740 public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException { 741 return execute(session -> doReceive(session, destination, messageSelector), true); 742 } 743 744 @Override 745 @Nullable 746 public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException { 747 return execute(session -> { 748 Destination destination = resolveDestinationName(session, destinationName); 749 return doReceive(session, destination, messageSelector); 750 }, true); 751 } 752 753 /** 754 * Receive a JMS message. 755 * @param session the JMS Session to operate on 756 * @param destination the JMS Destination to receive from 757 * @param messageSelector the message selector for this consumer (can be {@code null}) 758 * @return the JMS Message received, or {@code null} if none 759 * @throws JMSException if thrown by JMS API methods 760 */ 761 @Nullable 762 protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector) 763 throws JMSException { 764 765 return doReceive(session, createConsumer(session, destination, messageSelector)); 766 } 767 768 /** 769 * Actually receive a JMS message. 770 * @param session the JMS Session to operate on 771 * @param consumer the JMS MessageConsumer to receive with 772 * @return the JMS Message received, or {@code null} if none 773 * @throws JMSException if thrown by JMS API methods 774 */ 775 @Nullable 776 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 777 try { 778 // Use transaction timeout (if available). 779 long timeout = getReceiveTimeout(); 780 ConnectionFactory connectionFactory = getConnectionFactory(); 781 JmsResourceHolder resourceHolder = null; 782 if (connectionFactory != null) { 783 resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 784 } 785 if (resourceHolder != null && resourceHolder.hasTimeout()) { 786 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 787 } 788 Message message = receiveFromConsumer(consumer, timeout); 789 if (session.getTransacted()) { 790 // Commit necessary - but avoid commit call within a JTA transaction. 791 if (isSessionLocallyTransacted(session)) { 792 // Transacted session created by this template -> commit. 793 JmsUtils.commitIfNecessary(session); 794 } 795 } 796 else if (isClientAcknowledge(session)) { 797 // Manually acknowledge message, if any. 798 if (message != null) { 799 message.acknowledge(); 800 } 801 } 802 return message; 803 } 804 finally { 805 JmsUtils.closeMessageConsumer(consumer); 806 } 807 } 808 809 810 //--------------------------------------------------------------------------------------- 811 // Convenience methods for receiving auto-converted messages 812 //--------------------------------------------------------------------------------------- 813 814 @Override 815 @Nullable 816 public Object receiveAndConvert() throws JmsException { 817 return doConvertFromMessage(receive()); 818 } 819 820 @Override 821 @Nullable 822 public Object receiveAndConvert(Destination destination) throws JmsException { 823 return doConvertFromMessage(receive(destination)); 824 } 825 826 @Override 827 @Nullable 828 public Object receiveAndConvert(String destinationName) throws JmsException { 829 return doConvertFromMessage(receive(destinationName)); 830 } 831 832 @Override 833 @Nullable 834 public Object receiveSelectedAndConvert(String messageSelector) throws JmsException { 835 return doConvertFromMessage(receiveSelected(messageSelector)); 836 } 837 838 @Override 839 @Nullable 840 public Object receiveSelectedAndConvert(Destination destination, String messageSelector) throws JmsException { 841 return doConvertFromMessage(receiveSelected(destination, messageSelector)); 842 } 843 844 @Override 845 @Nullable 846 public Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException { 847 return doConvertFromMessage(receiveSelected(destinationName, messageSelector)); 848 } 849 850 /** 851 * Extract the content from the given JMS message. 852 * @param message the JMS Message to convert (can be {@code null}) 853 * @return the content of the message, or {@code null} if none 854 */ 855 @Nullable 856 protected Object doConvertFromMessage(@Nullable Message message) { 857 if (message != null) { 858 try { 859 return getRequiredMessageConverter().fromMessage(message); 860 } 861 catch (JMSException ex) { 862 throw convertJmsAccessException(ex); 863 } 864 } 865 return null; 866 } 867 868 869 //--------------------------------------------------------------------------------------- 870 // Convenience methods for sending messages to and receiving the reply from a destination 871 //--------------------------------------------------------------------------------------- 872 873 @Override 874 @Nullable 875 public Message sendAndReceive(MessageCreator messageCreator) throws JmsException { 876 Destination defaultDestination = getDefaultDestination(); 877 if (defaultDestination != null) { 878 return sendAndReceive(defaultDestination, messageCreator); 879 } 880 else { 881 return sendAndReceive(getRequiredDefaultDestinationName(), messageCreator); 882 } 883 } 884 885 @Override 886 @Nullable 887 public Message sendAndReceive(final Destination destination, final MessageCreator messageCreator) throws JmsException { 888 return executeLocal(session -> doSendAndReceive(session, destination, messageCreator), true); 889 } 890 891 @Override 892 @Nullable 893 public Message sendAndReceive(final String destinationName, final MessageCreator messageCreator) throws JmsException { 894 return executeLocal(session -> { 895 Destination destination = resolveDestinationName(session, destinationName); 896 return doSendAndReceive(session, destination, messageCreator); 897 }, true); 898 } 899 900 /** 901 * Send a request message to the given {@link Destination} and block until 902 * a reply has been received on a temporary queue created on-the-fly. 903 * <p>Return the response message or {@code null} if no message has 904 * @throws JMSException if thrown by JMS API methods 905 */ 906 @Nullable 907 protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator) 908 throws JMSException { 909 910 Assert.notNull(messageCreator, "MessageCreator must not be null"); 911 TemporaryQueue responseQueue = null; 912 MessageProducer producer = null; 913 MessageConsumer consumer = null; 914 try { 915 Message requestMessage = messageCreator.createMessage(session); 916 responseQueue = session.createTemporaryQueue(); 917 producer = session.createProducer(destination); 918 consumer = session.createConsumer(responseQueue); 919 requestMessage.setJMSReplyTo(responseQueue); 920 if (logger.isDebugEnabled()) { 921 logger.debug("Sending created message: " + requestMessage); 922 } 923 doSend(producer, requestMessage); 924 return receiveFromConsumer(consumer, getReceiveTimeout()); 925 } 926 finally { 927 JmsUtils.closeMessageConsumer(consumer); 928 JmsUtils.closeMessageProducer(producer); 929 if (responseQueue != null) { 930 responseQueue.delete(); 931 } 932 } 933 } 934 935 /** 936 * A variant of {@link #execute(SessionCallback, boolean)} that explicitly 937 * creates a non-transactional {@link Session}. The given {@link SessionCallback} 938 * does not participate in an existing transaction. 939 */ 940 @Nullable 941 private <T> T executeLocal(SessionCallback<T> action, boolean startConnection) throws JmsException { 942 Assert.notNull(action, "Callback object must not be null"); 943 Connection con = null; 944 Session session = null; 945 try { 946 con = createConnection(); 947 session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); 948 if (startConnection) { 949 con.start(); 950 } 951 if (logger.isDebugEnabled()) { 952 logger.debug("Executing callback on JMS Session: " + session); 953 } 954 return action.doInJms(session); 955 } 956 catch (JMSException ex) { 957 throw convertJmsAccessException(ex); 958 } 959 finally { 960 JmsUtils.closeSession(session); 961 ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory(), startConnection); 962 } 963 } 964 965 966 //--------------------------------------------------------------------------------------- 967 // Convenience methods for browsing messages 968 //--------------------------------------------------------------------------------------- 969 970 @Override 971 @Nullable 972 public <T> T browse(BrowserCallback<T> action) throws JmsException { 973 Queue defaultQueue = getDefaultQueue(); 974 if (defaultQueue != null) { 975 return browse(defaultQueue, action); 976 } 977 else { 978 return browse(getRequiredDefaultDestinationName(), action); 979 } 980 } 981 982 @Override 983 @Nullable 984 public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException { 985 return browseSelected(queue, null, action); 986 } 987 988 @Override 989 @Nullable 990 public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException { 991 return browseSelected(queueName, null, action); 992 } 993 994 @Override 995 @Nullable 996 public <T> T browseSelected(String messageSelector, BrowserCallback<T> action) throws JmsException { 997 Queue defaultQueue = getDefaultQueue(); 998 if (defaultQueue != null) { 999 return browseSelected(defaultQueue, messageSelector, action); 1000 } 1001 else { 1002 return browseSelected(getRequiredDefaultDestinationName(), messageSelector, action); 1003 } 1004 } 1005 1006 @Override 1007 @Nullable 1008 public <T> T browseSelected(final Queue queue, @Nullable final String messageSelector, final BrowserCallback<T> action) 1009 throws JmsException { 1010 1011 Assert.notNull(action, "Callback object must not be null"); 1012 return execute(session -> { 1013 QueueBrowser browser = createBrowser(session, queue, messageSelector); 1014 try { 1015 return action.doInJms(session, browser); 1016 } 1017 finally { 1018 JmsUtils.closeQueueBrowser(browser); 1019 } 1020 }, true); 1021 } 1022 1023 @Override 1024 @Nullable 1025 public <T> T browseSelected(final String queueName, @Nullable final String messageSelector, final BrowserCallback<T> action) 1026 throws JmsException { 1027 1028 Assert.notNull(action, "Callback object must not be null"); 1029 return execute(session -> { 1030 Queue queue = (Queue) getDestinationResolver().resolveDestinationName(session, queueName, false); 1031 QueueBrowser browser = createBrowser(session, queue, messageSelector); 1032 try { 1033 return action.doInJms(session, browser); 1034 } 1035 finally { 1036 JmsUtils.closeQueueBrowser(browser); 1037 } 1038 }, true); 1039 } 1040 1041 1042 /** 1043 * Fetch an appropriate Connection from the given JmsResourceHolder. 1044 * <p>This implementation accepts any JMS 1.1 Connection. 1045 * @param holder the JmsResourceHolder 1046 * @return an appropriate Connection fetched from the holder, 1047 * or {@code null} if none found 1048 */ 1049 @Nullable 1050 protected Connection getConnection(JmsResourceHolder holder) { 1051 return holder.getConnection(); 1052 } 1053 1054 /** 1055 * Fetch an appropriate Session from the given JmsResourceHolder. 1056 * <p>This implementation accepts any JMS 1.1 Session. 1057 * @param holder the JmsResourceHolder 1058 * @return an appropriate Session fetched from the holder, 1059 * or {@code null} if none found 1060 */ 1061 @Nullable 1062 protected Session getSession(JmsResourceHolder holder) { 1063 return holder.getSession(); 1064 } 1065 1066 /** 1067 * Check whether the given Session is locally transacted, that is, whether 1068 * its transaction is managed by this listener container's Session handling 1069 * and not by an external transaction coordinator. 1070 * <p>Note: The Session's own transacted flag will already have been checked 1071 * before. This method is about finding out whether the Session's transaction 1072 * is local or externally coordinated. 1073 * @param session the Session to check 1074 * @return whether the given Session is locally transacted 1075 * @see #isSessionTransacted() 1076 * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional 1077 */ 1078 protected boolean isSessionLocallyTransacted(Session session) { 1079 return isSessionTransacted() && 1080 !ConnectionFactoryUtils.isSessionTransactional(session, getConnectionFactory()); 1081 } 1082 1083 /** 1084 * Create a JMS MessageProducer for the given Session and Destination, 1085 * configuring it to disable message ids and/or timestamps (if necessary). 1086 * <p>Delegates to {@link #doCreateProducer} for creation of the raw 1087 * JMS MessageProducer. 1088 * @param session the JMS Session to create a MessageProducer for 1089 * @param destination the JMS Destination to create a MessageProducer for 1090 * @return the new JMS MessageProducer 1091 * @throws JMSException if thrown by JMS API methods 1092 * @see #setMessageIdEnabled 1093 * @see #setMessageTimestampEnabled 1094 */ 1095 protected MessageProducer createProducer(Session session, @Nullable Destination destination) throws JMSException { 1096 MessageProducer producer = doCreateProducer(session, destination); 1097 if (!isMessageIdEnabled()) { 1098 producer.setDisableMessageID(true); 1099 } 1100 if (!isMessageTimestampEnabled()) { 1101 producer.setDisableMessageTimestamp(true); 1102 } 1103 return producer; 1104 } 1105 1106 /** 1107 * Create a raw JMS MessageProducer for the given Session and Destination. 1108 * <p>This implementation uses JMS 1.1 API. 1109 * @param session the JMS Session to create a MessageProducer for 1110 * @param destination the JMS Destination to create a MessageProducer for 1111 * @return the new JMS MessageProducer 1112 * @throws JMSException if thrown by JMS API methods 1113 */ 1114 protected MessageProducer doCreateProducer(Session session, @Nullable Destination destination) throws JMSException { 1115 return session.createProducer(destination); 1116 } 1117 1118 /** 1119 * Create a JMS MessageConsumer for the given Session and Destination. 1120 * <p>This implementation uses JMS 1.1 API. 1121 * @param session the JMS Session to create a MessageConsumer for 1122 * @param destination the JMS Destination to create a MessageConsumer for 1123 * @param messageSelector the message selector for this consumer (can be {@code null}) 1124 * @return the new JMS MessageConsumer 1125 * @throws JMSException if thrown by JMS API methods 1126 */ 1127 protected MessageConsumer createConsumer(Session session, Destination destination, @Nullable String messageSelector) 1128 throws JMSException { 1129 1130 // Only pass in the NoLocal flag in case of a Topic: 1131 // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException 1132 // in case of the NoLocal flag being specified for a Queue. 1133 if (isPubSubDomain()) { 1134 return session.createConsumer(destination, messageSelector, isPubSubNoLocal()); 1135 } 1136 else { 1137 return session.createConsumer(destination, messageSelector); 1138 } 1139 } 1140 1141 /** 1142 * Create a JMS MessageProducer for the given Session and Destination, 1143 * configuring it to disable message ids and/or timestamps (if necessary). 1144 * <p>Delegates to {@link #doCreateProducer} for creation of the raw 1145 * JMS MessageProducer. 1146 * @param session the JMS Session to create a QueueBrowser for 1147 * @param queue the JMS Queue to create a QueueBrowser for 1148 * @param messageSelector the message selector for this consumer (can be {@code null}) 1149 * @return the new JMS QueueBrowser 1150 * @throws JMSException if thrown by JMS API methods 1151 * @see #setMessageIdEnabled 1152 * @see #setMessageTimestampEnabled 1153 */ 1154 protected QueueBrowser createBrowser(Session session, Queue queue, @Nullable String messageSelector) 1155 throws JMSException { 1156 1157 return session.createBrowser(queue, messageSelector); 1158 } 1159 1160 1161 /** 1162 * ResourceFactory implementation that delegates to this template's protected callback methods. 1163 */ 1164 private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 1165 1166 @Override 1167 @Nullable 1168 public Connection getConnection(JmsResourceHolder holder) { 1169 return JmsTemplate.this.getConnection(holder); 1170 } 1171 1172 @Override 1173 @Nullable 1174 public Session getSession(JmsResourceHolder holder) { 1175 return JmsTemplate.this.getSession(holder); 1176 } 1177 1178 @Override 1179 public Connection createConnection() throws JMSException { 1180 return JmsTemplate.this.createConnection(); 1181 } 1182 1183 @Override 1184 public Session createSession(Connection con) throws JMSException { 1185 return JmsTemplate.this.createSession(con); 1186 } 1187 1188 @Override 1189 public boolean isSynchedLocalTransactionAllowed() { 1190 return JmsTemplate.this.isSessionTransacted(); 1191 } 1192 } 1193 1194}