001package org.springframework.batch.integration.chunk;
002
003import org.apache.commons.logging.Log;
004import org.apache.commons.logging.LogFactory;
005import org.springframework.beans.factory.InitializingBean;
006import org.springframework.integration.core.MessageSource;
007import org.springframework.messaging.Message;
008import org.springframework.messaging.MessageChannel;
009import org.springframework.messaging.support.ChannelInterceptor;
010import org.springframework.messaging.support.ChannelInterceptorAdapter;
011import org.springframework.util.Assert;
012
013
014/**
015 * A {@link ChannelInterceptor} that turns a pollable channel into a "pass-thru channel": if a client calls
016 * <code>receive()</code> on the channel it will delegate to a {@link MessageSource} to pull the message directly from
017 * an external source. This is particularly useful in combination with a message channel in thread scope, in which case
018 * the <code>receive()</code> can join a transaction which was started by the caller.
019 *
020 * @author Dave Syer
021 *
022 */
023public class MessageSourcePollerInterceptor extends ChannelInterceptorAdapter implements InitializingBean {
024
025        private static Log logger = LogFactory.getLog(MessageSourcePollerInterceptor.class);
026
027        private MessageSource<?> source;
028
029        private MessageChannel channel;
030
031        /**
032         * Convenient default constructor for configuration purposes.
033         */
034        public MessageSourcePollerInterceptor() {
035        }
036
037        /**
038         * @param source a message source to poll for messages on receive.
039         */
040        public MessageSourcePollerInterceptor(MessageSource<?> source) {
041                this.source = source;
042        }
043
044        /**
045         * Optional MessageChannel for injecting the message received from the source (defaults to the channel intercepted
046         * in {@link #preReceive(MessageChannel)}).
047         *
048         * @param channel the channel to set
049         */
050        public void setChannel(MessageChannel channel) {
051                this.channel = channel;
052        }
053
054        /**
055         * Asserts that mandatory properties are set.
056         * @see InitializingBean#afterPropertiesSet()
057         */
058        public void afterPropertiesSet() throws Exception {
059                Assert.state(source != null, "A MessageSource must be provided");
060        }
061
062        /**
063         * @param source a message source to poll for messages on receive.
064         */
065        public void setMessageSource(MessageSource<?> source) {
066                this.source = source;
067        }
068
069        /**
070         * Receive from the {@link MessageSource} and send immediately to the input channel, so that the call that we are
071         * intercepting always a message to receive.
072         *
073         * @see ChannelInterceptorAdapter#preReceive(MessageChannel)
074         */
075        @Override
076        public boolean preReceive(MessageChannel channel) {
077                Message<?> message = source.receive();
078                if (message != null) {
079                        if (this.channel != null) {
080                                channel = this.channel;
081                        }
082                        channel.send(message);
083                        if (logger.isDebugEnabled()) {
084                                logger.debug("Sent " + message + " to channel " + channel);
085                        }
086                        return true;
087                }
088                return true;
089        }
090
091}