001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jca.work;
018
019import javax.resource.spi.work.ExecutionContext;
020import javax.resource.spi.work.Work;
021import javax.resource.spi.work.WorkAdapter;
022import javax.resource.spi.work.WorkCompletedException;
023import javax.resource.spi.work.WorkEvent;
024import javax.resource.spi.work.WorkException;
025import javax.resource.spi.work.WorkListener;
026import javax.resource.spi.work.WorkManager;
027import javax.resource.spi.work.WorkRejectedException;
028
029import org.springframework.core.task.AsyncTaskExecutor;
030import org.springframework.core.task.SimpleAsyncTaskExecutor;
031import org.springframework.core.task.SyncTaskExecutor;
032import org.springframework.core.task.TaskExecutor;
033import org.springframework.core.task.TaskRejectedException;
034import org.springframework.core.task.TaskTimeoutException;
035import org.springframework.lang.Nullable;
036import org.springframework.util.Assert;
037
038/**
039 * Simple JCA 1.7 {@link javax.resource.spi.work.WorkManager} implementation that
040 * delegates to a Spring {@link org.springframework.core.task.TaskExecutor}.
041 * Provides simple task execution including start timeouts, but without support
042 * for a JCA ExecutionContext (i.e. without support for imported transactions).
043 *
044 * <p>Uses a {@link org.springframework.core.task.SyncTaskExecutor} for {@link #doWork}
045 * calls and a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
046 * for {@link #startWork} and {@link #scheduleWork} calls, by default.
047 * These default task executors can be overridden through configuration.
048 *
049 * <p><b>NOTE: This WorkManager does not provide thread pooling by default!</b>
050 * Specify a {@link org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor}
051 * (or any other thread-pooling TaskExecutor) as "asyncTaskExecutor" in order to
052 * achieve actual thread pooling.
053 *
054 * <p>This WorkManager automatically detects a specified
055 * {@link org.springframework.core.task.AsyncTaskExecutor} implementation
056 * and uses its extended timeout functionality where appropriate.
057 * JCA WorkListeners are fully supported in any case.
058 *
059 * @author Juergen Hoeller
060 * @since 2.0.3
061 * @see #setSyncTaskExecutor
062 * @see #setAsyncTaskExecutor
063 */
064public class SimpleTaskWorkManager implements WorkManager {
065
066        @Nullable
067        private TaskExecutor syncTaskExecutor = new SyncTaskExecutor();
068
069        @Nullable
070        private AsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
071
072
073        /**
074         * Specify the TaskExecutor to use for <i>synchronous</i> work execution
075         * (i.e. {@link #doWork} calls).
076         * <p>Default is a {@link org.springframework.core.task.SyncTaskExecutor}.
077         */
078        public void setSyncTaskExecutor(TaskExecutor syncTaskExecutor) {
079                this.syncTaskExecutor = syncTaskExecutor;
080        }
081
082        /**
083         * Specify the TaskExecutor to use for <i>asynchronous</i> work execution
084         * (i.e. {@link #startWork} and {@link #scheduleWork} calls).
085         * <p>This will typically (but not necessarily) be an
086         * {@link org.springframework.core.task.AsyncTaskExecutor} implementation.
087         * Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}.
088         */
089        public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
090                this.asyncTaskExecutor = asyncTaskExecutor;
091        }
092
093
094        @Override
095        public void doWork(Work work) throws WorkException {
096                doWork(work, WorkManager.INDEFINITE, null, null);
097        }
098
099        @Override
100        public void doWork(Work work, long startTimeout, @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener)
101                        throws WorkException {
102
103                Assert.state(this.syncTaskExecutor != null, "No 'syncTaskExecutor' set");
104                executeWork(this.syncTaskExecutor, work, startTimeout, false, executionContext, workListener);
105        }
106
107        @Override
108        public long startWork(Work work) throws WorkException {
109                return startWork(work, WorkManager.INDEFINITE, null, null);
110        }
111
112        @Override
113        public long startWork(Work work, long startTimeout, @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener)
114                        throws WorkException {
115
116                Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set");
117                return executeWork(this.asyncTaskExecutor, work, startTimeout, true, executionContext, workListener);
118        }
119
120        @Override
121        public void scheduleWork(Work work) throws WorkException {
122                scheduleWork(work, WorkManager.INDEFINITE, null, null);
123        }
124
125        @Override
126        public void scheduleWork(Work work, long startTimeout, @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener)
127                        throws WorkException {
128
129                Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set");
130                executeWork(this.asyncTaskExecutor, work, startTimeout, false, executionContext, workListener);
131        }
132
133
134        /**
135         * Execute the given Work on the specified TaskExecutor.
136         * @param taskExecutor the TaskExecutor to use
137         * @param work the Work to execute
138         * @param startTimeout the time duration within which the Work is supposed to start
139         * @param blockUntilStarted whether to block until the Work has started
140         * @param executionContext the JCA ExecutionContext for the given Work
141         * @param workListener the WorkListener to clal for the given Work
142         * @return the time elapsed from Work acceptance until start of execution
143         * (or -1 if not applicable or not known)
144         * @throws WorkException if the TaskExecutor did not accept the Work
145         */
146        protected long executeWork(TaskExecutor taskExecutor, Work work, long startTimeout, boolean blockUntilStarted,
147                        @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener) throws WorkException {
148
149                if (executionContext != null && executionContext.getXid() != null) {
150                        throw new WorkException("SimpleTaskWorkManager does not supported imported XIDs: " + executionContext.getXid());
151                }
152                WorkListener workListenerToUse = workListener;
153                if (workListenerToUse == null) {
154                        workListenerToUse = new WorkAdapter();
155                }
156
157                boolean isAsync = (taskExecutor instanceof AsyncTaskExecutor);
158                DelegatingWorkAdapter workHandle = new DelegatingWorkAdapter(work, workListenerToUse, !isAsync);
159                try {
160                        if (isAsync) {
161                                ((AsyncTaskExecutor) taskExecutor).execute(workHandle, startTimeout);
162                        }
163                        else {
164                                taskExecutor.execute(workHandle);
165                        }
166                }
167                catch (TaskTimeoutException ex) {
168                        WorkException wex = new WorkRejectedException("TaskExecutor rejected Work because of timeout: " + work, ex);
169                        wex.setErrorCode(WorkException.START_TIMED_OUT);
170                        workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex));
171                        throw wex;
172                }
173                catch (TaskRejectedException ex) {
174                        WorkException wex = new WorkRejectedException("TaskExecutor rejected Work: " + work, ex);
175                        wex.setErrorCode(WorkException.INTERNAL);
176                        workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex));
177                        throw wex;
178                }
179                catch (Throwable ex) {
180                        WorkException wex = new WorkException("TaskExecutor failed to execute Work: " + work, ex);
181                        wex.setErrorCode(WorkException.INTERNAL);
182                        throw wex;
183                }
184                if (isAsync) {
185                        workListenerToUse.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null));
186                }
187
188                if (blockUntilStarted) {
189                        long acceptanceTime = System.currentTimeMillis();
190                        synchronized (workHandle.monitor) {
191                                try {
192                                        while (!workHandle.started) {
193                                                workHandle.monitor.wait();
194                                        }
195                                }
196                                catch (InterruptedException ex) {
197                                        Thread.currentThread().interrupt();
198                                }
199                        }
200                        return (System.currentTimeMillis() - acceptanceTime);
201                }
202                else {
203                        return WorkManager.UNKNOWN;
204                }
205        }
206
207
208        /**
209         * Work adapter that supports start timeouts and WorkListener callbacks
210         * for a given Work that it delegates to.
211         */
212        private static class DelegatingWorkAdapter implements Work {
213
214                private final Work work;
215
216                private final WorkListener workListener;
217
218                private final boolean acceptOnExecution;
219
220                public final Object monitor = new Object();
221
222                public boolean started = false;
223
224                public DelegatingWorkAdapter(Work work, WorkListener workListener, boolean acceptOnExecution) {
225                        this.work = work;
226                        this.workListener = workListener;
227                        this.acceptOnExecution = acceptOnExecution;
228                }
229
230                @Override
231                public void run() {
232                        if (this.acceptOnExecution) {
233                                this.workListener.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, this.work, null));
234                        }
235                        synchronized (this.monitor) {
236                                this.started = true;
237                                this.monitor.notify();
238                        }
239                        this.workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, this.work, null));
240                        try {
241                                this.work.run();
242                        }
243                        catch (RuntimeException | Error ex) {
244                                this.workListener.workCompleted(
245                                                new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException(ex)));
246                                throw ex;
247                        }
248                        this.workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, null));
249                }
250
251                @Override
252                public void release() {
253                        this.work.release();
254                }
255        }
256
257}