001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.integration.chunk; 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import org.springframework.batch.core.JobInterruptedException; 022import org.springframework.batch.core.StepContribution; 023import org.springframework.batch.core.step.item.Chunk; 024import org.springframework.batch.core.step.item.ChunkProcessor; 025import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor; 026import org.springframework.batch.core.step.skip.NonSkippableReadException; 027import org.springframework.batch.core.step.skip.SkipLimitExceededException; 028import org.springframework.batch.core.step.skip.SkipListenerFailedException; 029import org.springframework.beans.factory.InitializingBean; 030import org.springframework.integration.annotation.MessageEndpoint; 031import org.springframework.integration.annotation.ServiceActivator; 032import org.springframework.retry.RetryException; 033import org.springframework.util.Assert; 034 035/** 036 * A {@link ChunkHandler} based on a {@link ChunkProcessor}. Knows how to distinguish between a processor that is fault 037 * tolerant, and one that is not. If the processor is fault tolerant then exceptions can be propagated on the assumption 038 * that there will be a roll back and the request will be re-delivered. 039 * 040 * @author Dave Syer 041 * @author Michael Minella 042 * 043 * @param <S> the type of the items in the chunk to be handled 044 */ 045@MessageEndpoint 046public class ChunkProcessorChunkHandler<S> implements ChunkHandler<S>, InitializingBean { 047 048 private static final Log logger = LogFactory.getLog(ChunkProcessorChunkHandler.class); 049 050 private ChunkProcessor<S> chunkProcessor; 051 052 /* 053 * (non-Javadoc) 054 * 055 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 056 */ 057 public void afterPropertiesSet() throws Exception { 058 Assert.notNull(chunkProcessor, "A ChunkProcessor must be provided"); 059 } 060 061 /** 062 * Public setter for the {@link ChunkProcessor}. 063 * 064 * @param chunkProcessor the chunkProcessor to set 065 */ 066 public void setChunkProcessor(ChunkProcessor<S> chunkProcessor) { 067 this.chunkProcessor = chunkProcessor; 068 } 069 070 /** 071 * 072 * @see ChunkHandler#handleChunk(ChunkRequest) 073 */ 074 @ServiceActivator 075 public ChunkResponse handleChunk(ChunkRequest<S> chunkRequest) throws Exception { 076 077 if (logger.isDebugEnabled()) { 078 logger.debug("Handling chunk: " + chunkRequest); 079 } 080 081 StepContribution stepContribution = chunkRequest.getStepContribution(); 082 083 Throwable failure = process(chunkRequest, stepContribution); 084 if (failure != null) { 085 logger.debug("Failed chunk", failure); 086 return new ChunkResponse(false, chunkRequest.getSequence(), chunkRequest.getJobId(), stepContribution, failure.getClass().getName() 087 + ": " + failure.getMessage()); 088 } 089 090 if (logger.isDebugEnabled()) { 091 logger.debug("Completed chunk handling with " + stepContribution); 092 } 093 return new ChunkResponse(true, chunkRequest.getSequence(), chunkRequest.getJobId(), stepContribution); 094 095 } 096 097 /** 098 * @param chunkRequest the current request 099 * @param stepContribution the step contribution to update 100 * @throws Exception if there is a fatal exception 101 */ 102 private Throwable process(ChunkRequest<S> chunkRequest, StepContribution stepContribution) throws Exception { 103 104 Chunk<S> chunk = new Chunk<S>(chunkRequest.getItems()); 105 Throwable failure = null; 106 try { 107 chunkProcessor.process(stepContribution, chunk); 108 } 109 catch (SkipLimitExceededException e) { 110 failure = e; 111 } 112 catch (NonSkippableReadException e) { 113 failure = e; 114 } 115 catch (SkipListenerFailedException e) { 116 failure = e; 117 } 118 catch (RetryException e) { 119 failure = e; 120 } 121 catch (JobInterruptedException e) { 122 failure = e; 123 } 124 catch (Exception e) { 125 if (chunkProcessor instanceof FaultTolerantChunkProcessor<?, ?>) { 126 // try again... 127 throw e; 128 } 129 else { 130 failure = e; 131 } 132 } 133 134 return failure; 135 136 } 137 138}