On this page
torch.futures
This package provides a Future type that encapsulates an asynchronous execution and a set of utility functions to simplify operations on Future objects. Currently, the Future type is primarily used by the Distributed RPC Framework.
- class torch.futures.Future(*, devices=None)
- 
    Wrapper around a torch._C.Futurewhich encapsulates an asynchronous execution of a callable, e.g.rpc_async(). It also exposes a set of APIs to add callback functions and set results.Warning GPU support is a beta feature, subject to changes. - add_done_callback(callback)[source]
- 
      Append the given callback function to this Future, which will be run when theFutureis completed. Multiple callbacks can be added to the sameFuture, but the order in which they will be executed cannot be guaranteed. The callback must take one argument, which is the reference to thisFuture. The callback function can use thevalue()method to get the value. Note that if thisFutureis already completed, the given callback will be run inline.We recommend that you use the then()method as it provides a way to synchronize after your callback has completed.add_done_callbackcan be cheaper if your callback does not return anything. But boththen()andadd_done_callbackuse the same callback registration API under the hood.With respect to GPU tensors, this method behaves in the same way as then().- Parameters
- 
        callback ( Future) – aCallablethat takes in one argument, which is the reference to thisFuture.
 Note Note that if the callback function throws, either through the original future being completed with an exception and calling fut.wait(), or through other code in the callback, error handling must be carefully taken care of. For example, if this callback later completes additional futures, those futures are not marked as completed with an error and the user is responsible for handling completion/waiting on those futures independently.- Example::
- 
        >>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
 
 - done()[source]
- 
      Return Trueif thisFutureis done. AFutureis done if it has a result or an exception.If the value contains tensors that reside on GPUs, Future.done()will returnTrueeven if the asynchronous kernels that are populating those tensors haven’t yet completed running on the device, because at such stage the result is already usable, provided one performs the appropriate synchronizations (seewait()).- Return type
 
 - set_exception(result)[source]
- 
      Set an exception for this Future, which will mark thisFutureas completed with an error and trigger all attached callbacks. Note that when calling wait()/value() on thisFuture, the exception set here will be raised inline.- Parameters
- 
        result (BaseException) – the exception for this Future.
 - Example::
- 
        >>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
 
 - set_result(result)[source]
- 
      Set the result for this Future, which will mark thisFutureas completed and trigger all attached callbacks. Note that aFuturecannot be marked completed twice.If the result contains tensors that reside on GPUs, this method can be called even if the asynchronous kernels that are populating those tensors haven’t yet completed running on the device, provided that the streams on which those kernels were enqueued are set as the current ones when this method is called. Put simply, it’s safe to call this method immediately after launching those kernels, without any additional synchronization, as long as one doesn’t change streams in between. This method will record events on all the relevant current streams and will use them to ensure proper scheduling for all the consumers of this Future.- Parameters
- 
        result (object) – the result object of this Future.
 - Example::
- 
        >>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
 
 - then(callback)[source]
- 
      Append the given callback function to this Future, which will be run when theFutureis completed. Multiple callbacks can be added to the sameFuture, but the order in which they will be executed cannot be guaranteed (to enforce a certain order consider chaining:fut.then(cb1).then(cb2)). The callback must take one argument, which is the reference to thisFuture. The callback function can use thevalue()method to get the value. Note that if thisFutureis already completed, the given callback will be run immediately inline.If the Future’s value contains tensors that reside on GPUs, the callback might be invoked while the async kernels that are populating those tensors haven’t yet finished executing on the device. However, the callback will be invoked with some dedicated streams set as current (fetched from a global pool) which will be synchronized with those kernels. Hence any operation performed by the callback on these tensors will be scheduled on the device after the kernels complete. In other words, as long as the callback doesn’t switch streams, it can safely manipulate the result without any additional synchronization. This is similar to the non-blocking behavior ofwait().Similarly, if the callback returns a value that contains tensors that reside on a GPU, it can do so even if the kernels that are producing these tensors are still running on the device, as long as the callback didn’t change streams during its execution. If one wants to change streams, one must be careful to re-synchronize them with the original streams, that is, those that were current when the callback was invoked. - Parameters
- 
        callback ( Callable) – aCallablethat takes thisFutureas the only argument.
- Returns
- 
        A new Futureobject that holds the return value of thecallbackand will be marked as completed when the givencallbackfinishes.
- Return type
- 
        Future[S] 
 Note Note that if the callback function throws, either through the original future being completed with an exception and calling fut.wait(), or through other code in the callback, the future returned bythenwill be marked appropriately with the encountered error. However, if this callback later completes additional futures, those futures are not marked as completed with an error and the user is responsible for handling completion/waiting on those futures independently.- Example::
- 
        >>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
 
 - value()[source]
- 
      Obtain the value of an already-completed future. This method should only be called after a call to wait()has completed, or inside a callback function passed tothen(). In other cases thisFuturemay not yet hold a value and callingvalue()could fail.If the value contains tensors that reside on GPUs, then this method will not perform any additional synchronization. This should be done beforehand, separately, through a call to wait()(except within callbacks, for which it’s already being taken care of bythen()).- Returns
- 
        The value held by this Future. If the function (callback or RPC) creating the value has thrown an error, thisvalue()method will also throw an error.
- Return type
- 
        T 
 
 - wait()[source]
- 
      Block until the value of this Futureis ready.If the value contains tensors that reside on GPUs, then an additional synchronization is performed with the kernels (executing on the device) which may be asynchronously populating those tensors. Such sync is non-blocking, which means that wait()will insert the necessary instructions in the current streams to ensure that further operations enqueued on those streams will be properly scheduled after the async kernels but, once that is done,wait()will return, even if those kernels are still running. No further synchronization is required when accessing and using the values, as long as one doesn’t change streams.- Returns
- 
        The value held by this Future. If the function (callback or RPC) creating the value has thrown an error, thiswaitmethod will also throw an error.
- Return type
- 
        T 
 
 
- torch.futures.collect_all(futures)[source]
- 
    Collects the provided Futureobjects into a single combinedFuturethat is completed when all of the sub-futures are completed.- Parameters
- Returns
- 
      Returns a Futureobject to a list of the passed in Futures.
- Return type
 - Example::
- 
      >>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1
 
- torch.futures.wait_all(futures)[source]
- 
    Waits for all provided futures to be complete, and returns the list of completed values. If any of the futures encounters an error, the method will exit early and report the error not waiting for other futures to complete. 
© 2024, PyTorch Contributors
PyTorch has a BSD-style license, as found in the LICENSE file.
 https://pytorch.org/docs/2.1/futures.html