001/*
002 * Copyright 2006-2010 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.item.support;
017
018import java.util.Map.Entry;
019
020import org.springframework.batch.item.ExecutionContext;
021import org.springframework.batch.item.ItemReader;
022import org.springframework.batch.item.ItemStream;
023import org.springframework.batch.item.ItemStreamException;
024import org.springframework.batch.item.ItemStreamReader;
025import org.springframework.batch.item.ParseException;
026import org.springframework.batch.item.PeekableItemReader;
027import org.springframework.batch.item.UnexpectedInputException;
028
029/**
030 * <p>
031 * A {@link PeekableItemReader} that allows the user to peek one item ahead.
032 * Repeated calls to {@link #peek()} will return the same item, and this will be
033 * the next item returned from {@link #read()}.
034 * </p>
035 * 
036 * <p>
037 * Intentionally <b>not</b> thread-safe: it wouldn't be possible to honour the peek in
038 * multiple threads because only one of the threads that peeked would get that
039 * item in the next call to read.
040 * </p>
041 * 
042 * @author Dave Syer
043 * 
044 */
045public class SingleItemPeekableItemReader<T> implements ItemStreamReader<T>, PeekableItemReader<T> {
046
047        private ItemReader<T> delegate;
048
049        private T next;
050
051        private ExecutionContext executionContext = new ExecutionContext();
052
053        /**
054         * The item reader to use as a delegate. Items are read from the delegate
055         * and passed to the caller in {@link #read()}.
056         * 
057         * @param delegate the delegate to set
058         */
059        public void setDelegate(ItemReader<T> delegate) {
060                this.delegate = delegate;
061        }
062
063        /**
064         * Get the next item from the delegate (whether or not it has already been
065         * peeked at).
066         * 
067         * @see ItemReader#read()
068         */
069    @Override
070        public T read() throws Exception, UnexpectedInputException, ParseException {
071                if (next != null) {
072                        T item = next;
073                        next = null;
074                        // executionContext = new ExecutionContext();
075                        return item;
076                }
077                return delegate.read();
078        }
079
080        /**
081         * Peek at the next item, ensuring that if the delegate is an
082         * {@link ItemStream} the state is stored for the next call to
083         * {@link #update(ExecutionContext)}.
084         * 
085         * @return the next item (or null if there is none).
086         * 
087         * @see PeekableItemReader#peek()
088         */
089    @Override
090        public T peek() throws Exception, UnexpectedInputException, ParseException {
091                if (next == null) {
092                        updateDelegate(executionContext);
093                        next = delegate.read();
094                }
095                return next;
096        }
097
098        /**
099         * If the delegate is an {@link ItemStream}, just pass the call on,
100         * otherwise reset the peek cache.
101         * 
102         * @throws ItemStreamException if there is a problem
103         * @see ItemStream#close()
104         */
105    @Override
106        public void close() throws ItemStreamException {
107                next = null;
108                if (delegate instanceof ItemStream) {
109                        ((ItemStream) delegate).close();
110                }
111                executionContext = new ExecutionContext();
112        }
113
114        /**
115         * If the delegate is an {@link ItemStream}, just pass the call on,
116         * otherwise reset the peek cache.
117         * 
118         * @param executionContext the current context
119         * @throws ItemStreamException if there is a problem
120         * @see ItemStream#open(ExecutionContext)
121         */
122    @Override
123        public void open(ExecutionContext executionContext) throws ItemStreamException {
124                next = null;
125                if (delegate instanceof ItemStream) {
126                        ((ItemStream) delegate).open(executionContext);
127                }
128                executionContext = new ExecutionContext();
129        }
130
131        /**
132         * If there is a cached peek, then retrieve the execution context state from
133         * that point. If there is no peek cached, then call directly to the
134         * delegate.
135         * 
136         * @param executionContext the current context
137         * @throws ItemStreamException if there is a problem
138         * @see ItemStream#update(ExecutionContext)
139         */
140    @Override
141        public void update(ExecutionContext executionContext) throws ItemStreamException {
142                if (next != null) {
143                        // Get the last state from the delegate instead of using
144                        // current value.
145                        for (Entry<String, Object> entry : this.executionContext.entrySet()) {
146                                executionContext.put(entry.getKey(), entry.getValue());
147                        }
148                        return;
149                }
150                updateDelegate(executionContext);
151        }
152
153        private void updateDelegate(ExecutionContext executionContext) {
154                if (delegate instanceof ItemStream) {
155                        ((ItemStream) delegate).update(executionContext);
156                }
157        }
158
159}