001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.util;
018
019import java.io.IOException;
020import java.io.ObjectInputStream;
021import java.io.Serializable;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026/**
027 * Support class for throttling concurrent access to a specific resource.
028 *
029 * <p>Designed for use as a base class, with the subclass invoking
030 * the {@link #beforeAccess()} and {@link #afterAccess()} methods at
031 * appropriate points of its workflow. Note that {@code afterAccess}
032 * should usually be called in a finally block!
033 *
034 * <p>The default concurrency limit of this support class is -1
035 * ("unbounded concurrency"). Subclasses may override this default;
036 * check the javadoc of the concrete class that you're using.
037 *
038 * @author Juergen Hoeller
039 * @since 1.2.5
040 * @see #setConcurrencyLimit
041 * @see #beforeAccess()
042 * @see #afterAccess()
043 * @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
044 * @see java.io.Serializable
045 */
046@SuppressWarnings("serial")
047public abstract class ConcurrencyThrottleSupport implements Serializable {
048
049        /**
050         * Permit any number of concurrent invocations: that is, don't throttle concurrency.
051         */
052        public static final int UNBOUNDED_CONCURRENCY = -1;
053
054        /**
055         * Switch concurrency 'off': that is, don't allow any concurrent invocations.
056         */
057        public static final int NO_CONCURRENCY = 0;
058
059
060        /** Transient to optimize serialization */
061        protected transient Log logger = LogFactory.getLog(getClass());
062
063        private transient Object monitor = new Object();
064
065        private int concurrencyLimit = UNBOUNDED_CONCURRENCY;
066
067        private int concurrencyCount = 0;
068
069
070        /**
071         * Set the maximum number of concurrent access attempts allowed.
072         * -1 indicates unbounded concurrency.
073         * <p>In principle, this limit can be changed at runtime,
074         * although it is generally designed as a config time setting.
075         * <p>NOTE: Do not switch between -1 and any concrete limit at runtime,
076         * as this will lead to inconsistent concurrency counts: A limit
077         * of -1 effectively turns off concurrency counting completely.
078         */
079        public void setConcurrencyLimit(int concurrencyLimit) {
080                this.concurrencyLimit = concurrencyLimit;
081        }
082
083        /**
084         * Return the maximum number of concurrent access attempts allowed.
085         */
086        public int getConcurrencyLimit() {
087                return this.concurrencyLimit;
088        }
089
090        /**
091         * Return whether this throttle is currently active.
092         * @return {@code true} if the concurrency limit for this instance is active
093         * @see #getConcurrencyLimit()
094         */
095        public boolean isThrottleActive() {
096                return (this.concurrencyLimit >= 0);
097        }
098
099
100        /**
101         * To be invoked before the main execution logic of concrete subclasses.
102         * <p>This implementation applies the concurrency throttle.
103         * @see #afterAccess()
104         */
105        protected void beforeAccess() {
106                if (this.concurrencyLimit == NO_CONCURRENCY) {
107                        throw new IllegalStateException(
108                                        "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
109                }
110                if (this.concurrencyLimit > 0) {
111                        boolean debug = logger.isDebugEnabled();
112                        synchronized (this.monitor) {
113                                boolean interrupted = false;
114                                while (this.concurrencyCount >= this.concurrencyLimit) {
115                                        if (interrupted) {
116                                                throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
117                                                                "but concurrency limit still does not allow for entering");
118                                        }
119                                        if (debug) {
120                                                logger.debug("Concurrency count " + this.concurrencyCount +
121                                                                " has reached limit " + this.concurrencyLimit + " - blocking");
122                                        }
123                                        try {
124                                                this.monitor.wait();
125                                        }
126                                        catch (InterruptedException ex) {
127                                                // Re-interrupt current thread, to allow other threads to react.
128                                                Thread.currentThread().interrupt();
129                                                interrupted = true;
130                                        }
131                                }
132                                if (debug) {
133                                        logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
134                                }
135                                this.concurrencyCount++;
136                        }
137                }
138        }
139
140        /**
141         * To be invoked after the main execution logic of concrete subclasses.
142         * @see #beforeAccess()
143         */
144        protected void afterAccess() {
145                if (this.concurrencyLimit >= 0) {
146                        synchronized (this.monitor) {
147                                this.concurrencyCount--;
148                                if (logger.isDebugEnabled()) {
149                                        logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
150                                }
151                                this.monitor.notify();
152                        }
153                }
154        }
155
156
157        //---------------------------------------------------------------------
158        // Serialization support
159        //---------------------------------------------------------------------
160
161        private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
162                // Rely on default serialization, just initialize state after deserialization.
163                ois.defaultReadObject();
164
165                // Initialize transient fields.
166                this.logger = LogFactory.getLog(getClass());
167                this.monitor = new Object();
168        }
169
170}