001/*
002 * Copyright 2002-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.repeat.support;
018
019import org.springframework.batch.repeat.RepeatStatus;
020
021import java.util.Comparator;
022import java.util.NoSuchElementException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.PriorityBlockingQueue;
025import java.util.concurrent.Semaphore;
026
027/**
028 * An implementation of the {@link ResultQueue} that throttles the number of
029 * expected results, limiting it to a maximum at any given time.
030 * 
031 * @author Dave Syer
032 */
033public class ResultHolderResultQueue implements ResultQueue<ResultHolder> {
034
035        // Accumulation of result objects as they finish.
036        private final BlockingQueue<ResultHolder> results;
037
038        // Accumulation of dummy objects flagging expected results in the future.
039        private final Semaphore waits;
040
041        private final Object lock = new Object();
042
043        private volatile int count = 0;
044
045        /**
046         * @param throttleLimit the maximum number of results that can be expected
047         * at any given time.
048         */
049        public ResultHolderResultQueue(int throttleLimit) {
050                results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
051                waits = new Semaphore(throttleLimit);
052        }
053
054    @Override
055        public boolean isEmpty() {
056                return results.isEmpty();
057        }
058
059        /*
060         * (non-Javadoc)
061         * 
062         * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
063         */
064    @Override
065        public boolean isExpecting() {
066                // Base the decision about whether we expect more results on a
067                // counter of the number of expected results actually collected.
068                // Do not synchronize! Otherwise put and expect can deadlock.
069                return count > 0;
070        }
071
072        /**
073         * Tell the queue to expect one more result. Blocks until a new result is
074         * available if already expecting too many (as determined by the throttle
075         * limit).
076         * 
077         * @see ResultQueue#expect()
078         */
079    @Override
080        public void expect() throws InterruptedException {
081                waits.acquire();
082                // Don't acquire the lock in a synchronized block - might deadlock
083                synchronized (lock) {
084                        count++;
085                }
086        }
087
088    @Override
089        public void put(ResultHolder holder) throws IllegalArgumentException {
090                if (!isExpecting()) {
091                        throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
092                }
093                results.add(holder);
094                // Take from the waits queue now to allow another result to
095                // accumulate. But don't decrement the counter.
096                waits.release();
097                synchronized (lock) {
098                        lock.notifyAll();
099                }
100        }
101
102        /**
103         * Get the next result as soon as it becomes available. <br>
104         * <br>
105         * Release result immediately if:
106         * <ul>
107         * <li>There is a result that is continuable.</li>
108         * </ul>
109         * Otherwise block if either:
110         * <ul>
111         * <li>There is no result (as per contract of {@link ResultQueue}).</li>
112         * <li>The number of results is less than the number expected.</li>
113         * </ul>
114         * Error if either:
115         * <ul>
116         * <li>Not expecting.</li>
117         * <li>Interrupted.</li>
118         * </ul>
119         * 
120         * @see ResultQueue#take()
121         */
122    @Override
123        public ResultHolder take() throws NoSuchElementException, InterruptedException {
124                if (!isExpecting()) {
125                        throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
126                }
127                ResultHolder value;
128                synchronized (lock) {
129                        value = results.take();
130                        if (isContinuable(value)) {
131                                // Decrement the counter only when the result is collected.
132                                count--;
133                                return value;
134                        }
135                }
136                results.put(value);
137                synchronized (lock) {
138                        while (count > results.size()) {
139                                lock.wait();
140                        }
141                        value = results.take();
142                        count--;
143                }
144                return value;
145        }
146
147        private boolean isContinuable(ResultHolder value) {
148                return value.getResult() != null && value.getResult().isContinuable();
149        }
150
151        /**
152         * Compares ResultHolders so that one that is continuable ranks lowest.
153         * 
154         * @author Dave Syer
155         * 
156         */
157        private static class ResultHolderComparator implements Comparator<ResultHolder> {
158        @Override
159                public int compare(ResultHolder h1, ResultHolder h2) {
160                        RepeatStatus result1 = h1.getResult();
161                        RepeatStatus result2 = h2.getResult();
162                        if (result1 == null && result2 == null) {
163                                return 0;
164                        }
165                        if (result1 == null) {
166                                return -1;
167                        }
168                        else if (result2 == null) {
169                                return 1;
170                        }
171                        if ((result1.isContinuable() && result2.isContinuable())
172                                        || (!result1.isContinuable() && !result2.isContinuable())) {
173                                return 0;
174                        }
175                        if (result1.isContinuable()) {
176                                return -1;
177                        }
178                        return 1;
179                }
180        }
181
182}