001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.step; 017 018import java.util.Date; 019 020import org.apache.commons.logging.Log; 021import org.apache.commons.logging.LogFactory; 022import org.springframework.batch.core.BatchStatus; 023import org.springframework.batch.core.ExitStatus; 024import org.springframework.batch.core.JobInterruptedException; 025import org.springframework.batch.core.Step; 026import org.springframework.batch.core.StepExecution; 027import org.springframework.batch.core.StepExecutionListener; 028import org.springframework.batch.core.UnexpectedJobExecutionException; 029import org.springframework.batch.core.configuration.annotation.StepScope; 030import org.springframework.batch.core.launch.NoSuchJobException; 031import org.springframework.batch.core.launch.support.ExitCodeMapper; 032import org.springframework.batch.core.listener.CompositeStepExecutionListener; 033import org.springframework.batch.core.repository.JobRepository; 034import org.springframework.batch.core.scope.context.StepSynchronizationManager; 035import org.springframework.batch.item.ExecutionContext; 036import org.springframework.batch.repeat.RepeatException; 037import org.springframework.beans.factory.BeanNameAware; 038import org.springframework.beans.factory.InitializingBean; 039import org.springframework.util.Assert; 040import org.springframework.util.ClassUtils; 041 042/** 043 * A {@link Step} implementation that provides common behavior to subclasses, including registering and calling 044 * listeners. 045 * 046 * @author Dave Syer 047 * @author Ben Hale 048 * @author Robert Kasanicky 049 * @author Michael Minella 050 * @author Chris Schaefer 051 * @author Mahmoud Ben Hassine 052 */ 053public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware { 054 055 private static final Log logger = LogFactory.getLog(AbstractStep.class); 056 057 private String name; 058 059 private int startLimit = Integer.MAX_VALUE; 060 061 private boolean allowStartIfComplete = false; 062 063 private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener(); 064 065 private JobRepository jobRepository; 066 067 /** 068 * Default constructor. 069 */ 070 public AbstractStep() { 071 super(); 072 } 073 074 @Override 075 public void afterPropertiesSet() throws Exception { 076 Assert.state(name != null, "A Step must have a name"); 077 Assert.state(jobRepository != null, "JobRepository is mandatory"); 078 } 079 080 @Override 081 public String getName() { 082 return this.name; 083 } 084 085 /** 086 * Set the name property. Always overrides the default value if this object is a Spring bean. 087 * @param name the name of the {@link Step}. 088 * @see #setBeanName(java.lang.String) 089 */ 090 public void setName(String name) { 091 this.name = name; 092 } 093 094 /** 095 * Set the name property if it is not already set. Because of the order of the callbacks in a Spring container the 096 * name property will be set first if it is present. Care is needed with bean definition inheritance - if a parent 097 * bean has a name, then its children need an explicit name as well, otherwise they will not be unique. 098 * 099 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) 100 */ 101 @Override 102 public void setBeanName(String name) { 103 if (this.name == null) { 104 this.name = name; 105 } 106 } 107 108 @Override 109 public int getStartLimit() { 110 return this.startLimit; 111 } 112 113 /** 114 * Public setter for the startLimit. 115 * 116 * @param startLimit the startLimit to set 117 */ 118 public void setStartLimit(int startLimit) { 119 this.startLimit = startLimit == 0 ? Integer.MAX_VALUE : startLimit; 120 } 121 122 @Override 123 public boolean isAllowStartIfComplete() { 124 return this.allowStartIfComplete; 125 } 126 127 /** 128 * Public setter for flag that determines whether the step should start again if it is already complete. Defaults to 129 * false. 130 * 131 * @param allowStartIfComplete the value of the flag to set 132 */ 133 public void setAllowStartIfComplete(boolean allowStartIfComplete) { 134 this.allowStartIfComplete = allowStartIfComplete; 135 } 136 137 /** 138 * Convenient constructor for setting only the name property. 139 * 140 * @param name Name of the step 141 */ 142 public AbstractStep(String name) { 143 this.name = name; 144 } 145 146 /** 147 * Extension point for subclasses to execute business logic. Subclasses should set the {@link ExitStatus} on the 148 * {@link StepExecution} before returning. 149 * 150 * @param stepExecution the current step context 151 * @throws Exception checked exception thrown by implementation 152 */ 153 protected abstract void doExecute(StepExecution stepExecution) throws Exception; 154 155 /** 156 * Extension point for subclasses to provide callbacks to their collaborators at the beginning of a step, to open or 157 * acquire resources. Does nothing by default. 158 * 159 * @param ctx the {@link ExecutionContext} to use 160 * @throws Exception checked exception thrown by implementation 161 */ 162 protected void open(ExecutionContext ctx) throws Exception { 163 } 164 165 /** 166 * Extension point for subclasses to provide callbacks to their collaborators at the end of a step (right at the end 167 * of the finally block), to close or release resources. Does nothing by default. 168 * 169 * @param ctx the {@link ExecutionContext} to use 170 * @throws Exception checked exception thrown by implementation 171 */ 172 protected void close(ExecutionContext ctx) throws Exception { 173 } 174 175 /** 176 * Template method for step execution logic - calls abstract methods for resource initialization ( 177 * {@link #open(ExecutionContext)}), execution logic ({@link #doExecute(StepExecution)}) and resource closing ( 178 * {@link #close(ExecutionContext)}). 179 */ 180 @Override 181 public final void execute(StepExecution stepExecution) throws JobInterruptedException, 182 UnexpectedJobExecutionException { 183 184 Assert.notNull(stepExecution, "stepExecution must not be null"); 185 186 if (logger.isDebugEnabled()) { 187 logger.debug("Executing: id=" + stepExecution.getId()); 188 } 189 stepExecution.setStartTime(new Date()); 190 stepExecution.setStatus(BatchStatus.STARTED); 191 getJobRepository().update(stepExecution); 192 193 // Start with a default value that will be trumped by anything 194 ExitStatus exitStatus = ExitStatus.EXECUTING; 195 196 doExecutionRegistration(stepExecution); 197 198 try { 199 getCompositeListener().beforeStep(stepExecution); 200 open(stepExecution.getExecutionContext()); 201 202 try { 203 doExecute(stepExecution); 204 } 205 catch (RepeatException e) { 206 throw e.getCause(); 207 } 208 exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus()); 209 210 // Check if someone is trying to stop us 211 if (stepExecution.isTerminateOnly()) { 212 throw new JobInterruptedException("JobExecution interrupted."); 213 } 214 215 // Need to upgrade here not set, in case the execution was stopped 216 stepExecution.upgradeStatus(BatchStatus.COMPLETED); 217 if (logger.isDebugEnabled()) { 218 logger.debug("Step execution success: id=" + stepExecution.getId()); 219 } 220 } 221 catch (Throwable e) { 222 stepExecution.upgradeStatus(determineBatchStatus(e)); 223 exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); 224 stepExecution.addFailureException(e); 225 if (stepExecution.getStatus() == BatchStatus.STOPPED) { 226 logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage())); 227 if (logger.isDebugEnabled()) { 228 logger.debug("Full exception", e); 229 } 230 } 231 else { 232 logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); 233 } 234 } 235 finally { 236 237 try { 238 // Update the step execution to the latest known value so the 239 // listeners can act on it 240 exitStatus = exitStatus.and(stepExecution.getExitStatus()); 241 stepExecution.setExitStatus(exitStatus); 242 exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution)); 243 } 244 catch (Exception e) { 245 logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); 246 } 247 248 try { 249 getJobRepository().updateExecutionContext(stepExecution); 250 } 251 catch (Exception e) { 252 stepExecution.setStatus(BatchStatus.UNKNOWN); 253 exitStatus = exitStatus.and(ExitStatus.UNKNOWN); 254 stepExecution.addFailureException(e); 255 logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " 256 + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); 257 } 258 259 stepExecution.setEndTime(new Date()); 260 stepExecution.setExitStatus(exitStatus); 261 262 try { 263 getJobRepository().update(stepExecution); 264 } 265 catch (Exception e) { 266 stepExecution.setStatus(BatchStatus.UNKNOWN); 267 stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN)); 268 stepExecution.addFailureException(e); 269 logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " 270 + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); 271 } 272 273 try { 274 close(stepExecution.getExecutionContext()); 275 } 276 catch (Exception e) { 277 logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); 278 stepExecution.addFailureException(e); 279 } 280 281 doExecutionRelease(); 282 283 if (logger.isDebugEnabled()) { 284 logger.debug("Step execution complete: " + stepExecution.getSummary()); 285 } 286 } 287 } 288 289 /** 290 * Releases the most recent {@link StepExecution} 291 */ 292 protected void doExecutionRelease() { 293 StepSynchronizationManager.release(); 294 } 295 296 /** 297 * Registers the {@link StepExecution} for property resolution via {@link StepScope} 298 * 299 * @param stepExecution StepExecution to use when hydrating the StepScoped beans 300 */ 301 protected void doExecutionRegistration(StepExecution stepExecution) { 302 StepSynchronizationManager.register(stepExecution); 303 } 304 305 /** 306 * Determine the step status based on the exception. 307 */ 308 private static BatchStatus determineBatchStatus(Throwable e) { 309 if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) { 310 return BatchStatus.STOPPED; 311 } 312 else { 313 return BatchStatus.FAILED; 314 } 315 } 316 317 /** 318 * Register a step listener for callbacks at the appropriate stages in a step execution. 319 * 320 * @param listener a {@link StepExecutionListener} 321 */ 322 public void registerStepExecutionListener(StepExecutionListener listener) { 323 this.stepExecutionListener.register(listener); 324 } 325 326 /** 327 * Register each of the objects as listeners. 328 * 329 * @param listeners an array of listener objects of known types. 330 */ 331 public void setStepExecutionListeners(StepExecutionListener[] listeners) { 332 for (int i = 0; i < listeners.length; i++) { 333 registerStepExecutionListener(listeners[i]); 334 } 335 } 336 337 /** 338 * @return composite listener that delegates to all registered listeners. 339 */ 340 protected StepExecutionListener getCompositeListener() { 341 return stepExecutionListener; 342 } 343 344 /** 345 * Public setter for {@link JobRepository}. 346 * 347 * @param jobRepository is a mandatory dependence (no default). 348 */ 349 public void setJobRepository(JobRepository jobRepository) { 350 this.jobRepository = jobRepository; 351 } 352 353 protected JobRepository getJobRepository() { 354 return jobRepository; 355 } 356 357 @Override 358 public String toString() { 359 return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]"; 360 } 361 362 /** 363 * Default mapping from throwable to {@link ExitStatus}. Clients can modify the exit code using a 364 * {@link StepExecutionListener}. 365 * 366 * @param ex the cause of the failure 367 * @return an {@link ExitStatus} 368 */ 369 private ExitStatus getDefaultExitStatusForFailure(Throwable ex) { 370 ExitStatus exitStatus; 371 if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) { 372 exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName()); 373 } 374 else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) { 375 exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName()); 376 } 377 else { 378 exitStatus = ExitStatus.FAILED.addExitDescription(ex); 379 } 380 381 return exitStatus; 382 } 383 384}