001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core;
018
019import java.io.IOException;
020import java.io.ObjectInputStream;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024import java.util.concurrent.CopyOnWriteArrayList;
025
026import org.springframework.batch.item.ExecutionContext;
027import org.springframework.util.Assert;
028
029/**
030 * Batch domain object representation the execution of a step. Unlike
031 * {@link JobExecution}, there are additional properties related the processing
032 * of items such as commit count, etc.
033 *
034 * @author Lucas Ward
035 * @author Dave Syer
036 *
037 */
038@SuppressWarnings("serial")
039public class StepExecution extends Entity {
040
041        private final JobExecution jobExecution;
042
043        private final String stepName;
044
045        private volatile BatchStatus status = BatchStatus.STARTING;
046
047        private volatile int readCount = 0;
048
049        private volatile int writeCount = 0;
050
051        private volatile int commitCount = 0;
052
053        private volatile int rollbackCount = 0;
054
055        private volatile int readSkipCount = 0;
056
057        private volatile int processSkipCount = 0;
058
059        private volatile int writeSkipCount = 0;
060
061        private volatile Date startTime = new Date(System.currentTimeMillis());
062
063        private volatile Date endTime = null;
064
065        private volatile Date lastUpdated = null;
066
067        private volatile ExecutionContext executionContext = new ExecutionContext();
068
069        private volatile ExitStatus exitStatus = ExitStatus.EXECUTING;
070
071        private volatile boolean terminateOnly;
072
073        private volatile int filterCount;
074
075        private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>();
076
077        /**
078         * Constructor with mandatory properties.
079         *
080         * @param stepName the step to which this execution belongs
081         * @param jobExecution the current job execution
082         * @param id the id of this execution
083         */
084        public StepExecution(String stepName, JobExecution jobExecution, Long id) {
085                this(stepName, jobExecution);
086                Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution");
087                Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution");
088                setId(id);
089                jobExecution.addStepExecution(this);
090        }
091
092        /**
093         * Constructor that substitutes in null for the execution id
094         *
095         * @param stepName the step to which this execution belongs
096         * @param jobExecution the current job execution
097         */
098        public StepExecution(String stepName, JobExecution jobExecution) {
099                super();
100                Assert.hasLength(stepName, "A stepName is required");
101                this.stepName = stepName;
102                this.jobExecution = jobExecution;
103        }
104
105        /**
106         * Constructor that requires only a stepName.  Intended only to be
107         * used via serialization libraries to address the circular
108         * reference between {@link JobExecution} and StepExecution.
109         *
110         * @param stepName the name of the executed step
111         */
112        @SuppressWarnings("unused")
113        private StepExecution(String stepName) {
114                super();
115                Assert.hasLength(stepName, "A stepName is required");
116                this.stepName = stepName;
117                this.jobExecution = null;
118        }
119
120        /**
121         * Returns the {@link ExecutionContext} for this execution
122         *
123         * @return the attributes
124         */
125        public ExecutionContext getExecutionContext() {
126                return executionContext;
127        }
128
129        /**
130         * Sets the {@link ExecutionContext} for this execution
131         *
132         * @param executionContext the attributes
133         */
134        public void setExecutionContext(ExecutionContext executionContext) {
135                this.executionContext = executionContext;
136        }
137
138        /**
139         * Returns the current number of commits for this execution
140         *
141         * @return the current number of commits
142         */
143        public int getCommitCount() {
144                return commitCount;
145        }
146
147        /**
148         * Sets the current number of commits for this execution
149         *
150         * @param commitCount the current number of commits
151         */
152        public void setCommitCount(int commitCount) {
153                this.commitCount = commitCount;
154        }
155
156        /**
157         * Returns the time that this execution ended
158         *
159         * @return the time that this execution ended
160         */
161        public Date getEndTime() {
162                return endTime;
163        }
164
165        /**
166         * Sets the time that this execution ended
167         *
168         * @param endTime the time that this execution ended
169         */
170        public void setEndTime(Date endTime) {
171                this.endTime = endTime;
172        }
173
174        /**
175         * Returns the current number of items read for this execution
176         *
177         * @return the current number of items read for this execution
178         */
179        public int getReadCount() {
180                return readCount;
181        }
182
183        /**
184         * Sets the current number of read items for this execution
185         *
186         * @param readCount the current number of read items for this execution
187         */
188        public void setReadCount(int readCount) {
189                this.readCount = readCount;
190        }
191
192        /**
193         * Returns the current number of items written for this execution
194         *
195         * @return the current number of items written for this execution
196         */
197        public int getWriteCount() {
198                return writeCount;
199        }
200
201        /**
202         * Sets the current number of written items for this execution
203         *
204         * @param writeCount the current number of written items for this execution
205         */
206        public void setWriteCount(int writeCount) {
207                this.writeCount = writeCount;
208        }
209
210        /**
211         * Returns the current number of rollbacks for this execution
212         *
213         * @return the current number of rollbacks for this execution
214         */
215        public int getRollbackCount() {
216                return rollbackCount;
217        }
218
219        /**
220         * Returns the current number of items filtered out of this execution
221         *
222         * @return the current number of items filtered out of this execution
223         */
224        public int getFilterCount() {
225                return filterCount;
226        }
227
228        /**
229         * Public setter for the number of items filtered out of this execution.
230         * @param filterCount the number of items filtered out of this execution to
231         * set
232         */
233        public void setFilterCount(int filterCount) {
234                this.filterCount = filterCount;
235        }
236
237        /**
238         * Setter for number of rollbacks for this execution
239         * @param rollbackCount int the number of rollbacks.
240         */
241        public void setRollbackCount(int rollbackCount) {
242                this.rollbackCount = rollbackCount;
243        }
244
245        /**
246         * Gets the time this execution started
247         *
248         * @return the time this execution started
249         */
250        public Date getStartTime() {
251                return startTime;
252        }
253
254        /**
255         * Sets the time this execution started
256         *
257         * @param startTime the time this execution started
258         */
259        public void setStartTime(Date startTime) {
260                this.startTime = startTime;
261        }
262
263        /**
264         * Returns the current status of this step
265         *
266         * @return the current status of this step
267         */
268        public BatchStatus getStatus() {
269                return status;
270        }
271
272        /**
273         * Sets the current status of this step
274         *
275         * @param status the current status of this step
276         */
277        public void setStatus(BatchStatus status) {
278                this.status = status;
279        }
280
281        /**
282         * Upgrade the status field if the provided value is greater than the
283         * existing one. Clients using this method to set the status can be sure
284         * that they don't overwrite a failed status with an successful one.
285         *
286         * @param status the new status value
287         */
288        public void upgradeStatus(BatchStatus status) {
289                this.status = this.status.upgradeTo(status);
290        }
291
292        /**
293         * @return the name of the step
294         */
295        public String getStepName() {
296                return stepName;
297        }
298
299        /**
300         * Accessor for the job execution id.
301         *
302         * @return the jobExecutionId
303         */
304        public Long getJobExecutionId() {
305                if (jobExecution != null) {
306                        return jobExecution.getId();
307                }
308                return null;
309        }
310
311        /**
312         * @param exitStatus {@link ExitStatus} instance used to establish the exit status.
313         */
314        public void setExitStatus(ExitStatus exitStatus) {
315                this.exitStatus = exitStatus;
316        }
317
318        /**
319         * @return the exitCode
320         */
321        public ExitStatus getExitStatus() {
322                return exitStatus;
323        }
324
325        /**
326         * Accessor for the execution context information of the enclosing job.
327         *
328         * @return the {@link JobExecution} that was used to start this step
329         * execution.
330         */
331        public JobExecution getJobExecution() {
332                return jobExecution;
333        }
334
335        /**
336         * Factory method for {@link StepContribution}.
337         *
338         * @return a new {@link StepContribution}
339         */
340        public StepContribution createStepContribution() {
341                return new StepContribution(this);
342        }
343
344        /**
345         * On successful execution just before a chunk commit, this method should be
346         * called. Synchronizes access to the {@link StepExecution} so that changes
347         * are atomic.
348         *
349         * @param contribution {@link StepContribution} instance used to update the StepExecution state.
350         */
351        public synchronized void apply(StepContribution contribution) {
352                readSkipCount += contribution.getReadSkipCount();
353                writeSkipCount += contribution.getWriteSkipCount();
354                processSkipCount += contribution.getProcessSkipCount();
355                filterCount += contribution.getFilterCount();
356                readCount += contribution.getReadCount();
357                writeCount += contribution.getWriteCount();
358                exitStatus = exitStatus.and(contribution.getExitStatus());
359        }
360
361        /**
362         * On unsuccessful execution after a chunk has rolled back.
363         */
364        public synchronized void incrementRollbackCount() {
365                rollbackCount++;
366        }
367
368        /**
369         * @return flag to indicate that an execution should halt
370         */
371        public boolean isTerminateOnly() {
372                return this.terminateOnly;
373        }
374
375        /**
376         * Set a flag that will signal to an execution environment that this
377         * execution (and its surrounding job) wishes to exit.
378         */
379        public void setTerminateOnly() {
380                this.terminateOnly = true;
381        }
382
383        /**
384         * @return the total number of items skipped.
385         */
386        public int getSkipCount() {
387                return readSkipCount + processSkipCount + writeSkipCount;
388        }
389
390        /**
391         * Increment the number of commits
392         */
393        public void incrementCommitCount() {
394                commitCount++;
395        }
396
397        /**
398         * Convenience method to get the current job parameters.
399         *
400         * @return the {@link JobParameters} from the enclosing job, or empty if
401         * that is null
402         */
403        public JobParameters getJobParameters() {
404                if (jobExecution == null) {
405                        return new JobParameters();
406                }
407                return jobExecution.getJobParameters();
408        }
409
410        /**
411         * @return the number of records skipped on read
412         */
413        public int getReadSkipCount() {
414                return readSkipCount;
415        }
416
417        /**
418         * @return the number of records skipped on write
419         */
420        public int getWriteSkipCount() {
421                return writeSkipCount;
422        }
423
424        /**
425         * Set the number of records skipped on read
426         *
427         * @param readSkipCount int containing read skip count to be used for the step execution.
428         */
429        public void setReadSkipCount(int readSkipCount) {
430                this.readSkipCount = readSkipCount;
431        }
432
433        /**
434         * Set the number of records skipped on write
435         *
436         * @param writeSkipCount int containing write skip count to be used for the step execution.
437         */
438        public void setWriteSkipCount(int writeSkipCount) {
439                this.writeSkipCount = writeSkipCount;
440        }
441
442        /**
443         * @return the number of records skipped during processing
444         */
445        public int getProcessSkipCount() {
446                return processSkipCount;
447        }
448
449        /**
450         * Set the number of records skipped during processing.
451         *
452         * @param processSkipCount int containing process skip count to be used for the step execution.
453         */
454        public void setProcessSkipCount(int processSkipCount) {
455                this.processSkipCount = processSkipCount;
456        }
457
458        /**
459         * @return the Date representing the last time this execution was persisted.
460         */
461        public Date getLastUpdated() {
462                return lastUpdated;
463        }
464
465        /**
466         * Set the time when the StepExecution was last updated before persisting
467         *
468         * @param lastUpdated {@link Date} instance used to establish the last
469         * updated date for the Step Execution.
470         */
471        public void setLastUpdated(Date lastUpdated) {
472                this.lastUpdated = lastUpdated;
473        }
474
475        public List<Throwable> getFailureExceptions() {
476                return failureExceptions;
477        }
478
479        public void addFailureException(Throwable throwable) {
480                this.failureExceptions.add(throwable);
481        }
482
483        /*
484         * (non-Javadoc)
485         *
486         * @see
487         * org.springframework.batch.container.common.domain.Entity#equals(java.
488         * lang.Object)
489         */
490        @Override
491        public boolean equals(Object obj) {
492
493                Object jobExecutionId = getJobExecutionId();
494                if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) {
495                        return super.equals(obj);
496                }
497                StepExecution other = (StepExecution) obj;
498
499                return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId()))
500                                && getId().equals(other.getId());
501        }
502
503        /**
504         * Deserialize and ensure transient fields are re-instantiated when read
505         * back.
506         *
507         * @param stream instance of {@link ObjectInputStream}.
508         *
509         * @throws IOException thrown if error occurs during read.
510         * @throws ClassNotFoundException thrown if class is not found.
511         */
512        private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
513                stream.defaultReadObject();
514                failureExceptions = new ArrayList<Throwable>();
515        }
516
517        /*
518         * (non-Javadoc)
519         *
520         * @see org.springframework.batch.container.common.domain.Entity#hashCode()
521         */
522        @Override
523        public int hashCode() {
524                Object jobExecutionId = getJobExecutionId();
525                Long id = getId();
526                return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91
527                                * (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0);
528        }
529
530        @Override
531        public String toString() {
532                return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription());
533        }
534
535        public String getSummary() {
536                return super.toString()
537                                + String.format(
538                                                ", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d"
539                                                                + ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status,
540                                                                exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount,
541                                                                processSkipCount, commitCount, rollbackCount);
542        }
543
544}