001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.step.tasklet; 017 018import org.apache.commons.logging.Log; 019import org.apache.commons.logging.LogFactory; 020import org.springframework.batch.core.BatchStatus; 021import org.springframework.batch.core.ChunkListener; 022import org.springframework.batch.core.JobInterruptedException; 023import org.springframework.batch.core.StepContribution; 024import org.springframework.batch.core.StepExecution; 025import org.springframework.batch.core.StepExecutionListener; 026import org.springframework.batch.core.listener.CompositeChunkListener; 027import org.springframework.batch.core.repository.JobRepository; 028import org.springframework.batch.core.scope.context.ChunkContext; 029import org.springframework.batch.core.scope.context.StepContextRepeatCallback; 030import org.springframework.batch.core.step.AbstractStep; 031import org.springframework.batch.core.step.FatalStepExecutionException; 032import org.springframework.batch.core.step.StepInterruptionPolicy; 033import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; 034import org.springframework.batch.item.ExecutionContext; 035import org.springframework.batch.item.ItemReader; 036import org.springframework.batch.item.ItemStream; 037import org.springframework.batch.item.ItemWriter; 038import org.springframework.batch.item.support.CompositeItemStream; 039import org.springframework.batch.repeat.RepeatContext; 040import org.springframework.batch.repeat.RepeatOperations; 041import org.springframework.batch.repeat.RepeatStatus; 042import org.springframework.batch.repeat.support.RepeatTemplate; 043import org.springframework.transaction.PlatformTransactionManager; 044import org.springframework.transaction.TransactionStatus; 045import org.springframework.transaction.interceptor.DefaultTransactionAttribute; 046import org.springframework.transaction.interceptor.TransactionAttribute; 047import org.springframework.transaction.support.TransactionCallback; 048import org.springframework.transaction.support.TransactionSynchronization; 049import org.springframework.transaction.support.TransactionSynchronizationAdapter; 050import org.springframework.transaction.support.TransactionSynchronizationManager; 051import org.springframework.transaction.support.TransactionTemplate; 052import org.springframework.util.Assert; 053 054import java.util.concurrent.Semaphore; 055 056/** 057 * Simple implementation of executing the step as a call to a {@link Tasklet}, 058 * possibly repeated, and each call surrounded by a transaction. The structure 059 * is therefore that of a loop with transaction boundary inside the loop. The 060 * loop is controlled by the step operations ( 061 * {@link #setStepOperations(RepeatOperations)}).<br> 062 * <br> 063 * 064 * Clients can use interceptors in the step operations to intercept or listen to 065 * the iteration on a step-wide basis, for instance to get a callback when the 066 * step is complete. Those that want callbacks at the level of an individual 067 * tasks, can specify interceptors for the chunk operations. 068 * 069 * @author Dave Syer 070 * @author Lucas Ward 071 * @author Ben Hale 072 * @author Robert Kasanicky 073 * @author Michael Minella 074 * @author Will Schipp 075 * @author Mahmoud Ben Hassine 076 */ 077@SuppressWarnings("serial") 078public class TaskletStep extends AbstractStep { 079 080 private static final Log logger = LogFactory.getLog(TaskletStep.class); 081 082 private RepeatOperations stepOperations = new RepeatTemplate(); 083 084 private CompositeChunkListener chunkListener = new CompositeChunkListener(); 085 086 // default to checking current thread for interruption. 087 private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); 088 089 private CompositeItemStream stream = new CompositeItemStream(); 090 091 private PlatformTransactionManager transactionManager; 092 093 private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() { 094 095 @Override 096 public boolean rollbackOn(Throwable ex) { 097 return true; 098 } 099 100 }; 101 102 private Tasklet tasklet; 103 104 public static final String TASKLET_TYPE_KEY = "batch.taskletType"; 105 106 /** 107 * Default constructor. 108 */ 109 public TaskletStep() { 110 this(null); 111 } 112 113 /** 114 * @param name the name for the {@link TaskletStep} 115 */ 116 public TaskletStep(String name) { 117 super(name); 118 } 119 120 /* 121 * (non-Javadoc) 122 * 123 * @see 124 * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet() 125 */ 126 @Override 127 public void afterPropertiesSet() throws Exception { 128 super.afterPropertiesSet(); 129 Assert.state(transactionManager != null, "A transaction manager must be provided"); 130 } 131 132 /** 133 * Public setter for the {@link PlatformTransactionManager}. 134 * 135 * @param transactionManager the transaction manager to set 136 */ 137 public void setTransactionManager(PlatformTransactionManager transactionManager) { 138 this.transactionManager = transactionManager; 139 } 140 141 /** 142 * Public setter for the {@link TransactionAttribute}. 143 * 144 * @param transactionAttribute the {@link TransactionAttribute} to set 145 */ 146 public void setTransactionAttribute(TransactionAttribute transactionAttribute) { 147 this.transactionAttribute = transactionAttribute; 148 } 149 150 /** 151 * Public setter for the {@link Tasklet}. 152 * 153 * @param tasklet the {@link Tasklet} to set 154 */ 155 public void setTasklet(Tasklet tasklet) { 156 this.tasklet = tasklet; 157 if (tasklet instanceof StepExecutionListener) { 158 registerStepExecutionListener((StepExecutionListener) tasklet); 159 } 160 } 161 162 /** 163 * Register a chunk listener for callbacks at the appropriate stages in a 164 * step execution. 165 * 166 * @param listener a {@link ChunkListener} 167 */ 168 public void registerChunkListener(ChunkListener listener) { 169 this.chunkListener.register(listener); 170 } 171 172 /** 173 * Register each of the objects as listeners. 174 * 175 * @param listeners an array of listener objects of known types. 176 */ 177 public void setChunkListeners(ChunkListener[] listeners) { 178 for (int i = 0; i < listeners.length; i++) { 179 registerChunkListener(listeners[i]); 180 } 181 } 182 183 /** 184 * Register each of the streams for callbacks at the appropriate time in the 185 * step. The {@link ItemReader} and {@link ItemWriter} are automatically 186 * registered, but it doesn't hurt to also register them here. Injected 187 * dependencies of the reader and writer are not automatically registered, 188 * so if you implement {@link ItemWriter} using delegation to another object 189 * which itself is a {@link ItemStream}, you need to register the delegate 190 * here. 191 * 192 * @param streams an array of {@link ItemStream} objects. 193 */ 194 public void setStreams(ItemStream[] streams) { 195 for (int i = 0; i < streams.length; i++) { 196 registerStream(streams[i]); 197 } 198 } 199 200 /** 201 * Register a single {@link ItemStream} for callbacks to the stream 202 * interface. 203 * 204 * @param stream instance of {@link ItemStream} 205 */ 206 public void registerStream(ItemStream stream) { 207 this.stream.register(stream); 208 } 209 210 /** 211 * The {@link RepeatOperations} to use for the outer loop of the batch 212 * processing. Should be set up by the caller through a factory. Defaults to 213 * a plain {@link RepeatTemplate}. 214 * 215 * @param stepOperations a {@link RepeatOperations} instance. 216 */ 217 public void setStepOperations(RepeatOperations stepOperations) { 218 this.stepOperations = stepOperations; 219 } 220 221 /** 222 * Setter for the {@link StepInterruptionPolicy}. The policy is used to 223 * check whether an external request has been made to interrupt the job 224 * execution. 225 * 226 * @param interruptionPolicy a {@link StepInterruptionPolicy} 227 */ 228 public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) { 229 this.interruptionPolicy = interruptionPolicy; 230 } 231 232 /** 233 * Process the step and update its context so that progress can be monitored 234 * by the caller. The step is broken down into chunks, each one executing in 235 * a transaction. The step and its execution and execution context are all 236 * given an up to date {@link BatchStatus}, and the {@link JobRepository} is 237 * used to store the result. Various reporting information are also added to 238 * the current context governing the step execution, which would normally be 239 * available to the caller through the step's {@link ExecutionContext}.<br> 240 * 241 * @throws JobInterruptedException if the step or a chunk is interrupted 242 * @throws RuntimeException if there is an exception during a chunk 243 * execution 244 * 245 */ 246 @Override 247 protected void doExecute(StepExecution stepExecution) throws Exception { 248 stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName()); 249 stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); 250 251 stream.update(stepExecution.getExecutionContext()); 252 getJobRepository().updateExecutionContext(stepExecution); 253 254 // Shared semaphore per step execution, so other step executions can run 255 // in parallel without needing the lock 256 final Semaphore semaphore = createSemaphore(); 257 258 stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { 259 260 @Override 261 public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) 262 throws Exception { 263 264 StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); 265 266 // Before starting a new transaction, check for 267 // interruption. 268 interruptionPolicy.checkInterrupted(stepExecution); 269 270 RepeatStatus result; 271 try { 272 result = new TransactionTemplate(transactionManager, transactionAttribute) 273 .execute(new ChunkTransactionCallback(chunkContext, semaphore)); 274 } 275 catch (UncheckedTransactionException e) { 276 // Allow checked exceptions to be thrown inside callback 277 throw (Exception) e.getCause(); 278 } 279 280 chunkListener.afterChunk(chunkContext); 281 282 // Check for interruption after transaction as well, so that 283 // the interrupted exception is correctly propagated up to 284 // caller 285 interruptionPolicy.checkInterrupted(stepExecution); 286 287 return result == null ? RepeatStatus.FINISHED : result; 288 } 289 290 }); 291 292 } 293 294 /** 295 * Extension point mainly for test purposes so that the behaviour of the 296 * lock can be manipulated to simulate various pathologies. 297 * 298 * @return a semaphore for locking access to the JobRepository 299 */ 300 protected Semaphore createSemaphore() { 301 return new Semaphore(1); 302 } 303 304 @Override 305 protected void close(ExecutionContext ctx) throws Exception { 306 stream.close(); 307 } 308 309 @Override 310 protected void open(ExecutionContext ctx) throws Exception { 311 stream.open(ctx); 312 } 313 314 /** 315 * retrieve the tasklet - helper method for JobOperator 316 * @return the {@link Tasklet} instance being executed within this step 317 */ 318 public Tasklet getTasklet() { 319 return tasklet; 320 } 321 322 /** 323 * A callback for the transactional work inside a chunk. Also detects 324 * failures in the transaction commit and rollback, only panicking if the 325 * transaction status is unknown (i.e. if a commit failure leads to a clean 326 * rollback then we assume the state is consistent). 327 * 328 * @author Dave Syer 329 * 330 */ 331 private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback<RepeatStatus> { 332 333 private final StepExecution stepExecution; 334 335 private final ChunkContext chunkContext; 336 337 private boolean rolledBack = false; 338 339 private boolean stepExecutionUpdated = false; 340 341 private StepExecution oldVersion; 342 343 private boolean locked = false; 344 345 private final Semaphore semaphore; 346 347 public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) { 348 this.chunkContext = chunkContext; 349 this.stepExecution = chunkContext.getStepContext().getStepExecution(); 350 this.semaphore = semaphore; 351 } 352 353 @Override 354 public void afterCompletion(int status) { 355 try { 356 if (status != TransactionSynchronization.STATUS_COMMITTED) { 357 if (stepExecutionUpdated) { 358 // Wah! the commit failed. We need to rescue the step 359 // execution data. 360 logger.info("Commit failed while step execution data was already updated. " 361 + "Reverting to old version."); 362 copy(oldVersion, stepExecution); 363 if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { 364 rollback(stepExecution); 365 } 366 } 367 chunkListener.afterChunkError(chunkContext); 368 } 369 370 if (status == TransactionSynchronization.STATUS_UNKNOWN) { 371 logger.error("Rolling back with transaction in unknown state"); 372 rollback(stepExecution); 373 stepExecution.upgradeStatus(BatchStatus.UNKNOWN); 374 stepExecution.setTerminateOnly(); 375 } 376 } 377 finally { 378 // Only release the lock if we acquired it, and release as late 379 // as possible 380 if (locked) { 381 semaphore.release(); 382 } 383 384 locked = false; 385 } 386 } 387 388 @Override 389 public RepeatStatus doInTransaction(TransactionStatus status) { 390 TransactionSynchronizationManager.registerSynchronization(this); 391 392 RepeatStatus result = RepeatStatus.CONTINUABLE; 393 394 StepContribution contribution = stepExecution.createStepContribution(); 395 396 chunkListener.beforeChunk(chunkContext); 397 398 // In case we need to push it back to its old value 399 // after a commit fails... 400 oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); 401 copy(stepExecution, oldVersion); 402 403 try { 404 405 try { 406 try { 407 result = tasklet.execute(contribution, chunkContext); 408 if (result == null) { 409 result = RepeatStatus.FINISHED; 410 } 411 } 412 catch (Exception e) { 413 if (transactionAttribute.rollbackOn(e)) { 414 chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e); 415 throw e; 416 } 417 } 418 } 419 finally { 420 421 // If the step operations are asynchronous then we need 422 // to synchronize changes to the step execution (at a 423 // minimum). Take the lock *before* changing the step 424 // execution. 425 try { 426 semaphore.acquire(); 427 locked = true; 428 } 429 catch (InterruptedException e) { 430 logger.error("Thread interrupted while locking for repository update"); 431 stepExecution.setStatus(BatchStatus.STOPPED); 432 stepExecution.setTerminateOnly(); 433 Thread.currentThread().interrupt(); 434 } 435 436 // Apply the contribution to the step 437 // even if unsuccessful 438 if (logger.isDebugEnabled()) { 439 logger.debug("Applying contribution: " + contribution); 440 } 441 stepExecution.apply(contribution); 442 443 } 444 445 stepExecutionUpdated = true; 446 447 stream.update(stepExecution.getExecutionContext()); 448 449 try { 450 // Going to attempt a commit. If it fails this flag will 451 // stay false and we can use that later. 452 getJobRepository().updateExecutionContext(stepExecution); 453 stepExecution.incrementCommitCount(); 454 if (logger.isDebugEnabled()) { 455 logger.debug("Saving step execution before commit: " + stepExecution); 456 } 457 getJobRepository().update(stepExecution); 458 } 459 catch (Exception e) { 460 // If we get to here there was a problem saving the step 461 // execution and we have to fail. 462 String msg = "JobRepository failure forcing rollback"; 463 logger.error(msg, e); 464 throw new FatalStepExecutionException(msg, e); 465 } 466 } 467 catch (Error e) { 468 if (logger.isDebugEnabled()) { 469 logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage()); 470 } 471 rollback(stepExecution); 472 throw e; 473 } 474 catch (RuntimeException e) { 475 if (logger.isDebugEnabled()) { 476 logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage()); 477 } 478 rollback(stepExecution); 479 throw e; 480 } 481 catch (Exception e) { 482 if (logger.isDebugEnabled()) { 483 logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage()); 484 } 485 rollback(stepExecution); 486 // Allow checked exceptions 487 throw new UncheckedTransactionException(e); 488 } 489 490 return result; 491 492 } 493 494 private void rollback(StepExecution stepExecution) { 495 if (!rolledBack) { 496 stepExecution.incrementRollbackCount(); 497 rolledBack = true; 498 } 499 } 500 501 private void copy(final StepExecution source, final StepExecution target) { 502 target.setVersion(source.getVersion()); 503 target.setWriteCount(source.getWriteCount()); 504 target.setFilterCount(source.getFilterCount()); 505 target.setCommitCount(source.getCommitCount()); 506 target.setExecutionContext(new ExecutionContext(source.getExecutionContext())); 507 } 508 509 } 510}