001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.tasklet; 018 019import java.io.File; 020import java.util.concurrent.Callable; 021import java.util.concurrent.FutureTask; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026import org.springframework.batch.core.ExitStatus; 027import org.springframework.batch.core.JobExecution; 028import org.springframework.batch.core.JobInterruptedException; 029import org.springframework.batch.core.StepContribution; 030import org.springframework.batch.core.StepExecution; 031import org.springframework.batch.core.explore.JobExplorer; 032import org.springframework.batch.core.listener.StepExecutionListenerSupport; 033import org.springframework.batch.core.scope.context.ChunkContext; 034import org.springframework.batch.repeat.RepeatStatus; 035import org.springframework.beans.factory.InitializingBean; 036import org.springframework.core.task.SimpleAsyncTaskExecutor; 037import org.springframework.core.task.TaskExecutor; 038import org.springframework.util.Assert; 039 040/** 041 * {@link Tasklet} that executes a system command. 042 * 043 * The system command is executed asynchronously using injected 044 * {@link #setTaskExecutor(TaskExecutor)} - timeout value is required to be set, 045 * so that the batch job does not hang forever if the external process hangs. 046 * 047 * Tasklet periodically checks for termination status (i.e. 048 * {@link #setCommand(String)} finished its execution or 049 * {@link #setTimeout(long)} expired or job was interrupted). The check interval 050 * is given by {@link #setTerminationCheckInterval(long)}. 051 * 052 * When job interrupt is detected tasklet's execution is terminated immediately 053 * by throwing {@link JobInterruptedException}. 054 * 055 * {@link #setInterruptOnCancel(boolean)} specifies whether the tasklet should 056 * attempt to interrupt the thread that executes the system command if it is 057 * still running when tasklet exits (abnormally). 058 * 059 * @author Robert Kasanicky 060 * @author Will Schipp 061 */ 062public class SystemCommandTasklet extends StepExecutionListenerSupport implements StoppableTasklet, InitializingBean { 063 064 protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class); 065 066 private String command; 067 068 private String[] environmentParams = null; 069 070 private File workingDirectory = null; 071 072 private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper(); 073 074 private long timeout = 0; 075 076 private long checkInterval = 1000; 077 078 private StepExecution execution = null; 079 080 private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 081 082 private boolean interruptOnCancel = false; 083 084 private volatile boolean stopped = false; 085 086 private JobExplorer jobExplorer; 087 088 private boolean stoppable = false; 089 090 /** 091 * Execute system command and map its exit code to {@link ExitStatus} using 092 * {@link SystemProcessExitCodeMapper}. 093 */ 094 @Override 095 public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { 096 097 FutureTask<Integer> systemCommandTask = new FutureTask<Integer>(new Callable<Integer>() { 098 099 @Override 100 public Integer call() throws Exception { 101 Process process = Runtime.getRuntime().exec(command, environmentParams, workingDirectory); 102 return process.waitFor(); 103 } 104 105 }); 106 107 long t0 = System.currentTimeMillis(); 108 109 taskExecutor.execute(systemCommandTask); 110 111 while (true) { 112 Thread.sleep(checkInterval);//moved to the end of the logic 113 114 if(stoppable) { 115 JobExecution jobExecution = 116 jobExplorer.getJobExecution(chunkContext.getStepContext().getStepExecution().getJobExecutionId()); 117 118 if(jobExecution.isStopping()) { 119 stopped = true; 120 } 121 } 122 123 if (systemCommandTask.isDone()) { 124 contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get())); 125 return RepeatStatus.FINISHED; 126 } 127 else if (System.currentTimeMillis() - t0 > timeout) { 128 systemCommandTask.cancel(interruptOnCancel); 129 throw new SystemCommandException("Execution of system command did not finish within the timeout"); 130 } 131 else if (execution.isTerminateOnly()) { 132 systemCommandTask.cancel(interruptOnCancel); 133 throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'"); 134 } 135 else if (stopped) { 136 systemCommandTask.cancel(interruptOnCancel); 137 contribution.setExitStatus(ExitStatus.STOPPED); 138 return RepeatStatus.FINISHED; 139 } 140 } 141 } 142 143 /** 144 * @param command command to be executed in a separate system process 145 */ 146 public void setCommand(String command) { 147 this.command = command; 148 } 149 150 /** 151 * @param envp environment parameter values, inherited from parent process 152 * when not set (or set to null). 153 */ 154 public void setEnvironmentParams(String[] envp) { 155 this.environmentParams = envp; 156 } 157 158 /** 159 * @param dir working directory of the spawned process, inherited from 160 * parent process when not set (or set to null). 161 */ 162 public void setWorkingDirectory(String dir) { 163 if (dir == null) { 164 this.workingDirectory = null; 165 return; 166 } 167 this.workingDirectory = new File(dir); 168 Assert.isTrue(workingDirectory.exists(), "working directory must exist"); 169 Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory"); 170 171 } 172 173 @Override 174 public void afterPropertiesSet() throws Exception { 175 Assert.hasLength(command, "'command' property value is required"); 176 Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set"); 177 Assert.isTrue(timeout > 0, "timeout value must be greater than zero"); 178 Assert.notNull(taskExecutor, "taskExecutor is required"); 179 stoppable = jobExplorer != null; 180 } 181 182 public void setJobExplorer(JobExplorer jobExplorer) { 183 this.jobExplorer = jobExplorer; 184 } 185 186 /** 187 * @param systemProcessExitCodeMapper maps system process return value to 188 * <code>ExitStatus</code> returned by Tasklet. 189 * {@link SimpleSystemProcessExitCodeMapper} is used by default. 190 */ 191 public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) { 192 this.systemProcessExitCodeMapper = systemProcessExitCodeMapper; 193 } 194 195 /** 196 * Timeout in milliseconds. 197 * @param timeout upper limit for how long the execution of the external 198 * program is allowed to last. 199 */ 200 public void setTimeout(long timeout) { 201 this.timeout = timeout; 202 } 203 204 /** 205 * The time interval how often the tasklet will check for termination 206 * status. 207 * 208 * @param checkInterval time interval in milliseconds (1 second by default). 209 */ 210 public void setTerminationCheckInterval(long checkInterval) { 211 this.checkInterval = checkInterval; 212 } 213 214 /** 215 * Get a reference to {@link StepExecution} for interrupt checks during 216 * system command execution. 217 */ 218 @Override 219 public void beforeStep(StepExecution stepExecution) { 220 this.execution = stepExecution; 221 } 222 223 /** 224 * Sets the task executor that will be used to execute the system command 225 * NB! Avoid using a synchronous task executor 226 * 227 * @param taskExecutor instance of {@link TaskExecutor}. 228 */ 229 public void setTaskExecutor(TaskExecutor taskExecutor) { 230 this.taskExecutor = taskExecutor; 231 } 232 233 /** 234 * If <code>true</code> tasklet will attempt to interrupt the thread 235 * executing the system command if {@link #setTimeout(long)} has been 236 * exceeded or user interrupts the job. <code>false</code> by default 237 * 238 * @param interruptOnCancel boolean determines if process should be interrupted 239 */ 240 public void setInterruptOnCancel(boolean interruptOnCancel) { 241 this.interruptOnCancel = interruptOnCancel; 242 } 243 244 /** 245 * Will interrupt the thread executing the system command only if 246 * {@link #setInterruptOnCancel(boolean)} has been set to true. Otherwise 247 * the underlying command will be allowed to finish before the tasklet 248 * ends. 249 * 250 * @since 3.0 251 * @see StoppableTasklet#stop() 252 */ 253 @Override 254 public void stop() { 255 stopped = true; 256 } 257 258}