001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jca.work; 018 019import javax.resource.spi.work.ExecutionContext; 020import javax.resource.spi.work.Work; 021import javax.resource.spi.work.WorkAdapter; 022import javax.resource.spi.work.WorkCompletedException; 023import javax.resource.spi.work.WorkEvent; 024import javax.resource.spi.work.WorkException; 025import javax.resource.spi.work.WorkListener; 026import javax.resource.spi.work.WorkManager; 027import javax.resource.spi.work.WorkRejectedException; 028 029import org.springframework.core.task.AsyncTaskExecutor; 030import org.springframework.core.task.SimpleAsyncTaskExecutor; 031import org.springframework.core.task.SyncTaskExecutor; 032import org.springframework.core.task.TaskExecutor; 033import org.springframework.core.task.TaskRejectedException; 034import org.springframework.core.task.TaskTimeoutException; 035import org.springframework.lang.Nullable; 036import org.springframework.util.Assert; 037 038/** 039 * Simple JCA 1.7 {@link javax.resource.spi.work.WorkManager} implementation that 040 * delegates to a Spring {@link org.springframework.core.task.TaskExecutor}. 041 * Provides simple task execution including start timeouts, but without support 042 * for a JCA ExecutionContext (i.e. without support for imported transactions). 043 * 044 * <p>Uses a {@link org.springframework.core.task.SyncTaskExecutor} for {@link #doWork} 045 * calls and a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} 046 * for {@link #startWork} and {@link #scheduleWork} calls, by default. 047 * These default task executors can be overridden through configuration. 048 * 049 * <p><b>NOTE: This WorkManager does not provide thread pooling by default!</b> 050 * Specify a {@link org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor} 051 * (or any other thread-pooling TaskExecutor) as "asyncTaskExecutor" in order to 052 * achieve actual thread pooling. 053 * 054 * <p>This WorkManager automatically detects a specified 055 * {@link org.springframework.core.task.AsyncTaskExecutor} implementation 056 * and uses its extended timeout functionality where appropriate. 057 * JCA WorkListeners are fully supported in any case. 058 * 059 * @author Juergen Hoeller 060 * @since 2.0.3 061 * @see #setSyncTaskExecutor 062 * @see #setAsyncTaskExecutor 063 */ 064public class SimpleTaskWorkManager implements WorkManager { 065 066 @Nullable 067 private TaskExecutor syncTaskExecutor = new SyncTaskExecutor(); 068 069 @Nullable 070 private AsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor(); 071 072 073 /** 074 * Specify the TaskExecutor to use for <i>synchronous</i> work execution 075 * (i.e. {@link #doWork} calls). 076 * <p>Default is a {@link org.springframework.core.task.SyncTaskExecutor}. 077 */ 078 public void setSyncTaskExecutor(TaskExecutor syncTaskExecutor) { 079 this.syncTaskExecutor = syncTaskExecutor; 080 } 081 082 /** 083 * Specify the TaskExecutor to use for <i>asynchronous</i> work execution 084 * (i.e. {@link #startWork} and {@link #scheduleWork} calls). 085 * <p>This will typically (but not necessarily) be an 086 * {@link org.springframework.core.task.AsyncTaskExecutor} implementation. 087 * Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}. 088 */ 089 public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) { 090 this.asyncTaskExecutor = asyncTaskExecutor; 091 } 092 093 094 @Override 095 public void doWork(Work work) throws WorkException { 096 doWork(work, WorkManager.INDEFINITE, null, null); 097 } 098 099 @Override 100 public void doWork(Work work, long startTimeout, @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener) 101 throws WorkException { 102 103 Assert.state(this.syncTaskExecutor != null, "No 'syncTaskExecutor' set"); 104 executeWork(this.syncTaskExecutor, work, startTimeout, false, executionContext, workListener); 105 } 106 107 @Override 108 public long startWork(Work work) throws WorkException { 109 return startWork(work, WorkManager.INDEFINITE, null, null); 110 } 111 112 @Override 113 public long startWork(Work work, long startTimeout, @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener) 114 throws WorkException { 115 116 Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set"); 117 return executeWork(this.asyncTaskExecutor, work, startTimeout, true, executionContext, workListener); 118 } 119 120 @Override 121 public void scheduleWork(Work work) throws WorkException { 122 scheduleWork(work, WorkManager.INDEFINITE, null, null); 123 } 124 125 @Override 126 public void scheduleWork(Work work, long startTimeout, @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener) 127 throws WorkException { 128 129 Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set"); 130 executeWork(this.asyncTaskExecutor, work, startTimeout, false, executionContext, workListener); 131 } 132 133 134 /** 135 * Execute the given Work on the specified TaskExecutor. 136 * @param taskExecutor the TaskExecutor to use 137 * @param work the Work to execute 138 * @param startTimeout the time duration within which the Work is supposed to start 139 * @param blockUntilStarted whether to block until the Work has started 140 * @param executionContext the JCA ExecutionContext for the given Work 141 * @param workListener the WorkListener to clal for the given Work 142 * @return the time elapsed from Work acceptance until start of execution 143 * (or -1 if not applicable or not known) 144 * @throws WorkException if the TaskExecutor did not accept the Work 145 */ 146 protected long executeWork(TaskExecutor taskExecutor, Work work, long startTimeout, boolean blockUntilStarted, 147 @Nullable ExecutionContext executionContext, @Nullable WorkListener workListener) throws WorkException { 148 149 if (executionContext != null && executionContext.getXid() != null) { 150 throw new WorkException("SimpleTaskWorkManager does not supported imported XIDs: " + executionContext.getXid()); 151 } 152 WorkListener workListenerToUse = workListener; 153 if (workListenerToUse == null) { 154 workListenerToUse = new WorkAdapter(); 155 } 156 157 boolean isAsync = (taskExecutor instanceof AsyncTaskExecutor); 158 DelegatingWorkAdapter workHandle = new DelegatingWorkAdapter(work, workListenerToUse, !isAsync); 159 try { 160 if (isAsync) { 161 ((AsyncTaskExecutor) taskExecutor).execute(workHandle, startTimeout); 162 } 163 else { 164 taskExecutor.execute(workHandle); 165 } 166 } 167 catch (TaskTimeoutException ex) { 168 WorkException wex = new WorkRejectedException("TaskExecutor rejected Work because of timeout: " + work, ex); 169 wex.setErrorCode(WorkException.START_TIMED_OUT); 170 workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex)); 171 throw wex; 172 } 173 catch (TaskRejectedException ex) { 174 WorkException wex = new WorkRejectedException("TaskExecutor rejected Work: " + work, ex); 175 wex.setErrorCode(WorkException.INTERNAL); 176 workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex)); 177 throw wex; 178 } 179 catch (Throwable ex) { 180 WorkException wex = new WorkException("TaskExecutor failed to execute Work: " + work, ex); 181 wex.setErrorCode(WorkException.INTERNAL); 182 throw wex; 183 } 184 if (isAsync) { 185 workListenerToUse.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null)); 186 } 187 188 if (blockUntilStarted) { 189 long acceptanceTime = System.currentTimeMillis(); 190 synchronized (workHandle.monitor) { 191 try { 192 while (!workHandle.started) { 193 workHandle.monitor.wait(); 194 } 195 } 196 catch (InterruptedException ex) { 197 Thread.currentThread().interrupt(); 198 } 199 } 200 return (System.currentTimeMillis() - acceptanceTime); 201 } 202 else { 203 return WorkManager.UNKNOWN; 204 } 205 } 206 207 208 /** 209 * Work adapter that supports start timeouts and WorkListener callbacks 210 * for a given Work that it delegates to. 211 */ 212 private static class DelegatingWorkAdapter implements Work { 213 214 private final Work work; 215 216 private final WorkListener workListener; 217 218 private final boolean acceptOnExecution; 219 220 public final Object monitor = new Object(); 221 222 public boolean started = false; 223 224 public DelegatingWorkAdapter(Work work, WorkListener workListener, boolean acceptOnExecution) { 225 this.work = work; 226 this.workListener = workListener; 227 this.acceptOnExecution = acceptOnExecution; 228 } 229 230 @Override 231 public void run() { 232 if (this.acceptOnExecution) { 233 this.workListener.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, this.work, null)); 234 } 235 synchronized (this.monitor) { 236 this.started = true; 237 this.monitor.notify(); 238 } 239 this.workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, this.work, null)); 240 try { 241 this.work.run(); 242 } 243 catch (RuntimeException | Error ex) { 244 this.workListener.workCompleted( 245 new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException(ex))); 246 throw ex; 247 } 248 this.workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, null)); 249 } 250 251 @Override 252 public void release() { 253 this.work.release(); 254 } 255 } 256 257}