001/*
002 * Copyright 2002-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.repeat.support;
018
019import java.util.NoSuchElementException;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.LinkedBlockingQueue;
022import java.util.concurrent.Semaphore;
023
024/**
025 * An implementation of the {@link ResultQueue} that throttles the number of
026 * expected results, limiting it to a maximum at any given time.
027 * 
028 * @author Dave Syer
029 */
030public class ThrottleLimitResultQueue<T> implements ResultQueue<T> {
031
032        // Accumulation of result objects as they finish.
033        private final BlockingQueue<T> results;
034
035        // Accumulation of dummy objects flagging expected results in the future.
036        private final Semaphore waits;
037
038        private final Object lock = new Object();
039
040        private volatile int count = 0;
041
042        /**
043         * @param throttleLimit the maximum number of results that can be expected
044         * at any given time.
045         */
046        public ThrottleLimitResultQueue(int throttleLimit) {
047                results = new LinkedBlockingQueue<T>();
048                waits = new Semaphore(throttleLimit);
049        }
050
051    @Override
052        public boolean isEmpty() {
053                return results.isEmpty();
054        }
055
056        /*
057         * (non-Javadoc)
058         * 
059         * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
060         */
061    @Override
062        public boolean isExpecting() {
063                // Base the decision about whether we expect more results on a
064                // counter of the number of expected results actually collected.
065                // Do not synchronize!  Otherwise put and expect can deadlock.
066                return count > 0;
067        }
068
069        /**
070         * Tell the queue to expect one more result. Blocks until a new result is
071         * available if already expecting too many (as determined by the throttle
072         * limit).
073         * 
074         * @see ResultQueue#expect()
075         */
076    @Override
077        public void expect() throws InterruptedException {
078                synchronized (lock) {
079                        waits.acquire();
080                        count++;
081                }
082        }
083
084    @Override
085        public void put(T holder) throws IllegalArgumentException {
086                if (!isExpecting()) {
087                        throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
088                }
089                // There should be no need to block here, or to use offer()
090                results.add(holder);
091                // Take from the waits queue now to allow another result to
092                // accumulate. But don't decrement the counter.
093                waits.release();
094        }
095
096    @Override
097        public T take() throws NoSuchElementException, InterruptedException {
098                if (!isExpecting()) {
099                        throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
100                }
101                T value;
102                synchronized (lock) {
103                        value = results.take();
104                        // Decrement the counter only when the result is collected.
105                        count--;
106                }
107                return value;
108        }
109
110}