classPrefectFuture(Generic[R,A]):""" Represents the result of a computation happening in a task runner. When tasks are called, they are submitted to a task runner which creates a future for access to the state and result of the task. Examples: Define a task that returns a string >>> from prefect import flow, task >>> @task >>> def my_task() -> str: >>> return "hello" Calls of this task in a flow will return a future >>> @flow >>> def my_flow(): >>> future = my_task.submit() # PrefectFuture[str, Sync] includes result type >>> future.task_run.id # UUID for the task run Wait for the task to complete >>> @flow >>> def my_flow(): >>> future = my_task.submit() >>> final_state = future.wait() Wait N sconds for the task to complete >>> @flow >>> def my_flow(): >>> future = my_task.submit() >>> final_state = future.wait(0.1) >>> if final_state: >>> ... # Task done >>> else: >>> ... # Task not done yet Wait for a task to complete and retrieve its result >>> @flow >>> def my_flow(): >>> future = my_task.submit() >>> result = future.result() >>> assert result == "hello" Wait N seconds for a task to complete and retrieve its result >>> @flow >>> def my_flow(): >>> future = my_task.submit() >>> result = future.result(timeout=5) >>> assert result == "hello" Retrieve the state of a task without waiting for completion >>> @flow >>> def my_flow(): >>> future = my_task.submit() >>> state = future.get_state() """def__init__(self,name:str,key:UUID,task_runner:"BaseTaskRunner",asynchronous:A=True,_final_state:State[R]=None,# Exposed for testing)->None:self.key=keyself.name=nameself.asynchronous=asynchronousself.task_run=Noneself._final_state=_final_stateself._exception:Optional[Exception]=Noneself._task_runner=task_runnerself._submitted=anyio.Event()self._loop=asyncio.get_running_loop()@overloaddefwait(self:"PrefectFuture[R, Async]",timeout:None=None)->Awaitable[State[R]]:...@overloaddefwait(self:"PrefectFuture[R, Sync]",timeout:None=None)->State[R]:...@overloaddefwait(self:"PrefectFuture[R, Async]",timeout:float)->Awaitable[Optional[State[R]]]:...@overloaddefwait(self:"PrefectFuture[R, Sync]",timeout:float)->Optional[State[R]]:...defwait(self,timeout=None):""" Wait for the run to finish and return the final state If the timeout is reached before the run reaches a final state, `None` is returned. """ifself.asynchronous:returnself._wait(timeout=timeout)else:# type checking cannot handle the overloaded timeout passingreturnfrom_sync.call_soon_in_loop_thread(create_call(self._wait,timeout=timeout)).result()# type: ignore@overloadasyncdef_wait(self,timeout:None=None)->State[R]:...@overloadasyncdef_wait(self,timeout:float)->Optional[State[R]]:...asyncdef_wait(self,timeout=None):""" Async implementation for `wait` """awaitself._wait_for_submission()ifself._final_state:returnself._final_stateself._final_state=awaitself._task_runner.wait(self.key,timeout)returnself._final_state@overloaddefresult(self:"PrefectFuture[R, Sync]",timeout:float=None,raise_on_failure:bool=True,)->R:...@overloaddefresult(self:"PrefectFuture[R, Sync]",timeout:float=None,raise_on_failure:bool=False,)->Union[R,Exception]:...@overloaddefresult(self:"PrefectFuture[R, Async]",timeout:float=None,raise_on_failure:bool=True,)->Awaitable[R]:...@overloaddefresult(self:"PrefectFuture[R, Async]",timeout:float=None,raise_on_failure:bool=False,)->Awaitable[Union[R,Exception]]:...defresult(self,timeout:float=None,raise_on_failure:bool=True):""" Wait for the run to finish and return the final state. If the timeout is reached before the run reaches a final state, a `TimeoutError` will be raised. If `raise_on_failure` is `True` and the task run failed, the task run's exception will be raised. """ifself.asynchronous:returnself._result(timeout=timeout,raise_on_failure=raise_on_failure)else:returnfrom_sync.call_soon_in_loop_thread(create_call(self._result,timeout=timeout,raise_on_failure=raise_on_failure)).result()asyncdef_result(self,timeout:float=None,raise_on_failure:bool=True):""" Async implementation of `result` """final_state=awaitself._wait(timeout=timeout)ifnotfinal_state:raiseTimeoutError("Call timed out before task finished.")returnawaitfinal_state.result(raise_on_failure=raise_on_failure,fetch=True)@overloaddefget_state(self:"PrefectFuture[R, Async]",client:PrefectClient=None)->Awaitable[State[R]]:...@overloaddefget_state(self:"PrefectFuture[R, Sync]",client:PrefectClient=None)->State[R]:...defget_state(self,client:PrefectClient=None):""" Get the current state of the task run. """ifself.asynchronous:returncast(Awaitable[State[R]],self._get_state(client=client))else:returncast(State[R],sync(self._get_state,client=client))@inject_clientasyncdef_get_state(self,client:PrefectClient=None)->State[R]:assertclientisnotNone# always injected# We must wait for the task run id to be populatedawaitself._wait_for_submission()task_run=awaitclient.read_task_run(self.task_run.id)ifnottask_run:raiseRuntimeError("Future has no associated task run in the server.")# Update the task run referenceself.task_run=task_runreturntask_run.stateasyncdef_wait_for_submission(self):importasyncio# TODO: This spin lock is not performant but is necessary for cases where a# future is created in a separate event loop i.e. when a sync task is# called in an async flowifnotasyncio.get_running_loop()==self._loop:whilenotself._submitted.is_set():awaitanyio.sleep(0)else:awaitself._submitted.wait()def__hash__(self)->int:returnhash(self.key)def__repr__(self)->str:returnf"PrefectFuture({self.name!r})"def__bool__(self)->bool:warnings.warn(("A 'PrefectFuture' from a task call was cast to a boolean; ""did you mean to check the result of the task instead? ""e.g. `if my_task().result(): ...`"),stacklevel=2,)returnTrue
defget_state(self,client:PrefectClient=None):""" Get the current state of the task run. """ifself.asynchronous:returncast(Awaitable[State[R]],self._get_state(client=client))else:returncast(State[R],sync(self._get_state,client=client))
defresult(self,timeout:float=None,raise_on_failure:bool=True):""" Wait for the run to finish and return the final state. If the timeout is reached before the run reaches a final state, a `TimeoutError` will be raised. If `raise_on_failure` is `True` and the task run failed, the task run's exception will be raised. """ifself.asynchronous:returnself._result(timeout=timeout,raise_on_failure=raise_on_failure)else:returnfrom_sync.call_soon_in_loop_thread(create_call(self._result,timeout=timeout,raise_on_failure=raise_on_failure)).result()
Wait for the run to finish and return the final state
If the timeout is reached before the run reaches a final state,
None is returned.
Source code in src/prefect/futures.py
149150151152153154155156157158159160
defwait(self,timeout=None):""" Wait for the run to finish and return the final state If the timeout is reached before the run reaches a final state, `None` is returned. """ifself.asynchronous:returnself._wait(timeout=timeout)else:# type checking cannot handle the overloaded timeout passingreturnfrom_sync.call_soon_in_loop_thread(create_call(self._wait,timeout=timeout)).result()# type: ignore
defcall_repr(__fn:Callable,*args:Any,**kwargs:Any)->str:""" Generate a repr for a function call as "fn_name(arg_value, kwarg_name=kwarg_value)" """name=__fn.__name__# TODO: If this computation is concerningly expensive, we can iterate checking the# length at each arg or avoid calling `repr` on args with large amounts of# datacall_args=", ".join([repr(arg)forarginargs]+[f"{key}={repr(val)}"forkey,valinkwargs.items()])# Enforce a maximum lengthiflen(call_args)>100:call_args=call_args[:100]+"..."returnf"{name}({call_args})"
Given a Python built-in collection, recursively find PrefectFutures and build a
new collection with the same structure with futures resolved to their results.
Resolving futures to their results may wait for execution to complete and require
communication with the API.
Unsupported object types will be returned without modification.
asyncdefresolve_futures_to_data(expr:Union[PrefectFuture[R,Any],Any])->Union[R,Any]:""" Given a Python built-in collection, recursively find `PrefectFutures` and build a new collection with the same structure with futures resolved to their results. Resolving futures to their results may wait for execution to complete and require communication with the API. Unsupported object types will be returned without modification. """defresolve_future(expr):ifisinstance(expr,PrefectFuture):returnrun_async_from_worker_thread(expr._result)else:returnexprreturnawaitrun_sync_in_worker_thread(visit_collection,expr,visit_fn=resolve_future,return_data=True)
Given a Python built-in collection, recursively find PrefectFutures and build a
new collection with the same structure with futures resolved to their final states.
Resolving futures to their final states may wait for execution to complete.
Unsupported object types will be returned without modification.
asyncdefresolve_futures_to_states(expr:Union[PrefectFuture[R,Any],Any])->Union[State[R],Any]:""" Given a Python built-in collection, recursively find `PrefectFutures` and build a new collection with the same structure with futures resolved to their final states. Resolving futures to their final states may wait for execution to complete. Unsupported object types will be returned without modification. """defresolve_future(expr):ifisinstance(expr,PrefectFuture):returnrun_async_from_worker_thread(expr._wait)else:returnexprreturnawaitrun_sync_in_worker_thread(visit_collection,expr,visit_fn=resolve_future,return_data=True)