001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.container.jms;
018
019import org.aopalliance.aop.Advice;
020import org.springframework.aop.framework.ProxyFactory;
021import org.springframework.aop.support.DefaultPointcutAdvisor;
022import org.springframework.aop.support.NameMatchMethodPointcut;
023import org.springframework.batch.repeat.RepeatOperations;
024import org.springframework.batch.repeat.interceptor.RepeatOperationsInterceptor;
025import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
026import org.springframework.jms.listener.DefaultMessageListenerContainer;
027import org.springframework.transaction.interceptor.TransactionInterceptor;
028
029import javax.jms.JMSException;
030import javax.jms.MessageConsumer;
031import javax.jms.Session;
032
033/**
034 * Message listener container adapted for intercepting the message reception
035 * with advice provided through configuration.<br>
036 * 
037 * To enable batching of messages in a single transaction, use the
038 * {@link TransactionInterceptor} and the {@link RepeatOperationsInterceptor} in
039 * the advice chain (with or without a transaction manager set in the base
040 * class). Instead of receiving a single message and processing it, the
041 * container will then use a {@link RepeatOperations} to receive multiple
042 * messages in the same thread. Use with a {@link RepeatOperations} and a
043 * transaction interceptor. If the transaction interceptor uses XA then use an
044 * XA connection factory, or else the
045 * {@link TransactionAwareConnectionFactoryProxy} to synchronize the JMS session
046 * with the ongoing transaction (opening up the possibility of duplicate
047 * messages after a failure). In the latter case you will not need to provide a
048 * transaction manager in the base class - it only gets on the way and prevents
049 * the JMS session from synchronizing with the database transaction.
050 * 
051 * @author Dave Syer
052 * 
053 */
054public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
055
056        /**
057         * @author Dave Syer
058         * 
059         */
060        public static interface ContainerDelegate {
061                boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException;
062        }
063
064        private Advice[] advices = new Advice[0];
065
066        private ContainerDelegate delegate = new ContainerDelegate() {
067                @Override
068                public boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException {
069                        return BatchMessageListenerContainer.super.receiveAndExecute(invoker, session, consumer);
070                }
071        };
072
073        private ContainerDelegate proxy = delegate;
074
075        /**
076         * Public setter for the {@link Advice}.
077         * @param advices the advice to set
078         */
079        public void setAdviceChain(Advice[] advices) {
080                this.advices = advices;
081        }
082
083        /**
084         * Set up interceptor with provided advice on the
085         * {@link #receiveAndExecute(Object, Session, MessageConsumer)} method.
086         * 
087         * @see org.springframework.jms.listener.AbstractJmsListeningContainer#afterPropertiesSet()
088         */
089        @Override
090        public void afterPropertiesSet() {
091                super.afterPropertiesSet();
092                initializeProxy();
093        }
094
095        /**
096         * Override base class to prevent exceptions from being swallowed. Should be
097         * an injectable strategy (see SPR-4733).
098         * 
099         * @see org.springframework.jms.listener.AbstractMessageListenerContainer#handleListenerException(java.lang.Throwable)
100         */
101        @Override
102        protected void handleListenerException(Throwable ex) {
103                if (!isSessionTransacted()) {
104                        // Log the exceptions in base class if not transactional anyway
105                        super.handleListenerException(ex);
106                        return;
107                }
108                logger.debug("Re-throwing exception in container.");
109                if (ex instanceof RuntimeException) {
110                        // We need to re-throw so that an enclosing non-JMS transaction can
111                        // rollback...
112                        throw (RuntimeException) ex;
113                }
114                else if (ex instanceof Error) {
115                        // Just re-throw Error instances because otherwise unit tests just
116                        // swallow exceptions from EasyMock and JUnit.
117                        throw (Error) ex;
118                }
119        }
120
121        /**
122         * Override base class method to wrap call in advice if provided.
123         * @see org.springframework.jms.listener.AbstractPollingMessageListenerContainer#receiveAndExecute(Object,
124         * javax.jms.Session, javax.jms.MessageConsumer)
125         */
126        @Override
127        protected boolean receiveAndExecute(final Object invoker, final Session session, final MessageConsumer consumer)
128                        throws JMSException {
129                return proxy.receiveAndExecute(invoker, session, consumer);
130        }
131
132        /**
133         * 
134         */
135        public void initializeProxy() {
136                ProxyFactory factory = new ProxyFactory();
137                for (int i = 0; i < advices.length; i++) {
138                        DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(advices[i]);
139                        NameMatchMethodPointcut pointcut = new NameMatchMethodPointcut();
140                        pointcut.addMethodName("receiveAndExecute");
141                        advisor.setPointcut(pointcut);
142                        factory.addAdvisor(advisor);
143                }
144                factory.setProxyTargetClass(false);
145                factory.addInterface(ContainerDelegate.class);
146                factory.setTarget(delegate);
147                proxy = (ContainerDelegate) factory.getProxy();
148        }
149
150}