001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core; 018 019import java.io.IOException; 020import java.io.ObjectInputStream; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024import java.util.concurrent.CopyOnWriteArrayList; 025 026import org.springframework.batch.item.ExecutionContext; 027import org.springframework.util.Assert; 028 029/** 030 * Batch domain object representation the execution of a step. Unlike 031 * {@link JobExecution}, there are additional properties related the processing 032 * of items such as commit count, etc. 033 * 034 * @author Lucas Ward 035 * @author Dave Syer 036 * 037 */ 038@SuppressWarnings("serial") 039public class StepExecution extends Entity { 040 041 private final JobExecution jobExecution; 042 043 private final String stepName; 044 045 private volatile BatchStatus status = BatchStatus.STARTING; 046 047 private volatile int readCount = 0; 048 049 private volatile int writeCount = 0; 050 051 private volatile int commitCount = 0; 052 053 private volatile int rollbackCount = 0; 054 055 private volatile int readSkipCount = 0; 056 057 private volatile int processSkipCount = 0; 058 059 private volatile int writeSkipCount = 0; 060 061 private volatile Date startTime = new Date(System.currentTimeMillis()); 062 063 private volatile Date endTime = null; 064 065 private volatile Date lastUpdated = null; 066 067 private volatile ExecutionContext executionContext = new ExecutionContext(); 068 069 private volatile ExitStatus exitStatus = ExitStatus.EXECUTING; 070 071 private volatile boolean terminateOnly; 072 073 private volatile int filterCount; 074 075 private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>(); 076 077 /** 078 * Constructor with mandatory properties. 079 * 080 * @param stepName the step to which this execution belongs 081 * @param jobExecution the current job execution 082 * @param id the id of this execution 083 */ 084 public StepExecution(String stepName, JobExecution jobExecution, Long id) { 085 this(stepName, jobExecution); 086 Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution"); 087 Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution"); 088 setId(id); 089 jobExecution.addStepExecution(this); 090 } 091 092 /** 093 * Constructor that substitutes in null for the execution id 094 * 095 * @param stepName the step to which this execution belongs 096 * @param jobExecution the current job execution 097 */ 098 public StepExecution(String stepName, JobExecution jobExecution) { 099 super(); 100 Assert.hasLength(stepName, "A stepName is required"); 101 this.stepName = stepName; 102 this.jobExecution = jobExecution; 103 } 104 105 /** 106 * Constructor that requires only a stepName. Intended only to be 107 * used via serialization libraries to address the circular 108 * reference between {@link JobExecution} and StepExecution. 109 * 110 * @param stepName the name of the executed step 111 */ 112 @SuppressWarnings("unused") 113 private StepExecution(String stepName) { 114 super(); 115 Assert.hasLength(stepName, "A stepName is required"); 116 this.stepName = stepName; 117 this.jobExecution = null; 118 } 119 120 /** 121 * Returns the {@link ExecutionContext} for this execution 122 * 123 * @return the attributes 124 */ 125 public ExecutionContext getExecutionContext() { 126 return executionContext; 127 } 128 129 /** 130 * Sets the {@link ExecutionContext} for this execution 131 * 132 * @param executionContext the attributes 133 */ 134 public void setExecutionContext(ExecutionContext executionContext) { 135 this.executionContext = executionContext; 136 } 137 138 /** 139 * Returns the current number of commits for this execution 140 * 141 * @return the current number of commits 142 */ 143 public int getCommitCount() { 144 return commitCount; 145 } 146 147 /** 148 * Sets the current number of commits for this execution 149 * 150 * @param commitCount the current number of commits 151 */ 152 public void setCommitCount(int commitCount) { 153 this.commitCount = commitCount; 154 } 155 156 /** 157 * Returns the time that this execution ended 158 * 159 * @return the time that this execution ended 160 */ 161 public Date getEndTime() { 162 return endTime; 163 } 164 165 /** 166 * Sets the time that this execution ended 167 * 168 * @param endTime the time that this execution ended 169 */ 170 public void setEndTime(Date endTime) { 171 this.endTime = endTime; 172 } 173 174 /** 175 * Returns the current number of items read for this execution 176 * 177 * @return the current number of items read for this execution 178 */ 179 public int getReadCount() { 180 return readCount; 181 } 182 183 /** 184 * Sets the current number of read items for this execution 185 * 186 * @param readCount the current number of read items for this execution 187 */ 188 public void setReadCount(int readCount) { 189 this.readCount = readCount; 190 } 191 192 /** 193 * Returns the current number of items written for this execution 194 * 195 * @return the current number of items written for this execution 196 */ 197 public int getWriteCount() { 198 return writeCount; 199 } 200 201 /** 202 * Sets the current number of written items for this execution 203 * 204 * @param writeCount the current number of written items for this execution 205 */ 206 public void setWriteCount(int writeCount) { 207 this.writeCount = writeCount; 208 } 209 210 /** 211 * Returns the current number of rollbacks for this execution 212 * 213 * @return the current number of rollbacks for this execution 214 */ 215 public int getRollbackCount() { 216 return rollbackCount; 217 } 218 219 /** 220 * Returns the current number of items filtered out of this execution 221 * 222 * @return the current number of items filtered out of this execution 223 */ 224 public int getFilterCount() { 225 return filterCount; 226 } 227 228 /** 229 * Public setter for the number of items filtered out of this execution. 230 * @param filterCount the number of items filtered out of this execution to 231 * set 232 */ 233 public void setFilterCount(int filterCount) { 234 this.filterCount = filterCount; 235 } 236 237 /** 238 * Setter for number of rollbacks for this execution 239 * @param rollbackCount int the number of rollbacks. 240 */ 241 public void setRollbackCount(int rollbackCount) { 242 this.rollbackCount = rollbackCount; 243 } 244 245 /** 246 * Gets the time this execution started 247 * 248 * @return the time this execution started 249 */ 250 public Date getStartTime() { 251 return startTime; 252 } 253 254 /** 255 * Sets the time this execution started 256 * 257 * @param startTime the time this execution started 258 */ 259 public void setStartTime(Date startTime) { 260 this.startTime = startTime; 261 } 262 263 /** 264 * Returns the current status of this step 265 * 266 * @return the current status of this step 267 */ 268 public BatchStatus getStatus() { 269 return status; 270 } 271 272 /** 273 * Sets the current status of this step 274 * 275 * @param status the current status of this step 276 */ 277 public void setStatus(BatchStatus status) { 278 this.status = status; 279 } 280 281 /** 282 * Upgrade the status field if the provided value is greater than the 283 * existing one. Clients using this method to set the status can be sure 284 * that they don't overwrite a failed status with an successful one. 285 * 286 * @param status the new status value 287 */ 288 public void upgradeStatus(BatchStatus status) { 289 this.status = this.status.upgradeTo(status); 290 } 291 292 /** 293 * @return the name of the step 294 */ 295 public String getStepName() { 296 return stepName; 297 } 298 299 /** 300 * Accessor for the job execution id. 301 * 302 * @return the jobExecutionId 303 */ 304 public Long getJobExecutionId() { 305 if (jobExecution != null) { 306 return jobExecution.getId(); 307 } 308 return null; 309 } 310 311 /** 312 * @param exitStatus {@link ExitStatus} instance used to establish the exit status. 313 */ 314 public void setExitStatus(ExitStatus exitStatus) { 315 this.exitStatus = exitStatus; 316 } 317 318 /** 319 * @return the exitCode 320 */ 321 public ExitStatus getExitStatus() { 322 return exitStatus; 323 } 324 325 /** 326 * Accessor for the execution context information of the enclosing job. 327 * 328 * @return the {@link JobExecution} that was used to start this step 329 * execution. 330 */ 331 public JobExecution getJobExecution() { 332 return jobExecution; 333 } 334 335 /** 336 * Factory method for {@link StepContribution}. 337 * 338 * @return a new {@link StepContribution} 339 */ 340 public StepContribution createStepContribution() { 341 return new StepContribution(this); 342 } 343 344 /** 345 * On successful execution just before a chunk commit, this method should be 346 * called. Synchronizes access to the {@link StepExecution} so that changes 347 * are atomic. 348 * 349 * @param contribution {@link StepContribution} instance used to update the StepExecution state. 350 */ 351 public synchronized void apply(StepContribution contribution) { 352 readSkipCount += contribution.getReadSkipCount(); 353 writeSkipCount += contribution.getWriteSkipCount(); 354 processSkipCount += contribution.getProcessSkipCount(); 355 filterCount += contribution.getFilterCount(); 356 readCount += contribution.getReadCount(); 357 writeCount += contribution.getWriteCount(); 358 exitStatus = exitStatus.and(contribution.getExitStatus()); 359 } 360 361 /** 362 * On unsuccessful execution after a chunk has rolled back. 363 */ 364 public synchronized void incrementRollbackCount() { 365 rollbackCount++; 366 } 367 368 /** 369 * @return flag to indicate that an execution should halt 370 */ 371 public boolean isTerminateOnly() { 372 return this.terminateOnly; 373 } 374 375 /** 376 * Set a flag that will signal to an execution environment that this 377 * execution (and its surrounding job) wishes to exit. 378 */ 379 public void setTerminateOnly() { 380 this.terminateOnly = true; 381 } 382 383 /** 384 * @return the total number of items skipped. 385 */ 386 public int getSkipCount() { 387 return readSkipCount + processSkipCount + writeSkipCount; 388 } 389 390 /** 391 * Increment the number of commits 392 */ 393 public void incrementCommitCount() { 394 commitCount++; 395 } 396 397 /** 398 * Convenience method to get the current job parameters. 399 * 400 * @return the {@link JobParameters} from the enclosing job, or empty if 401 * that is null 402 */ 403 public JobParameters getJobParameters() { 404 if (jobExecution == null) { 405 return new JobParameters(); 406 } 407 return jobExecution.getJobParameters(); 408 } 409 410 /** 411 * @return the number of records skipped on read 412 */ 413 public int getReadSkipCount() { 414 return readSkipCount; 415 } 416 417 /** 418 * @return the number of records skipped on write 419 */ 420 public int getWriteSkipCount() { 421 return writeSkipCount; 422 } 423 424 /** 425 * Set the number of records skipped on read 426 * 427 * @param readSkipCount int containing read skip count to be used for the step execution. 428 */ 429 public void setReadSkipCount(int readSkipCount) { 430 this.readSkipCount = readSkipCount; 431 } 432 433 /** 434 * Set the number of records skipped on write 435 * 436 * @param writeSkipCount int containing write skip count to be used for the step execution. 437 */ 438 public void setWriteSkipCount(int writeSkipCount) { 439 this.writeSkipCount = writeSkipCount; 440 } 441 442 /** 443 * @return the number of records skipped during processing 444 */ 445 public int getProcessSkipCount() { 446 return processSkipCount; 447 } 448 449 /** 450 * Set the number of records skipped during processing. 451 * 452 * @param processSkipCount int containing process skip count to be used for the step execution. 453 */ 454 public void setProcessSkipCount(int processSkipCount) { 455 this.processSkipCount = processSkipCount; 456 } 457 458 /** 459 * @return the Date representing the last time this execution was persisted. 460 */ 461 public Date getLastUpdated() { 462 return lastUpdated; 463 } 464 465 /** 466 * Set the time when the StepExecution was last updated before persisting 467 * 468 * @param lastUpdated {@link Date} instance used to establish the last 469 * updated date for the Step Execution. 470 */ 471 public void setLastUpdated(Date lastUpdated) { 472 this.lastUpdated = lastUpdated; 473 } 474 475 public List<Throwable> getFailureExceptions() { 476 return failureExceptions; 477 } 478 479 public void addFailureException(Throwable throwable) { 480 this.failureExceptions.add(throwable); 481 } 482 483 /* 484 * (non-Javadoc) 485 * 486 * @see 487 * org.springframework.batch.container.common.domain.Entity#equals(java. 488 * lang.Object) 489 */ 490 @Override 491 public boolean equals(Object obj) { 492 493 Object jobExecutionId = getJobExecutionId(); 494 if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) { 495 return super.equals(obj); 496 } 497 StepExecution other = (StepExecution) obj; 498 499 return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId())) 500 && getId().equals(other.getId()); 501 } 502 503 /** 504 * Deserialize and ensure transient fields are re-instantiated when read 505 * back. 506 * 507 * @param stream instance of {@link ObjectInputStream}. 508 * 509 * @throws IOException thrown if error occurs during read. 510 * @throws ClassNotFoundException thrown if class is not found. 511 */ 512 private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { 513 stream.defaultReadObject(); 514 failureExceptions = new ArrayList<Throwable>(); 515 } 516 517 /* 518 * (non-Javadoc) 519 * 520 * @see org.springframework.batch.container.common.domain.Entity#hashCode() 521 */ 522 @Override 523 public int hashCode() { 524 Object jobExecutionId = getJobExecutionId(); 525 Long id = getId(); 526 return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91 527 * (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0); 528 } 529 530 @Override 531 public String toString() { 532 return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription()); 533 } 534 535 public String getSummary() { 536 return super.toString() 537 + String.format( 538 ", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d" 539 + ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status, 540 exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount, 541 processSkipCount, commitCount, rollbackCount); 542 } 543 544}