001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.listener; 017 018import java.lang.reflect.InvocationTargetException; 019import java.util.List; 020 021import javax.batch.api.chunk.listener.RetryProcessListener; 022import javax.batch.api.chunk.listener.RetryReadListener; 023import javax.batch.api.chunk.listener.RetryWriteListener; 024import javax.batch.operations.BatchRuntimeException; 025import org.springframework.batch.core.ChunkListener; 026import org.springframework.batch.core.ExitStatus; 027import org.springframework.batch.core.ItemProcessListener; 028import org.springframework.batch.core.ItemReadListener; 029import org.springframework.batch.core.ItemWriteListener; 030import org.springframework.batch.core.SkipListener; 031import org.springframework.batch.core.StepExecution; 032import org.springframework.batch.core.StepExecutionListener; 033import org.springframework.batch.core.StepListener; 034import org.springframework.batch.core.scope.context.ChunkContext; 035import org.springframework.batch.item.ItemStream; 036import org.springframework.lang.Nullable; 037 038/** 039 * @author Dave Syer 040 * @author Michael Minella 041 * @author Chris Schaefer 042 * @author Mahmoud Ben Hassine 043 */ 044public class MulticasterBatchListener<T, S> implements StepExecutionListener, ChunkListener, ItemReadListener<T>, 045ItemProcessListener<T, S>, ItemWriteListener<S>, SkipListener<T, S>, RetryReadListener, RetryProcessListener, RetryWriteListener { 046 047 private CompositeStepExecutionListener stepListener = new CompositeStepExecutionListener(); 048 049 private CompositeChunkListener chunkListener = new CompositeChunkListener(); 050 051 private CompositeItemReadListener<T> itemReadListener = new CompositeItemReadListener<T>(); 052 053 private CompositeItemProcessListener<T, S> itemProcessListener = new CompositeItemProcessListener<T, S>(); 054 055 private CompositeItemWriteListener<S> itemWriteListener = new CompositeItemWriteListener<S>(); 056 057 private CompositeSkipListener<T, S> skipListener = new CompositeSkipListener<T, S>(); 058 059 private CompositeRetryReadListener retryReadListener = new CompositeRetryReadListener(); 060 061 private CompositeRetryProcessListener retryProcessListener = new CompositeRetryProcessListener(); 062 063 private CompositeRetryWriteListener retryWriteListener = new CompositeRetryWriteListener(); 064 065 /** 066 * Initialize the listener instance. 067 */ 068 public MulticasterBatchListener() { 069 super(); 070 } 071 072 /** 073 * Register each of the objects as listeners. Once registered, calls to the 074 * {@link MulticasterBatchListener} broadcast to the individual listeners. 075 * 076 * @param listeners listener objects of types known to the multicaster. 077 */ 078 public void setListeners(List<? extends StepListener> listeners) { 079 for (StepListener stepListener : listeners) { 080 register(stepListener); 081 } 082 } 083 084 /** 085 * Register the listener for callbacks on the appropriate interfaces 086 * implemented. Any {@link StepListener} can be provided, or an 087 * {@link ItemStream}. Other types will be ignored. 088 * 089 * @param listener the {@link StepListener} instance to be registered. 090 */ 091 public void register(StepListener listener) { 092 if (listener instanceof StepExecutionListener) { 093 this.stepListener.register((StepExecutionListener) listener); 094 } 095 if (listener instanceof ChunkListener) { 096 this.chunkListener.register((ChunkListener) listener); 097 } 098 if (listener instanceof ItemReadListener<?>) { 099 @SuppressWarnings("unchecked") 100 ItemReadListener<T> itemReadListener = (ItemReadListener<T>) listener; 101 this.itemReadListener.register(itemReadListener); 102 } 103 if (listener instanceof ItemProcessListener<?, ?>) { 104 @SuppressWarnings("unchecked") 105 ItemProcessListener<T, S> itemProcessListener = (ItemProcessListener<T, S>) listener; 106 this.itemProcessListener.register(itemProcessListener); 107 } 108 if (listener instanceof ItemWriteListener<?>) { 109 @SuppressWarnings("unchecked") 110 ItemWriteListener<S> itemWriteListener = (ItemWriteListener<S>) listener; 111 this.itemWriteListener.register(itemWriteListener); 112 } 113 if (listener instanceof SkipListener<?, ?>) { 114 @SuppressWarnings("unchecked") 115 SkipListener<T, S> skipListener = (SkipListener<T, S>) listener; 116 this.skipListener.register(skipListener); 117 } 118 if(listener instanceof RetryReadListener) { 119 this.retryReadListener.register((RetryReadListener) listener); 120 } 121 if(listener instanceof RetryProcessListener) { 122 this.retryProcessListener.register((RetryProcessListener) listener); 123 } 124 if(listener instanceof RetryWriteListener) { 125 this.retryWriteListener.register((RetryWriteListener) listener); 126 } 127 } 128 129 /** 130 * @see org.springframework.batch.core.listener.CompositeItemProcessListener#afterProcess(java.lang.Object, 131 * java.lang.Object) 132 */ 133 @Override 134 public void afterProcess(T item, @Nullable S result) { 135 try { 136 itemProcessListener.afterProcess(item, result); 137 } 138 catch (RuntimeException e) { 139 throw new StepListenerFailedException("Error in afterProcess.", getTargetException(e)); 140 } 141 } 142 143 /** 144 * @see org.springframework.batch.core.listener.CompositeItemProcessListener#beforeProcess(java.lang.Object) 145 */ 146 @Override 147 public void beforeProcess(T item) { 148 try { 149 itemProcessListener.beforeProcess(item); 150 } 151 catch (RuntimeException e) { 152 throw new StepListenerFailedException("Error in beforeProcess.", getTargetException(e)); 153 } 154 } 155 156 /** 157 * @see org.springframework.batch.core.listener.CompositeItemProcessListener#onProcessError(java.lang.Object, 158 * java.lang.Exception) 159 */ 160 @Override 161 public void onProcessError(T item, Exception ex) { 162 try { 163 itemProcessListener.onProcessError(item, ex); 164 } 165 catch (RuntimeException e) { 166 throw new StepListenerFailedException("Error in onProcessError.", e); 167 } 168 } 169 170 /** 171 * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#afterStep(StepExecution) 172 */ 173 @Override 174 public ExitStatus afterStep(StepExecution stepExecution) { 175 try { 176 return stepListener.afterStep(stepExecution); 177 } 178 catch (RuntimeException e) { 179 throw new StepListenerFailedException("Error in afterStep.", e); 180 } 181 } 182 183 /** 184 * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#beforeStep(org.springframework.batch.core.StepExecution) 185 */ 186 @Override 187 public void beforeStep(StepExecution stepExecution) { 188 try { 189 stepListener.beforeStep(stepExecution); 190 } 191 catch (RuntimeException e) { 192 throw new StepListenerFailedException("Error in beforeStep.", e); 193 } 194 } 195 196 /** 197 * @see org.springframework.batch.core.listener.CompositeChunkListener#afterChunk(ChunkContext context) 198 */ 199 @Override 200 public void afterChunk(ChunkContext context) { 201 try { 202 chunkListener.afterChunk(context); 203 } 204 catch (RuntimeException e) { 205 throw new StepListenerFailedException("Error in afterChunk.", getTargetException(e)); 206 } 207 } 208 209 /** 210 * @see org.springframework.batch.core.listener.CompositeChunkListener#beforeChunk(ChunkContext context) 211 */ 212 @Override 213 public void beforeChunk(ChunkContext context) { 214 try { 215 chunkListener.beforeChunk(context); 216 } 217 catch (RuntimeException e) { 218 throw new StepListenerFailedException("Error in beforeChunk.", getTargetException(e)); 219 } 220 } 221 222 /** 223 * @see org.springframework.batch.core.listener.CompositeItemReadListener#afterRead(java.lang.Object) 224 */ 225 @Override 226 public void afterRead(T item) { 227 try { 228 itemReadListener.afterRead(item); 229 } 230 catch (RuntimeException e) { 231 throw new StepListenerFailedException("Error in afterRead.", getTargetException(e)); 232 } 233 } 234 235 /** 236 * @see org.springframework.batch.core.listener.CompositeItemReadListener#beforeRead() 237 */ 238 @Override 239 public void beforeRead() { 240 try { 241 itemReadListener.beforeRead(); 242 } 243 catch (RuntimeException e) { 244 throw new StepListenerFailedException("Error in beforeRead.", getTargetException(e)); 245 } 246 } 247 248 /** 249 * @see org.springframework.batch.core.listener.CompositeItemReadListener#onReadError(java.lang.Exception) 250 */ 251 @Override 252 public void onReadError(Exception ex) { 253 try { 254 itemReadListener.onReadError(ex); 255 } 256 catch (RuntimeException e) { 257 throw new StepListenerFailedException("Error in onReadError.", e); 258 } 259 } 260 261 /** 262 * @see ItemWriteListener#afterWrite(List) 263 */ 264 @Override 265 public void afterWrite(List<? extends S> items) { 266 try { 267 itemWriteListener.afterWrite(items); 268 } 269 catch (RuntimeException e) { 270 throw new StepListenerFailedException("Error in afterWrite.", getTargetException(e)); 271 } 272 } 273 274 /** 275 * @see ItemWriteListener#beforeWrite(List) 276 */ 277 @Override 278 public void beforeWrite(List<? extends S> items) { 279 try { 280 itemWriteListener.beforeWrite(items); 281 } 282 catch (RuntimeException e) { 283 throw new StepListenerFailedException("Error in beforeWrite.", getTargetException(e)); 284 } 285 } 286 287 /** 288 * @see ItemWriteListener#onWriteError(Exception, List) 289 */ 290 @Override 291 public void onWriteError(Exception ex, List<? extends S> items) { 292 try { 293 itemWriteListener.onWriteError(ex, items); 294 } 295 catch (RuntimeException e) { 296 throw new StepListenerFailedException("Error in onWriteError.", e); 297 } 298 } 299 300 /** 301 * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInRead(java.lang.Throwable) 302 */ 303 @Override 304 public void onSkipInRead(Throwable t) { 305 skipListener.onSkipInRead(t); 306 } 307 308 /** 309 * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInWrite(java.lang.Object, 310 * java.lang.Throwable) 311 */ 312 @Override 313 public void onSkipInWrite(S item, Throwable t) { 314 skipListener.onSkipInWrite(item, t); 315 } 316 317 /** 318 * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInProcess(Object, 319 * Throwable) 320 */ 321 @Override 322 public void onSkipInProcess(T item, Throwable t) { 323 skipListener.onSkipInProcess(item, t); 324 } 325 326 @Override 327 public void afterChunkError(ChunkContext context) { 328 try { 329 chunkListener.afterChunkError(context); 330 } 331 catch (RuntimeException e) { 332 throw new StepListenerFailedException("Error in afterFailedChunk.", e); 333 } 334 } 335 336 @Override 337 public void onRetryReadException(Exception ex) throws Exception { 338 try { 339 retryReadListener.onRetryReadException(ex); 340 } catch (Exception e) { 341 throw new BatchRuntimeException(e); 342 } 343 } 344 345 @Override 346 public void onRetryProcessException(Object item, Exception ex) throws Exception { 347 try { 348 retryProcessListener.onRetryProcessException(item, ex); 349 } catch (Exception e) { 350 throw new BatchRuntimeException(e); 351 } 352 } 353 354 @Override 355 public void onRetryWriteException(List<Object> items, Exception ex) throws Exception { 356 try { 357 retryWriteListener.onRetryWriteException(items, ex); 358 } catch (Exception e) { 359 throw new BatchRuntimeException(e); 360 } 361 } 362 363 /** 364 * Unwrap the target exception from a wrapped {@link InvocationTargetException}. 365 * @param e the exception to introspect 366 * @return the target exception if any 367 */ 368 private Throwable getTargetException(RuntimeException e) { 369 Throwable cause = e.getCause(); 370 if (cause != null && cause instanceof InvocationTargetException) { 371 return ((InvocationTargetException) cause).getTargetException(); 372 } 373 return e; 374 } 375}