001/*
002 * Copyright 2013-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.partition;
017
018import java.io.Serializable;
019import java.util.Queue;
020import java.util.concurrent.locks.ReentrantLock;
021
022import javax.batch.api.partition.PartitionCollector;
023import javax.batch.operations.BatchRuntimeException;
024
025import org.springframework.batch.core.ChunkListener;
026import org.springframework.batch.core.scope.context.ChunkContext;
027import org.springframework.util.Assert;
028
029/**
030 * Adapter class used to wrap a {@link PartitionCollector} so that it can be consumed
031 * as a {@link ChunkListener}.  A thread-safe {@link Queue} is required along with the
032 * {@link PartitionCollector}.  The {@link Queue} is where the result of the call to
033 * the PartitionCollector will be placed.
034 *
035 * @author Michael Minella
036 * @author Mahmoud Ben Hassine
037 * @since 3.0
038 */
039public class PartitionCollectorAdapter implements ChunkListener {
040
041        private PartitionCollector collector;
042        private Queue<Serializable> partitionQueue;
043        private ReentrantLock lock;
044
045        public PartitionCollectorAdapter(Queue<Serializable> queue, PartitionCollector collector) {
046                Assert.notNull(queue, "A thread-safe Queue is required");
047                Assert.notNull(collector, "A PartitionCollector is required");
048
049                this.partitionQueue = queue;
050                this.collector = collector;
051        }
052
053        public void setPartitionLock(ReentrantLock lock) {
054                this.lock = lock;
055        }
056
057        @Override
058        public void beforeChunk(ChunkContext context) {
059        }
060
061        @Override
062        public void afterChunk(ChunkContext context) {
063                try {
064                        if(context.isComplete()) {
065                                lock.lock();
066                                Serializable collectPartitionData = collector.collectPartitionData();
067
068                                if(collectPartitionData != null) {
069                                        partitionQueue.add(collectPartitionData);
070                                }
071                        }
072                } catch (Throwable e) {
073                        throw new BatchRuntimeException("An error occurred while collecting data from the PartitionCollector", e);
074                } finally {
075                        if(lock.isHeldByCurrentThread()) {
076                                lock.unlock();
077                        }
078                }
079        }
080
081        @Override
082        public void afterChunkError(ChunkContext context) {
083                try {
084                        lock.lock();
085                        if(context.isComplete()) {
086                                Serializable collectPartitionData = collector.collectPartitionData();
087
088                                if(collectPartitionData != null) {
089                                        partitionQueue.add(collectPartitionData);
090                                }
091                        }
092                } catch (Throwable e) {
093                        throw new BatchRuntimeException("An error occurred while collecting data from the PartitionCollector", e);
094                } finally {
095                        if(lock.isHeldByCurrentThread()) {
096                                lock.unlock();
097                        }
098                }
099        }
100}