001/* 002 * Copyright 2013-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.partition; 017 018import java.io.Serializable; 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Properties; 025import java.util.Queue; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.Future; 029import java.util.concurrent.FutureTask; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.locks.ReentrantLock; 032 033import javax.batch.api.partition.PartitionAnalyzer; 034import javax.batch.api.partition.PartitionCollector; 035import javax.batch.api.partition.PartitionMapper; 036import javax.batch.api.partition.PartitionPlan; 037 038import org.springframework.batch.core.BatchStatus; 039import org.springframework.batch.core.ExitStatus; 040import org.springframework.batch.core.JobExecutionException; 041import org.springframework.batch.core.Step; 042import org.springframework.batch.core.StepExecution; 043import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext; 044import org.springframework.batch.core.partition.PartitionHandler; 045import org.springframework.batch.core.partition.StepExecutionSplitter; 046import org.springframework.batch.core.repository.JobRepository; 047import org.springframework.batch.item.ExecutionContext; 048import org.springframework.beans.factory.InitializingBean; 049import org.springframework.core.task.TaskRejectedException; 050import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 051import org.springframework.util.Assert; 052 053/** 054 * Executes a step instance per thread using a {@link ThreadPoolTaskExecutor} in 055 * accordance with JSR-352. The results from each step is aggregated into a 056 * cumulative result. 057 * 058 * @author Michael Minella 059 * @author Mahmoud Ben Hassine 060 * @since 3.0 061 */ 062public class JsrPartitionHandler implements PartitionHandler, InitializingBean { 063 064 private static final int DEFAULT_POLLING_INTERVAL = 500; 065 066 // TODO: Replace with proper Channel and Messages once minimum support level for Spring is 4 067 private Queue<Serializable> partitionDataQueue; 068 private ReentrantLock lock; 069 private Step step; 070 private int partitions; 071 private PartitionAnalyzer analyzer; 072 private PartitionMapper mapper; 073 private int threads; 074 private BatchPropertyContext propertyContext; 075 private JobRepository jobRepository; 076 private boolean allowStartIfComplete = false; 077 private Set<String> partitionStepNames = new HashSet<String>(); 078 private int pollingInterval = DEFAULT_POLLING_INTERVAL; 079 080 /** 081 * @return the step that will be executed by each partition 082 */ 083 public Step getStep() { 084 return step; 085 } 086 087 /** 088 * @return the names of each partitioned step 089 */ 090 public Collection<String> getPartitionStepNames() { 091 return partitionStepNames; 092 } 093 094 /** 095 * @param allowStartIfComplete flag stating if the step should restart if it 096 * was complete in a previous run 097 */ 098 public void setAllowStartIfComplete(boolean allowStartIfComplete) { 099 this.allowStartIfComplete = allowStartIfComplete; 100 } 101 102 /** 103 * @param queue {@link Queue} to receive the output of the {@link PartitionCollector} 104 */ 105 public void setPartitionDataQueue(Queue<Serializable> queue) { 106 this.partitionDataQueue = queue; 107 } 108 109 public void setPartitionLock(ReentrantLock lock) { 110 this.lock = lock; 111 } 112 113 /** 114 * @param context {@link BatchPropertyContext} to resolve partition level step properties 115 */ 116 public void setPropertyContext(BatchPropertyContext context) { 117 this.propertyContext = context; 118 } 119 120 /** 121 * @param mapper {@link PartitionMapper} used to configure partitioning 122 */ 123 public void setPartitionMapper(PartitionMapper mapper) { 124 this.mapper = mapper; 125 } 126 127 /** 128 * @param step the step to be executed as a partitioned step 129 */ 130 public void setStep(Step step) { 131 this.step = step; 132 } 133 134 /** 135 * @param analyzer {@link PartitionAnalyzer} 136 */ 137 public void setPartitionAnalyzer(PartitionAnalyzer analyzer) { 138 this.analyzer = analyzer; 139 } 140 141 /** 142 * @param threads the number of threads to execute the partitions to be run 143 * within. The default is the number of partitions. 144 */ 145 public void setThreads(int threads) { 146 this.threads = threads; 147 } 148 149 /** 150 * @param partitions the number of partitions to be executed 151 */ 152 public void setPartitions(int partitions) { 153 this.partitions = partitions; 154 } 155 156 /** 157 * @param jobRepository {@link JobRepository} 158 */ 159 public void setJobRepository(JobRepository jobRepository) { 160 this.jobRepository = jobRepository; 161 } 162 163 /** 164 * @param pollingInterval the duration of partitions completion polling interval 165 * (in milliseconds). The default value is 500ms. 166 */ 167 public void setPollingInterval(int pollingInterval) { 168 this.pollingInterval = pollingInterval; 169 } 170 171 /* (non-Javadoc) 172 * @see org.springframework.batch.core.partition.PartitionHandler#handle(org.springframework.batch.core.partition.StepExecutionSplitter, org.springframework.batch.core.StepExecution) 173 */ 174 @Override 175 public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, 176 StepExecution stepExecution) throws Exception { 177 final List<Future<StepExecution>> tasks = new ArrayList<Future<StepExecution>>(); 178 final Set<StepExecution> result = new HashSet<StepExecution>(); 179 final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 180 181 int stepExecutionCount = jobRepository.getStepExecutionCount(stepExecution.getJobExecution().getJobInstance(), stepExecution.getStepName()); 182 183 boolean isRestart = stepExecutionCount > 1; 184 185 Set<StepExecution> partitionStepExecutions = splitStepExecution(stepExecution, isRestart); 186 187 for (StepExecution curStepExecution : partitionStepExecutions) { 188 partitionStepNames.add(curStepExecution.getStepName()); 189 } 190 191 taskExecutor.setCorePoolSize(threads); 192 taskExecutor.setMaxPoolSize(threads); 193 194 taskExecutor.initialize(); 195 196 try { 197 for (final StepExecution curStepExecution : partitionStepExecutions) { 198 final FutureTask<StepExecution> task = createTask(step, curStepExecution); 199 200 try { 201 taskExecutor.execute(task); 202 tasks.add(task); 203 } catch (TaskRejectedException e) { 204 // couldn't execute one of the tasks 205 ExitStatus exitStatus = ExitStatus.FAILED 206 .addExitDescription("TaskExecutor rejected the task for this step."); 207 /* 208 * Set the status in case the caller is tracking it through the 209 * JobExecution. 210 */ 211 curStepExecution.setStatus(BatchStatus.FAILED); 212 curStepExecution.setExitStatus(exitStatus); 213 result.add(stepExecution); 214 } 215 } 216 217 processPartitionResults(tasks, result); 218 } 219 finally { 220 taskExecutor.shutdown(); 221 } 222 223 return result; 224 } 225 226 /** 227 * Blocks until all partitioned steps have completed. As each step completes 228 * the PartitionAnalyzer analyzes the collector data received from each 229 * partition (if there is any). 230 * 231 * @param tasks The {@link Future} that contains the reference to the executing step 232 * @param result Set of completed {@link StepExecution}s 233 * @throws Exception 234 */ 235 private void processPartitionResults( 236 final List<Future<StepExecution>> tasks, 237 final Set<StepExecution> result) throws Exception { 238 while(true) { 239 Thread.sleep(pollingInterval); 240 try { 241 lock.lock(); 242 while(!partitionDataQueue.isEmpty()) { 243 analyzer.analyzeCollectorData(partitionDataQueue.remove()); 244 } 245 246 processFinishedPartitions(tasks, result); 247 248 if(tasks.size() == 0) { 249 break; 250 } 251 } finally { 252 if(lock.isHeldByCurrentThread()) { 253 lock.unlock(); 254 } 255 } 256 } 257 } 258 259 /** 260 * Uses either the {@link PartitionMapper} or the hard coded configuration to split 261 * the supplied master StepExecution into the slave StepExecutions. 262 * 263 * @param stepExecution master {@link StepExecution} 264 * @param isRestart true if this step is being restarted 265 * @return a {@link Set} of {@link StepExecution}s to be executed 266 * @throws Exception 267 * @throws JobExecutionException 268 */ 269 private Set<StepExecution> splitStepExecution(StepExecution stepExecution, 270 boolean isRestart) throws Exception, JobExecutionException { 271 Set<StepExecution> partitionStepExecutions = new HashSet<StepExecution>(); 272 if(isRestart) { 273 if(mapper != null) { 274 PartitionPlan plan = mapper.mapPartitions(); 275 276 if(plan.getPartitionsOverride()) { 277 partitionStepExecutions = applyPartitionPlan(stepExecution, plan, false); 278 279 for (StepExecution curStepExecution : partitionStepExecutions) { 280 curStepExecution.setExecutionContext(new ExecutionContext()); 281 } 282 } else { 283 Properties[] partitionProps = plan.getPartitionProperties(); 284 285 plan = (PartitionPlanState) stepExecution.getExecutionContext().get("partitionPlanState"); 286 plan.setPartitionProperties(partitionProps); 287 288 partitionStepExecutions = applyPartitionPlan(stepExecution, plan, true); 289 } 290 291 } else { 292 StepExecutionSplitter stepSplitter = new JsrStepExecutionSplitter(jobRepository, allowStartIfComplete, stepExecution.getStepName(), true); 293 partitionStepExecutions = stepSplitter.split(stepExecution, partitions); 294 } 295 } else { 296 if(mapper != null) { 297 PartitionPlan plan = mapper.mapPartitions(); 298 partitionStepExecutions = applyPartitionPlan(stepExecution, plan, true); 299 } else { 300 StepExecutionSplitter stepSplitter = new JsrStepExecutionSplitter(jobRepository, allowStartIfComplete, stepExecution.getStepName(), true); 301 partitionStepExecutions = stepSplitter.split(stepExecution, partitions); 302 } 303 } 304 return partitionStepExecutions; 305 } 306 307 private Set<StepExecution> applyPartitionPlan(StepExecution stepExecution, 308 PartitionPlan plan, boolean restoreState) throws JobExecutionException { 309 StepExecutionSplitter stepSplitter; 310 Set<StepExecution> partitionStepExecutions; 311 if(plan.getThreads() > 0) { 312 threads = plan.getThreads(); 313 } else if(plan.getPartitions() > 0) { 314 threads = plan.getPartitions(); 315 } else { 316 throw new IllegalArgumentException("Either a number of threads or partitions are required"); 317 } 318 319 PartitionPlanState partitionPlanState = new PartitionPlanState(); 320 partitionPlanState.setPartitionPlan(plan); 321 322 stepExecution.getExecutionContext().put("partitionPlanState", partitionPlanState); 323 324 stepSplitter = new JsrStepExecutionSplitter(jobRepository, allowStartIfComplete, stepExecution.getStepName(), restoreState); 325 partitionStepExecutions = stepSplitter.split(stepExecution, plan.getPartitions()); 326 registerPartitionProperties(partitionStepExecutions, plan); 327 return partitionStepExecutions; 328 } 329 330 private void processFinishedPartitions( 331 final List<Future<StepExecution>> tasks, 332 final Set<StepExecution> result) throws Exception { 333 for(int i = 0; i < tasks.size(); i++) { 334 Future<StepExecution> curTask = tasks.get(i); 335 336 if(curTask.isDone()) { 337 StepExecution curStepExecution = curTask.get(); 338 339 if(analyzer != null) { 340 analyzer.analyzeStatus(curStepExecution.getStatus().getBatchStatus(), curStepExecution.getExitStatus().getExitCode()); 341 } 342 343 result.add(curStepExecution); 344 345 tasks.remove(i); 346 i--; 347 } 348 } 349 } 350 351 private void registerPartitionProperties( 352 Set<StepExecution> partitionStepExecutions, PartitionPlan plan) { 353 Properties[] partitionProperties = plan.getPartitionProperties(); 354 if(partitionProperties != null) { 355 Iterator<StepExecution> executions = partitionStepExecutions.iterator(); 356 357 int i = 0; 358 while(executions.hasNext()) { 359 StepExecution curExecution = executions.next(); 360 361 if(i < partitionProperties.length) { 362 Properties partitionPropertyValues = partitionProperties[i]; 363 if(partitionPropertyValues != null) { 364 propertyContext.setStepProperties(curExecution.getStepName(), partitionPropertyValues); 365 } 366 367 i++; 368 } else { 369 break; 370 } 371 } 372 } 373 } 374 375 /** 376 * Creates the task executing the given step in the context of the given execution. 377 * 378 * @param step the step to execute 379 * @param stepExecution the given execution 380 * @return the task executing the given step 381 */ 382 protected FutureTask<StepExecution> createTask(final Step step, 383 final StepExecution stepExecution) { 384 return new FutureTask<StepExecution>(new Callable<StepExecution>() { 385 @Override 386 public StepExecution call() throws Exception { 387 step.execute(stepExecution); 388 return stepExecution; 389 } 390 }); 391 } 392 393 /* (non-Javadoc) 394 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 395 */ 396 @Override 397 public void afterPropertiesSet() throws Exception { 398 Assert.notNull(propertyContext, "A BatchPropertyContext is required"); 399 Assert.isTrue(mapper != null || (threads > 0 || partitions > 0), "Either a mapper implementation or the number of partitions/threads is required"); 400 Assert.notNull(jobRepository, "A JobRepository is required"); 401 Assert.isTrue(pollingInterval >= 0, "The polling interval must be positive"); 402 403 if(partitionDataQueue == null) { 404 partitionDataQueue = new LinkedBlockingQueue<Serializable>(); 405 } 406 407 if(lock == null) { 408 lock = new ReentrantLock(); 409 } 410 } 411 412 /** 413 * Since a {@link PartitionPlan} could provide dynamic data (different results from run to run), 414 * the batch runtime needs to save off the results for restarts. This class serves as a container 415 * used to save off that state. 416 * 417 * @author Michael Minella 418 * @since 3.0 419 */ 420 public static class PartitionPlanState implements PartitionPlan, Serializable { 421 422 private static final long serialVersionUID = 1L; 423 private Properties[] partitionProperties; 424 private int partitions; 425 private int threads; 426 427 /** 428 * @param plan the {@link PartitionPlan} that is the source of the state 429 */ 430 public PartitionPlanState(PartitionPlan plan) { 431 partitionProperties = plan.getPartitionProperties(); 432 partitions = plan.getPartitions(); 433 threads = plan.getThreads(); 434 } 435 436 public PartitionPlanState() { 437 } 438 439 public void setPartitionPlan(PartitionPlan plan) { 440 this.partitionProperties = plan.getPartitionProperties(); 441 this.partitions = plan.getPartitions(); 442 this.threads = plan.getThreads(); 443 } 444 445 /* (non-Javadoc) 446 * @see javax.batch.api.partition.PartitionPlan#getPartitionProperties() 447 */ 448 @Override 449 public Properties[] getPartitionProperties() { 450 return partitionProperties; 451 } 452 453 /* (non-Javadoc) 454 * @see javax.batch.api.partition.PartitionPlan#getPartitions() 455 */ 456 @Override 457 public int getPartitions() { 458 return partitions; 459 } 460 461 /* (non-Javadoc) 462 * @see javax.batch.api.partition.PartitionPlan#getThreads() 463 */ 464 @Override 465 public int getThreads() { 466 return threads; 467 } 468 469 /* (non-Javadoc) 470 * @see javax.batch.api.partition.PartitionPlan#setPartitions(int) 471 */ 472 @Override 473 public void setPartitions(int count) { 474 this.partitions = count; 475 } 476 477 /* (non-Javadoc) 478 * @see javax.batch.api.partition.PartitionPlan#setPartitionsOverride(boolean) 479 */ 480 @Override 481 public void setPartitionsOverride(boolean override) { 482 // Intentional No-op 483 } 484 485 /* (non-Javadoc) 486 * @see javax.batch.api.partition.PartitionPlan#getPartitionsOverride() 487 */ 488 @Override 489 public boolean getPartitionsOverride() { 490 return false; 491 } 492 493 /* (non-Javadoc) 494 * @see javax.batch.api.partition.PartitionPlan#setThreads(int) 495 */ 496 @Override 497 public void setThreads(int count) { 498 this.threads = count; 499 } 500 501 /* (non-Javadoc) 502 * @see javax.batch.api.partition.PartitionPlan#setPartitionProperties(java.util.Properties[]) 503 */ 504 @Override 505 public void setPartitionProperties(Properties[] props) { 506 this.partitionProperties = props; 507 } 508 } 509}