001/* 002 * Copyright 2012-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.data; 018 019import java.util.Map; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.neo4j.ogm.session.SessionFactory; 024 025import org.springframework.batch.item.ItemReader; 026import org.springframework.beans.factory.InitializingBean; 027import org.springframework.util.Assert; 028import org.springframework.util.StringUtils; 029 030/** 031 * <p> 032 * Restartable {@link ItemReader} that reads objects from the graph database Neo4j 033 * via a paging technique. 034 * </p> 035 * 036 * <p> 037 * It executes cypher queries built from the statement fragments provided to 038 * retrieve the requested data. The query is executed using paged requests of 039 * a size specified in {@link #setPageSize(int)}. Additional pages are requested 040 * as needed when the {@link #read()} method is called. On restart, the reader 041 * will begin again at the same number item it left off at. 042 * </p> 043 * 044 * <p> 045 * Performance is dependent on your Neo4J configuration (embedded or remote) as 046 * well as page size. Setting a fairly large page size and using a commit 047 * interval that matches the page size should provide better performance. 048 * </p> 049 * 050 * <p> 051 * This implementation is thread-safe between calls to 052 * {@link #open(org.springframework.batch.item.ExecutionContext)}, however you 053 * should set <code>saveState=false</code> if used in a multi-threaded 054 * environment (no restart available). 055 * </p> 056 * 057 * @author Michael Minella 058 * @since 3.07 059 */ 060public abstract class AbstractNeo4jItemReader<T> extends 061 AbstractPaginatedDataItemReader<T> implements InitializingBean { 062 063 protected Log logger = LogFactory.getLog(getClass()); 064 065 private SessionFactory sessionFactory; 066 067 private String startStatement; 068 private String returnStatement; 069 private String matchStatement; 070 private String whereStatement; 071 private String orderByStatement; 072 073 private Class<T> targetType; 074 075 private Map<String, Object> parameterValues; 076 077 /** 078 * Optional parameters to be used in the cypher query. 079 * 080 * @param parameterValues the parameter values to be used in the cypher query 081 */ 082 public void setParameterValues(Map<String, Object> parameterValues) { 083 this.parameterValues = parameterValues; 084 } 085 086 protected final Map<String, Object> getParameterValues() { 087 return this.parameterValues; 088 } 089 090 /** 091 * The start segment of the cypher query. START is prepended 092 * to the statement provided and should <em>not</em> be 093 * included. 094 * 095 * @param startStatement the start fragment of the cypher query. 096 */ 097 public void setStartStatement(String startStatement) { 098 this.startStatement = startStatement; 099 } 100 101 /** 102 * The return statement of the cypher query. RETURN is prepended 103 * to the statement provided and should <em>not</em> be 104 * included 105 * 106 * @param returnStatement the return fragment of the cypher query. 107 */ 108 public void setReturnStatement(String returnStatement) { 109 this.returnStatement = returnStatement; 110 } 111 112 /** 113 * An optional match fragment of the cypher query. MATCH is 114 * prepended to the statement provided and should <em>not</em> 115 * be included. 116 * 117 * @param matchStatement the match fragment of the cypher query 118 */ 119 public void setMatchStatement(String matchStatement) { 120 this.matchStatement = matchStatement; 121 } 122 123 /** 124 * An optional where fragment of the cypher query. WHERE is 125 * prepended to the statement provided and should <em>not</em> 126 * be included. 127 * 128 * @param whereStatement where fragment of the cypher query 129 */ 130 public void setWhereStatement(String whereStatement) { 131 this.whereStatement = whereStatement; 132 } 133 134 /** 135 * A list of properties to order the results by. This is 136 * required so that subsequent page requests pull back the 137 * segment of results correctly. ORDER BY is prepended to 138 * the statement provided and should <em>not</em> be included. 139 * 140 * @param orderByStatement order by fragment of the cypher query. 141 */ 142 public void setOrderByStatement(String orderByStatement) { 143 this.orderByStatement = orderByStatement; 144 } 145 146 protected SessionFactory getSessionFactory() { 147 return sessionFactory; 148 } 149 150 /** 151 * Establish the session factory for the reader. 152 * @param sessionFactory the factory to use for the reader. 153 */ 154 public void setSessionFactory(SessionFactory sessionFactory) { 155 this.sessionFactory = sessionFactory; 156 } 157 158 /** 159 * The object type to be returned from each call to {@link #read()} 160 * 161 * @param targetType the type of object to return. 162 */ 163 public void setTargetType(Class<T> targetType) { 164 this.targetType = targetType; 165 } 166 167 protected final Class<T> getTargetType() { 168 return this.targetType; 169 } 170 171 protected String generateLimitCypherQuery() { 172 StringBuilder query = new StringBuilder(); 173 174 query.append("START ").append(startStatement); 175 query.append(matchStatement != null ? " MATCH " + matchStatement : ""); 176 query.append(whereStatement != null ? " WHERE " + whereStatement : ""); 177 query.append(" RETURN ").append(returnStatement); 178 query.append(" ORDER BY ").append(orderByStatement); 179 query.append(" SKIP " + (pageSize * page)); 180 query.append(" LIMIT " + pageSize); 181 182 String resultingQuery = query.toString(); 183 184 if (logger.isDebugEnabled()) { 185 logger.debug(resultingQuery); 186 } 187 188 return resultingQuery; 189 } 190 191 /** 192 * Checks mandatory properties 193 * 194 * @see InitializingBean#afterPropertiesSet() 195 */ 196 @Override 197 public void afterPropertiesSet() throws Exception { 198 Assert.state(sessionFactory != null,"A SessionFactory is required"); 199 Assert.state(targetType != null, "The type to be returned is required"); 200 Assert.state(StringUtils.hasText(startStatement), "A START statement is required"); 201 Assert.state(StringUtils.hasText(returnStatement), "A RETURN statement is required"); 202 Assert.state(StringUtils.hasText(orderByStatement), "A ORDER BY statement is required"); 203 } 204}