001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.repeat.support; 018 019import org.springframework.batch.repeat.RepeatCallback; 020import org.springframework.batch.repeat.RepeatContext; 021import org.springframework.batch.repeat.RepeatException; 022import org.springframework.batch.repeat.RepeatOperations; 023import org.springframework.batch.repeat.RepeatStatus; 024import org.springframework.core.task.SyncTaskExecutor; 025import org.springframework.core.task.TaskExecutor; 026import org.springframework.util.Assert; 027 028/** 029 * Provides {@link RepeatOperations} support including interceptors that can be 030 * used to modify or monitor the behaviour at run time.<br> 031 * 032 * This implementation is sufficient to be used to configure transactional 033 * behaviour for each item by making the {@link RepeatCallback} transactional, 034 * or for the whole batch by making the execute method transactional (but only 035 * then if the task executor is synchronous).<br> 036 * 037 * This class is thread-safe if its collaborators are thread-safe (interceptors, 038 * terminationPolicy, callback). Normally this will be the case, but clients 039 * need to be aware that if the task executor is asynchronous, then the other 040 * collaborators should be also. In particular the {@link RepeatCallback} that 041 * is wrapped in the execute method must be thread-safe - often it is based on 042 * some form of data source, which itself should be both thread-safe and 043 * transactional (multiple threads could be accessing it at any given time, and 044 * each thread would have its own transaction).<br> 045 * 046 * @author Dave Syer 047 * 048 */ 049public class TaskExecutorRepeatTemplate extends RepeatTemplate { 050 051 /** 052 * Default limit for maximum number of concurrent unfinished results allowed 053 * by the template. 054 * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)} 055 * . 056 */ 057 public static final int DEFAULT_THROTTLE_LIMIT = 4; 058 059 private int throttleLimit = DEFAULT_THROTTLE_LIMIT; 060 061 private TaskExecutor taskExecutor = new SyncTaskExecutor(); 062 063 /** 064 * Public setter for the throttle limit. The throttle limit is the largest 065 * number of concurrent tasks that can be executing at one time - if a new 066 * task arrives and the throttle limit is breached we wait for one of the 067 * executing tasks to finish before submitting the new one to the 068 * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}. 069 * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool 070 * might prevent the throttle limit actually being reached (so make the core 071 * pool size larger than the throttle limit if possible). 072 * 073 * @param throttleLimit the throttleLimit to set. 074 */ 075 public void setThrottleLimit(int throttleLimit) { 076 this.throttleLimit = throttleLimit; 077 } 078 079 /** 080 * Setter for task executor to be used to run the individual item callbacks. 081 * 082 * @param taskExecutor a TaskExecutor 083 * @throws IllegalArgumentException if the argument is null 084 */ 085 public void setTaskExecutor(TaskExecutor taskExecutor) { 086 Assert.notNull(taskExecutor, "A TaskExecutor is required"); 087 this.taskExecutor = taskExecutor; 088 } 089 090 /** 091 * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The 092 * internal state in this case is a queue of unfinished result holders of 093 * type {@link ResultHolder}. The holder with the return value should not be 094 * on the queue when this method exits. The queue is scoped in the calling 095 * method so there is no need to synchronize access. 096 * 097 */ 098 @Override 099 protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) 100 throws Throwable { 101 102 ExecutingRunnable runnable; 103 104 ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); 105 106 do { 107 108 /* 109 * Wrap the callback in a runnable that will add its result to the 110 * queue when it is ready. 111 */ 112 runnable = new ExecutingRunnable(callback, context, queue); 113 114 /** 115 * Tell the runnable that it can expect a result. This could have 116 * been in-lined with the constructor, but it might block, so it's 117 * better to do it here, since we have the option (it's a private 118 * class). 119 */ 120 runnable.expect(); 121 122 /* 123 * Start the task possibly concurrently / in the future. 124 */ 125 taskExecutor.execute(runnable); 126 127 /* 128 * Allow termination policy to update its state. This must happen 129 * immediately before or after the call to the task executor. 130 */ 131 update(context); 132 133 /* 134 * Keep going until we get a result that is finished, or early 135 * termination... 136 */ 137 } while (queue.isEmpty() && !isComplete(context)); 138 139 /* 140 * N.B. If the queue is empty then take() blocks until a result appears, 141 * and there must be at least one because we just submitted one to the 142 * task executor. 143 */ 144 ResultHolder result = queue.take(); 145 if (result.getError() != null) { 146 throw result.getError(); 147 } 148 return result.getResult(); 149 } 150 151 /** 152 * Wait for all the results to appear on the queue and execute the after 153 * interceptors for each one. 154 * 155 * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState) 156 */ 157 @Override 158 protected boolean waitForResults(RepeatInternalState state) { 159 160 ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); 161 162 boolean result = true; 163 164 while (queue.isExpecting()) { 165 166 /* 167 * Careful that no runnables that are not going to finish ever get 168 * onto the queue, else this may block forever. 169 */ 170 ResultHolder future; 171 try { 172 future = queue.take(); 173 } 174 catch (InterruptedException e) { 175 Thread.currentThread().interrupt(); 176 throw new RepeatException("InterruptedException while waiting for result."); 177 } 178 179 if (future.getError() != null) { 180 state.getThrowables().add(future.getError()); 181 result = false; 182 } 183 else { 184 RepeatStatus status = future.getResult(); 185 result = result && canContinue(status); 186 executeAfterInterceptors(future.getContext(), status); 187 } 188 189 } 190 191 Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch."); 192 193 return result; 194 } 195 196 @Override 197 protected RepeatInternalState createInternalState(RepeatContext context) { 198 // Queue of pending results: 199 return new ResultQueueInternalState(throttleLimit); 200 } 201 202 /** 203 * A runnable that puts its result on a queue when it is done. 204 * 205 * @author Dave Syer 206 * 207 */ 208 private class ExecutingRunnable implements Runnable, ResultHolder { 209 210 private final RepeatCallback callback; 211 212 private final RepeatContext context; 213 214 private final ResultQueue<ResultHolder> queue; 215 216 private volatile RepeatStatus result; 217 218 private volatile Throwable error; 219 220 public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) { 221 222 super(); 223 224 this.callback = callback; 225 this.context = context; 226 this.queue = queue; 227 228 } 229 230 /** 231 * Tell the queue to expect a result. 232 */ 233 public void expect() { 234 try { 235 queue.expect(); 236 } 237 catch (InterruptedException e) { 238 Thread.currentThread().interrupt(); 239 throw new RepeatException("InterruptedException waiting for to acquire lock on input."); 240 } 241 } 242 243 /** 244 * Execute the batch callback, and store the result, or any exception 245 * that is thrown for retrieval later by caller. 246 * 247 * @see java.lang.Runnable#run() 248 */ 249 @Override 250 public void run() { 251 boolean clearContext = false; 252 try { 253 if (RepeatSynchronizationManager.getContext() == null) { 254 clearContext = true; 255 RepeatSynchronizationManager.register(context); 256 } 257 258 if (logger.isDebugEnabled()) { 259 logger.debug("Repeat operation about to start at count=" + context.getStartedCount()); 260 } 261 262 result = callback.doInIteration(context); 263 264 } 265 catch (Throwable e) { 266 error = e; 267 } 268 finally { 269 270 if (clearContext) { 271 RepeatSynchronizationManager.clear(); 272 } 273 274 queue.put(this); 275 276 } 277 } 278 279 /** 280 * Get the result - never blocks because the queue manages waiting for 281 * the task to finish. 282 */ 283 @Override 284 public RepeatStatus getResult() { 285 return result; 286 } 287 288 /** 289 * Get the error - never blocks because the queue manages waiting for 290 * the task to finish. 291 */ 292 @Override 293 public Throwable getError() { 294 return error; 295 } 296 297 /** 298 * Getter for the context. 299 */ 300 @Override 301 public RepeatContext getContext() { 302 return this.context; 303 } 304 305 } 306 307 /** 308 * @author Dave Syer 309 * 310 */ 311 private static class ResultQueueInternalState extends RepeatInternalStateSupport { 312 313 private final ResultQueue<ResultHolder> results; 314 315 /** 316 * @param throttleLimit the throttle limit for the result queue 317 */ 318 public ResultQueueInternalState(int throttleLimit) { 319 super(); 320 this.results = new ResultHolderResultQueue(throttleLimit); 321 } 322 323 /** 324 * @return the result queue 325 */ 326 public ResultQueue<ResultHolder> getResultQueue() { 327 return results; 328 } 329 330 } 331 332}