# Control Flow Tasks


# FilterTask

class

prefect.tasks.control_flow.filter.FilterTask

(filter_func=None, log_func=None, **kwargs)[source]

Task for filtering lists of results. The default filter removes NoResults, Nones and Exceptions, intended to be used for filtering out mapped results. Note that this task has a default trigger of all_finished and skip_on_upstream_skip=False.

Args:

  • filter_func (Callable, optional): a function to use for filtering results; this function should accept a single positional argument and return a boolean indicating whether this result should be kept or not. The default is to filter out NoResults and Exceptions
  • log_func (Callable, optional): a function to use for logging the result of the filter_func with info log level. If no function is passed in, no logging happens (default).
  • **kwargs (optional): additional keyword arguments to pass to the Task constructor
Example:

from prefect import task, Flow
from prefect.tasks.control_flow import FilterTask

default_filter = FilterTask()
even_filter = FilterTask(filter_func=lambda x: x % 2 == 0)
log_filter = FilterTask(
    filter_func=lambda x: x % 2 == 0,
    log_func=lambda x: f"Even numbers: {', '.join([str(y) for y in x])}",
)

@task
def add(x):
    return x + 1

@task
def div(x):
    return 1 / x

with Flow("filter-numbers") as flow:
    even_numbers = even_filter(add.map(x=[-1, 0, 1, 2, 3, 99, 314]))
    even_numbers_log = log_filter(add.map(x=[-1, 0, 1, 2, 3, 99, 314]))
    final_numbers = default_filter(div.map(even_numbers))

flow_state = flow.run()

print(flow_state.result[final_numbers].result)
# [0.5, 0.25, 0.01]

methods:                                                                                                                                                       

prefect.tasks.control_flow.filter.FilterTask.run

(task_results)[source]

Task run method.

Args:

  • task_results (List[Any]): a list of results from upstream tasks, which will be filtered using self.filter_func
Returns:
  • List[Any]: a filtered list of results



# case

class

prefect.tasks.control_flow.case.case

(task, value)[source]

A conditional block in a flow definition.

Used as a context-manager, case creates a block of tasks that are only run if the result of task is equal to value.

Args:

  • task (Task): The task to use in the comparison
  • value (Any): A constant the result of task will be compared with
Example:

A case block is similar to Python's if-blocks. It delimits a block of tasks that will only be run if the result of task is equal to value:

# Standard python code
if task == value:
    res = run_if_task_equals_value()
    other_task(res)

# Equivalent prefect code
with case(task, value):
    # Tasks created in this block are only run if the
    # result of `task` is equal to `value`
    res = run_if_task_equals_value()
    other_task(run)

The value argument can be any non-task object. Here we branch on a string result:

with Flow("example") as flow:
    cond = condition()

    with case(cond, "a"):
        run_if_cond_is_a()

    with case(cond, "b"):
        run_if_cond_is_b()

methods:                                                                                                                                                       

prefect.tasks.control_flow.case.case.add_task

(task, flow)[source]

Add a new task under the case statement.

Args:

  • task (Task): the task to add
  • flow (Flow): the flow to use



# Functions

top-level functions:                                                                                                                                                       

prefect.tasks.control_flow.conditional.switch

(condition, cases, mapped=False)[source]

Adds a SWITCH to a workflow.

The condition task is evaluated and the result is compared to the keys of the cases dictionary. The task corresponding to the matching key is run; all other tasks are skipped. Any tasks downstream of the skipped tasks are also skipped unless they set skip_on_upstream_skip=False.

Example:

@task
def condition():
return "b" # returning 'b' will take the b_branch

@task
def a_branch():
return "A Branch"

@task
def b_branch():
return "B Branch"

with Flow("switch-flow") as flow:
switch(condition, dict(a=a_branch, b=b_branch))



Args:
  • condition (Task): a task whose result forms the condition for the switch
  • cases (Dict[Any, Task]): a dict representing the "case" statements of the switch. The value of the condition task will be compared to the keys of this dict, and the matching task will be executed.
  • mapped (bool, optional): If true, the switch operation will be mapped over the arguments instead of applied directly. Defaults to False.
Raises:
  • PrefectWarning: if any of the tasks in "cases" have upstream dependencies, then this task will warn that those upstream tasks may run whether or not the switch condition matches their branch. The most common cause of this is passing a list of tasks as one of the cases, which adds the List task to the switch condition but leaves the tasks themselves upstream.

prefect.tasks.control_flow.conditional.ifelse

(condition, true_task, false_task, mapped=False)[source]

Builds a conditional branch into a workflow.

If the condition evaluates True(ish), the true_task will run. If it evaluates False(ish), the false_task will run. The task that doesn't run is Skipped, as are all downstream tasks that don't set skip_on_upstream_skip=False.

Args:

  • condition (Task): a task whose boolean result forms the condition for the ifelse
  • true_task (Task): a task that will be executed if the condition is True
  • false_task (Task): a task that will be executed if the condition is False
  • mapped (bool, optional): If true, the ifelse operation will be mapped over the arguments instead of applied directly. Defaults to False.

prefect.tasks.control_flow.conditional.merge

(*tasks, flow=None, mapped=False, **kwargs)[source]

Merges conditional branches back together.

A conditional branch in a flow results in one or more tasks proceeding and one or more tasks skipping. It is often convenient to merge those branches back into a single result. This function is a simple way to achieve that goal. By default this task will skip if all its upstream dependencies are also skipped.

The merge will return the first real result it encounters, or None. If multiple tasks might return a result, group them with a list.

Example:

    with Flow("My Flow"):
true_branch = ActionIfTrue()
false_branch = ActionIfFalse()
ifelse(CheckCondition(), true_branch, false_branch)

merged_result = merge(true_branch, false_branch)




Args:
  • *tasks (Task): tasks whose results should be merged into a single result. The tasks are assumed to all sit downstream of different switch branches, such that only one of them will contain a result and the others will all be skipped.
  • flow (Flow, optional): The flow to use, defaults to the current flow in context if no flow is specified
  • mapped (bool, optional): If true, the merge operation will be mapped over the arguments instead of applied directly. Defaults to False.
  • **kwargs (optional): kwargs to be passed to the Merge constructor.
Returns:
  • Task: a Task representing the merged result.

This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC