001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.integration.chunk; 018 019import java.lang.reflect.Field; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.springframework.batch.core.StepContribution; 024import org.springframework.batch.core.StepExecutionListener; 025import org.springframework.batch.core.step.item.Chunk; 026import org.springframework.batch.core.step.item.ChunkOrientedTasklet; 027import org.springframework.batch.core.step.item.ChunkProcessor; 028import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor; 029import org.springframework.batch.core.step.item.SimpleChunkProcessor; 030import org.springframework.batch.core.step.tasklet.Tasklet; 031import org.springframework.batch.core.step.tasklet.TaskletStep; 032import org.springframework.batch.item.ItemWriter; 033import org.springframework.batch.item.support.PassThroughItemProcessor; 034import org.springframework.beans.factory.FactoryBean; 035import org.springframework.util.Assert; 036import org.springframework.util.ReflectionUtils; 037 038/** 039 * Convenient factory bean for a chunk handler that also converts an existing chunk-oriented step into a remote chunk 040 * master. The idea is to lift the existing chunk processor out of a Step that works locally, and replace it with a one 041 * that writes chunks into a message channel. The existing step hands its business chunk processing responsibility over 042 * to the handler produced by the factory, which then needs to be set up as a worker on the other end of the channel the 043 * chunks are being sent to. Once this chunk handler is installed the application is playing the role of both the master 044 * and the slave listeners in the Remote Chunking pattern for the Step in question. 045 * 046 * @author Dave Syer 047 * @author Mahmoud Ben Hassine 048 * 049 */ 050public class RemoteChunkHandlerFactoryBean<T> implements FactoryBean<ChunkHandler<T>> { 051 052 private static Log logger = LogFactory.getLog(RemoteChunkHandlerFactoryBean.class); 053 054 private TaskletStep step; 055 056 private ItemWriter<T> chunkWriter; 057 058 private StepContributionSource stepContributionSource; 059 060 /** 061 * The local step that is to be converted to a remote chunk master. 062 * 063 * @param step the step to set 064 */ 065 public void setStep(TaskletStep step) { 066 this.step = step; 067 } 068 069 /** 070 * The item writer to be injected into the step. Its responsibility is to send chunks of items to remote workers. 071 * Usually in practice it will be a {@link ChunkMessageChannelItemWriter}. 072 * 073 * @param chunkWriter the chunk writer to set 074 */ 075 public void setChunkWriter(ItemWriter<T> chunkWriter) { 076 this.chunkWriter = chunkWriter; 077 } 078 079 /** 080 * A source of {@link StepContribution} instances coming back from remote workers. 081 * 082 * @param stepContributionSource the step contribution source to set (defaults to the chunk writer) 083 */ 084 public void setStepContributionSource(StepContributionSource stepContributionSource) { 085 this.stepContributionSource = stepContributionSource; 086 } 087 088 /** 089 * The type of object created by this factory. Returns {@link ChunkHandler} class. 090 * 091 * @see FactoryBean#getObjectType() 092 */ 093 public Class<?> getObjectType() { 094 return ChunkHandler.class; 095 } 096 097 /** 098 * Optimization for the bean factory (always returns true). 099 * 100 * @see FactoryBean#isSingleton() 101 */ 102 public boolean isSingleton() { 103 return true; 104 } 105 106 /** 107 * Builds a {@link ChunkHandler} from the {@link ChunkProcessor} extracted from the {@link #setStep(TaskletStep) 108 * step} provided. Also modifies the step to send chunks to the chunk handler via the 109 * {@link #setChunkWriter(ItemWriter) chunk writer}. 110 * 111 * @see FactoryBean#getObject() 112 */ 113 public ChunkHandler<T> getObject() throws Exception { 114 115 if (stepContributionSource == null) { 116 Assert.state(chunkWriter instanceof StepContributionSource, 117 "The chunk writer must be a StepContributionSource or else the source must be provided explicitly"); 118 stepContributionSource = (StepContributionSource) chunkWriter; 119 } 120 121 Assert.state(step instanceof TaskletStep, "Step [" + step.getName() + "] must be a TaskletStep"); 122 if (logger.isDebugEnabled()) { 123 logger.debug("Converting TaskletStep with name=" + step.getName()); 124 } 125 126 Tasklet tasklet = getTasklet(step); 127 Assert.state(tasklet instanceof ChunkOrientedTasklet<?>, "Tasklet must be ChunkOrientedTasklet in step=" 128 + step.getName()); 129 130 ChunkProcessor<T> chunkProcessor = getChunkProcessor((ChunkOrientedTasklet<?>) tasklet); 131 Assert.state(chunkProcessor != null, "ChunkProcessor must be accessible in Tasklet in step=" + step.getName()); 132 133 ItemWriter<T> itemWriter = getItemWriter(chunkProcessor); 134 Assert.state(!(itemWriter instanceof ChunkMessageChannelItemWriter<?>), "Cannot adapt step [" + step.getName() 135 + "] because it already has a remote chunk writer. Use a local writer in the step."); 136 137 replaceChunkProcessor((ChunkOrientedTasklet<?>) tasklet, chunkWriter, stepContributionSource); 138 if (chunkWriter instanceof StepExecutionListener) { 139 step.registerStepExecutionListener((StepExecutionListener) chunkWriter); 140 } 141 142 ChunkProcessorChunkHandler<T> handler = new ChunkProcessorChunkHandler<T>(); 143 setNonBuffering(chunkProcessor); 144 handler.setChunkProcessor(chunkProcessor); 145 // TODO: create step context for the processor in case it has 146 // scope="step" dependencies 147 handler.afterPropertiesSet(); 148 149 return handler; 150 151 } 152 153 /** 154 * Overrides the buffering settings in the chunk processor if it is fault tolerant. 155 * @param chunkProcessor the chunk processor that is going to be used in the workers 156 */ 157 private void setNonBuffering(ChunkProcessor<T> chunkProcessor) { 158 if (chunkProcessor instanceof FaultTolerantChunkProcessor<?, ?>) { 159 ((FaultTolerantChunkProcessor<?, ?>) chunkProcessor).setBuffering(false); 160 } 161 } 162 163 /** 164 * Replace the chunk processor in the tasklet provided with one that can act as a master in the Remote Chunking 165 * pattern. 166 * 167 * @param tasklet a ChunkOrientedTasklet 168 * @param chunkWriter an ItemWriter that can send the chunks to remote workers 169 * @param stepContributionSource a StepContributionSource used to gather results from the workers 170 */ 171 private void replaceChunkProcessor(ChunkOrientedTasklet<?> tasklet, ItemWriter<T> chunkWriter, 172 final StepContributionSource stepContributionSource) { 173 setField(tasklet, "chunkProcessor", new SimpleChunkProcessor<T, T>(new PassThroughItemProcessor<T>(), 174 chunkWriter) { 175 @Override 176 protected void write(StepContribution contribution, Chunk<T> inputs, Chunk<T> outputs) throws Exception { 177 doWrite(outputs.getItems()); 178 // Do not update the step contribution until the chunks are 179 // actually processed 180 updateStepContribution(contribution, stepContributionSource); 181 } 182 }); 183 } 184 185 /** 186 * Update a StepContribution with all the data from a StepContributionSource. The filter and write counts plus the 187 * exit status will be updated to reflect the data in the source. 188 * 189 * @param contribution the current contribution 190 * @param stepContributionSource a source of StepContributions 191 */ 192 protected void updateStepContribution(StepContribution contribution, StepContributionSource stepContributionSource) { 193 for (StepContribution result : stepContributionSource.getStepContributions()) { 194 contribution.incrementFilterCount(result.getFilterCount()); 195 contribution.incrementWriteCount(result.getWriteCount()); 196 for (int i = 0; i < result.getProcessSkipCount(); i++) { 197 contribution.incrementProcessSkipCount(); 198 } 199 for (int i = 0; i < result.getWriteSkipCount(); i++) { 200 contribution.incrementWriteSkipCount(); 201 } 202 contribution.setExitStatus(contribution.getExitStatus().and(result.getExitStatus())); 203 } 204 } 205 206 /** 207 * Pull out an item writer from a ChunkProcessor 208 * @param chunkProcessor a ChunkProcessor 209 * @return its ItemWriter 210 */ 211 @SuppressWarnings("unchecked") 212 private ItemWriter<T> getItemWriter(ChunkProcessor<T> chunkProcessor) { 213 return (ItemWriter<T>) getField(chunkProcessor, "itemWriter"); 214 } 215 216 /** 217 * Pull the ChunkProcessor out of a tasklet. 218 * @param tasklet a ChunkOrientedTasklet 219 * @return the ChunkProcessor 220 */ 221 @SuppressWarnings("unchecked") 222 private ChunkProcessor<T> getChunkProcessor(ChunkOrientedTasklet<?> tasklet) { 223 return (ChunkProcessor<T>) getField(tasklet, "chunkProcessor"); 224 } 225 226 /** 227 * Pull a Tasklet out of a step. 228 * @param step a TaskletStep 229 * @return the Tasklet 230 */ 231 private Tasklet getTasklet(TaskletStep step) { 232 return (Tasklet) getField(step, "tasklet"); 233 } 234 235 private static Object getField(Object target, String name) { 236 Assert.notNull(target, "Target object must not be null"); 237 Field field = ReflectionUtils.findField(target.getClass(), name); 238 if (field == null) { 239 if (logger.isDebugEnabled()) { 240 logger.debug("Could not find field [" + name + "] on target [" + target + "]"); 241 } 242 return null; 243 } 244 245 if (logger.isDebugEnabled()) { 246 logger.debug("Getting field [" + name + "] from target [" + target + "]"); 247 } 248 ReflectionUtils.makeAccessible(field); 249 return ReflectionUtils.getField(field, target); 250 } 251 252 private static void setField(Object target, String name, Object value) { 253 Assert.notNull(target, "Target object must not be null"); 254 Field field = ReflectionUtils.findField(target.getClass(), name); 255 if (field == null) { 256 throw new IllegalStateException("Could not find field [" + name + "] on target [" + target + "]"); 257 } 258 259 if (logger.isDebugEnabled()) { 260 logger.debug("Getting field [" + name + "] from target [" + target + "]"); 261 } 262 ReflectionUtils.makeAccessible(field); 263 ReflectionUtils.setField(field, target, value); 264 } 265 266}