001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.integration.chunk;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.List;
022import java.util.Queue;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.springframework.batch.core.BatchStatus;
029import org.springframework.batch.core.ExitStatus;
030import org.springframework.batch.core.StepContribution;
031import org.springframework.batch.core.StepExecution;
032import org.springframework.batch.core.listener.StepExecutionListenerSupport;
033import org.springframework.batch.item.ExecutionContext;
034import org.springframework.batch.item.ItemStream;
035import org.springframework.batch.item.ItemStreamException;
036import org.springframework.batch.item.ItemWriter;
037import org.springframework.integration.core.MessagingTemplate;
038import org.springframework.messaging.Message;
039import org.springframework.messaging.PollableChannel;
040import org.springframework.messaging.support.GenericMessage;
041import org.springframework.util.Assert;
042
043public class ChunkMessageChannelItemWriter<T> extends StepExecutionListenerSupport implements ItemWriter<T>,
044                ItemStream, StepContributionSource {
045
046        private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
047
048        static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL";
049
050        static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
051
052        private static final long DEFAULT_THROTTLE_LIMIT = 6;
053
054        private MessagingTemplate messagingGateway;
055
056        private final LocalState localState = new LocalState();
057
058        private long throttleLimit = DEFAULT_THROTTLE_LIMIT;
059
060        private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
061
062        private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;
063
064        private PollableChannel replyChannel;
065
066        /**
067         * The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a
068         * multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing
069         * slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40.
070         *
071         * @param maxWaitTimeouts the maximum number of wait timeouts
072         */
073        public void setMaxWaitTimeouts(int maxWaitTimeouts) {
074                this.maxWaitTimeouts = maxWaitTimeouts;
075        }
076
077        /**
078         * Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid
079         * overwhelming the receivers.
080         * @param throttleLimit the throttle limit to set
081         */
082        public void setThrottleLimit(long throttleLimit) {
083                this.throttleLimit = throttleLimit;
084        }
085
086        public void setMessagingOperations(MessagingTemplate messagingGateway) {
087                this.messagingGateway = messagingGateway;
088        }
089
090        public void setReplyChannel(PollableChannel replyChannel) {
091                this.replyChannel = replyChannel;
092        }
093
094        public void write(List<? extends T> items) throws Exception {
095
096                // Block until expecting <= throttle limit
097                while (localState.getExpecting() > throttleLimit) {
098                        getNextResult();
099                }
100
101                if (!items.isEmpty()) {
102
103                        ChunkRequest<T> request = localState.getRequest(items);
104                        if (logger.isDebugEnabled()) {
105                                logger.debug("Dispatching chunk: " + request);
106                        }
107                        messagingGateway.send(new GenericMessage<ChunkRequest<T>>(request));
108                        localState.incrementExpected();
109
110                }
111
112        }
113
114        @Override
115        public void beforeStep(StepExecution stepExecution) {
116                localState.setStepExecution(stepExecution);
117        }
118
119        @Override
120        public ExitStatus afterStep(StepExecution stepExecution) {
121                if (!(stepExecution.getStatus() == BatchStatus.COMPLETED)) {
122                        return ExitStatus.EXECUTING;
123                }
124                long expecting = localState.getExpecting();
125                boolean timedOut;
126                try {
127                        logger.debug("Waiting for results in step listener...");
128                        timedOut = !waitForResults();
129                        logger.debug("Finished waiting for results in step listener.");
130                }
131                catch (RuntimeException e) {
132                        logger.debug("Detected failure waiting for results in step listener.", e);
133                        stepExecution.setStatus(BatchStatus.FAILED);
134                        return ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
135                }
136                finally {
137
138                        if (logger.isDebugEnabled()) {
139                                logger.debug("Finished waiting for results in step listener.  Still expecting: "
140                                                + localState.getExpecting());
141                        }
142
143                        for (StepContribution contribution : getStepContributions()) {
144                                stepExecution.apply(contribution);
145                        }
146                }
147                if (timedOut) {
148                        stepExecution.setStatus(BatchStatus.FAILED);
149                        return ExitStatus.FAILED.addExitDescription("Timed out waiting for " + localState.getExpecting()
150                                        + " backlog at end of step");
151                }
152                return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results.");
153        }
154
155        public void close() throws ItemStreamException {
156                localState.reset();
157        }
158
159        public void open(ExecutionContext executionContext) throws ItemStreamException {
160                if (executionContext.containsKey(EXPECTED)) {
161                        localState.open(executionContext.getInt(EXPECTED), executionContext.getInt(ACTUAL));
162                        if (!waitForResults()) {
163                                throw new ItemStreamException("Timed out waiting for back log on open");
164                        }
165                }
166        }
167
168        public void update(ExecutionContext executionContext) throws ItemStreamException {
169                executionContext.putInt(EXPECTED, localState.expected.intValue());
170                executionContext.putInt(ACTUAL, localState.actual.intValue());
171        }
172
173        public Collection<StepContribution> getStepContributions() {
174                List<StepContribution> contributions = new ArrayList<StepContribution>();
175                for (ChunkResponse response : localState.pollChunkResponses()) {
176                        StepContribution contribution = response.getStepContribution();
177                        if (logger.isDebugEnabled()) {
178                                logger.debug("Applying: " + response);
179                        }
180                        contributions.add(contribution);
181                }
182                return contributions;
183        }
184
185        /**
186         * Wait until all the results that are in the pipeline come back to the reply channel.
187         *
188         * @return true if successfully received a result, false if timed out
189         */
190        private boolean waitForResults() throws AsynchronousFailureException {
191                int count = 0;
192                int maxCount = maxWaitTimeouts;
193                Throwable failure = null;
194                logger.info("Waiting for " + localState.getExpecting() + " results");
195                while (localState.getExpecting() > 0 && count++ < maxCount) {
196                        try {
197                                getNextResult();
198                        }
199                        catch (Throwable t) {
200                                logger.error("Detected error in remote result. Trying to recover " + localState.getExpecting()
201                                                + " outstanding results before completing.", t);
202                                failure = t;
203                        }
204                }
205                if (failure != null) {
206                        throw wrapIfNecessary(failure);
207                }
208                return count < maxCount;
209        }
210
211        /**
212         * Get the next result if it is available (within the timeout specified in the gateway), otherwise do nothing.
213         *
214         * @throws AsynchronousFailureException If there is a response and it contains a failed chunk response.
215         *
216         * @throws IllegalStateException if the result contains the wrong job instance id (maybe we are sharing a channel
217         * and we shouldn't be)
218         */
219        @SuppressWarnings("unchecked")
220        private void getNextResult() throws AsynchronousFailureException {
221                Message<ChunkResponse> message = (Message<ChunkResponse>) messagingGateway.receive(replyChannel);
222                if (message != null) {
223                        ChunkResponse payload = message.getPayload();
224                        if (logger.isDebugEnabled()) {
225                                logger.debug("Found result: " + payload);
226                        }
227                        Long jobInstanceId = payload.getJobId();
228                        Assert.state(jobInstanceId != null, "Message did not contain job instance id.");
229                        Assert.state(jobInstanceId.equals(localState.getJobId()), "Message contained wrong job instance id ["
230                                        + jobInstanceId + "] should have been [" + localState.getJobId() + "].");
231                        if (payload.isRedelivered()) {
232                                logger
233                                                .warn("Redelivered result detected, which may indicate stale state. In the best case, we just picked up a timed out message "
234                                                                + "from a previous failed execution. In the worst case (and if this is not a restart), "
235                                                                + "the step may now timeout.  In that case if you believe that all messages "
236                                                                + "from workers have been sent, the business state "
237                                                                + "is probably inconsistent, and the step will fail.");
238                                localState.incrementRedelivered();
239                        }
240                        localState.pushResponse(payload);
241                        localState.incrementActual();
242                        if (!payload.isSuccessful()) {
243                                throw new AsynchronousFailureException("Failure or interrupt detected in handler: "
244                                                + payload.getMessage());
245                        }
246                }
247        }
248
249        /**
250         * Re-throws the original throwable if it is unchecked, wraps checked exceptions into
251         * {@link AsynchronousFailureException}.
252         */
253        private static AsynchronousFailureException wrapIfNecessary(Throwable throwable) {
254                if (throwable instanceof Error) {
255                        throw (Error) throwable;
256                }
257                else if (throwable instanceof AsynchronousFailureException) {
258                        return (AsynchronousFailureException) throwable;
259                }
260                else {
261                        return new AsynchronousFailureException("Exception in remote process", throwable);
262                }
263        }
264
265        private static class LocalState {
266
267                private final AtomicInteger current = new AtomicInteger(-1);
268
269                private final AtomicInteger actual = new AtomicInteger();
270
271                private final AtomicInteger expected = new AtomicInteger();
272
273                private final AtomicInteger redelivered = new AtomicInteger();
274
275                private StepExecution stepExecution;
276
277                private final Queue<ChunkResponse> contributions = new LinkedBlockingQueue<ChunkResponse>();
278
279                public int getExpecting() {
280                        return expected.get() - actual.get();
281                }
282
283                public <T> ChunkRequest<T> getRequest(List<? extends T> items) {
284                        return new ChunkRequest<T>(current.incrementAndGet(), items, getJobId(), createStepContribution());
285                }
286
287                public void open(int expectedValue, int actualValue) {
288                        actual.set(actualValue);
289                        expected.set(expectedValue);
290                }
291
292                public Collection<ChunkResponse> pollChunkResponses() {
293                        Collection<ChunkResponse> set = new ArrayList<ChunkResponse>();
294                        synchronized (contributions) {
295                                ChunkResponse item = contributions.poll();
296                                while (item != null) {
297                                        set.add(item);
298                                        item = contributions.poll();
299                                }
300                        }
301                        return set;
302                }
303
304                public void pushResponse(ChunkResponse stepContribution) {
305                        synchronized (contributions) {
306                                contributions.add(stepContribution);
307                        }
308                }
309
310                public void incrementRedelivered() {
311                        redelivered.incrementAndGet();
312                }
313
314                public void incrementActual() {
315                        actual.incrementAndGet();
316                }
317
318                public void incrementExpected() {
319                        expected.incrementAndGet();
320                }
321
322                public StepContribution createStepContribution() {
323                        return stepExecution.createStepContribution();
324                }
325
326                public Long getJobId() {
327                        return stepExecution.getJobExecution().getJobId();
328                }
329
330                public void setStepExecution(StepExecution stepExecution) {
331                        this.stepExecution = stepExecution;
332                }
333
334                public void reset() {
335                        expected.set(0);
336                        actual.set(0);
337                }
338        }
339
340}