001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.integration.chunk; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.List; 022import java.util.Queue; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.springframework.batch.core.BatchStatus; 029import org.springframework.batch.core.ExitStatus; 030import org.springframework.batch.core.StepContribution; 031import org.springframework.batch.core.StepExecution; 032import org.springframework.batch.core.listener.StepExecutionListenerSupport; 033import org.springframework.batch.item.ExecutionContext; 034import org.springframework.batch.item.ItemStream; 035import org.springframework.batch.item.ItemStreamException; 036import org.springframework.batch.item.ItemWriter; 037import org.springframework.integration.core.MessagingTemplate; 038import org.springframework.messaging.Message; 039import org.springframework.messaging.PollableChannel; 040import org.springframework.messaging.support.GenericMessage; 041import org.springframework.util.Assert; 042 043public class ChunkMessageChannelItemWriter<T> extends StepExecutionListenerSupport implements ItemWriter<T>, 044 ItemStream, StepContributionSource { 045 046 private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class); 047 048 static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL"; 049 050 static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED"; 051 052 private static final long DEFAULT_THROTTLE_LIMIT = 6; 053 054 private MessagingTemplate messagingGateway; 055 056 private final LocalState localState = new LocalState(); 057 058 private long throttleLimit = DEFAULT_THROTTLE_LIMIT; 059 060 private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40; 061 062 private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS; 063 064 private PollableChannel replyChannel; 065 066 /** 067 * The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a 068 * multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing 069 * slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40. 070 * 071 * @param maxWaitTimeouts the maximum number of wait timeouts 072 */ 073 public void setMaxWaitTimeouts(int maxWaitTimeouts) { 074 this.maxWaitTimeouts = maxWaitTimeouts; 075 } 076 077 /** 078 * Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid 079 * overwhelming the receivers. 080 * @param throttleLimit the throttle limit to set 081 */ 082 public void setThrottleLimit(long throttleLimit) { 083 this.throttleLimit = throttleLimit; 084 } 085 086 public void setMessagingOperations(MessagingTemplate messagingGateway) { 087 this.messagingGateway = messagingGateway; 088 } 089 090 public void setReplyChannel(PollableChannel replyChannel) { 091 this.replyChannel = replyChannel; 092 } 093 094 public void write(List<? extends T> items) throws Exception { 095 096 // Block until expecting <= throttle limit 097 while (localState.getExpecting() > throttleLimit) { 098 getNextResult(); 099 } 100 101 if (!items.isEmpty()) { 102 103 ChunkRequest<T> request = localState.getRequest(items); 104 if (logger.isDebugEnabled()) { 105 logger.debug("Dispatching chunk: " + request); 106 } 107 messagingGateway.send(new GenericMessage<ChunkRequest<T>>(request)); 108 localState.incrementExpected(); 109 110 } 111 112 } 113 114 @Override 115 public void beforeStep(StepExecution stepExecution) { 116 localState.setStepExecution(stepExecution); 117 } 118 119 @Override 120 public ExitStatus afterStep(StepExecution stepExecution) { 121 if (!(stepExecution.getStatus() == BatchStatus.COMPLETED)) { 122 return ExitStatus.EXECUTING; 123 } 124 long expecting = localState.getExpecting(); 125 boolean timedOut; 126 try { 127 logger.debug("Waiting for results in step listener..."); 128 timedOut = !waitForResults(); 129 logger.debug("Finished waiting for results in step listener."); 130 } 131 catch (RuntimeException e) { 132 logger.debug("Detected failure waiting for results in step listener.", e); 133 stepExecution.setStatus(BatchStatus.FAILED); 134 return ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage()); 135 } 136 finally { 137 138 if (logger.isDebugEnabled()) { 139 logger.debug("Finished waiting for results in step listener. Still expecting: " 140 + localState.getExpecting()); 141 } 142 143 for (StepContribution contribution : getStepContributions()) { 144 stepExecution.apply(contribution); 145 } 146 } 147 if (timedOut) { 148 stepExecution.setStatus(BatchStatus.FAILED); 149 return ExitStatus.FAILED.addExitDescription("Timed out waiting for " + localState.getExpecting() 150 + " backlog at end of step"); 151 } 152 return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results."); 153 } 154 155 public void close() throws ItemStreamException { 156 localState.reset(); 157 } 158 159 public void open(ExecutionContext executionContext) throws ItemStreamException { 160 if (executionContext.containsKey(EXPECTED)) { 161 localState.open(executionContext.getInt(EXPECTED), executionContext.getInt(ACTUAL)); 162 if (!waitForResults()) { 163 throw new ItemStreamException("Timed out waiting for back log on open"); 164 } 165 } 166 } 167 168 public void update(ExecutionContext executionContext) throws ItemStreamException { 169 executionContext.putInt(EXPECTED, localState.expected.intValue()); 170 executionContext.putInt(ACTUAL, localState.actual.intValue()); 171 } 172 173 public Collection<StepContribution> getStepContributions() { 174 List<StepContribution> contributions = new ArrayList<StepContribution>(); 175 for (ChunkResponse response : localState.pollChunkResponses()) { 176 StepContribution contribution = response.getStepContribution(); 177 if (logger.isDebugEnabled()) { 178 logger.debug("Applying: " + response); 179 } 180 contributions.add(contribution); 181 } 182 return contributions; 183 } 184 185 /** 186 * Wait until all the results that are in the pipeline come back to the reply channel. 187 * 188 * @return true if successfully received a result, false if timed out 189 */ 190 private boolean waitForResults() throws AsynchronousFailureException { 191 int count = 0; 192 int maxCount = maxWaitTimeouts; 193 Throwable failure = null; 194 logger.info("Waiting for " + localState.getExpecting() + " results"); 195 while (localState.getExpecting() > 0 && count++ < maxCount) { 196 try { 197 getNextResult(); 198 } 199 catch (Throwable t) { 200 logger.error("Detected error in remote result. Trying to recover " + localState.getExpecting() 201 + " outstanding results before completing.", t); 202 failure = t; 203 } 204 } 205 if (failure != null) { 206 throw wrapIfNecessary(failure); 207 } 208 return count < maxCount; 209 } 210 211 /** 212 * Get the next result if it is available (within the timeout specified in the gateway), otherwise do nothing. 213 * 214 * @throws AsynchronousFailureException If there is a response and it contains a failed chunk response. 215 * 216 * @throws IllegalStateException if the result contains the wrong job instance id (maybe we are sharing a channel 217 * and we shouldn't be) 218 */ 219 @SuppressWarnings("unchecked") 220 private void getNextResult() throws AsynchronousFailureException { 221 Message<ChunkResponse> message = (Message<ChunkResponse>) messagingGateway.receive(replyChannel); 222 if (message != null) { 223 ChunkResponse payload = message.getPayload(); 224 if (logger.isDebugEnabled()) { 225 logger.debug("Found result: " + payload); 226 } 227 Long jobInstanceId = payload.getJobId(); 228 Assert.state(jobInstanceId != null, "Message did not contain job instance id."); 229 Assert.state(jobInstanceId.equals(localState.getJobId()), "Message contained wrong job instance id [" 230 + jobInstanceId + "] should have been [" + localState.getJobId() + "]."); 231 if (payload.isRedelivered()) { 232 logger 233 .warn("Redelivered result detected, which may indicate stale state. In the best case, we just picked up a timed out message " 234 + "from a previous failed execution. In the worst case (and if this is not a restart), " 235 + "the step may now timeout. In that case if you believe that all messages " 236 + "from workers have been sent, the business state " 237 + "is probably inconsistent, and the step will fail."); 238 localState.incrementRedelivered(); 239 } 240 localState.pushResponse(payload); 241 localState.incrementActual(); 242 if (!payload.isSuccessful()) { 243 throw new AsynchronousFailureException("Failure or interrupt detected in handler: " 244 + payload.getMessage()); 245 } 246 } 247 } 248 249 /** 250 * Re-throws the original throwable if it is unchecked, wraps checked exceptions into 251 * {@link AsynchronousFailureException}. 252 */ 253 private static AsynchronousFailureException wrapIfNecessary(Throwable throwable) { 254 if (throwable instanceof Error) { 255 throw (Error) throwable; 256 } 257 else if (throwable instanceof AsynchronousFailureException) { 258 return (AsynchronousFailureException) throwable; 259 } 260 else { 261 return new AsynchronousFailureException("Exception in remote process", throwable); 262 } 263 } 264 265 private static class LocalState { 266 267 private final AtomicInteger current = new AtomicInteger(-1); 268 269 private final AtomicInteger actual = new AtomicInteger(); 270 271 private final AtomicInteger expected = new AtomicInteger(); 272 273 private final AtomicInteger redelivered = new AtomicInteger(); 274 275 private StepExecution stepExecution; 276 277 private final Queue<ChunkResponse> contributions = new LinkedBlockingQueue<ChunkResponse>(); 278 279 public int getExpecting() { 280 return expected.get() - actual.get(); 281 } 282 283 public <T> ChunkRequest<T> getRequest(List<? extends T> items) { 284 return new ChunkRequest<T>(current.incrementAndGet(), items, getJobId(), createStepContribution()); 285 } 286 287 public void open(int expectedValue, int actualValue) { 288 actual.set(actualValue); 289 expected.set(expectedValue); 290 } 291 292 public Collection<ChunkResponse> pollChunkResponses() { 293 Collection<ChunkResponse> set = new ArrayList<ChunkResponse>(); 294 synchronized (contributions) { 295 ChunkResponse item = contributions.poll(); 296 while (item != null) { 297 set.add(item); 298 item = contributions.poll(); 299 } 300 } 301 return set; 302 } 303 304 public void pushResponse(ChunkResponse stepContribution) { 305 synchronized (contributions) { 306 contributions.add(stepContribution); 307 } 308 } 309 310 public void incrementRedelivered() { 311 redelivered.incrementAndGet(); 312 } 313 314 public void incrementActual() { 315 actual.incrementAndGet(); 316 } 317 318 public void incrementExpected() { 319 expected.incrementAndGet(); 320 } 321 322 public StepContribution createStepContribution() { 323 return stepExecution.createStepContribution(); 324 } 325 326 public Long getJobId() { 327 return stepExecution.getJobExecution().getJobId(); 328 } 329 330 public void setStepExecution(StepExecution stepExecution) { 331 this.stepExecution = stepExecution; 332 } 333 334 public void reset() { 335 expected.set(0); 336 actual.set(0); 337 } 338 } 339 340}