001/*
002 * Copyright 2006-2010 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.poller;
017
018import java.util.concurrent.Callable;
019import java.util.concurrent.ExecutionException;
020import java.util.concurrent.Future;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.TimeoutException;
023
024/**
025 * A {@link Poller} that uses the callers thread to poll for a result as soon as
026 * it is asked for. This is often appropriate if you expect a result relatively
027 * quickly, or if there is only one such result expected (otherwise it is more
028 * efficient to use a background thread to do the polling).
029 * 
030 * @author Dave Syer
031 * 
032 * @param <S> the type of the result
033 */
034public class DirectPoller<S> implements Poller<S> {
035
036        private final long interval;
037
038        public DirectPoller(long interval) {
039                this.interval = interval;
040        }
041
042        /**
043         * Get a future for a non-null result from the callback. Only when the
044         * result is asked for (using {@link Future#get()} or
045         * {@link Future#get(long, TimeUnit)} will the polling actually start.
046         * 
047         * @see Poller#poll(Callable)
048         */
049    @Override
050        public Future<S> poll(Callable<S> callable) throws Exception {
051                return new DirectPollingFuture<S>(interval, callable);
052        }
053
054        private static class DirectPollingFuture<S> implements Future<S> {
055
056                private final long startTime = System.currentTimeMillis();
057
058                private volatile boolean cancelled;
059
060                private volatile S result = null;
061
062                private final long interval;
063
064                private final Callable<S> callable;
065
066                public DirectPollingFuture(long interval, Callable<S> callable) {
067                        this.interval = interval;
068                        this.callable = callable;
069                }
070
071        @Override
072                public boolean cancel(boolean mayInterruptIfRunning) {
073                        cancelled = true;
074                        return true;
075                }
076
077        @Override
078                public S get() throws InterruptedException, ExecutionException {
079                        try {
080                                return get(-1, TimeUnit.MILLISECONDS);
081                        }
082                        catch (TimeoutException e) {
083                                throw new IllegalStateException("Unexpected timeout waiting for result", e);
084                        }
085                }
086
087        @Override
088                public S get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
089
090                        try {
091                                result = callable.call();
092                        }
093                        catch (Exception e) {
094                                throw new ExecutionException(e);
095                        }
096
097                        Long nextExecutionTime = startTime + interval;
098                        long currentTimeMillis = System.currentTimeMillis();
099                        long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
100
101                        while (result == null && !cancelled) {
102
103                                long delta = nextExecutionTime - startTime;
104                                if (delta >= timeoutMillis && timeoutMillis > 0) {
105                                        throw new TimeoutException("Timed out waiting for task to return non-null result");
106                                }
107
108                                if (nextExecutionTime > currentTimeMillis) {
109                                        Thread.sleep(nextExecutionTime - currentTimeMillis);
110                                }
111
112                                currentTimeMillis = System.currentTimeMillis();
113                                nextExecutionTime = currentTimeMillis + interval;
114
115                                try {
116                                        result = callable.call();
117                                }
118                                catch (Exception e) {
119                                        throw new ExecutionException(e);
120                                }
121
122                        }
123
124                        return result;
125
126                }
127
128        @Override
129                public boolean isCancelled() {
130                        return cancelled;
131                }
132
133        @Override
134                public boolean isDone() {
135                        return cancelled || result != null;
136                }
137
138        }
139
140}