001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step.item;
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.springframework.batch.item.ExecutionContext;
021import org.springframework.batch.item.ItemReader;
022import org.springframework.batch.item.ItemStream;
023import org.springframework.batch.item.ItemStreamException;
024import org.springframework.batch.item.ItemStreamSupport;
025import org.springframework.batch.item.support.CompositeItemStream;
026
027/**
028 * Manage the offset data between the last successful commit and updates made to
029 * an input chunk. Only works with single threaded steps because it has to use a
030 * {@link ThreadLocal} to manage the state and coordinate between the caller
031 * and the wrapped {@link ItemStream}.
032 *
033 * @author Dave Syer
034 * @since 2.0
035 */
036public class ChunkMonitor extends ItemStreamSupport {
037
038        private Log logger = LogFactory.getLog(getClass());
039
040        private boolean streamsRegistered = false;
041
042        public static class ChunkMonitorData {
043                public int offset;
044
045                public int chunkSize;
046
047                public ChunkMonitorData(int offset, int chunkSize) {
048                        this.offset = offset;
049                        this.chunkSize = chunkSize;
050                }
051        }
052
053        private static final String OFFSET = "OFFSET";
054
055        private CompositeItemStream stream = new CompositeItemStream();
056
057        private ThreadLocal<ChunkMonitorData> holder = new ThreadLocal<ChunkMonitorData>();
058
059        private ItemReader<?> reader;
060
061        public ChunkMonitor() {
062                this.setExecutionContextName(ChunkMonitor.class.getName());
063        }
064
065        /**
066         * @param stream the stream to set
067         */
068        public void registerItemStream(ItemStream stream) {
069                streamsRegistered = true;
070                this.stream.register(stream);
071        }
072
073        /**
074         * @param reader the reader to set
075         */
076        public void setItemReader(ItemReader<?> reader) {
077                this.reader = reader;
078        }
079
080        public void incrementOffset() {
081                ChunkMonitorData data = getData();
082                data.offset ++;
083                if (data.offset >= data.chunkSize) {
084                        resetOffset();
085                }
086        }
087
088        public int getOffset() {
089                return getData().offset;
090        }
091
092        public void resetOffset() {
093                getData().offset = 0;
094        }
095
096        public void setChunkSize(int chunkSize) {
097                getData().chunkSize = chunkSize;
098                resetOffset();
099        }
100
101        @Override
102        public void close() throws ItemStreamException {
103                super.close();
104                holder.set(null);
105                if (streamsRegistered) {
106                        stream.close();
107                }
108        }
109
110        @Override
111        public void open(ExecutionContext executionContext) throws ItemStreamException {
112                super.open(executionContext);
113                if (streamsRegistered) {
114                        stream.open(executionContext);
115                        ChunkMonitorData data = new ChunkMonitorData(executionContext.getInt(getExecutionContextKey(OFFSET), 0), 0);
116                        holder.set(data);
117                        if (reader == null) {
118                                logger.warn("No ItemReader set (must be concurrent step), so ignoring offset data.");
119                                return;
120                        }
121                        for (int i = 0; i < data.offset; i++) {
122                                try {
123                                        reader.read();
124                                }
125                                catch (Exception e) {
126                                        throw new ItemStreamException("Could not position reader with offset: " + data.offset, e);
127                                }
128                        }
129
130                        resetOffset();
131                }
132        }
133
134        @Override
135        public void update(ExecutionContext executionContext) throws ItemStreamException {
136                super.update(executionContext);
137                if (streamsRegistered) {
138                        ChunkMonitorData data = getData();
139                        if (data.offset == 0) {
140                                // Only call the underlying update method if we are on a chunk
141                                // boundary
142                                stream.update(executionContext);
143                                executionContext.remove(getExecutionContextKey(OFFSET));
144                        }
145                        else {
146                                executionContext.putInt(getExecutionContextKey(OFFSET), data.offset);
147                        }
148                }
149        }
150
151        private ChunkMonitorData getData() {
152                ChunkMonitorData data = holder.get();
153                if (data==null) {
154                        if (streamsRegistered) {
155                                logger.warn("ItemStream was opened in a different thread.  Restart data could be compromised.");
156                        }
157                        data = new ChunkMonitorData(0,0);
158                        holder.set(data);
159                }
160                return data;
161        }
162
163}