001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.util; 018 019import java.io.IOException; 020import java.io.ObjectInputStream; 021import java.io.Serializable; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026/** 027 * Support class for throttling concurrent access to a specific resource. 028 * 029 * <p>Designed for use as a base class, with the subclass invoking 030 * the {@link #beforeAccess()} and {@link #afterAccess()} methods at 031 * appropriate points of its workflow. Note that {@code afterAccess} 032 * should usually be called in a finally block! 033 * 034 * <p>The default concurrency limit of this support class is -1 035 * ("unbounded concurrency"). Subclasses may override this default; 036 * check the javadoc of the concrete class that you're using. 037 * 038 * @author Juergen Hoeller 039 * @since 1.2.5 040 * @see #setConcurrencyLimit 041 * @see #beforeAccess() 042 * @see #afterAccess() 043 * @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor 044 * @see java.io.Serializable 045 */ 046@SuppressWarnings("serial") 047public abstract class ConcurrencyThrottleSupport implements Serializable { 048 049 /** 050 * Permit any number of concurrent invocations: that is, don't throttle concurrency. 051 */ 052 public static final int UNBOUNDED_CONCURRENCY = -1; 053 054 /** 055 * Switch concurrency 'off': that is, don't allow any concurrent invocations. 056 */ 057 public static final int NO_CONCURRENCY = 0; 058 059 060 /** Transient to optimize serialization */ 061 protected transient Log logger = LogFactory.getLog(getClass()); 062 063 private transient Object monitor = new Object(); 064 065 private int concurrencyLimit = UNBOUNDED_CONCURRENCY; 066 067 private int concurrencyCount = 0; 068 069 070 /** 071 * Set the maximum number of concurrent access attempts allowed. 072 * -1 indicates unbounded concurrency. 073 * <p>In principle, this limit can be changed at runtime, 074 * although it is generally designed as a config time setting. 075 * <p>NOTE: Do not switch between -1 and any concrete limit at runtime, 076 * as this will lead to inconsistent concurrency counts: A limit 077 * of -1 effectively turns off concurrency counting completely. 078 */ 079 public void setConcurrencyLimit(int concurrencyLimit) { 080 this.concurrencyLimit = concurrencyLimit; 081 } 082 083 /** 084 * Return the maximum number of concurrent access attempts allowed. 085 */ 086 public int getConcurrencyLimit() { 087 return this.concurrencyLimit; 088 } 089 090 /** 091 * Return whether this throttle is currently active. 092 * @return {@code true} if the concurrency limit for this instance is active 093 * @see #getConcurrencyLimit() 094 */ 095 public boolean isThrottleActive() { 096 return (this.concurrencyLimit >= 0); 097 } 098 099 100 /** 101 * To be invoked before the main execution logic of concrete subclasses. 102 * <p>This implementation applies the concurrency throttle. 103 * @see #afterAccess() 104 */ 105 protected void beforeAccess() { 106 if (this.concurrencyLimit == NO_CONCURRENCY) { 107 throw new IllegalStateException( 108 "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY"); 109 } 110 if (this.concurrencyLimit > 0) { 111 boolean debug = logger.isDebugEnabled(); 112 synchronized (this.monitor) { 113 boolean interrupted = false; 114 while (this.concurrencyCount >= this.concurrencyLimit) { 115 if (interrupted) { 116 throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " + 117 "but concurrency limit still does not allow for entering"); 118 } 119 if (debug) { 120 logger.debug("Concurrency count " + this.concurrencyCount + 121 " has reached limit " + this.concurrencyLimit + " - blocking"); 122 } 123 try { 124 this.monitor.wait(); 125 } 126 catch (InterruptedException ex) { 127 // Re-interrupt current thread, to allow other threads to react. 128 Thread.currentThread().interrupt(); 129 interrupted = true; 130 } 131 } 132 if (debug) { 133 logger.debug("Entering throttle at concurrency count " + this.concurrencyCount); 134 } 135 this.concurrencyCount++; 136 } 137 } 138 } 139 140 /** 141 * To be invoked after the main execution logic of concrete subclasses. 142 * @see #beforeAccess() 143 */ 144 protected void afterAccess() { 145 if (this.concurrencyLimit >= 0) { 146 synchronized (this.monitor) { 147 this.concurrencyCount--; 148 if (logger.isDebugEnabled()) { 149 logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount); 150 } 151 this.monitor.notify(); 152 } 153 } 154 } 155 156 157 //--------------------------------------------------------------------- 158 // Serialization support 159 //--------------------------------------------------------------------- 160 161 private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { 162 // Rely on default serialization, just initialize state after deserialization. 163 ois.defaultReadObject(); 164 165 // Initialize transient fields. 166 this.logger = LogFactory.getLog(getClass()); 167 this.monitor = new Object(); 168 } 169 170}