001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.integration.async; 017 018import java.util.concurrent.Callable; 019import java.util.concurrent.Future; 020import java.util.concurrent.FutureTask; 021 022import org.springframework.batch.core.StepExecution; 023import org.springframework.batch.core.scope.context.StepContext; 024import org.springframework.batch.core.scope.context.StepSynchronizationManager; 025import org.springframework.batch.item.ItemProcessor; 026import org.springframework.batch.item.ItemWriter; 027import org.springframework.beans.factory.InitializingBean; 028import org.springframework.core.task.SyncTaskExecutor; 029import org.springframework.core.task.TaskExecutor; 030import org.springframework.util.Assert; 031 032/** 033 * An {@link ItemProcessor} that delegates to a nested processor and in the 034 * background. To allow for background processing the return value from the 035 * processor is a {@link Future} which needs to be unpacked before the item can 036 * be used by a client. 037 * 038 * Because the {@link Future} is typically unwrapped in the {@link ItemWriter}, 039 * there are lifecycle and stats limitations (since the framework doesn't know 040 * what the result of the processor is). While not an exhaustive list, things like 041 * {@link StepExecution#filterCount} will not reflect the number of filtered items 042 * and {@link org.springframework.batch.core.ItemProcessListener#onProcessError(Object, Exception)} 043 * will not be called. 044 * 045 * @author Dave Syer 046 * 047 * @param <I> the input object type 048 * @param <O> the output object type (will be wrapped in a Future) 049 * @see AsyncItemWriter 050 */ 051public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean { 052 053 private ItemProcessor<I, O> delegate; 054 055 private TaskExecutor taskExecutor = new SyncTaskExecutor(); 056 057 /** 058 * Check mandatory properties (the {@link #setDelegate(ItemProcessor)}). 059 * 060 * @see InitializingBean#afterPropertiesSet() 061 */ 062 public void afterPropertiesSet() throws Exception { 063 Assert.notNull(delegate, "The delegate must be set."); 064 } 065 066 /** 067 * The {@link ItemProcessor} to use to delegate processing to in a 068 * background thread. 069 * 070 * @param delegate the {@link ItemProcessor} to use as a delegate 071 */ 072 public void setDelegate(ItemProcessor<I, O> delegate) { 073 this.delegate = delegate; 074 } 075 076 /** 077 * The {@link TaskExecutor} to use to allow the item processing to proceed 078 * in the background. Defaults to a {@link SyncTaskExecutor} so no threads 079 * are created unless this is overridden. 080 * 081 * @param taskExecutor a {@link TaskExecutor} 082 */ 083 public void setTaskExecutor(TaskExecutor taskExecutor) { 084 this.taskExecutor = taskExecutor; 085 } 086 087 /** 088 * Transform the input by delegating to the provided item processor. The 089 * return value is wrapped in a {@link Future} so that clients can unpack it 090 * later. 091 * 092 * @see ItemProcessor#process(Object) 093 */ 094 public Future<O> process(final I item) throws Exception { 095 final StepExecution stepExecution = getStepExecution(); 096 FutureTask<O> task = new FutureTask<O>(new Callable<O>() { 097 public O call() throws Exception { 098 if (stepExecution != null) { 099 StepSynchronizationManager.register(stepExecution); 100 } 101 try { 102 return delegate.process(item); 103 } 104 finally { 105 if (stepExecution != null) { 106 StepSynchronizationManager.close(); 107 } 108 } 109 } 110 }); 111 taskExecutor.execute(task); 112 return task; 113 } 114 115 /** 116 * @return the current step execution if there is one 117 */ 118 private StepExecution getStepExecution() { 119 StepContext context = StepSynchronizationManager.getContext(); 120 if (context==null) { 121 return null; 122 } 123 StepExecution stepExecution = context.getStepExecution(); 124 return stepExecution; 125 } 126 127}