# Flows
# Overview
A Flow
is a container for Tasks
. It represents an entire workflow or application by describing the dependencies between tasks.
Flows are DAGs, or "directed acyclic graphs." This is a mathematical way of describing certain organizational principles:
- A graph is a data structure that uses "edges" to connect "nodes." Prefect models each
Flow
as a graph in whichTask
dependencies are modeled byEdges
. - A directed graph means that edges have a start and an end: when two tasks are connected, one of them unambiguously runs first and the other one runs second.
- An acyclic directed graph has no circular dependencies: if you walk through the graph, you will never revisit a task you've seen before.
# APIs
# Functional API
The most convenient way to build a Prefect pipeline is with the functional API. The functional API is available any time you enter a Flow
context. In this mode, you can call Tasks
on other Tasks
as if they were functions, and Prefect will build up a computational graph in the background by modifying the flow appropriately.
For example:
from prefect import task, Task, Flow
import random
@task
def random_number():
return random.randint(0, 100)
@task
def plus_one(x):
return x + 1
with Flow('My Functional Flow') as flow:
r = random_number()
y = plus_one(x=r)
Using Task Subclasses with the Functional API
Note that in order to use a Task
subclass with the functional API (as opposed to a @task
-decorated function), you need to instantiate the class before calling it:
class PlusOneTask(Task):
def run(self, x):
return x + 1
with Flow('Plus One Flow'):
task = PlusOneTask() # first create the Task instance
result = task(10) # then call it with arguments
Instantiation is when properties including the task's retry_delay
, trigger
, and caching
mechanisms are set. With the functional API, these properties can be passed as arguments to the @task
decorator.
# Imperative API
Prefect's imperative API allows more fine-grained control. Its main advantage over the functional API is that it allows tasks to be set as upstream or downstream dependencies without passing their results. This allows you to create a strict ordering of tasks through state dependencies without also creating data dependencies.
from prefect import Task, Flow
class RunMeFirst(Task):
def run(self):
print("I'm running first!")
class PlusOneTask(Task):
def run(self, x):
return x + 1
flow = Flow('My Imperative Flow')
plus_one = PlusOneTask()
flow.set_dependencies(
task=plus_one,
upstream_tasks=[RunMeFirst()],
keyword_tasks=dict(x=10))
flow.visualize()
TIP
flow.set_dependencies()
and task.set_dependencies()
(the latter is only available inside an active flow context) are the main entrypoints for the imperative API. Flows also provide some lower-level methods like add_task()
and add_edge()
that can be used to manipulate the graph directly.
# Running a flow
To run a flow, call flow.run()
:
from prefect import task, Flow
@task
def say_hello():
print("Hello, world!")
with Flow("Run Me") as flow:
h = say_hello()
flow.run() # prints "Hello, world!"
This will return a State
object representing the outcome of the run, including the States
of all tasks.
state = flow.run()
state.result[h] # the task state of the say_hello task
# Schedules
Prefect treats flows as functions, which means they can be run at any time, with any concurrency, for any reason.
However, flows may also have schedules. In Prefect terms, a schedule is nothing more than a way to indicate that you want to start a new run at a specific time. Even if a flow has a schedule, you may still run it manually.
For more information, see the Schedules concept doc.
# Running a flow on schedule
If flow.run()
is called for a flow with a schedule attached, then it will run the flow on schedule. Note that it will wait for the next scheduled time and not start running immediately.
Concurrent flow runs are not supported by `flow.run()`
flow.run()
is a convenient way to run a flow on schedule, but it does not support concurrent flow runs. It will wait for a run to completely finish, including things like tasks that require retries, before starting the next run. However, Prefect schedules never return start times in the past. This means that if a flow run is still running when another flow run is supposed to start, the second flow run won't happen at all. If you require concurrent runs in a local process, consider using the lower-level FlowRunner
classes directly.
# Key tasks
# Terminal tasks
The terminal tasks of the flow are any tasks that have no downstream dependencies -- they are the last tasks to run.
Flows are not considered Finished
until all of their terminal tasks finish, and will remain Running
otherwise. By default, terminal tasks are also the flow's reference tasks, and therefore determine its state.
Run order
Prefect does not guarantee the order in which tasks will run, other than that tasks will not run before their upstream dependencies are evaluated. Therefore, you might have a terminal task that actually runs before other tasks in your flow, as long as it does not depend on those tasks.
# Reference tasks
When a flow runs, its state is determined by the state of its reference tasks. By default, a flow's reference tasks are its terminal tasks, which includes any task that has no downstream tasks. If the reference tasks are all successful (including any skipped tasks), the flow is considered a Success
. If any reference tasks fail, the flow is considered Failed
. No matter what state the reference tasks are in, the flow is considered Pending
if any of its tasks are unfinished.
with Flow('Reference Task Flow') as flow:
a, b, c = Task(), Task(), Task()
flow.add_edge(a, b)
flow.add_edge(b, c)
# by default, the reference tasks are the terminal tasks
assert flow.reference_tasks() == {c}
When should you change the reference tasks?
Generally, a flow's terminal tasks are appropriate reference tasks. However, there are times when that isn't the case.
Consider a flow that takes some action, and has a downstream task that only runs if the main action fails, in order to clean up the environment. If the main task fails and the clean up task is successful, was the flow as a whole successful? To some users, the answer is yes: the clean up operation worked as expected. To other users, the answer is no: the main purpose of the flow was not achieved.
Custom reference tasks allow you to alter this behavior to suit your needs.
# Serialization
Flow metadata can be serialized by calling the flow's serialize()
method.
# Retrieving tasks
Flows can contain many tasks, and it can be challenging to find the exact task you need. Fortunately, the get_tasks()
method makes this simpler. Pass any of the various task identification keys to the function, and it will retrieve any matching tasks.
# any tasks with the name "my task"
flow.get_tasks(name="my task")
# any tasks with the name "my task" and the "blue" tag
flow.get_tasks(name="my task", tags=["blue"])
# the task with the slug "x"
flow.get_tasks(slug="x")
# State handlers
State handlers allow users to provide custom logic that fires whenever a flow changes state. For example, you could send a Slack notification if the flow failed -- we actually think that's so useful we included it here!
State handlers must have the following signature:
state_handler(flow: Flow, old_state: State, new_state: State) -> State
The handler is called anytime the flow's state changes, and receives the flow itself, the old state, and the new state. The state that the handler returns is used as the flow's new state.
If multiple handlers are provided, they are called in sequence. Each one will receive the "true" old_state
and the new_state
generated by the previous handler.
# Terminal State Handlers
A flow's terminal_state_handler
allow users to provide custom logic for determining the
final State of a Flow.
Terminal state handlers must have the following signature:
terminal_state_handler(flow: Flow, state: State, reference_task_states: Set[State]) -> Optional[State]
where:
flow
is the current Flowstate
is the current state of the Flowreference_task_states
contains states for Flow reference tasks
For example, you may want to determine if reference tasks have failed and update the state with a custom message.
def custom_terminal_state_handler(
flow: Flow,
state: State,
reference_task_states: Set[State],
) -> Optional[State]:
failed = False
# iterate through reference task states looking for failures
for task_state in reference_task_states:
if task_state.is_failed():
failed = True
# update the terminal state of the Flow and return
if failed:
state.message = "Some important tasks have failed"
return state
class FailingTask(Task):
def run(self):
raise Exception
flow = Flow(
"my flow with custom terminal state handler",
terminal_state_handler=custom_terminal_state_handler,
)
← Tasks Parameters →