001/* 002 * Copyright 2013-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.partition; 017 018import java.io.Serializable; 019import java.util.Queue; 020import java.util.concurrent.locks.ReentrantLock; 021 022import javax.batch.api.partition.PartitionCollector; 023import javax.batch.operations.BatchRuntimeException; 024 025import org.springframework.batch.core.ChunkListener; 026import org.springframework.batch.core.scope.context.ChunkContext; 027import org.springframework.util.Assert; 028 029/** 030 * Adapter class used to wrap a {@link PartitionCollector} so that it can be consumed 031 * as a {@link ChunkListener}. A thread-safe {@link Queue} is required along with the 032 * {@link PartitionCollector}. The {@link Queue} is where the result of the call to 033 * the PartitionCollector will be placed. 034 * 035 * @author Michael Minella 036 * @author Mahmoud Ben Hassine 037 * @since 3.0 038 */ 039public class PartitionCollectorAdapter implements ChunkListener { 040 041 private PartitionCollector collector; 042 private Queue<Serializable> partitionQueue; 043 private ReentrantLock lock; 044 045 public PartitionCollectorAdapter(Queue<Serializable> queue, PartitionCollector collector) { 046 Assert.notNull(queue, "A thread-safe Queue is required"); 047 Assert.notNull(collector, "A PartitionCollector is required"); 048 049 this.partitionQueue = queue; 050 this.collector = collector; 051 } 052 053 public void setPartitionLock(ReentrantLock lock) { 054 this.lock = lock; 055 } 056 057 @Override 058 public void beforeChunk(ChunkContext context) { 059 } 060 061 @Override 062 public void afterChunk(ChunkContext context) { 063 try { 064 if(context.isComplete()) { 065 lock.lock(); 066 Serializable collectPartitionData = collector.collectPartitionData(); 067 068 if(collectPartitionData != null) { 069 partitionQueue.add(collectPartitionData); 070 } 071 } 072 } catch (Throwable e) { 073 throw new BatchRuntimeException("An error occurred while collecting data from the PartitionCollector", e); 074 } finally { 075 if(lock.isHeldByCurrentThread()) { 076 lock.unlock(); 077 } 078 } 079 } 080 081 @Override 082 public void afterChunkError(ChunkContext context) { 083 try { 084 lock.lock(); 085 if(context.isComplete()) { 086 Serializable collectPartitionData = collector.collectPartitionData(); 087 088 if(collectPartitionData != null) { 089 partitionQueue.add(collectPartitionData); 090 } 091 } 092 } catch (Throwable e) { 093 throw new BatchRuntimeException("An error occurred while collecting data from the PartitionCollector", e); 094 } finally { 095 if(lock.isHeldByCurrentThread()) { 096 lock.unlock(); 097 } 098 } 099 } 100}