001/* 002 * Copyright 2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.step.builder; 017 018import java.util.ArrayList; 019import java.util.List; 020 021import org.springframework.batch.core.ChunkListener; 022import org.springframework.batch.core.Step; 023import org.springframework.batch.core.StepListener; 024import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext; 025import org.springframework.batch.core.jsr.step.BatchletStep; 026import org.springframework.batch.core.jsr.step.item.JsrChunkProvider; 027import org.springframework.batch.core.jsr.step.item.JsrFaultTolerantChunkProcessor; 028import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; 029import org.springframework.batch.core.step.builder.StepBuilder; 030import org.springframework.batch.core.step.builder.StepBuilderException; 031import org.springframework.batch.core.step.item.ChunkOrientedTasklet; 032import org.springframework.batch.core.step.item.ChunkProcessor; 033import org.springframework.batch.core.step.item.ChunkProvider; 034import org.springframework.batch.core.step.skip.SkipPolicy; 035import org.springframework.batch.core.step.tasklet.TaskletStep; 036import org.springframework.batch.item.ItemStream; 037import org.springframework.batch.repeat.support.RepeatTemplate; 038import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; 039 040/** 041 * A step builder that extends the {@link FaultTolerantStepBuilder} to create JSR-352 042 * specific {@link ChunkProvider} and {@link ChunkProcessor} supporting both the chunking 043 * pattern defined by the spec as well as skip/retry logic. 044 * 045 * @author Michael Minella 046 * @author Chris Schaefer 047 * 048 * @param <I> The input type for the step 049 * @param <O> The output type for the step 050 */ 051public class JsrFaultTolerantStepBuilder<I, O> extends FaultTolerantStepBuilder<I, O> { 052 053 private BatchPropertyContext batchPropertyContext; 054 055 public void setBatchPropertyContext(BatchPropertyContext batchPropertyContext) { 056 this.batchPropertyContext = batchPropertyContext; 057 } 058 059 public JsrFaultTolerantStepBuilder(StepBuilder parent) { 060 super(parent); 061 } 062 063 @Override 064 public FaultTolerantStepBuilder<I, O> faultTolerant() { 065 return this; 066 } 067 068 069 /** 070 * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and 071 * then to {@link #createTasklet()} in subclasses to create the actual tasklet. 072 * 073 * @return a tasklet step fully configured and read to execute 074 */ 075 @Override 076 public TaskletStep build() { 077 registerStepListenerAsSkipListener(); 078 registerAsStreamsAndListeners(getReader(), getProcessor(), getWriter()); 079 080 registerStepListenerAsChunkListener(); 081 082 BatchletStep step = new BatchletStep(getName(), batchPropertyContext); 083 084 super.enhance(step); 085 086 step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0])); 087 088 if (getTransactionAttribute() != null) { 089 step.setTransactionAttribute(getTransactionAttribute()); 090 } 091 092 if (getStepOperations() == null) { 093 094 stepOperations(new RepeatTemplate()); 095 096 if (getTaskExecutor() != null) { 097 TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); 098 repeatTemplate.setTaskExecutor(getTaskExecutor()); 099 repeatTemplate.setThrottleLimit(getThrottleLimit()); 100 stepOperations(repeatTemplate); 101 } 102 103 ((RepeatTemplate) getStepOperations()).setExceptionHandler(getExceptionHandler()); 104 105 } 106 step.setStepOperations(getStepOperations()); 107 step.setTasklet(createTasklet()); 108 109 step.setStreams(getStreams().toArray(new ItemStream[0])); 110 111 try { 112 step.afterPropertiesSet(); 113 } 114 catch (Exception e) { 115 throw new StepBuilderException(e); 116 } 117 118 return step; 119 120 } 121 122 @Override 123 protected ChunkProvider<I> createChunkProvider() { 124 return new JsrChunkProvider<I>(); 125 } 126 127 /** 128 * Provides a JSR-352 specific implementation of a {@link ChunkProcessor} for use 129 * within the {@link ChunkOrientedTasklet} 130 * 131 * @return a JSR-352 implementation of the {@link ChunkProcessor} 132 * @see JsrFaultTolerantChunkProcessor 133 */ 134 @Override 135 protected ChunkProcessor<I> createChunkProcessor() { 136 SkipPolicy skipPolicy = getFatalExceptionAwareProxy(createSkipPolicy()); 137 JsrFaultTolerantChunkProcessor<I, O> chunkProcessor = 138 new JsrFaultTolerantChunkProcessor<I, O>(getReader(), getProcessor(), 139 getWriter(), createChunkOperations(), createRetryOperations()); 140 chunkProcessor.setSkipPolicy(skipPolicy); 141 chunkProcessor.setRollbackClassifier(getRollbackClassifier()); 142 detectStreamInReader(); 143 chunkProcessor.setChunkMonitor(getChunkMonitor()); 144 chunkProcessor.setListeners(getChunkListeners()); 145 146 return chunkProcessor; 147 } 148 149 private List<StepListener> getChunkListeners() { 150 List<StepListener> listeners = new ArrayList<StepListener>(); 151 listeners.addAll(getItemListeners()); 152 listeners.addAll(getSkipListeners()); 153 listeners.addAll(getJsrRetryListeners()); 154 155 return listeners; 156 } 157}