001package org.springframework.batch.integration.chunk; 002 003import org.apache.commons.logging.Log; 004import org.apache.commons.logging.LogFactory; 005import org.springframework.beans.factory.InitializingBean; 006import org.springframework.integration.core.MessageSource; 007import org.springframework.messaging.Message; 008import org.springframework.messaging.MessageChannel; 009import org.springframework.messaging.support.ChannelInterceptor; 010import org.springframework.messaging.support.ChannelInterceptorAdapter; 011import org.springframework.util.Assert; 012 013 014/** 015 * A {@link ChannelInterceptor} that turns a pollable channel into a "pass-thru channel": if a client calls 016 * <code>receive()</code> on the channel it will delegate to a {@link MessageSource} to pull the message directly from 017 * an external source. This is particularly useful in combination with a message channel in thread scope, in which case 018 * the <code>receive()</code> can join a transaction which was started by the caller. 019 * 020 * @author Dave Syer 021 * 022 */ 023public class MessageSourcePollerInterceptor extends ChannelInterceptorAdapter implements InitializingBean { 024 025 private static Log logger = LogFactory.getLog(MessageSourcePollerInterceptor.class); 026 027 private MessageSource<?> source; 028 029 private MessageChannel channel; 030 031 /** 032 * Convenient default constructor for configuration purposes. 033 */ 034 public MessageSourcePollerInterceptor() { 035 } 036 037 /** 038 * @param source a message source to poll for messages on receive. 039 */ 040 public MessageSourcePollerInterceptor(MessageSource<?> source) { 041 this.source = source; 042 } 043 044 /** 045 * Optional MessageChannel for injecting the message received from the source (defaults to the channel intercepted 046 * in {@link #preReceive(MessageChannel)}). 047 * 048 * @param channel the channel to set 049 */ 050 public void setChannel(MessageChannel channel) { 051 this.channel = channel; 052 } 053 054 /** 055 * Asserts that mandatory properties are set. 056 * @see InitializingBean#afterPropertiesSet() 057 */ 058 public void afterPropertiesSet() throws Exception { 059 Assert.state(source != null, "A MessageSource must be provided"); 060 } 061 062 /** 063 * @param source a message source to poll for messages on receive. 064 */ 065 public void setMessageSource(MessageSource<?> source) { 066 this.source = source; 067 } 068 069 /** 070 * Receive from the {@link MessageSource} and send immediately to the input channel, so that the call that we are 071 * intercepting always a message to receive. 072 * 073 * @see ChannelInterceptorAdapter#preReceive(MessageChannel) 074 */ 075 @Override 076 public boolean preReceive(MessageChannel channel) { 077 Message<?> message = source.receive(); 078 if (message != null) { 079 if (this.channel != null) { 080 channel = this.channel; 081 } 082 channel.send(message); 083 if (logger.isDebugEnabled()) { 084 logger.debug("Sent " + message + " to channel " + channel); 085 } 086 return true; 087 } 088 return true; 089 } 090 091}