001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.repeat.support;
018
019import org.springframework.batch.repeat.RepeatCallback;
020import org.springframework.batch.repeat.RepeatContext;
021import org.springframework.batch.repeat.RepeatException;
022import org.springframework.batch.repeat.RepeatOperations;
023import org.springframework.batch.repeat.RepeatStatus;
024import org.springframework.core.task.SyncTaskExecutor;
025import org.springframework.core.task.TaskExecutor;
026import org.springframework.util.Assert;
027
028/**
029 * Provides {@link RepeatOperations} support including interceptors that can be
030 * used to modify or monitor the behaviour at run time.<br>
031 * 
032 * This implementation is sufficient to be used to configure transactional
033 * behaviour for each item by making the {@link RepeatCallback} transactional,
034 * or for the whole batch by making the execute method transactional (but only
035 * then if the task executor is synchronous).<br>
036 * 
037 * This class is thread-safe if its collaborators are thread-safe (interceptors,
038 * terminationPolicy, callback). Normally this will be the case, but clients
039 * need to be aware that if the task executor is asynchronous, then the other
040 * collaborators should be also. In particular the {@link RepeatCallback} that
041 * is wrapped in the execute method must be thread-safe - often it is based on
042 * some form of data source, which itself should be both thread-safe and
043 * transactional (multiple threads could be accessing it at any given time, and
044 * each thread would have its own transaction).<br>
045 * 
046 * @author Dave Syer
047 * 
048 */
049public class TaskExecutorRepeatTemplate extends RepeatTemplate {
050
051        /**
052         * Default limit for maximum number of concurrent unfinished results allowed
053         * by the template.
054         * {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)}
055         * .
056         */
057        public static final int DEFAULT_THROTTLE_LIMIT = 4;
058
059        private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
060
061        private TaskExecutor taskExecutor = new SyncTaskExecutor();
062
063        /**
064         * Public setter for the throttle limit. The throttle limit is the largest
065         * number of concurrent tasks that can be executing at one time - if a new
066         * task arrives and the throttle limit is breached we wait for one of the
067         * executing tasks to finish before submitting the new one to the
068         * {@link TaskExecutor}. Default value is {@link #DEFAULT_THROTTLE_LIMIT}.
069         * N.B. when used with a thread pooled {@link TaskExecutor} the thread pool
070         * might prevent the throttle limit actually being reached (so make the core
071         * pool size larger than the throttle limit if possible).
072         * 
073         * @param throttleLimit the throttleLimit to set.
074         */
075        public void setThrottleLimit(int throttleLimit) {
076                this.throttleLimit = throttleLimit;
077        }
078
079        /**
080         * Setter for task executor to be used to run the individual item callbacks.
081         * 
082         * @param taskExecutor a TaskExecutor
083         * @throws IllegalArgumentException if the argument is null
084         */
085        public void setTaskExecutor(TaskExecutor taskExecutor) {
086                Assert.notNull(taskExecutor, "A TaskExecutor is required");
087                this.taskExecutor = taskExecutor;
088        }
089
090        /**
091         * Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The
092         * internal state in this case is a queue of unfinished result holders of
093         * type {@link ResultHolder}. The holder with the return value should not be
094         * on the queue when this method exits. The queue is scoped in the calling
095         * method so there is no need to synchronize access.
096         * 
097         */
098    @Override
099        protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
100                        throws Throwable {
101
102                ExecutingRunnable runnable;
103
104                ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
105
106                do {
107
108                        /*
109                         * Wrap the callback in a runnable that will add its result to the
110                         * queue when it is ready.
111                         */
112                        runnable = new ExecutingRunnable(callback, context, queue);
113
114                        /**
115                         * Tell the runnable that it can expect a result. This could have
116                         * been in-lined with the constructor, but it might block, so it's
117                         * better to do it here, since we have the option (it's a private
118                         * class).
119                         */
120                        runnable.expect();
121
122                        /*
123                         * Start the task possibly concurrently / in the future.
124                         */
125                        taskExecutor.execute(runnable);
126
127                        /*
128                         * Allow termination policy to update its state. This must happen
129                         * immediately before or after the call to the task executor.
130                         */
131                        update(context);
132
133                        /*
134                         * Keep going until we get a result that is finished, or early
135                         * termination...
136                         */
137                } while (queue.isEmpty() && !isComplete(context));
138
139                /*
140                 * N.B. If the queue is empty then take() blocks until a result appears,
141                 * and there must be at least one because we just submitted one to the
142                 * task executor.
143                 */
144                ResultHolder result = queue.take();
145                if (result.getError() != null) {
146                        throw result.getError();
147                }
148                return result.getResult();
149        }
150
151        /**
152         * Wait for all the results to appear on the queue and execute the after
153         * interceptors for each one.
154         * 
155         * @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
156         */
157    @Override
158        protected boolean waitForResults(RepeatInternalState state) {
159
160                ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
161
162                boolean result = true;
163
164                while (queue.isExpecting()) {
165
166                        /*
167                         * Careful that no runnables that are not going to finish ever get
168                         * onto the queue, else this may block forever.
169                         */
170                        ResultHolder future;
171                        try {
172                                future = queue.take();
173                        }
174                        catch (InterruptedException e) {
175                                Thread.currentThread().interrupt();
176                                throw new RepeatException("InterruptedException while waiting for result.");
177                        }
178
179                        if (future.getError() != null) {
180                                state.getThrowables().add(future.getError());
181                                result = false;
182                        }
183                        else {
184                                RepeatStatus status = future.getResult();
185                                result = result && canContinue(status);
186                                executeAfterInterceptors(future.getContext(), status);
187                        }
188
189                }
190
191                Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
192
193                return result;
194        }
195
196    @Override
197        protected RepeatInternalState createInternalState(RepeatContext context) {
198                // Queue of pending results:
199                return new ResultQueueInternalState(throttleLimit);
200        }
201
202        /**
203         * A runnable that puts its result on a queue when it is done.
204         * 
205         * @author Dave Syer
206         * 
207         */
208        private class ExecutingRunnable implements Runnable, ResultHolder {
209
210                private final RepeatCallback callback;
211
212                private final RepeatContext context;
213
214                private final ResultQueue<ResultHolder> queue;
215
216                private volatile RepeatStatus result;
217
218                private volatile Throwable error;
219
220                public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
221
222                        super();
223
224                        this.callback = callback;
225                        this.context = context;
226                        this.queue = queue;
227
228                }
229
230                /**
231                 * Tell the queue to expect a result.
232                 */
233                public void expect() {
234                        try {
235                                queue.expect();
236                        }
237                        catch (InterruptedException e) {
238                                Thread.currentThread().interrupt();
239                                throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
240                        }
241                }
242
243                /**
244                 * Execute the batch callback, and store the result, or any exception
245                 * that is thrown for retrieval later by caller.
246                 * 
247                 * @see java.lang.Runnable#run()
248                 */
249        @Override
250                public void run() {
251                        boolean clearContext = false;
252                        try {
253                                if (RepeatSynchronizationManager.getContext() == null) {
254                                        clearContext = true;
255                                        RepeatSynchronizationManager.register(context);
256                                }
257
258                                if (logger.isDebugEnabled()) {
259                                        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
260                                }
261
262                                result = callback.doInIteration(context);
263
264                        }
265                        catch (Throwable e) {
266                                error = e;
267                        }
268                        finally {
269
270                                if (clearContext) {
271                                        RepeatSynchronizationManager.clear();
272                                }
273
274                                queue.put(this);
275
276                        }
277                }
278
279                /**
280                 * Get the result - never blocks because the queue manages waiting for
281                 * the task to finish.
282                 */
283        @Override
284                public RepeatStatus getResult() {
285                        return result;
286                }
287
288                /**
289                 * Get the error - never blocks because the queue manages waiting for
290                 * the task to finish.
291                 */
292        @Override
293                public Throwable getError() {
294                        return error;
295                }
296
297                /**
298                 * Getter for the context.
299                 */
300        @Override
301                public RepeatContext getContext() {
302                        return this.context;
303                }
304
305        }
306
307        /**
308         * @author Dave Syer
309         * 
310         */
311        private static class ResultQueueInternalState extends RepeatInternalStateSupport {
312
313                private final ResultQueue<ResultHolder> results;
314
315                /**
316                 * @param throttleLimit the throttle limit for the result queue
317                 */
318                public ResultQueueInternalState(int throttleLimit) {
319                        super();
320                        this.results = new ResultHolderResultQueue(throttleLimit);
321                }
322
323                /**
324                 * @return the result queue
325                 */
326                public ResultQueue<ResultHolder> getResultQueue() {
327                        return results;
328                }
329
330        }
331
332}