001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.integration.async;
017
018import java.util.ArrayList;
019import java.util.List;
020import java.util.concurrent.ExecutionException;
021import java.util.concurrent.Future;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026import org.springframework.batch.item.ExecutionContext;
027import org.springframework.batch.item.ItemStream;
028import org.springframework.batch.item.ItemStreamException;
029import org.springframework.batch.item.ItemStreamWriter;
030import org.springframework.batch.item.ItemWriter;
031import org.springframework.beans.factory.InitializingBean;
032import org.springframework.util.Assert;
033
034public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {
035
036        private static final Log logger = LogFactory.getLog(AsyncItemWriter.class);
037
038        private ItemWriter<T> delegate;
039
040        public void afterPropertiesSet() throws Exception {
041                Assert.notNull(delegate, "A delegate ItemWriter must be provided.");
042        }
043
044        /**
045         * @param delegate ItemWriter that does the actual writing of the Future results
046         */
047        public void setDelegate(ItemWriter<T> delegate) {
048                this.delegate = delegate;
049        }
050
051        /**
052         * In the processing of the {@link java.util.concurrent.Future}s passed, nulls are <em>not</em> passed to the
053         * delegate since they are considered filtered out by the {@link org.springframework.batch.integration.async.AsyncItemProcessor}'s
054         * delegated {@link org.springframework.batch.item.ItemProcessor}.  If the unwrapping
055         * of the {@link Future} results in an {@link ExecutionException}, that will be
056         * unwrapped and the cause will be thrown.
057         *
058         * @param items {@link java.util.concurrent.Future}s to be unwrapped and passed to the delegate
059         * @throws Exception The exception returned by the Future if one was thrown
060         */
061        public void write(List<? extends Future<T>> items) throws Exception {
062                List<T> list = new ArrayList<T>();
063                for (Future<T> future : items) {
064                        try {
065                                T item = future.get();
066
067                                if(item != null) {
068                                        list.add(future.get());
069                                }
070                        }
071                        catch (ExecutionException e) {
072                                Throwable cause = e.getCause();
073
074                                if(cause != null && cause instanceof Exception) {
075                                        logger.debug("An exception was thrown while processing an item", e);
076
077                                        throw (Exception) cause;
078                                }
079                                else {
080                                        throw e;
081                                }
082                        }
083                }
084                
085                delegate.write(list);
086        }
087
088        @Override
089        public void open(ExecutionContext executionContext) throws ItemStreamException {
090                if (delegate instanceof ItemStream) {
091                        ((ItemStream) delegate).open(executionContext);
092                }
093        }
094
095        @Override
096        public void update(ExecutionContext executionContext) throws ItemStreamException {
097                if (delegate instanceof ItemStream) {
098                        ((ItemStream) delegate).update(executionContext);
099                }
100        }
101
102        @Override
103        public void close() throws ItemStreamException {
104                if (delegate instanceof ItemStream) {
105                        ((ItemStream) delegate).close();
106                }
107        }
108}