001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.concurrent.ForkJoinPool;
020import java.util.concurrent.TimeUnit;
021
022import org.springframework.beans.factory.DisposableBean;
023import org.springframework.beans.factory.FactoryBean;
024import org.springframework.beans.factory.InitializingBean;
025import org.springframework.lang.Nullable;
026
027/**
028 * A Spring {@link FactoryBean} that builds and exposes a preconfigured {@link ForkJoinPool}.
029 *
030 * @author Juergen Hoeller
031 * @since 3.1
032 */
033public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {
034
035        private boolean commonPool = false;
036
037        private int parallelism = Runtime.getRuntime().availableProcessors();
038
039        private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
040
041        @Nullable
042        private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
043
044        private boolean asyncMode = false;
045
046        private int awaitTerminationSeconds = 0;
047
048        @Nullable
049        private ForkJoinPool forkJoinPool;
050
051
052        /**
053         * Set whether to expose JDK 8's 'common' {@link ForkJoinPool}.
054         * <p>Default is "false", creating a local {@link ForkJoinPool} instance based on the
055         * {@link #setParallelism "parallelism"}, {@link #setThreadFactory "threadFactory"},
056         * {@link #setUncaughtExceptionHandler "uncaughtExceptionHandler"} and
057         * {@link #setAsyncMode "asyncMode"} properties on this FactoryBean.
058         * <p><b>NOTE:</b> Setting this flag to "true" effectively ignores all other
059         * properties on this FactoryBean, reusing the shared common JDK {@link ForkJoinPool}
060         * instead. This is a fine choice on JDK 8 but does remove the application's ability
061         * to customize ForkJoinPool behavior, in particular the use of custom threads.
062         * @since 3.2
063         * @see java.util.concurrent.ForkJoinPool#commonPool()
064         */
065        public void setCommonPool(boolean commonPool) {
066                this.commonPool = commonPool;
067        }
068
069        /**
070         * Specify the parallelism level. Default is {@link Runtime#availableProcessors()}.
071         */
072        public void setParallelism(int parallelism) {
073                this.parallelism = parallelism;
074        }
075
076        /**
077         * Set the factory for creating new ForkJoinWorkerThreads.
078         * Default is {@link ForkJoinPool#defaultForkJoinWorkerThreadFactory}.
079         */
080        public void setThreadFactory(ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory) {
081                this.threadFactory = threadFactory;
082        }
083
084        /**
085         * Set the handler for internal worker threads that terminate due to unrecoverable errors
086         * encountered while executing tasks. Default is none.
087         */
088        public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
089                this.uncaughtExceptionHandler = uncaughtExceptionHandler;
090        }
091
092        /**
093         * Specify whether to establish a local first-in-first-out scheduling mode for forked tasks
094         * that are never joined. This mode (asyncMode = {@code true}) may be more appropriate
095         * than the default locally stack-based mode in applications in which worker threads only
096         * process event-style asynchronous tasks. Default is {@code false}.
097         */
098        public void setAsyncMode(boolean asyncMode) {
099                this.asyncMode = asyncMode;
100        }
101
102        /**
103         * Set the maximum number of seconds that this ForkJoinPool is supposed to block
104         * on shutdown in order to wait for remaining tasks to complete their execution
105         * before the rest of the container continues to shut down. This is particularly
106         * useful if your remaining tasks are likely to need access to other resources
107         * that are also managed by the container.
108         * <p>By default, this ForkJoinPool won't wait for the termination of tasks at all.
109         * It will continue to fully execute all ongoing tasks as well as all remaining
110         * tasks in the queue, in parallel to the rest of the container shutting down.
111         * In contrast, if you specify an await-termination period using this property,
112         * this executor will wait for the given time (max) for the termination of tasks.
113         * <p>Note that this feature works for the {@link #setCommonPool "commonPool"}
114         * mode as well. The underlying ForkJoinPool won't actually terminate in that
115         * case but will wait for all tasks to terminate.
116         * @see java.util.concurrent.ForkJoinPool#shutdown()
117         * @see java.util.concurrent.ForkJoinPool#awaitTermination
118         */
119        public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
120                this.awaitTerminationSeconds = awaitTerminationSeconds;
121        }
122
123        @Override
124        public void afterPropertiesSet() {
125                this.forkJoinPool = (this.commonPool ? ForkJoinPool.commonPool() :
126                                new ForkJoinPool(this.parallelism, this.threadFactory, this.uncaughtExceptionHandler, this.asyncMode));
127        }
128
129
130        @Override
131        @Nullable
132        public ForkJoinPool getObject() {
133                return this.forkJoinPool;
134        }
135
136        @Override
137        public Class<?> getObjectType() {
138                return ForkJoinPool.class;
139        }
140
141        @Override
142        public boolean isSingleton() {
143                return true;
144        }
145
146
147        @Override
148        public void destroy() {
149                if (this.forkJoinPool != null) {
150                        // Ignored for the common pool.
151                        this.forkJoinPool.shutdown();
152
153                        // Wait for all tasks to terminate - works for the common pool as well.
154                        if (this.awaitTerminationSeconds > 0) {
155                                try {
156                                        this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
157                                }
158                                catch (InterruptedException ex) {
159                                        Thread.currentThread().interrupt();
160                                }
161                        }
162                }
163        }
164
165}