001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.integration.async; 017 018import java.util.ArrayList; 019import java.util.List; 020import java.util.concurrent.ExecutionException; 021import java.util.concurrent.Future; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026import org.springframework.batch.item.ExecutionContext; 027import org.springframework.batch.item.ItemStream; 028import org.springframework.batch.item.ItemStreamException; 029import org.springframework.batch.item.ItemStreamWriter; 030import org.springframework.batch.item.ItemWriter; 031import org.springframework.beans.factory.InitializingBean; 032import org.springframework.util.Assert; 033 034public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean { 035 036 private static final Log logger = LogFactory.getLog(AsyncItemWriter.class); 037 038 private ItemWriter<T> delegate; 039 040 public void afterPropertiesSet() throws Exception { 041 Assert.notNull(delegate, "A delegate ItemWriter must be provided."); 042 } 043 044 /** 045 * @param delegate ItemWriter that does the actual writing of the Future results 046 */ 047 public void setDelegate(ItemWriter<T> delegate) { 048 this.delegate = delegate; 049 } 050 051 /** 052 * In the processing of the {@link java.util.concurrent.Future}s passed, nulls are <em>not</em> passed to the 053 * delegate since they are considered filtered out by the {@link org.springframework.batch.integration.async.AsyncItemProcessor}'s 054 * delegated {@link org.springframework.batch.item.ItemProcessor}. If the unwrapping 055 * of the {@link Future} results in an {@link ExecutionException}, that will be 056 * unwrapped and the cause will be thrown. 057 * 058 * @param items {@link java.util.concurrent.Future}s to be unwrapped and passed to the delegate 059 * @throws Exception The exception returned by the Future if one was thrown 060 */ 061 public void write(List<? extends Future<T>> items) throws Exception { 062 List<T> list = new ArrayList<T>(); 063 for (Future<T> future : items) { 064 try { 065 T item = future.get(); 066 067 if(item != null) { 068 list.add(future.get()); 069 } 070 } 071 catch (ExecutionException e) { 072 Throwable cause = e.getCause(); 073 074 if(cause != null && cause instanceof Exception) { 075 logger.debug("An exception was thrown while processing an item", e); 076 077 throw (Exception) cause; 078 } 079 else { 080 throw e; 081 } 082 } 083 } 084 085 delegate.write(list); 086 } 087 088 @Override 089 public void open(ExecutionContext executionContext) throws ItemStreamException { 090 if (delegate instanceof ItemStream) { 091 ((ItemStream) delegate).open(executionContext); 092 } 093 } 094 095 @Override 096 public void update(ExecutionContext executionContext) throws ItemStreamException { 097 if (delegate instanceof ItemStream) { 098 ((ItemStream) delegate).update(executionContext); 099 } 100 } 101 102 @Override 103 public void close() throws ItemStreamException { 104 if (delegate instanceof ItemStream) { 105 ((ItemStream) delegate).close(); 106 } 107 } 108}