001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.container.jms; 018 019import org.aopalliance.aop.Advice; 020import org.springframework.aop.framework.ProxyFactory; 021import org.springframework.aop.support.DefaultPointcutAdvisor; 022import org.springframework.aop.support.NameMatchMethodPointcut; 023import org.springframework.batch.repeat.RepeatOperations; 024import org.springframework.batch.repeat.interceptor.RepeatOperationsInterceptor; 025import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy; 026import org.springframework.jms.listener.DefaultMessageListenerContainer; 027import org.springframework.transaction.interceptor.TransactionInterceptor; 028 029import javax.jms.JMSException; 030import javax.jms.MessageConsumer; 031import javax.jms.Session; 032 033/** 034 * Message listener container adapted for intercepting the message reception 035 * with advice provided through configuration.<br> 036 * 037 * To enable batching of messages in a single transaction, use the 038 * {@link TransactionInterceptor} and the {@link RepeatOperationsInterceptor} in 039 * the advice chain (with or without a transaction manager set in the base 040 * class). Instead of receiving a single message and processing it, the 041 * container will then use a {@link RepeatOperations} to receive multiple 042 * messages in the same thread. Use with a {@link RepeatOperations} and a 043 * transaction interceptor. If the transaction interceptor uses XA then use an 044 * XA connection factory, or else the 045 * {@link TransactionAwareConnectionFactoryProxy} to synchronize the JMS session 046 * with the ongoing transaction (opening up the possibility of duplicate 047 * messages after a failure). In the latter case you will not need to provide a 048 * transaction manager in the base class - it only gets on the way and prevents 049 * the JMS session from synchronizing with the database transaction. 050 * 051 * @author Dave Syer 052 * 053 */ 054public class BatchMessageListenerContainer extends DefaultMessageListenerContainer { 055 056 /** 057 * @author Dave Syer 058 * 059 */ 060 public static interface ContainerDelegate { 061 boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException; 062 } 063 064 private Advice[] advices = new Advice[0]; 065 066 private ContainerDelegate delegate = new ContainerDelegate() { 067 @Override 068 public boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException { 069 return BatchMessageListenerContainer.super.receiveAndExecute(invoker, session, consumer); 070 } 071 }; 072 073 private ContainerDelegate proxy = delegate; 074 075 /** 076 * Public setter for the {@link Advice}. 077 * @param advices the advice to set 078 */ 079 public void setAdviceChain(Advice[] advices) { 080 this.advices = advices; 081 } 082 083 /** 084 * Set up interceptor with provided advice on the 085 * {@link #receiveAndExecute(Object, Session, MessageConsumer)} method. 086 * 087 * @see org.springframework.jms.listener.AbstractJmsListeningContainer#afterPropertiesSet() 088 */ 089 @Override 090 public void afterPropertiesSet() { 091 super.afterPropertiesSet(); 092 initializeProxy(); 093 } 094 095 /** 096 * Override base class to prevent exceptions from being swallowed. Should be 097 * an injectable strategy (see SPR-4733). 098 * 099 * @see org.springframework.jms.listener.AbstractMessageListenerContainer#handleListenerException(java.lang.Throwable) 100 */ 101 @Override 102 protected void handleListenerException(Throwable ex) { 103 if (!isSessionTransacted()) { 104 // Log the exceptions in base class if not transactional anyway 105 super.handleListenerException(ex); 106 return; 107 } 108 logger.debug("Re-throwing exception in container."); 109 if (ex instanceof RuntimeException) { 110 // We need to re-throw so that an enclosing non-JMS transaction can 111 // rollback... 112 throw (RuntimeException) ex; 113 } 114 else if (ex instanceof Error) { 115 // Just re-throw Error instances because otherwise unit tests just 116 // swallow exceptions from EasyMock and JUnit. 117 throw (Error) ex; 118 } 119 } 120 121 /** 122 * Override base class method to wrap call in advice if provided. 123 * @see org.springframework.jms.listener.AbstractPollingMessageListenerContainer#receiveAndExecute(Object, 124 * javax.jms.Session, javax.jms.MessageConsumer) 125 */ 126 @Override 127 protected boolean receiveAndExecute(final Object invoker, final Session session, final MessageConsumer consumer) 128 throws JMSException { 129 return proxy.receiveAndExecute(invoker, session, consumer); 130 } 131 132 /** 133 * 134 */ 135 public void initializeProxy() { 136 ProxyFactory factory = new ProxyFactory(); 137 for (int i = 0; i < advices.length; i++) { 138 DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(advices[i]); 139 NameMatchMethodPointcut pointcut = new NameMatchMethodPointcut(); 140 pointcut.addMethodName("receiveAndExecute"); 141 advisor.setPointcut(pointcut); 142 factory.addAdvisor(advisor); 143 } 144 factory.setProxyTargetClass(false); 145 factory.addInterface(ContainerDelegate.class); 146 factory.setTarget(delegate); 147 proxy = (ContainerDelegate) factory.getProxy(); 148 } 149 150}