001/* 002 * Copyright 2002-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.repeat.support; 018 019import org.springframework.batch.repeat.RepeatStatus; 020 021import java.util.Comparator; 022import java.util.NoSuchElementException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.PriorityBlockingQueue; 025import java.util.concurrent.Semaphore; 026 027/** 028 * An implementation of the {@link ResultQueue} that throttles the number of 029 * expected results, limiting it to a maximum at any given time. 030 * 031 * @author Dave Syer 032 */ 033public class ResultHolderResultQueue implements ResultQueue<ResultHolder> { 034 035 // Accumulation of result objects as they finish. 036 private final BlockingQueue<ResultHolder> results; 037 038 // Accumulation of dummy objects flagging expected results in the future. 039 private final Semaphore waits; 040 041 private final Object lock = new Object(); 042 043 private volatile int count = 0; 044 045 /** 046 * @param throttleLimit the maximum number of results that can be expected 047 * at any given time. 048 */ 049 public ResultHolderResultQueue(int throttleLimit) { 050 results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator()); 051 waits = new Semaphore(throttleLimit); 052 } 053 054 @Override 055 public boolean isEmpty() { 056 return results.isEmpty(); 057 } 058 059 /* 060 * (non-Javadoc) 061 * 062 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting() 063 */ 064 @Override 065 public boolean isExpecting() { 066 // Base the decision about whether we expect more results on a 067 // counter of the number of expected results actually collected. 068 // Do not synchronize! Otherwise put and expect can deadlock. 069 return count > 0; 070 } 071 072 /** 073 * Tell the queue to expect one more result. Blocks until a new result is 074 * available if already expecting too many (as determined by the throttle 075 * limit). 076 * 077 * @see ResultQueue#expect() 078 */ 079 @Override 080 public void expect() throws InterruptedException { 081 waits.acquire(); 082 // Don't acquire the lock in a synchronized block - might deadlock 083 synchronized (lock) { 084 count++; 085 } 086 } 087 088 @Override 089 public void put(ResultHolder holder) throws IllegalArgumentException { 090 if (!isExpecting()) { 091 throw new IllegalArgumentException("Not expecting a result. Call expect() before put()."); 092 } 093 results.add(holder); 094 // Take from the waits queue now to allow another result to 095 // accumulate. But don't decrement the counter. 096 waits.release(); 097 synchronized (lock) { 098 lock.notifyAll(); 099 } 100 } 101 102 /** 103 * Get the next result as soon as it becomes available. <br> 104 * <br> 105 * Release result immediately if: 106 * <ul> 107 * <li>There is a result that is continuable.</li> 108 * </ul> 109 * Otherwise block if either: 110 * <ul> 111 * <li>There is no result (as per contract of {@link ResultQueue}).</li> 112 * <li>The number of results is less than the number expected.</li> 113 * </ul> 114 * Error if either: 115 * <ul> 116 * <li>Not expecting.</li> 117 * <li>Interrupted.</li> 118 * </ul> 119 * 120 * @see ResultQueue#take() 121 */ 122 @Override 123 public ResultHolder take() throws NoSuchElementException, InterruptedException { 124 if (!isExpecting()) { 125 throw new NoSuchElementException("Not expecting a result. Call expect() before take()."); 126 } 127 ResultHolder value; 128 synchronized (lock) { 129 value = results.take(); 130 if (isContinuable(value)) { 131 // Decrement the counter only when the result is collected. 132 count--; 133 return value; 134 } 135 } 136 results.put(value); 137 synchronized (lock) { 138 while (count > results.size()) { 139 lock.wait(); 140 } 141 value = results.take(); 142 count--; 143 } 144 return value; 145 } 146 147 private boolean isContinuable(ResultHolder value) { 148 return value.getResult() != null && value.getResult().isContinuable(); 149 } 150 151 /** 152 * Compares ResultHolders so that one that is continuable ranks lowest. 153 * 154 * @author Dave Syer 155 * 156 */ 157 private static class ResultHolderComparator implements Comparator<ResultHolder> { 158 @Override 159 public int compare(ResultHolder h1, ResultHolder h2) { 160 RepeatStatus result1 = h1.getResult(); 161 RepeatStatus result2 = h2.getResult(); 162 if (result1 == null && result2 == null) { 163 return 0; 164 } 165 if (result1 == null) { 166 return -1; 167 } 168 else if (result2 == null) { 169 return 1; 170 } 171 if ((result1.isContinuable() && result2.isContinuable()) 172 || (!result1.isContinuable() && !result2.isContinuable())) { 173 return 0; 174 } 175 if (result1.isContinuable()) { 176 return -1; 177 } 178 return 1; 179 } 180 } 181 182}