001/* 002 * Copyright 2006-2010 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.poller; 017 018import java.util.concurrent.Callable; 019import java.util.concurrent.ExecutionException; 020import java.util.concurrent.Future; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.TimeoutException; 023 024/** 025 * A {@link Poller} that uses the callers thread to poll for a result as soon as 026 * it is asked for. This is often appropriate if you expect a result relatively 027 * quickly, or if there is only one such result expected (otherwise it is more 028 * efficient to use a background thread to do the polling). 029 * 030 * @author Dave Syer 031 * 032 * @param <S> the type of the result 033 */ 034public class DirectPoller<S> implements Poller<S> { 035 036 private final long interval; 037 038 public DirectPoller(long interval) { 039 this.interval = interval; 040 } 041 042 /** 043 * Get a future for a non-null result from the callback. Only when the 044 * result is asked for (using {@link Future#get()} or 045 * {@link Future#get(long, TimeUnit)} will the polling actually start. 046 * 047 * @see Poller#poll(Callable) 048 */ 049 @Override 050 public Future<S> poll(Callable<S> callable) throws Exception { 051 return new DirectPollingFuture<S>(interval, callable); 052 } 053 054 private static class DirectPollingFuture<S> implements Future<S> { 055 056 private final long startTime = System.currentTimeMillis(); 057 058 private volatile boolean cancelled; 059 060 private volatile S result = null; 061 062 private final long interval; 063 064 private final Callable<S> callable; 065 066 public DirectPollingFuture(long interval, Callable<S> callable) { 067 this.interval = interval; 068 this.callable = callable; 069 } 070 071 @Override 072 public boolean cancel(boolean mayInterruptIfRunning) { 073 cancelled = true; 074 return true; 075 } 076 077 @Override 078 public S get() throws InterruptedException, ExecutionException { 079 try { 080 return get(-1, TimeUnit.MILLISECONDS); 081 } 082 catch (TimeoutException e) { 083 throw new IllegalStateException("Unexpected timeout waiting for result", e); 084 } 085 } 086 087 @Override 088 public S get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 089 090 try { 091 result = callable.call(); 092 } 093 catch (Exception e) { 094 throw new ExecutionException(e); 095 } 096 097 Long nextExecutionTime = startTime + interval; 098 long currentTimeMillis = System.currentTimeMillis(); 099 long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit); 100 101 while (result == null && !cancelled) { 102 103 long delta = nextExecutionTime - startTime; 104 if (delta >= timeoutMillis && timeoutMillis > 0) { 105 throw new TimeoutException("Timed out waiting for task to return non-null result"); 106 } 107 108 if (nextExecutionTime > currentTimeMillis) { 109 Thread.sleep(nextExecutionTime - currentTimeMillis); 110 } 111 112 currentTimeMillis = System.currentTimeMillis(); 113 nextExecutionTime = currentTimeMillis + interval; 114 115 try { 116 result = callable.call(); 117 } 118 catch (Exception e) { 119 throw new ExecutionException(e); 120 } 121 122 } 123 124 return result; 125 126 } 127 128 @Override 129 public boolean isCancelled() { 130 return cancelled; 131 } 132 133 @Override 134 public boolean isDone() { 135 return cancelled || result != null; 136 } 137 138 } 139 140}