001/*
002 * Copyright 2012-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.data;
018
019import java.util.Map;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.neo4j.ogm.session.SessionFactory;
024
025import org.springframework.batch.item.ItemReader;
026import org.springframework.beans.factory.InitializingBean;
027import org.springframework.util.Assert;
028import org.springframework.util.StringUtils;
029
030/**
031 * <p>
032 * Restartable {@link ItemReader} that reads objects from the graph database Neo4j
033 * via a paging technique.
034 * </p>
035 *
036 * <p>
037 * It executes cypher queries built from the statement fragments provided to
038 * retrieve the requested data.  The query is executed using paged requests of
039 * a size specified in {@link #setPageSize(int)}.  Additional pages are requested
040 * as needed when the {@link #read()} method is called.  On restart, the reader
041 * will begin again at the same number item it left off at.
042 * </p>
043 *
044 * <p>
045 * Performance is dependent on your Neo4J configuration (embedded or remote) as
046 * well as page size.  Setting a fairly large page size and using a commit
047 * interval that matches the page size should provide better performance.
048 * </p>
049 *
050 * <p>
051 * This implementation is thread-safe between calls to
052 * {@link #open(org.springframework.batch.item.ExecutionContext)}, however you
053 * should set <code>saveState=false</code> if used in a multi-threaded
054 * environment (no restart available).
055 * </p>
056 *
057 * @author Michael Minella
058 * @since 3.07
059 */
060public abstract class AbstractNeo4jItemReader<T> extends
061                AbstractPaginatedDataItemReader<T> implements InitializingBean {
062
063        protected Log logger = LogFactory.getLog(getClass());
064
065        private SessionFactory sessionFactory;
066
067        private String startStatement;
068        private String returnStatement;
069        private String matchStatement;
070        private String whereStatement;
071        private String orderByStatement;
072
073        private Class<T> targetType;
074
075        private Map<String, Object> parameterValues;
076
077        /**
078         * Optional parameters to be used in the cypher query.
079         *
080         * @param parameterValues the parameter values to be used in the cypher query
081         */
082        public void setParameterValues(Map<String, Object> parameterValues) {
083                this.parameterValues = parameterValues;
084        }
085
086        protected final Map<String, Object> getParameterValues() {
087                return this.parameterValues;
088        }
089
090        /**
091         * The start segment of the cypher query.  START is prepended
092         * to the statement provided and should <em>not</em> be
093         * included.
094         *
095         * @param startStatement the start fragment of the cypher query.
096         */
097        public void setStartStatement(String startStatement) {
098                this.startStatement = startStatement;
099        }
100
101        /**
102         * The return statement of the cypher query.  RETURN is prepended
103         * to the statement provided and should <em>not</em> be
104         * included
105         *
106         * @param returnStatement the return fragment of the cypher query.
107         */
108        public void setReturnStatement(String returnStatement) {
109                this.returnStatement = returnStatement;
110        }
111
112        /**
113         * An optional match fragment of the cypher query.  MATCH is
114         * prepended to the statement provided and should <em>not</em>
115         * be included.
116         *
117         * @param matchStatement the match fragment of the cypher query
118         */
119        public void setMatchStatement(String matchStatement) {
120                this.matchStatement = matchStatement;
121        }
122
123        /**
124         * An optional where fragment of the cypher query.  WHERE is
125         * prepended to the statement provided and should <em>not</em>
126         * be included.
127         *
128         * @param whereStatement where fragment of the cypher query
129         */
130        public void setWhereStatement(String whereStatement) {
131                this.whereStatement = whereStatement;
132        }
133
134        /**
135         * A list of properties to order the results by.  This is
136         * required so that subsequent page requests pull back the
137         * segment of results correctly.  ORDER BY is prepended to
138         * the statement provided and should <em>not</em> be included.
139         *
140         * @param orderByStatement order by fragment of the cypher query.
141         */
142        public void setOrderByStatement(String orderByStatement) {
143                this.orderByStatement = orderByStatement;
144        }
145
146        protected SessionFactory getSessionFactory() {
147                return sessionFactory;
148        }
149
150        /**
151         * Establish the session factory for the reader.
152         * @param sessionFactory the factory to use for the reader.
153         */
154        public void setSessionFactory(SessionFactory sessionFactory) {
155                this.sessionFactory = sessionFactory;
156        }
157
158        /**
159         * The object type to be returned from each call to {@link #read()}
160         *
161         * @param targetType the type of object to return.
162         */
163        public void setTargetType(Class<T> targetType) {
164                this.targetType = targetType;
165        }
166
167        protected final Class<T> getTargetType() {
168                return this.targetType;
169        }
170
171        protected String generateLimitCypherQuery() {
172                StringBuilder query = new StringBuilder();
173
174                query.append("START ").append(startStatement);
175                query.append(matchStatement != null ? " MATCH " + matchStatement : "");
176                query.append(whereStatement != null ? " WHERE " + whereStatement : "");
177                query.append(" RETURN ").append(returnStatement);
178                query.append(" ORDER BY ").append(orderByStatement);
179                query.append(" SKIP " + (pageSize * page));
180                query.append(" LIMIT " + pageSize);
181
182                String resultingQuery = query.toString();
183
184                if (logger.isDebugEnabled()) {
185                        logger.debug(resultingQuery);
186                }
187
188                return resultingQuery;
189        }
190
191        /**
192         * Checks mandatory properties
193         *
194         * @see InitializingBean#afterPropertiesSet()
195         */
196        @Override
197        public void afterPropertiesSet() throws Exception {
198                Assert.state(sessionFactory != null,"A SessionFactory is required");
199                Assert.state(targetType != null, "The type to be returned is required");
200                Assert.state(StringUtils.hasText(startStatement), "A START statement is required");
201                Assert.state(StringUtils.hasText(returnStatement), "A RETURN statement is required");
202                Assert.state(StringUtils.hasText(orderByStatement), "A ORDER BY statement is required");
203        }
204}