001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.item.database;
017
018import java.util.List;
019
020import org.apache.commons.logging.Log;
021import org.apache.commons.logging.LogFactory;
022import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
023import org.springframework.beans.factory.InitializingBean;
024import org.springframework.util.Assert;
025import org.springframework.util.ClassUtils;
026
027/**
028 * Abstract {@link org.springframework.batch.item.ItemStreamReader} for to extend when
029 * reading database records in a paging fashion.
030 *
031 * <p>
032 * Implementations should execute queries using paged requests of a size
033 * specified in {@link #setPageSize(int)}. Additional pages are requested when
034 * needed as {@link #read()} method is called, returning an object corresponding
035 * to current position.
036 * </p>
037 *
038 * @author Thomas Risberg
039 * @author Dave Syer
040 * @since 2.0
041 */
042public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> 
043        implements InitializingBean {
044
045        protected Log logger = LogFactory.getLog(getClass());
046
047        private volatile boolean initialized = false;
048
049        private int pageSize = 10;
050
051        private volatile int current = 0;
052
053        private volatile int page = 0;
054
055        protected volatile List<T> results;
056
057        private Object lock = new Object();
058
059        public AbstractPagingItemReader() {
060                setName(ClassUtils.getShortName(AbstractPagingItemReader.class));
061        }
062
063        /**
064         * The current page number.
065         * @return the current page
066         */
067        public int getPage() {
068                return page;
069        }
070
071        /**
072         * The page size configured for this reader.
073         * @return the page size
074         */
075        public int getPageSize() {
076                return pageSize;
077        }
078
079        /**
080         * The number of rows to retrieve at a time.
081         *
082         * @param pageSize the number of rows to fetch per page
083         */
084        public void setPageSize(int pageSize) {
085                this.pageSize = pageSize;
086        }
087
088        /**
089         * Check mandatory properties.
090         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
091         */
092        @Override
093        public void afterPropertiesSet() throws Exception {
094                Assert.isTrue(pageSize > 0, "pageSize must be greater than zero");
095        }
096
097        @Override
098        protected T doRead() throws Exception {
099
100                synchronized (lock) {
101
102                        if (results == null || current >= pageSize) {
103
104                                if (logger.isDebugEnabled()) {
105                                        logger.debug("Reading page " + getPage());
106                                }
107
108                                doReadPage();
109                                page++;
110                                if (current >= pageSize) {
111                                        current = 0;
112                                }
113
114                        }
115
116                        int next = current++;
117                        if (next < results.size()) {
118                                return results.get(next);
119                        }
120                        else {
121                                return null;
122                        }
123
124                }
125
126        }
127
128        abstract protected void doReadPage();
129
130        @Override
131        protected void doOpen() throws Exception {
132
133                Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
134                initialized = true;
135
136        }
137
138        @Override
139        protected void doClose() throws Exception {
140
141                synchronized (lock) {
142                        initialized = false;
143                        current = 0;
144                        page = 0;
145                        results = null;
146                }
147
148        }
149
150        @Override
151        protected void jumpToItem(int itemIndex) throws Exception {
152
153                synchronized (lock) {
154                        page = itemIndex / pageSize;
155                        current = itemIndex % pageSize;
156                }
157
158                doJumpToPage(itemIndex);
159
160                if (logger.isDebugEnabled()) {
161                        logger.debug("Jumping to page " + getPage() + " and index " + current);
162                }
163
164        }
165
166        abstract protected void doJumpToPage(int itemIndex);
167
168}