001/* 002 * Copyright 2006-2010 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.item.support; 017 018import java.util.Map.Entry; 019 020import org.springframework.batch.item.ExecutionContext; 021import org.springframework.batch.item.ItemReader; 022import org.springframework.batch.item.ItemStream; 023import org.springframework.batch.item.ItemStreamException; 024import org.springframework.batch.item.ItemStreamReader; 025import org.springframework.batch.item.ParseException; 026import org.springframework.batch.item.PeekableItemReader; 027import org.springframework.batch.item.UnexpectedInputException; 028 029/** 030 * <p> 031 * A {@link PeekableItemReader} that allows the user to peek one item ahead. 032 * Repeated calls to {@link #peek()} will return the same item, and this will be 033 * the next item returned from {@link #read()}. 034 * </p> 035 * 036 * <p> 037 * Intentionally <b>not</b> thread-safe: it wouldn't be possible to honour the peek in 038 * multiple threads because only one of the threads that peeked would get that 039 * item in the next call to read. 040 * </p> 041 * 042 * @author Dave Syer 043 * 044 */ 045public class SingleItemPeekableItemReader<T> implements ItemStreamReader<T>, PeekableItemReader<T> { 046 047 private ItemReader<T> delegate; 048 049 private T next; 050 051 private ExecutionContext executionContext = new ExecutionContext(); 052 053 /** 054 * The item reader to use as a delegate. Items are read from the delegate 055 * and passed to the caller in {@link #read()}. 056 * 057 * @param delegate the delegate to set 058 */ 059 public void setDelegate(ItemReader<T> delegate) { 060 this.delegate = delegate; 061 } 062 063 /** 064 * Get the next item from the delegate (whether or not it has already been 065 * peeked at). 066 * 067 * @see ItemReader#read() 068 */ 069 @Override 070 public T read() throws Exception, UnexpectedInputException, ParseException { 071 if (next != null) { 072 T item = next; 073 next = null; 074 // executionContext = new ExecutionContext(); 075 return item; 076 } 077 return delegate.read(); 078 } 079 080 /** 081 * Peek at the next item, ensuring that if the delegate is an 082 * {@link ItemStream} the state is stored for the next call to 083 * {@link #update(ExecutionContext)}. 084 * 085 * @return the next item (or null if there is none). 086 * 087 * @see PeekableItemReader#peek() 088 */ 089 @Override 090 public T peek() throws Exception, UnexpectedInputException, ParseException { 091 if (next == null) { 092 updateDelegate(executionContext); 093 next = delegate.read(); 094 } 095 return next; 096 } 097 098 /** 099 * If the delegate is an {@link ItemStream}, just pass the call on, 100 * otherwise reset the peek cache. 101 * 102 * @throws ItemStreamException if there is a problem 103 * @see ItemStream#close() 104 */ 105 @Override 106 public void close() throws ItemStreamException { 107 next = null; 108 if (delegate instanceof ItemStream) { 109 ((ItemStream) delegate).close(); 110 } 111 executionContext = new ExecutionContext(); 112 } 113 114 /** 115 * If the delegate is an {@link ItemStream}, just pass the call on, 116 * otherwise reset the peek cache. 117 * 118 * @param executionContext the current context 119 * @throws ItemStreamException if there is a problem 120 * @see ItemStream#open(ExecutionContext) 121 */ 122 @Override 123 public void open(ExecutionContext executionContext) throws ItemStreamException { 124 next = null; 125 if (delegate instanceof ItemStream) { 126 ((ItemStream) delegate).open(executionContext); 127 } 128 executionContext = new ExecutionContext(); 129 } 130 131 /** 132 * If there is a cached peek, then retrieve the execution context state from 133 * that point. If there is no peek cached, then call directly to the 134 * delegate. 135 * 136 * @param executionContext the current context 137 * @throws ItemStreamException if there is a problem 138 * @see ItemStream#update(ExecutionContext) 139 */ 140 @Override 141 public void update(ExecutionContext executionContext) throws ItemStreamException { 142 if (next != null) { 143 // Get the last state from the delegate instead of using 144 // current value. 145 for (Entry<String, Object> entry : this.executionContext.entrySet()) { 146 executionContext.put(entry.getKey(), entry.getValue()); 147 } 148 return; 149 } 150 updateDelegate(executionContext); 151 } 152 153 private void updateDelegate(ExecutionContext executionContext) { 154 if (delegate instanceof ItemStream) { 155 ((ItemStream) delegate).update(executionContext); 156 } 157 } 158 159}