001/*
002 * Copyright 2002-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.concurrent.ForkJoinPool;
020import java.util.concurrent.TimeUnit;
021
022import org.springframework.beans.factory.DisposableBean;
023import org.springframework.beans.factory.FactoryBean;
024import org.springframework.beans.factory.InitializingBean;
025import org.springframework.lang.UsesJava7;
026
027/**
028 * A Spring {@link FactoryBean} that builds and exposes a preconfigured {@link ForkJoinPool}.
029 * May be used on Java 7 and 8 as well as on Java 6 with {@code jsr166.jar} on the classpath
030 * (ideally on the VM bootstrap classpath).
031 *
032 * <p>For details on the ForkJoinPool API and its use with RecursiveActions, see the
033 * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html">JDK 7 javadoc</a>.
034 *
035 * <p>{@code jsr166.jar}, containing {@code java.util.concurrent} updates for Java 6, can be obtained
036 * from the <a href="http://gee.cs.oswego.edu/dl/concurrency-interest/">concurrency interest website</a>.
037 *
038 * @author Juergen Hoeller
039 * @since 3.1
040 */
041@UsesJava7
042public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {
043
044        private boolean commonPool = false;
045
046        private int parallelism = Runtime.getRuntime().availableProcessors();
047
048        private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
049
050        private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
051
052        private boolean asyncMode = false;
053
054        private int awaitTerminationSeconds = 0;
055
056        private ForkJoinPool forkJoinPool;
057
058
059        /**
060         * Set whether to expose JDK 8's 'common' {@link ForkJoinPool}.
061         * <p>Default is "false", creating a local {@link ForkJoinPool} instance based on the
062         * {@link #setParallelism "parallelism"}, {@link #setThreadFactory "threadFactory"},
063         * {@link #setUncaughtExceptionHandler "uncaughtExceptionHandler"} and
064         * {@link #setAsyncMode "asyncMode"} properties on this FactoryBean.
065         * <p><b>NOTE:</b> Setting this flag to "true" effectively ignores all other
066         * properties on this FactoryBean, reusing the shared common JDK {@link ForkJoinPool}
067         * instead. This is a fine choice on JDK 8 but does remove the application's ability
068         * to customize ForkJoinPool behavior, in particular the use of custom threads.
069         * @since 3.2
070         * @see java.util.concurrent.ForkJoinPool#commonPool()
071         */
072        public void setCommonPool(boolean commonPool) {
073                this.commonPool = commonPool;
074        }
075
076        /**
077         * Specify the parallelism level. Default is {@link Runtime#availableProcessors()}.
078         */
079        public void setParallelism(int parallelism) {
080                this.parallelism = parallelism;
081        }
082
083        /**
084         * Set the factory for creating new ForkJoinWorkerThreads.
085         * Default is {@link ForkJoinPool#defaultForkJoinWorkerThreadFactory}.
086         */
087        public void setThreadFactory(ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory) {
088                this.threadFactory = threadFactory;
089        }
090
091        /**
092         * Set the handler for internal worker threads that terminate due to unrecoverable errors
093         * encountered while executing tasks. Default is none.
094         */
095        public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
096                this.uncaughtExceptionHandler = uncaughtExceptionHandler;
097        }
098
099        /**
100         * Specify whether to establish a local first-in-first-out scheduling mode for forked tasks
101         * that are never joined. This mode (asyncMode = {@code true}) may be more appropriate
102         * than the default locally stack-based mode in applications in which worker threads only
103         * process event-style asynchronous tasks. Default is {@code false}.
104         */
105        public void setAsyncMode(boolean asyncMode) {
106                this.asyncMode = asyncMode;
107        }
108
109        /**
110         * Set the maximum number of seconds that this ForkJoinPool is supposed to block
111         * on shutdown in order to wait for remaining tasks to complete their execution
112         * before the rest of the container continues to shut down. This is particularly
113         * useful if your remaining tasks are likely to need access to other resources
114         * that are also managed by the container.
115         * <p>By default, this ForkJoinPool won't wait for the termination of tasks at all.
116         * It will continue to fully execute all ongoing tasks as well as all remaining
117         * tasks in the queue, in parallel to the rest of the container shutting down.
118         * In contrast, if you specify an await-termination period using this property,
119         * this executor will wait for the given time (max) for the termination of tasks.
120         * <p>Note that this feature works for the {@link #setCommonPool "commonPool"}
121         * mode as well. The underlying ForkJoinPool won't actually terminate in that
122         * case but will wait for all tasks to terminate.
123         * @see java.util.concurrent.ForkJoinPool#shutdown()
124         * @see java.util.concurrent.ForkJoinPool#awaitTermination
125         */
126        public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
127                this.awaitTerminationSeconds = awaitTerminationSeconds;
128        }
129
130        @Override
131        public void afterPropertiesSet() {
132                this.forkJoinPool = (this.commonPool ? ForkJoinPool.commonPool() :
133                                new ForkJoinPool(this.parallelism, this.threadFactory, this.uncaughtExceptionHandler, this.asyncMode));
134        }
135
136
137        @Override
138        public ForkJoinPool getObject() {
139                return this.forkJoinPool;
140        }
141
142        @Override
143        public Class<?> getObjectType() {
144                return ForkJoinPool.class;
145        }
146
147        @Override
148        public boolean isSingleton() {
149                return true;
150        }
151
152
153        @Override
154        public void destroy() {
155                // Ignored for the common pool.
156                this.forkJoinPool.shutdown();
157
158                // Wait for all tasks to terminate - works for the common pool as well.
159                if (this.awaitTerminationSeconds > 0) {
160                        try {
161                                this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
162                        }
163                        catch (InterruptedException ex) {
164                                Thread.currentThread().interrupt();
165                        }
166                }
167        }
168
169}