# Control Flow Tasks
# FilterTask
class
prefect.tasks.control_flow.filter.FilterTask
(filter_func=None, log_func=None, **kwargs)[source]Task for filtering lists of results. The default filter removes NoResult
s, None
s and Exceptions, intended to be used for filtering out mapped results. Note that this task has a default trigger of all_finished
and skip_on_upstream_skip=False
.
Args:
filter_func (Callable, optional)
: a function to use for filtering results; this function should accept a single positional argument and return a boolean indicating whether this result should be kept or not. The default is to filter outNoResult
s and Exceptionslog_func (Callable, optional)
: a function to use for logging the result of the filter_func with info log level. If no function is passed in, no logging happens (default).**kwargs (optional)
: additional keyword arguments to pass to the Task constructor
from prefect import task, Flow
from prefect.tasks.control_flow import FilterTask
default_filter = FilterTask()
even_filter = FilterTask(filter_func=lambda x: x % 2 == 0)
log_filter = FilterTask(
filter_func=lambda x: x % 2 == 0,
log_func=lambda x: f"Even numbers: {', '.join([str(y) for y in x])}",
)
@task
def add(x):
return x + 1
@task
def div(x):
return 1 / x
with Flow("filter-numbers") as flow:
even_numbers = even_filter(add.map(x=[-1, 0, 1, 2, 3, 99, 314]))
even_numbers_log = log_filter(add.map(x=[-1, 0, 1, 2, 3, 99, 314]))
final_numbers = default_filter(div.map(even_numbers))
flow_state = flow.run()
print(flow_state.result[final_numbers].result)
# [0.5, 0.25, 0.01]
methods: |
---|
prefect.tasks.control_flow.filter.FilterTask.run (task_results)[source] |
Task run method.
|
# case
A conditional block in a flow definition.
Used as a context-manager, case
creates a block of tasks that are only run if the result of task
is equal to value
.
Args:
task (Task)
: The task to use in the comparisonvalue (Any)
: A constant the result oftask
will be compared with
A case
block is similar to Python's if-blocks. It delimits a block of tasks that will only be run if the result of task
is equal to value
:
# Standard python code
if task == value:
res = run_if_task_equals_value()
other_task(res)
# Equivalent prefect code
with case(task, value):
# Tasks created in this block are only run if the
# result of `task` is equal to `value`
res = run_if_task_equals_value()
other_task(run)
The value
argument can be any non-task object. Here we branch on a string result:
with Flow("example") as flow:
cond = condition()
with case(cond, "a"):
run_if_cond_is_a()
with case(cond, "b"):
run_if_cond_is_b()
methods: |
---|
prefect.tasks.control_flow.case.case.add_task (task, flow)[source] |
Add a new task under the case statement.
|
# Functions
top-level functions: |
---|
prefect.tasks.control_flow.conditional.switch (condition, cases, mapped=False)[source] |
Adds a SWITCH to a workflow.
Args:
|
prefect.tasks.control_flow.conditional.ifelse (condition, true_task, false_task, mapped=False)[source] |
Builds a conditional branch into a workflow.
|
prefect.tasks.control_flow.conditional.merge (*tasks, flow=None, mapped=False, **kwargs)[source] |
Merges conditional branches back together.
Args:
|
This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC