001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.integration.chunk;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import org.springframework.batch.core.JobInterruptedException;
022import org.springframework.batch.core.StepContribution;
023import org.springframework.batch.core.step.item.Chunk;
024import org.springframework.batch.core.step.item.ChunkProcessor;
025import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor;
026import org.springframework.batch.core.step.skip.NonSkippableReadException;
027import org.springframework.batch.core.step.skip.SkipLimitExceededException;
028import org.springframework.batch.core.step.skip.SkipListenerFailedException;
029import org.springframework.beans.factory.InitializingBean;
030import org.springframework.integration.annotation.MessageEndpoint;
031import org.springframework.integration.annotation.ServiceActivator;
032import org.springframework.retry.RetryException;
033import org.springframework.util.Assert;
034
035/**
036 * A {@link ChunkHandler} based on a {@link ChunkProcessor}. Knows how to distinguish between a processor that is fault
037 * tolerant, and one that is not. If the processor is fault tolerant then exceptions can be propagated on the assumption
038 * that there will be a roll back and the request will be re-delivered.
039 *
040 * @author Dave Syer
041 * @author Michael Minella
042 *
043 * @param <S> the type of the items in the chunk to be handled
044 */
045@MessageEndpoint
046public class ChunkProcessorChunkHandler<S> implements ChunkHandler<S>, InitializingBean {
047
048        private static final Log logger = LogFactory.getLog(ChunkProcessorChunkHandler.class);
049
050        private ChunkProcessor<S> chunkProcessor;
051
052        /*
053         * (non-Javadoc)
054         *
055         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
056         */
057        public void afterPropertiesSet() throws Exception {
058                Assert.notNull(chunkProcessor, "A ChunkProcessor must be provided");
059        }
060
061        /**
062         * Public setter for the {@link ChunkProcessor}.
063         *
064         * @param chunkProcessor the chunkProcessor to set
065         */
066        public void setChunkProcessor(ChunkProcessor<S> chunkProcessor) {
067                this.chunkProcessor = chunkProcessor;
068        }
069
070        /**
071         *
072         * @see ChunkHandler#handleChunk(ChunkRequest)
073         */
074        @ServiceActivator
075        public ChunkResponse handleChunk(ChunkRequest<S> chunkRequest) throws Exception {
076
077                if (logger.isDebugEnabled()) {
078                        logger.debug("Handling chunk: " + chunkRequest);
079                }
080
081                StepContribution stepContribution = chunkRequest.getStepContribution();
082
083                Throwable failure = process(chunkRequest, stepContribution);
084                if (failure != null) {
085                        logger.debug("Failed chunk", failure);
086                        return new ChunkResponse(false, chunkRequest.getSequence(), chunkRequest.getJobId(), stepContribution, failure.getClass().getName()
087                                        + ": " + failure.getMessage());
088                }
089
090                if (logger.isDebugEnabled()) {
091                        logger.debug("Completed chunk handling with " + stepContribution);
092                }
093                return new ChunkResponse(true, chunkRequest.getSequence(), chunkRequest.getJobId(), stepContribution);
094
095        }
096
097        /**
098         * @param chunkRequest the current request
099         * @param stepContribution the step contribution to update
100         * @throws Exception if there is a fatal exception
101         */
102        private Throwable process(ChunkRequest<S> chunkRequest, StepContribution stepContribution) throws Exception {
103
104                Chunk<S> chunk = new Chunk<S>(chunkRequest.getItems());
105                Throwable failure = null;
106                try {
107                        chunkProcessor.process(stepContribution, chunk);
108                }
109                catch (SkipLimitExceededException e) {
110                        failure = e;
111                }
112                catch (NonSkippableReadException e) {
113                        failure = e;
114                }
115                catch (SkipListenerFailedException e) {
116                        failure = e;
117                }
118                catch (RetryException e) {
119                        failure = e;
120                }
121                catch (JobInterruptedException e) {
122                        failure = e;
123                }
124                catch (Exception e) {
125                        if (chunkProcessor instanceof FaultTolerantChunkProcessor<?, ?>) {
126                                // try again...
127                                throw e;
128                        }
129                        else {
130                                failure = e;
131                        }
132                }
133
134                return failure;
135
136        }
137
138}