States are rich objects that contain information about the status of a particular task run or flow run. While you don't need to know the details of the states to use Prefect, you can give your workflows superpowers by taking advantage of it.
At any moment, you can learn anything you need to know about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:
is scheduled to make a third run attempt in an hour
succeeded and what data it produced
was scheduled to run, but later cancelled
used the cached result of a previous run instead of re-running
failed because it timed out
By manipulating a relatively small number of task states, Prefect flows can harness the complexity that emerges in workflows.
Only runs have states
Though we often refer to the "state" of a flow or a task, what we really mean is the state of a flow run or a task run. Flows and tasks are templates that describe what a system does; only when we run the system does it also take on a state. So while we might refer to a task as "running" or being "successful", we really mean that a specific instance of the task is in that state.
States have names and types. State types are canonical, with specific orchestration rules that apply to transitions into and out of each state type. A state's name, is often, but not always, synonymous with its type. For example, a task run that is running for the first time has a state with the name Running and the type RUNNING. However, if the task retries, that same task run will have the name Retrying and the type RUNNING. Each time the task run transitions into the RUNNING state, the same orchestration rules are applied.
There are terminal state types from which there are no orchestrated transitions to any other state type.
COMPLETED
CANCELLED
FAILED
CRASHED
The full complement of states and state types includes:
Name
Type
Terminal?
Description
Scheduled
SCHEDULED
No
The run will begin at a particular time in the future.
Late
SCHEDULED
No
The run's scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default).
AwaitingRetry
SCHEDULED
No
The run did not complete successfully because of a code issue and had remaining retry attempts.
Pending
PENDING
No
The run has been submitted to run, but is waiting on necessary preconditions to be satisfied.
Running
RUNNING
No
The run code is currently executing.
Retrying
RUNNING
No
The run code is currently executing after previously not complete successfully.
Paused
PAUSED
No
The run code has stopped executing until it receives manual approval to proceed.
Cancelling
CANCELLING
No
The infrastructure on which the code was running is being cleaned up.
Cancelled
CANCELLED
Yes
The run did not complete because a user determined that it should not.
Completed
COMPLETED
Yes
The run completed successfully.
Failed
FAILED
Yes
The run did not complete because of a code issue and had no remaining retry attempts.
Crashed
CRASHED
Yes
The run did not complete because of an infrastructure issue.
State change hooks execute code in response to changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.
fromprefectimportflowdefmy_success_hook(flow,flow_run,state):print("Flow run succeeded!")@flow(on_completion=[my_success_hook])defmy_flow():return42my_flow()
defmy_flow_hook(flow:Flow,flow_run:FlowRun,state:State):"""This is the required signature for a flow run state change hook. This hook can only be passed into flows. """# pass hook as a list of callables@flow(on_completion=[my_flow_hook])
defmy_task_hook(task:Task,task_run:TaskRun,state:State):"""This is the required signature for a task run state change hook. This hook can only be passed into tasks. """# pass hook as a list of callables@task(on_failure=[my_task_hook])
State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:
defmy_success_hook(task,task_run,state):print("Task run succeeded!")defmy_failure_hook(task,task_run,state):print("Task run failed!")defmy_succeed_or_fail_hook(task,task_run,state):print("If the task run succeeds or fails, this hook runs.")@task(on_completion=[my_success_hook,my_succeed_or_fail_hook],on_failure=[my_failure_hook,my_succeed_or_fail_hook])
... or define your hook to accept arbitrary keyword arguments:
fromfunctoolsimportpartialfromprefectimportflow,taskdata={}defmy_hook(task,task_run,state,**kwargs):data.update(state=state,**kwargs)@taskdefbad_task():raiseValueError("meh")@flowdefok_with_failure_flow(x:str="foo",y:int=42):bad_task_with_a_hook=bad_task.with_options(on_failure=[partial(my_hook,**dict(x=x,y=y))])# return a tuple of "bar" and the task run state# to avoid raising the task's exceptionreturn"bar",bad_task_with_a_hook(return_state=True)_,task_run_state=ok_with_failure_flow()assertdata=={"x":"foo","y":42,"state":task_run_state}