001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.listener;
017
018import java.lang.reflect.InvocationTargetException;
019import java.util.List;
020
021import javax.batch.api.chunk.listener.RetryProcessListener;
022import javax.batch.api.chunk.listener.RetryReadListener;
023import javax.batch.api.chunk.listener.RetryWriteListener;
024import javax.batch.operations.BatchRuntimeException;
025import org.springframework.batch.core.ChunkListener;
026import org.springframework.batch.core.ExitStatus;
027import org.springframework.batch.core.ItemProcessListener;
028import org.springframework.batch.core.ItemReadListener;
029import org.springframework.batch.core.ItemWriteListener;
030import org.springframework.batch.core.SkipListener;
031import org.springframework.batch.core.StepExecution;
032import org.springframework.batch.core.StepExecutionListener;
033import org.springframework.batch.core.StepListener;
034import org.springframework.batch.core.scope.context.ChunkContext;
035import org.springframework.batch.item.ItemStream;
036import org.springframework.lang.Nullable;
037
038/**
039 * @author Dave Syer
040 * @author Michael Minella
041 * @author Chris Schaefer
042 * @author Mahmoud Ben Hassine
043 */
044public class MulticasterBatchListener<T, S> implements StepExecutionListener, ChunkListener, ItemReadListener<T>,
045ItemProcessListener<T, S>, ItemWriteListener<S>, SkipListener<T, S>, RetryReadListener, RetryProcessListener, RetryWriteListener {
046
047        private CompositeStepExecutionListener stepListener = new CompositeStepExecutionListener();
048
049        private CompositeChunkListener chunkListener = new CompositeChunkListener();
050
051        private CompositeItemReadListener<T> itemReadListener = new CompositeItemReadListener<T>();
052
053        private CompositeItemProcessListener<T, S> itemProcessListener = new CompositeItemProcessListener<T, S>();
054
055        private CompositeItemWriteListener<S> itemWriteListener = new CompositeItemWriteListener<S>();
056
057        private CompositeSkipListener<T, S> skipListener = new CompositeSkipListener<T, S>();
058
059        private CompositeRetryReadListener retryReadListener = new CompositeRetryReadListener();
060
061        private CompositeRetryProcessListener retryProcessListener = new CompositeRetryProcessListener();
062
063        private CompositeRetryWriteListener retryWriteListener = new CompositeRetryWriteListener();
064
065        /**
066         * Initialize the listener instance.
067         */
068        public MulticasterBatchListener() {
069                super();
070        }
071
072        /**
073         * Register each of the objects as listeners. Once registered, calls to the
074         * {@link MulticasterBatchListener} broadcast to the individual listeners.
075         *
076         * @param listeners listener objects of types known to the multicaster.
077         */
078        public void setListeners(List<? extends StepListener> listeners) {
079                for (StepListener stepListener : listeners) {
080                        register(stepListener);
081                }
082        }
083
084        /**
085         * Register the listener for callbacks on the appropriate interfaces
086         * implemented. Any {@link StepListener} can be provided, or an
087         * {@link ItemStream}. Other types will be ignored.
088         *
089         * @param listener the {@link StepListener} instance to be registered.
090         */
091        public void register(StepListener listener) {
092                if (listener instanceof StepExecutionListener) {
093                        this.stepListener.register((StepExecutionListener) listener);
094                }
095                if (listener instanceof ChunkListener) {
096                        this.chunkListener.register((ChunkListener) listener);
097                }
098                if (listener instanceof ItemReadListener<?>) {
099                        @SuppressWarnings("unchecked")
100                        ItemReadListener<T> itemReadListener = (ItemReadListener<T>) listener;
101                        this.itemReadListener.register(itemReadListener);
102                }
103                if (listener instanceof ItemProcessListener<?, ?>) {
104                        @SuppressWarnings("unchecked")
105                        ItemProcessListener<T, S> itemProcessListener = (ItemProcessListener<T, S>) listener;
106                        this.itemProcessListener.register(itemProcessListener);
107                }
108                if (listener instanceof ItemWriteListener<?>) {
109                        @SuppressWarnings("unchecked")
110                        ItemWriteListener<S> itemWriteListener = (ItemWriteListener<S>) listener;
111                        this.itemWriteListener.register(itemWriteListener);
112                }
113                if (listener instanceof SkipListener<?, ?>) {
114                        @SuppressWarnings("unchecked")
115                        SkipListener<T, S> skipListener = (SkipListener<T, S>) listener;
116                        this.skipListener.register(skipListener);
117                }
118                if(listener instanceof RetryReadListener) {
119                        this.retryReadListener.register((RetryReadListener) listener);
120                }
121                if(listener instanceof RetryProcessListener) {
122                        this.retryProcessListener.register((RetryProcessListener) listener);
123                }
124                if(listener instanceof RetryWriteListener) {
125                        this.retryWriteListener.register((RetryWriteListener) listener);
126                }
127        }
128
129        /**
130         * @see org.springframework.batch.core.listener.CompositeItemProcessListener#afterProcess(java.lang.Object,
131         * java.lang.Object)
132         */
133        @Override
134        public void afterProcess(T item, @Nullable S result) {
135                try {
136                        itemProcessListener.afterProcess(item, result);
137                }
138                catch (RuntimeException e) {
139                        throw new StepListenerFailedException("Error in afterProcess.", getTargetException(e));
140                }
141        }
142
143        /**
144         * @see org.springframework.batch.core.listener.CompositeItemProcessListener#beforeProcess(java.lang.Object)
145         */
146        @Override
147        public void beforeProcess(T item) {
148                try {
149                        itemProcessListener.beforeProcess(item);
150                }
151                catch (RuntimeException e) {
152                        throw new StepListenerFailedException("Error in beforeProcess.", getTargetException(e));
153                }
154        }
155
156        /**
157         * @see org.springframework.batch.core.listener.CompositeItemProcessListener#onProcessError(java.lang.Object,
158         * java.lang.Exception)
159         */
160        @Override
161        public void onProcessError(T item, Exception ex) {
162                try {
163                        itemProcessListener.onProcessError(item, ex);
164                }
165                catch (RuntimeException e) {
166                        throw new StepListenerFailedException("Error in onProcessError.", e);
167                }
168        }
169
170        /**
171         * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#afterStep(StepExecution)
172         */
173        @Override
174        public ExitStatus afterStep(StepExecution stepExecution) {
175                try {
176                        return stepListener.afterStep(stepExecution);
177                }
178                catch (RuntimeException e) {
179                        throw new StepListenerFailedException("Error in afterStep.", e);
180                }
181        }
182
183        /**
184         * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#beforeStep(org.springframework.batch.core.StepExecution)
185         */
186        @Override
187        public void beforeStep(StepExecution stepExecution) {
188                try {
189                        stepListener.beforeStep(stepExecution);
190                }
191                catch (RuntimeException e) {
192                        throw new StepListenerFailedException("Error in beforeStep.", e);
193                }
194        }
195
196        /**
197         * @see org.springframework.batch.core.listener.CompositeChunkListener#afterChunk(ChunkContext context)
198         */
199        @Override
200        public void afterChunk(ChunkContext context) {
201                try {
202                        chunkListener.afterChunk(context);
203                }
204                catch (RuntimeException e) {
205                        throw new StepListenerFailedException("Error in afterChunk.", getTargetException(e));
206                }
207        }
208
209        /**
210         * @see org.springframework.batch.core.listener.CompositeChunkListener#beforeChunk(ChunkContext context)
211         */
212        @Override
213        public void beforeChunk(ChunkContext context) {
214                try {
215                        chunkListener.beforeChunk(context);
216                }
217                catch (RuntimeException e) {
218                        throw new StepListenerFailedException("Error in beforeChunk.", getTargetException(e));
219                }
220        }
221
222        /**
223         * @see org.springframework.batch.core.listener.CompositeItemReadListener#afterRead(java.lang.Object)
224         */
225        @Override
226        public void afterRead(T item) {
227                try {
228                        itemReadListener.afterRead(item);
229                }
230                catch (RuntimeException e) {
231                        throw new StepListenerFailedException("Error in afterRead.", getTargetException(e));
232                }
233        }
234
235        /**
236         * @see org.springframework.batch.core.listener.CompositeItemReadListener#beforeRead()
237         */
238        @Override
239        public void beforeRead() {
240                try {
241                        itemReadListener.beforeRead();
242                }
243                catch (RuntimeException e) {
244                        throw new StepListenerFailedException("Error in beforeRead.", getTargetException(e));
245                }
246        }
247
248        /**
249         * @see org.springframework.batch.core.listener.CompositeItemReadListener#onReadError(java.lang.Exception)
250         */
251        @Override
252        public void onReadError(Exception ex) {
253                try {
254                        itemReadListener.onReadError(ex);
255                }
256                catch (RuntimeException e) {
257                        throw new StepListenerFailedException("Error in onReadError.", e);
258                }
259        }
260
261        /**
262         * @see ItemWriteListener#afterWrite(List)
263         */
264        @Override
265        public void afterWrite(List<? extends S> items) {
266                try {
267                        itemWriteListener.afterWrite(items);
268                }
269                catch (RuntimeException e) {
270                        throw new StepListenerFailedException("Error in afterWrite.", getTargetException(e));
271                }
272        }
273
274        /**
275         * @see ItemWriteListener#beforeWrite(List)
276         */
277        @Override
278        public void beforeWrite(List<? extends S> items) {
279                try {
280                        itemWriteListener.beforeWrite(items);
281                }
282                catch (RuntimeException e) {
283                        throw new StepListenerFailedException("Error in beforeWrite.", getTargetException(e));
284                }
285        }
286
287        /**
288         * @see ItemWriteListener#onWriteError(Exception, List)
289         */
290        @Override
291        public void onWriteError(Exception ex, List<? extends S> items) {
292                try {
293                        itemWriteListener.onWriteError(ex, items);
294                }
295                catch (RuntimeException e) {
296                        throw new StepListenerFailedException("Error in onWriteError.", e);
297                }
298        }
299
300        /**
301         * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInRead(java.lang.Throwable)
302         */
303        @Override
304        public void onSkipInRead(Throwable t) {
305                skipListener.onSkipInRead(t);
306        }
307
308        /**
309         * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInWrite(java.lang.Object,
310         * java.lang.Throwable)
311         */
312        @Override
313        public void onSkipInWrite(S item, Throwable t) {
314                skipListener.onSkipInWrite(item, t);
315        }
316
317        /**
318         * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInProcess(Object,
319         * Throwable)
320         */
321        @Override
322        public void onSkipInProcess(T item, Throwable t) {
323                skipListener.onSkipInProcess(item, t);
324        }
325
326        @Override
327        public void afterChunkError(ChunkContext context) {
328                try {
329                        chunkListener.afterChunkError(context);
330                }
331                catch (RuntimeException e) {
332                        throw new StepListenerFailedException("Error in afterFailedChunk.", e);
333                }
334        }
335
336        @Override
337        public void onRetryReadException(Exception ex) throws Exception {
338                try {
339                        retryReadListener.onRetryReadException(ex);
340                } catch (Exception e) {
341                        throw new BatchRuntimeException(e);
342                }
343        }
344
345        @Override
346        public void onRetryProcessException(Object item, Exception ex) throws Exception {
347                try {
348                        retryProcessListener.onRetryProcessException(item, ex);
349                } catch (Exception e) {
350                        throw new BatchRuntimeException(e);
351                }
352        }
353
354        @Override
355        public void onRetryWriteException(List<Object> items, Exception ex) throws Exception {
356                try {
357                        retryWriteListener.onRetryWriteException(items, ex);
358                } catch (Exception e) {
359                        throw new BatchRuntimeException(e);
360                }
361        }
362
363        /**
364         * Unwrap the target exception from a wrapped {@link InvocationTargetException}.
365         * @param e the exception to introspect
366         * @return the target exception if any
367         */
368        private Throwable getTargetException(RuntimeException e) {
369                Throwable cause = e.getCause();
370                if (cause != null && cause instanceof InvocationTargetException) {
371                        return ((InvocationTargetException) cause).getTargetException();
372                }
373                return e;
374        }
375}