001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.integration.chunk;
018
019import java.lang.reflect.Field;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.springframework.batch.core.StepContribution;
024import org.springframework.batch.core.StepExecutionListener;
025import org.springframework.batch.core.step.item.Chunk;
026import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
027import org.springframework.batch.core.step.item.ChunkProcessor;
028import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor;
029import org.springframework.batch.core.step.item.SimpleChunkProcessor;
030import org.springframework.batch.core.step.tasklet.Tasklet;
031import org.springframework.batch.core.step.tasklet.TaskletStep;
032import org.springframework.batch.item.ItemWriter;
033import org.springframework.batch.item.support.PassThroughItemProcessor;
034import org.springframework.beans.factory.FactoryBean;
035import org.springframework.util.Assert;
036import org.springframework.util.ReflectionUtils;
037
038/**
039 * Convenient factory bean for a chunk handler that also converts an existing chunk-oriented step into a remote chunk
040 * master. The idea is to lift the existing chunk processor out of a Step that works locally, and replace it with a one
041 * that writes chunks into a message channel. The existing step hands its business chunk processing responsibility over
042 * to the handler produced by the factory, which then needs to be set up as a worker on the other end of the channel the
043 * chunks are being sent to. Once this chunk handler is installed the application is playing the role of both the master
044 * and the slave listeners in the Remote Chunking pattern for the Step in question.
045 * 
046 * @author Dave Syer
047 * @author Mahmoud Ben Hassine
048 * 
049 */
050public class RemoteChunkHandlerFactoryBean<T> implements FactoryBean<ChunkHandler<T>> {
051
052        private static Log logger = LogFactory.getLog(RemoteChunkHandlerFactoryBean.class);
053
054        private TaskletStep step;
055
056        private ItemWriter<T> chunkWriter;
057
058        private StepContributionSource stepContributionSource;
059
060        /**
061         * The local step that is to be converted to a remote chunk master.
062         * 
063         * @param step the step to set
064         */
065        public void setStep(TaskletStep step) {
066                this.step = step;
067        }
068
069        /**
070         * The item writer to be injected into the step. Its responsibility is to send chunks of items to remote workers.
071         * Usually in practice it will be a {@link ChunkMessageChannelItemWriter}.
072         * 
073         * @param chunkWriter the chunk writer to set
074         */
075        public void setChunkWriter(ItemWriter<T> chunkWriter) {
076                this.chunkWriter = chunkWriter;
077        }
078
079        /**
080         * A source of {@link StepContribution} instances coming back from remote workers.
081         * 
082         * @param stepContributionSource the step contribution source to set (defaults to the chunk writer)
083         */
084        public void setStepContributionSource(StepContributionSource stepContributionSource) {
085                this.stepContributionSource = stepContributionSource;
086        }
087
088        /**
089         * The type of object created by this factory. Returns {@link ChunkHandler} class.
090         * 
091         * @see FactoryBean#getObjectType()
092         */
093        public Class<?> getObjectType() {
094                return ChunkHandler.class;
095        }
096
097        /**
098         * Optimization for the bean factory (always returns true).
099         * 
100         * @see FactoryBean#isSingleton()
101         */
102        public boolean isSingleton() {
103                return true;
104        }
105
106        /**
107         * Builds a {@link ChunkHandler} from the {@link ChunkProcessor} extracted from the {@link #setStep(TaskletStep)
108         * step} provided. Also modifies the step to send chunks to the chunk handler via the
109         * {@link #setChunkWriter(ItemWriter) chunk writer}.
110         * 
111         * @see FactoryBean#getObject()
112         */
113        public ChunkHandler<T> getObject() throws Exception {
114
115                if (stepContributionSource == null) {
116                        Assert.state(chunkWriter instanceof StepContributionSource,
117                                        "The chunk writer must be a StepContributionSource or else the source must be provided explicitly");
118                        stepContributionSource = (StepContributionSource) chunkWriter;
119                }
120
121                Assert.state(step instanceof TaskletStep, "Step [" + step.getName() + "] must be a TaskletStep");
122                if (logger.isDebugEnabled()) {
123                        logger.debug("Converting TaskletStep with name=" + step.getName());
124                }
125
126                Tasklet tasklet = getTasklet(step);
127                Assert.state(tasklet instanceof ChunkOrientedTasklet<?>, "Tasklet must be ChunkOrientedTasklet in step="
128                                + step.getName());
129
130                ChunkProcessor<T> chunkProcessor = getChunkProcessor((ChunkOrientedTasklet<?>) tasklet);
131                Assert.state(chunkProcessor != null, "ChunkProcessor must be accessible in Tasklet in step=" + step.getName());
132
133                ItemWriter<T> itemWriter = getItemWriter(chunkProcessor);
134                Assert.state(!(itemWriter instanceof ChunkMessageChannelItemWriter<?>), "Cannot adapt step [" + step.getName()
135                                + "] because it already has a remote chunk writer.  Use a local writer in the step.");
136
137                replaceChunkProcessor((ChunkOrientedTasklet<?>) tasklet, chunkWriter, stepContributionSource);
138                if (chunkWriter instanceof StepExecutionListener) {
139                        step.registerStepExecutionListener((StepExecutionListener) chunkWriter);
140                }
141
142                ChunkProcessorChunkHandler<T> handler = new ChunkProcessorChunkHandler<T>();
143                setNonBuffering(chunkProcessor);
144                handler.setChunkProcessor(chunkProcessor);
145                // TODO: create step context for the processor in case it has
146                // scope="step" dependencies
147                handler.afterPropertiesSet();
148
149                return handler;
150
151        }
152
153        /**
154         * Overrides the buffering settings in the chunk processor if it is fault tolerant.
155         * @param chunkProcessor the chunk processor that is going to be used in the workers
156         */
157        private void setNonBuffering(ChunkProcessor<T> chunkProcessor) {
158                if (chunkProcessor instanceof FaultTolerantChunkProcessor<?, ?>) {
159                        ((FaultTolerantChunkProcessor<?, ?>) chunkProcessor).setBuffering(false);
160                }
161        }
162
163        /**
164         * Replace the chunk processor in the tasklet provided with one that can act as a master in the Remote Chunking
165         * pattern.
166         * 
167         * @param tasklet a ChunkOrientedTasklet
168         * @param chunkWriter an ItemWriter that can send the chunks to remote workers
169         * @param stepContributionSource a StepContributionSource used to gather results from the workers
170         */
171        private void replaceChunkProcessor(ChunkOrientedTasklet<?> tasklet, ItemWriter<T> chunkWriter,
172                        final StepContributionSource stepContributionSource) {
173                setField(tasklet, "chunkProcessor", new SimpleChunkProcessor<T, T>(new PassThroughItemProcessor<T>(),
174                                chunkWriter) {
175                        @Override
176                        protected void write(StepContribution contribution, Chunk<T> inputs, Chunk<T> outputs) throws Exception {
177                                doWrite(outputs.getItems());
178                                // Do not update the step contribution until the chunks are
179                                // actually processed
180                                updateStepContribution(contribution, stepContributionSource);
181                        }
182                });
183        }
184
185        /**
186         * Update a StepContribution with all the data from a StepContributionSource. The filter and write counts plus the
187         * exit status will be updated to reflect the data in the source.
188         * 
189         * @param contribution the current contribution
190         * @param stepContributionSource a source of StepContributions
191         */
192        protected void updateStepContribution(StepContribution contribution, StepContributionSource stepContributionSource) {
193                for (StepContribution result : stepContributionSource.getStepContributions()) {
194                        contribution.incrementFilterCount(result.getFilterCount());
195                        contribution.incrementWriteCount(result.getWriteCount());
196                        for (int i = 0; i < result.getProcessSkipCount(); i++) {
197                                contribution.incrementProcessSkipCount();
198                        }
199                        for (int i = 0; i < result.getWriteSkipCount(); i++) {
200                                contribution.incrementWriteSkipCount();
201                        }
202                        contribution.setExitStatus(contribution.getExitStatus().and(result.getExitStatus()));
203                }
204        }
205
206        /**
207         * Pull out an item writer from a ChunkProcessor
208         * @param chunkProcessor a ChunkProcessor
209         * @return its ItemWriter
210         */
211        @SuppressWarnings("unchecked")
212        private ItemWriter<T> getItemWriter(ChunkProcessor<T> chunkProcessor) {
213                return (ItemWriter<T>) getField(chunkProcessor, "itemWriter");
214        }
215
216        /**
217         * Pull the ChunkProcessor out of a tasklet.
218         * @param tasklet a ChunkOrientedTasklet
219         * @return the ChunkProcessor
220         */
221        @SuppressWarnings("unchecked")
222        private ChunkProcessor<T> getChunkProcessor(ChunkOrientedTasklet<?> tasklet) {
223                return (ChunkProcessor<T>) getField(tasklet, "chunkProcessor");
224        }
225
226        /**
227         * Pull a Tasklet out of a step.
228         * @param step a TaskletStep
229         * @return the Tasklet
230         */
231        private Tasklet getTasklet(TaskletStep step) {
232                return (Tasklet) getField(step, "tasklet");
233        }
234
235        private static Object getField(Object target, String name) {
236                Assert.notNull(target, "Target object must not be null");
237                Field field = ReflectionUtils.findField(target.getClass(), name);
238                if (field == null) {
239                        if (logger.isDebugEnabled()) {
240                                logger.debug("Could not find field [" + name + "] on target [" + target + "]");
241                        }
242                        return null;
243                }
244
245                if (logger.isDebugEnabled()) {
246                        logger.debug("Getting field [" + name + "] from target [" + target + "]");
247                }
248                ReflectionUtils.makeAccessible(field);
249                return ReflectionUtils.getField(field, target);
250        }
251
252        private static void setField(Object target, String name, Object value) {
253                Assert.notNull(target, "Target object must not be null");
254                Field field = ReflectionUtils.findField(target.getClass(), name);
255                if (field == null) {
256                        throw new IllegalStateException("Could not find field [" + name + "] on target [" + target + "]");
257                }
258
259                if (logger.isDebugEnabled()) {
260                        logger.debug("Getting field [" + name + "] from target [" + target + "]");
261                }
262                ReflectionUtils.makeAccessible(field);
263                ReflectionUtils.setField(field, target, value);
264        }
265
266}