001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.integration.async;
017
018import java.util.concurrent.Callable;
019import java.util.concurrent.Future;
020import java.util.concurrent.FutureTask;
021
022import org.springframework.batch.core.StepExecution;
023import org.springframework.batch.core.scope.context.StepContext;
024import org.springframework.batch.core.scope.context.StepSynchronizationManager;
025import org.springframework.batch.item.ItemProcessor;
026import org.springframework.batch.item.ItemWriter;
027import org.springframework.beans.factory.InitializingBean;
028import org.springframework.core.task.SyncTaskExecutor;
029import org.springframework.core.task.TaskExecutor;
030import org.springframework.util.Assert;
031
032/**
033 * An {@link ItemProcessor} that delegates to a nested processor and in the
034 * background. To allow for background processing the return value from the
035 * processor is a {@link Future} which needs to be unpacked before the item can
036 * be used by a client.
037 *
038 * Because the {@link Future} is typically unwrapped in the {@link ItemWriter},
039 * there are lifecycle and stats limitations (since the framework doesn't know
040 * what the result of the processor is).  While not an exhaustive list, things like
041 * {@link StepExecution#filterCount} will not reflect the number of filtered items
042 * and {@link org.springframework.batch.core.ItemProcessListener#onProcessError(Object, Exception)}
043 * will not be called.
044 * 
045 * @author Dave Syer
046 * 
047 * @param <I> the input object type
048 * @param <O> the output object type (will be wrapped in a Future)
049 * @see AsyncItemWriter
050 */
051public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {
052
053        private ItemProcessor<I, O> delegate;
054
055        private TaskExecutor taskExecutor = new SyncTaskExecutor();
056
057        /**
058         * Check mandatory properties (the {@link #setDelegate(ItemProcessor)}).
059         * 
060         * @see InitializingBean#afterPropertiesSet()
061         */
062        public void afterPropertiesSet() throws Exception {
063                Assert.notNull(delegate, "The delegate must be set.");
064        }
065
066        /**
067         * The {@link ItemProcessor} to use to delegate processing to in a
068         * background thread.
069         * 
070         * @param delegate the {@link ItemProcessor} to use as a delegate
071         */
072        public void setDelegate(ItemProcessor<I, O> delegate) {
073                this.delegate = delegate;
074        }
075
076        /**
077         * The {@link TaskExecutor} to use to allow the item processing to proceed
078         * in the background. Defaults to a {@link SyncTaskExecutor} so no threads
079         * are created unless this is overridden.
080         * 
081         * @param taskExecutor a {@link TaskExecutor}
082         */
083        public void setTaskExecutor(TaskExecutor taskExecutor) {
084                this.taskExecutor = taskExecutor;
085        }
086
087        /**
088         * Transform the input by delegating to the provided item processor. The
089         * return value is wrapped in a {@link Future} so that clients can unpack it
090         * later.
091         * 
092         * @see ItemProcessor#process(Object)
093         */
094        public Future<O> process(final I item) throws Exception {
095                final StepExecution stepExecution = getStepExecution();
096                FutureTask<O> task = new FutureTask<O>(new Callable<O>() {
097                        public O call() throws Exception {
098                                if (stepExecution != null) {
099                                        StepSynchronizationManager.register(stepExecution);
100                                }
101                                try {
102                                        return delegate.process(item);
103                                }
104                                finally {
105                                        if (stepExecution != null) {
106                                                StepSynchronizationManager.close();
107                                        }
108                                }
109                        }
110                });
111                taskExecutor.execute(task);
112                return task;
113        }
114
115        /**
116         * @return the current step execution if there is one
117         */
118        private StepExecution getStepExecution() {
119                StepContext context = StepSynchronizationManager.getContext();
120                if (context==null) {
121                        return null;
122                }
123                StepExecution stepExecution = context.getStepExecution();
124                return stepExecution;
125        }
126
127}