001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.tasklet;
018
019import java.io.File;
020import java.util.concurrent.Callable;
021import java.util.concurrent.FutureTask;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026import org.springframework.batch.core.ExitStatus;
027import org.springframework.batch.core.JobExecution;
028import org.springframework.batch.core.JobInterruptedException;
029import org.springframework.batch.core.StepContribution;
030import org.springframework.batch.core.StepExecution;
031import org.springframework.batch.core.explore.JobExplorer;
032import org.springframework.batch.core.listener.StepExecutionListenerSupport;
033import org.springframework.batch.core.scope.context.ChunkContext;
034import org.springframework.batch.repeat.RepeatStatus;
035import org.springframework.beans.factory.InitializingBean;
036import org.springframework.core.task.SimpleAsyncTaskExecutor;
037import org.springframework.core.task.TaskExecutor;
038import org.springframework.util.Assert;
039
040/**
041 * {@link Tasklet} that executes a system command.
042 *
043 * The system command is executed asynchronously using injected
044 * {@link #setTaskExecutor(TaskExecutor)} - timeout value is required to be set,
045 * so that the batch job does not hang forever if the external process hangs.
046 *
047 * Tasklet periodically checks for termination status (i.e.
048 * {@link #setCommand(String)} finished its execution or
049 * {@link #setTimeout(long)} expired or job was interrupted). The check interval
050 * is given by {@link #setTerminationCheckInterval(long)}.
051 *
052 * When job interrupt is detected tasklet's execution is terminated immediately
053 * by throwing {@link JobInterruptedException}.
054 *
055 * {@link #setInterruptOnCancel(boolean)} specifies whether the tasklet should
056 * attempt to interrupt the thread that executes the system command if it is
057 * still running when tasklet exits (abnormally).
058 *
059 * @author Robert Kasanicky
060 * @author Will Schipp
061 */
062public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean {
063
064        protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);
065
066        private String command;
067
068        private String[] environmentParams = null;
069
070        private File workingDirectory = null;
071
072        private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper();
073
074        private long timeout = 0;
075
076        private long checkInterval = 1000;
077
078        private StepExecution execution = null;
079
080        private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
081
082        private boolean interruptOnCancel = false;
083
084        private volatile boolean stopped = false;
085
086        private JobExplorer jobExplorer;
087
088        private boolean stoppable = false;
089
090        /**
091         * Execute system command and map its exit code to {@link ExitStatus} using
092         * {@link SystemProcessExitCodeMapper}.
093         */
094        @Override
095        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
096
097                FutureTask<Integer> systemCommandTask = new FutureTask<Integer>(new Callable<Integer>() {
098
099                        @Override
100                        public Integer call() throws Exception {
101                                Process process = Runtime.getRuntime().exec(command, environmentParams, workingDirectory);
102                                return process.waitFor();
103                        }
104
105                });
106
107                long t0 = System.currentTimeMillis();
108
109                taskExecutor.execute(systemCommandTask);
110
111                while (true) {
112                        Thread.sleep(checkInterval);//moved to the end of the logic
113
114                        if(stoppable) {
115                                JobExecution jobExecution =
116                                                jobExplorer.getJobExecution(chunkContext.getStepContext().getStepExecution().getJobExecutionId());
117
118                                if(jobExecution.isStopping()) {
119                                        stopped = true;
120                                }
121                        }
122
123                        if (systemCommandTask.isDone()) {
124                                contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
125                                return RepeatStatus.FINISHED;
126                        }
127                        else if (System.currentTimeMillis() - t0 > timeout) {
128                                systemCommandTask.cancel(interruptOnCancel);
129                                throw new SystemCommandException("Execution of system command did not finish within the timeout");
130                        }
131                        else if (execution.isTerminateOnly()) {
132                                systemCommandTask.cancel(interruptOnCancel);
133                                throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
134                        }
135                        else if (stopped) {
136                                systemCommandTask.cancel(interruptOnCancel);
137                                contribution.setExitStatus(ExitStatus.STOPPED);
138                                return RepeatStatus.FINISHED;
139                        }
140                }
141        }
142
143        /**
144         * @param command command to be executed in a separate system process
145         */
146        public void setCommand(String command) {
147                this.command = command;
148        }
149
150        /**
151         * @param envp environment parameter values, inherited from parent process
152         * when not set (or set to null).
153         */
154        public void setEnvironmentParams(String[] envp) {
155                this.environmentParams = envp;
156        }
157
158        /**
159         * @param dir working directory of the spawned process, inherited from
160         * parent process when not set (or set to null).
161         */
162        public void setWorkingDirectory(String dir) {
163                if (dir == null) {
164                        this.workingDirectory = null;
165                        return;
166                }
167                this.workingDirectory = new File(dir);
168                Assert.isTrue(workingDirectory.exists(), "working directory must exist");
169                Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory");
170
171        }
172
173        @Override
174        public void afterPropertiesSet() throws Exception {
175                Assert.hasLength(command, "'command' property value is required");
176                Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set");
177                Assert.isTrue(timeout > 0, "timeout value must be greater than zero");
178                Assert.notNull(taskExecutor, "taskExecutor is required");
179                stoppable = jobExplorer != null;
180        }
181
182        public void setJobExplorer(JobExplorer jobExplorer) {
183                this.jobExplorer = jobExplorer;
184        }
185
186        /**
187         * @param systemProcessExitCodeMapper maps system process return value to
188         * <code>ExitStatus</code> returned by Tasklet.
189         * {@link SimpleSystemProcessExitCodeMapper} is used by default.
190         */
191        public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) {
192                this.systemProcessExitCodeMapper = systemProcessExitCodeMapper;
193        }
194
195        /**
196         * Timeout in milliseconds.
197         * @param timeout upper limit for how long the execution of the external
198         * program is allowed to last.
199         */
200        public void setTimeout(long timeout) {
201                this.timeout = timeout;
202        }
203
204        /**
205         * The time interval how often the tasklet will check for termination
206         * status.
207         *
208         * @param checkInterval time interval in milliseconds (1 second by default).
209         */
210        public void setTerminationCheckInterval(long checkInterval) {
211                this.checkInterval = checkInterval;
212        }
213
214        /**
215         * Get a reference to {@link StepExecution} for interrupt checks during
216         * system command execution.
217         */
218        @Override
219        public void beforeStep(StepExecution stepExecution) {
220                this.execution = stepExecution;
221        }
222
223        /**
224         * Sets the task executor that will be used to execute the system command
225         * NB! Avoid using a synchronous task executor
226         *
227         * @param taskExecutor instance of {@link TaskExecutor}.
228         */
229        public void setTaskExecutor(TaskExecutor taskExecutor) {
230                this.taskExecutor = taskExecutor;
231        }
232
233        /**
234         * If <code>true</code> tasklet will attempt to interrupt the thread
235         * executing the system command if {@link #setTimeout(long)} has been
236         * exceeded or user interrupts the job. <code>false</code> by default
237         *
238         * @param interruptOnCancel boolean determines if process should be interrupted
239         */
240        public void setInterruptOnCancel(boolean interruptOnCancel) {
241                this.interruptOnCancel = interruptOnCancel;
242        }
243
244        /**
245         * Will interrupt the thread executing the system command only if
246         * {@link #setInterruptOnCancel(boolean)} has been set to true.  Otherwise
247         * the underlying command will be allowed to finish before the tasklet
248         * ends.
249         *
250         * @since 3.0
251         * @see StoppableTasklet#stop()
252         */
253        @Override
254        public void stop() {
255                stopped = true;
256        }
257
258}