001/* 002 * Copyright 2002-2012 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jca.work; 018 019import javax.resource.spi.work.ExecutionContext; 020import javax.resource.spi.work.Work; 021import javax.resource.spi.work.WorkAdapter; 022import javax.resource.spi.work.WorkCompletedException; 023import javax.resource.spi.work.WorkEvent; 024import javax.resource.spi.work.WorkException; 025import javax.resource.spi.work.WorkListener; 026import javax.resource.spi.work.WorkManager; 027import javax.resource.spi.work.WorkRejectedException; 028 029import org.springframework.core.task.AsyncTaskExecutor; 030import org.springframework.core.task.SimpleAsyncTaskExecutor; 031import org.springframework.core.task.SyncTaskExecutor; 032import org.springframework.core.task.TaskExecutor; 033import org.springframework.core.task.TaskRejectedException; 034import org.springframework.core.task.TaskTimeoutException; 035import org.springframework.util.Assert; 036 037/** 038 * Simple JCA 1.5 {@link javax.resource.spi.work.WorkManager} implementation that 039 * delegates to a Spring {@link org.springframework.core.task.TaskExecutor}. 040 * Provides simple task execution including start timeouts, but without support 041 * for a JCA ExecutionContext (i.e. without support for imported transactions). 042 * 043 * <p>Uses a {@link org.springframework.core.task.SyncTaskExecutor} for {@link #doWork} 044 * calls and a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} 045 * for {@link #startWork} and {@link #scheduleWork} calls, by default. 046 * These default task executors can be overridden through configuration. 047 * 048 * <p><b>NOTE: This WorkManager does not provide thread pooling by default!</b> 049 * Specify a {@link org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor} 050 * (or any other thread-pooling TaskExecutor) as "asyncTaskExecutor" in order to 051 * achieve actual thread pooling. 052 * 053 * <p>This WorkManager automatically detects a specified 054 * {@link org.springframework.core.task.AsyncTaskExecutor} implementation 055 * and uses its extended timeout functionality where appropriate. 056 * JCA WorkListeners are fully supported in any case. 057 * 058 * @author Juergen Hoeller 059 * @since 2.0.3 060 * @see #setSyncTaskExecutor 061 * @see #setAsyncTaskExecutor 062 */ 063public class SimpleTaskWorkManager implements WorkManager { 064 065 private TaskExecutor syncTaskExecutor = new SyncTaskExecutor(); 066 067 private AsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor(); 068 069 070 /** 071 * Specify the TaskExecutor to use for <i>synchronous</i> work execution 072 * (i.e. {@link #doWork} calls). 073 * <p>Default is a {@link org.springframework.core.task.SyncTaskExecutor}. 074 */ 075 public void setSyncTaskExecutor(TaskExecutor syncTaskExecutor) { 076 this.syncTaskExecutor = syncTaskExecutor; 077 } 078 079 /** 080 * Specify the TaskExecutor to use for <i>asynchronous</i> work execution 081 * (i.e. {@link #startWork} and {@link #scheduleWork} calls). 082 * <p>This will typically (but not necessarily) be an 083 * {@link org.springframework.core.task.AsyncTaskExecutor} implementation. 084 * Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}. 085 */ 086 public void setAsyncTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) { 087 this.asyncTaskExecutor = asyncTaskExecutor; 088 } 089 090 091 @Override 092 public void doWork(Work work) throws WorkException { 093 doWork(work, WorkManager.INDEFINITE, null, null); 094 } 095 096 @Override 097 public void doWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener) 098 throws WorkException { 099 100 Assert.state(this.syncTaskExecutor != null, "No 'syncTaskExecutor' set"); 101 executeWork(this.syncTaskExecutor, work, startTimeout, false, executionContext, workListener); 102 } 103 104 @Override 105 public long startWork(Work work) throws WorkException { 106 return startWork(work, WorkManager.INDEFINITE, null, null); 107 } 108 109 @Override 110 public long startWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener) 111 throws WorkException { 112 113 Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set"); 114 return executeWork(this.asyncTaskExecutor, work, startTimeout, true, executionContext, workListener); 115 } 116 117 @Override 118 public void scheduleWork(Work work) throws WorkException { 119 scheduleWork(work, WorkManager.INDEFINITE, null, null); 120 } 121 122 @Override 123 public void scheduleWork(Work work, long startTimeout, ExecutionContext executionContext, WorkListener workListener) 124 throws WorkException { 125 126 Assert.state(this.asyncTaskExecutor != null, "No 'asyncTaskExecutor' set"); 127 executeWork(this.asyncTaskExecutor, work, startTimeout, false, executionContext, workListener); 128 } 129 130 131 /** 132 * Execute the given Work on the specified TaskExecutor. 133 * @param taskExecutor the TaskExecutor to use 134 * @param work the Work to execute 135 * @param startTimeout the time duration within which the Work is supposed to start 136 * @param blockUntilStarted whether to block until the Work has started 137 * @param executionContext the JCA ExecutionContext for the given Work 138 * @param workListener the WorkListener to clal for the given Work 139 * @return the time elapsed from Work acceptance until start of execution 140 * (or -1 if not applicable or not known) 141 * @throws WorkException if the TaskExecutor did not accept the Work 142 */ 143 protected long executeWork(TaskExecutor taskExecutor, Work work, long startTimeout, 144 boolean blockUntilStarted, ExecutionContext executionContext, WorkListener workListener) 145 throws WorkException { 146 147 if (executionContext != null && executionContext.getXid() != null) { 148 throw new WorkException("SimpleTaskWorkManager does not supported imported XIDs: " + executionContext.getXid()); 149 } 150 WorkListener workListenerToUse = workListener; 151 if (workListenerToUse == null) { 152 workListenerToUse = new WorkAdapter(); 153 } 154 155 boolean isAsync = (taskExecutor instanceof AsyncTaskExecutor); 156 DelegatingWorkAdapter workHandle = new DelegatingWorkAdapter(work, workListenerToUse, !isAsync); 157 try { 158 if (isAsync) { 159 ((AsyncTaskExecutor) taskExecutor).execute(workHandle, startTimeout); 160 } 161 else { 162 taskExecutor.execute(workHandle); 163 } 164 } 165 catch (TaskTimeoutException ex) { 166 WorkException wex = new WorkRejectedException("TaskExecutor rejected Work because of timeout: " + work, ex); 167 wex.setErrorCode(WorkException.START_TIMED_OUT); 168 workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex)); 169 throw wex; 170 } 171 catch (TaskRejectedException ex) { 172 WorkException wex = new WorkRejectedException("TaskExecutor rejected Work: " + work, ex); 173 wex.setErrorCode(WorkException.INTERNAL); 174 workListenerToUse.workRejected(new WorkEvent(this, WorkEvent.WORK_REJECTED, work, wex)); 175 throw wex; 176 } 177 catch (Throwable ex) { 178 WorkException wex = new WorkException("TaskExecutor failed to execute Work: " + work, ex); 179 wex.setErrorCode(WorkException.INTERNAL); 180 throw wex; 181 } 182 if (isAsync) { 183 workListenerToUse.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null)); 184 } 185 186 if (blockUntilStarted) { 187 long acceptanceTime = System.currentTimeMillis(); 188 synchronized (workHandle.monitor) { 189 try { 190 while (!workHandle.started) { 191 workHandle.monitor.wait(); 192 } 193 } 194 catch (InterruptedException ex) { 195 Thread.currentThread().interrupt(); 196 } 197 } 198 return (System.currentTimeMillis() - acceptanceTime); 199 } 200 else { 201 return WorkManager.UNKNOWN; 202 } 203 } 204 205 206 /** 207 * Work adapter that supports start timeouts and WorkListener callbacks 208 * for a given Work that it delegates to. 209 */ 210 private static class DelegatingWorkAdapter implements Work { 211 212 private final Work work; 213 214 private final WorkListener workListener; 215 216 private final boolean acceptOnExecution; 217 218 public final Object monitor = new Object(); 219 220 public boolean started = false; 221 222 public DelegatingWorkAdapter(Work work, WorkListener workListener, boolean acceptOnExecution) { 223 this.work = work; 224 this.workListener = workListener; 225 this.acceptOnExecution = acceptOnExecution; 226 } 227 228 @Override 229 public void run() { 230 if (this.acceptOnExecution) { 231 this.workListener.workAccepted(new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null)); 232 } 233 synchronized (this.monitor) { 234 this.started = true; 235 this.monitor.notify(); 236 } 237 this.workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, this.work, null)); 238 try { 239 this.work.run(); 240 } 241 catch (RuntimeException ex) { 242 this.workListener.workCompleted( 243 new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException(ex))); 244 throw ex; 245 } 246 catch (Error err) { 247 this.workListener.workCompleted( 248 new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, new WorkCompletedException(err))); 249 throw err; 250 } 251 this.workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, this.work, null)); 252 } 253 254 @Override 255 public void release() { 256 this.work.release(); 257 } 258 } 259 260}