001/*
002 * Copyright 2002-2012 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jca.work;
018
019import javax.resource.spi.work.ExecutionContext;
020import javax.resource.spi.work.Work;
021import javax.resource.spi.work.WorkAdapter;
022import javax.resource.spi.work.WorkCompletedException;
023import javax.resource.spi.work.WorkEvent;
024import javax.resource.spi.work.WorkException;
025import javax.resource.spi.work.WorkListener;
026import javax.resource.spi.work.WorkManager;
027import javax.resource.spi.work.WorkRejectedException;
028
029import org.springframework.core.task.AsyncTaskExecutor;
030import org.springframework.core.task.SimpleAsyncTaskExecutor;
031import org.springframework.core.task.SyncTaskExecutor;
032import org.springframework.core.task.TaskExecutor;
033import org.springframework.core.task.TaskRejectedException;
034import org.springframework.core.task.TaskTimeoutException;
035import org.springframework.util.Assert;
036
037/**
038 * Simple JCA 1.5 {@link javax.resource.spi.work.WorkManager} implementation that
039 * delegates to a Spring {@link org.springframework.core.task.TaskExecutor}.
040 * Provides simple task execution including start timeouts, but without support
041 * for a JCA ExecutionContext (i.e. without support for imported transactions).
042 *
043 * <p>Uses a {@link org.springframework.core.task.SyncTaskExecutor} for {@link #doWork}
044 * calls and a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
045 * for {@link #startWork} and {@link #scheduleWork} calls, by default.
046 * These default task executors can be overridden through configuration.
047 *
048 * <p><b>NOTE: This WorkManager does not provide thread pooling by default!</b>
049 * Specify a {@link org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor}
050 * (or any other thread-pooling TaskExecutor) as "asyncTaskExecutor" in order to
051 * achieve actual thread pooling.
052 *
053 * <p>This WorkManager automatically detects a specified
054 * {@link org.springframework.core.task.AsyncTaskExecutor} implementation
055 * and uses its extended timeout functionality where appropriate.
056 * JCA WorkListeners are fully supported in any case.
057 *
058 * @author Juergen Hoeller
059 * @since 2.0.3
060 * @see #setSyncTaskExecutor
061 * @see #setAsyncTaskExecutor
062 */
063public class SimpleTaskWorkManager implements WorkManager {
064
065        private TaskExecutor syncTaskExecutor = new SyncTaskExecutor();
066
067        private AsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
068
069
070        /**
071         * Specify the TaskExecutor to use for <i>synchronous</i> work execution
072         * (i.e. {@link #doWork} calls).
073         * <p>Default is a {@link org.springframework.core.task.SyncTaskExecutor}.
074         */
075        public void setSyncTaskExecutor(TaskExecutor syncTaskExecutor) {
076                this.syncTaskExecutor = syncTaskExecutor;
077        }
078
079        /**
080         * Specify the TaskExecutor to use for <i>asynchronous</i> work execution
081         * (i.e. {@link #startWork} and {@link #scheduleWork} calls).
082         * <p>This will typically (but not necessarily) be an
083         * {@link org.springframework.core.task.AsyncTaskExecutor} implementation.
084         * Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}.
085         */
086        public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
087                this.asyncTaskExecutor = asyncTaskExecutor;
088        }
089
090
091        @Override
092        public void doWork(Work work) throws WorkException {
093                doWork(work, WorkManager.INDEFINITE, null, null);
094        }
095
096        @Override
097        public void doWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener)
098                        throws WorkException {
099
100                Assert.state(this.syncTaskExecutor != null, "No 'syncTaskExecutor' set");
101                executeWork(this.syncTaskExecutor, work, startTimeout, false, executionContext, workListener);
102        }
103
104        @Override
105        public long startWork(Work work) throws WorkException {
106                return startWork(work, WorkManager.INDEFINITE, null, null);
107        }
108
109        @Override
110        public long startWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener)
111                        throws WorkException {
112
113                Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set");
114                return executeWork(this.asyncTaskExecutor, work, startTimeout, true, executionContext, workListener);
115        }
116
117        @Override
118        public void scheduleWork(Work work) throws WorkException {
119                scheduleWork(work, WorkManager.INDEFINITE, null, null);
120        }
121
122        @Override
123        public void scheduleWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener)
124                        throws WorkException {
125
126                Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set");
127                executeWork(this.asyncTaskExecutor, work, startTimeout, false, executionContext, workListener);
128        }
129
130
131        /**
132         * Execute the given Work on the specified TaskExecutor.
133         * @param taskExecutor the TaskExecutor to use
134         * @param work the Work to execute
135         * @param startTimeout the time duration within which the Work is supposed to start
136         * @param blockUntilStarted whether to block until the Work has started
137         * @param executionContext the JCA ExecutionContext for the given Work
138         * @param workListener the WorkListener to clal for the given Work
139         * @return the time elapsed from Work acceptance until start of execution
140         * (or -1 if not applicable or not known)
141         * @throws WorkException if the TaskExecutor did not accept the Work
142         */
143        protected long executeWork(TaskExecutor taskExecutor, Work work, long startTimeout,
144                        boolean blockUntilStarted, ExecutionContext executionContext, WorkListener workListener)
145                        throws WorkException {
146
147                if (executionContext != null && executionContext.getXid() != null) {
148                        throw new WorkException("SimpleTaskWorkManager does not supported imported XIDs: " + executionContext.getXid());
149                }
150                WorkListener workListenerToUse = workListener;
151                if (workListenerToUse == null) {
152                        workListenerToUse = new WorkAdapter();
153                }
154
155                boolean isAsync = (taskExecutor instanceof AsyncTaskExecutor);
156                DelegatingWorkAdapter workHandle = new DelegatingWorkAdapter(work, workListenerToUse, !isAsync);
157                try {
158                        if (isAsync) {
159                                ((AsyncTaskExecutor) taskExecutor).execute(workHandle, startTimeout);
160                        }
161                        else {
162                                taskExecutor.execute(workHandle);
163                        }
164                }
165                catch (TaskTimeoutException ex) {
166                        WorkException wex = new WorkRejectedException("TaskExecutor rejected Work because of timeout: " + work, ex);
167                        wex.setErrorCode(WorkException.START_TIMED_OUT);
168                        workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex));
169                        throw wex;
170                }
171                catch (TaskRejectedException ex) {
172                        WorkException wex = new WorkRejectedException("TaskExecutor rejected Work: " + work, ex);
173                        wex.setErrorCode(WorkException.INTERNAL);
174                        workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex));
175                        throw wex;
176                }
177                catch (Throwable ex) {
178                        WorkException wex = new WorkException("TaskExecutor failed to execute Work: " + work, ex);
179                        wex.setErrorCode(WorkException.INTERNAL);
180                        throw wex;
181                }
182                if (isAsync) {
183                        workListenerToUse.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null));
184                }
185
186                if (blockUntilStarted) {
187                        long acceptanceTime = System.currentTimeMillis();
188                        synchronized (workHandle.monitor) {
189                                try {
190                                        while (!workHandle.started) {
191                                                workHandle.monitor.wait();
192                                        }
193                                }
194                                catch (InterruptedException ex) {
195                                        Thread.currentThread().interrupt();
196                                }
197                        }
198                        return (System.currentTimeMillis() - acceptanceTime);
199                }
200                else {
201                        return WorkManager.UNKNOWN;
202                }
203        }
204
205
206        /**
207         * Work adapter that supports start timeouts and WorkListener callbacks
208         * for a given Work that it delegates to.
209         */
210        private static class DelegatingWorkAdapter implements Work {
211
212                private final Work work;
213
214                private final WorkListener workListener;
215
216                private final boolean acceptOnExecution;
217
218                public final Object monitor = new Object();
219
220                public boolean started = false;
221
222                public DelegatingWorkAdapter(Work work, WorkListener workListener, boolean acceptOnExecution) {
223                        this.work = work;
224                        this.workListener = workListener;
225                        this.acceptOnExecution = acceptOnExecution;
226                }
227
228                @Override
229                public void run() {
230                        if (this.acceptOnExecution) {
231                                this.workListener.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null));
232                        }
233                        synchronized (this.monitor) {
234                                this.started = true;
235                                this.monitor.notify();
236                        }
237                        this.workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, this.work, null));
238                        try {
239                                this.work.run();
240                        }
241                        catch (RuntimeException ex) {
242                                this.workListener.workCompleted(
243                                                new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException(ex)));
244                                throw ex;
245                        }
246                        catch (Error err) {
247                                this.workListener.workCompleted(
248                                                new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException(err)));
249                                throw err;
250                        }
251                        this.workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, null));
252                }
253
254                @Override
255                public void release() {
256                        this.work.release();
257                }
258        }
259
260}