001/* 002 * Copyright 2002-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.repeat.support; 018 019import java.util.NoSuchElementException; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.LinkedBlockingQueue; 022import java.util.concurrent.Semaphore; 023 024/** 025 * An implementation of the {@link ResultQueue} that throttles the number of 026 * expected results, limiting it to a maximum at any given time. 027 * 028 * @author Dave Syer 029 */ 030public class ThrottleLimitResultQueue<T> implements ResultQueue<T> { 031 032 // Accumulation of result objects as they finish. 033 private final BlockingQueue<T> results; 034 035 // Accumulation of dummy objects flagging expected results in the future. 036 private final Semaphore waits; 037 038 private final Object lock = new Object(); 039 040 private volatile int count = 0; 041 042 /** 043 * @param throttleLimit the maximum number of results that can be expected 044 * at any given time. 045 */ 046 public ThrottleLimitResultQueue(int throttleLimit) { 047 results = new LinkedBlockingQueue<T>(); 048 waits = new Semaphore(throttleLimit); 049 } 050 051 @Override 052 public boolean isEmpty() { 053 return results.isEmpty(); 054 } 055 056 /* 057 * (non-Javadoc) 058 * 059 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting() 060 */ 061 @Override 062 public boolean isExpecting() { 063 // Base the decision about whether we expect more results on a 064 // counter of the number of expected results actually collected. 065 // Do not synchronize! Otherwise put and expect can deadlock. 066 return count > 0; 067 } 068 069 /** 070 * Tell the queue to expect one more result. Blocks until a new result is 071 * available if already expecting too many (as determined by the throttle 072 * limit). 073 * 074 * @see ResultQueue#expect() 075 */ 076 @Override 077 public void expect() throws InterruptedException { 078 synchronized (lock) { 079 waits.acquire(); 080 count++; 081 } 082 } 083 084 @Override 085 public void put(T holder) throws IllegalArgumentException { 086 if (!isExpecting()) { 087 throw new IllegalArgumentException("Not expecting a result. Call expect() before put()."); 088 } 089 // There should be no need to block here, or to use offer() 090 results.add(holder); 091 // Take from the waits queue now to allow another result to 092 // accumulate. But don't decrement the counter. 093 waits.release(); 094 } 095 096 @Override 097 public T take() throws NoSuchElementException, InterruptedException { 098 if (!isExpecting()) { 099 throw new NoSuchElementException("Not expecting a result. Call expect() before take()."); 100 } 101 T value; 102 synchronized (lock) { 103 value = results.take(); 104 // Decrement the counter only when the result is collected. 105 count--; 106 } 107 return value; 108 } 109 110}