001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.item.support; 017 018import java.util.ArrayList; 019import java.util.Arrays; 020import java.util.List; 021 022import org.springframework.batch.item.ExecutionContext; 023import org.springframework.batch.item.ItemStream; 024import org.springframework.batch.item.ItemStreamException; 025 026/** 027 * Simple {@link ItemStream} that delegates to a list of other streams. 028 * 029 * @author Dave Syer 030 * 031 */ 032public class CompositeItemStream implements ItemStream { 033 034 private final List<ItemStream> streams = new ArrayList<>(); 035 036 /** 037 * Public setter for the {@link ItemStream}s. 038 * 039 * @param streams array of {@link ItemStream}. 040 */ 041 public void setStreams(ItemStream[] streams) { 042 this.streams.addAll(Arrays.asList(streams)); 043 } 044 045 /** 046 * Register a {@link ItemStream} as one of the interesting providers under 047 * the provided key. 048 * 049 * @param stream an instance of {@link ItemStream} to be added to the list of streams. 050 */ 051 public void register(ItemStream stream) { 052 synchronized (streams) { 053 if (!streams.contains(stream)) { 054 streams.add(stream); 055 } 056 } 057 } 058 059 /** 060 * 061 */ 062 public CompositeItemStream() { 063 super(); 064 } 065 066 /** 067 * Simple aggregate {@link ExecutionContext} provider for the contributions 068 * registered under the given key. 069 * 070 * @see org.springframework.batch.item.ItemStream#update(ExecutionContext) 071 */ 072 @Override 073 public void update(ExecutionContext executionContext) { 074 for (ItemStream itemStream : streams) { 075 itemStream.update(executionContext); 076 } 077 } 078 079 /** 080 * Broadcast the call to close. 081 082 * @throws ItemStreamException thrown if one of the {@link ItemStream}s in 083 * the list fails to close. This is a sequential operation so all itemStreams 084 * in the list after the one that failed to close will remain open. 085 */ 086 @Override 087 public void close() throws ItemStreamException { 088 for (ItemStream itemStream : streams) { 089 itemStream.close(); 090 } 091 } 092 093 /** 094 * Broadcast the call to open. 095 * 096 * @throws ItemStreamException thrown if one of the {@link ItemStream}s in 097 * the list fails to open. This is a sequential operation so all itemStreams 098 * in the list after the one that failed to open will not be opened. 099 */ 100 @Override 101 public void open(ExecutionContext executionContext) throws ItemStreamException { 102 for (ItemStream itemStream : streams) { 103 itemStream.open(executionContext); 104 } 105 } 106 107}