# Task Utilities

# Functions

top-level functions:                                                                                                                                                       



Context manager for setting task tags.


  • *tags ([str]): a list of tags to apply to the tasks created within the context manager
def add(x, y):
return x + y

with Flow("My Flow") as f:
with tags("math", "function"):
result = add(1, 5)

print(result.tags) # {"function", "math"}


(x, flow=None)[source]

Wraps a function, collection, or constant with the appropriate Task type. If a constant or collection of constants is passed, a Constant task is returned.


  • x (object): any Python object to convert to a prefect Task
  • flow (Flow, optional): Flow to which the prefect Task will be bound
  • a prefect Task representing the passed object


(message=None, duration=None)[source]

Utility function for pausing a task during execution to wait for manual intervention. Note that the entire task will be rerun if the user decides to run this task again! The only difference is that this utility will not raise a PAUSE signal. To bypass a PAUSE signal being raised, put the task into a Resume state.


  • message (str): an optional message for the Pause state.
  • duration (timedelta): an optional pause duration; otherwise infinite (well, 10 years)
    from prefect import Flow
from prefect.utilities.tasks import task, pause_task

def add(x, y):
z = y - x ## this code will be rerun after resuming from the pause!
if z == 0: ## this code will be rerun after resuming from the pause!
return x + y

with Flow("My Flow") as f:
res = add(4, 4)

state = f.run()
state.result[res] # a Paused state

state = f.run(task_states={res: Resume()})
state.result[res] # a Success state


(fn=None, **task_init_kwargs)[source]

A decorator for creating Tasks from functions.


  • fn (Callable): the decorated function
  • **task_init_kwargs (Any): keyword arguments that will be passed to the Task constructor on initialization.
  • FunctionTask: A instance of a FunctionTask
  • ValueError: if the provided function violates signature requirements for Task run methods

@task(name='hello', max_retries=3, retry_delay=1)
def hello(name):
print('hello, {}'.format(name))

with Flow("My Flow") as flow:
t1 = hello('foo')
t2 = hello('bar')

The decorator is best suited to Prefect's functional API, but can also be used with the imperative API.

def fn_without_args():
return 1

def fn_with_args(x):
return x

# both tasks work inside a functional flow context
with Flow("My Flow"):



Helper decorator for dealing with Task classes with attributes that serve as defaults for Task.run. Specifically, this decorator allows the author of a Task to identify certain keyword arguments to the run method which will fall back to self.ATTR_NAME if not explicitly provided to self.run. This pattern allows users to create a Task "template", whose default settings can be created at initialization but overrided in individual instances when the Task is called.


  • *attr_args (str): a splatted list of strings specifying which kwargs should fallback to attributes, if not provided at runtime. Note that the strings provided here must match keyword arguments in the run call signature, as well as the names of attributes of this Task.
  • Callable: the decorated / altered Task.run method
class MyTask(Task):
def init(self, a=None, b=None):
self.a = a
self.b = b

@defaults_from_attrs('a', 'b')
def run(self, a=None, b=None):
return a, b

task = MyTask(a=1, b=2)

task.run() # (1, 2)
task.run(a=99) # (99, 2)
task.run(a=None, b=None) # (None, None)


(func, *args, flow=None, **kwargs)[source]

Map a function that adds tasks to a flow elementwise across one or more tasks. Arguments that should not be mapped over should be wrapped with prefect.unmapped. Nested arguments that should be unnested before mapping over should be wrapped with prefect.flatten

This can be useful when wanting to create complicated mapped pipelines (e.g. ones using control flow components like case).


  • func (Callable): a function that adds tasks to a flow
  • *args: task arguments to map over
  • flow (Flow, optional): The flow to use, defaults to the current flow in context if no flow is specified. If specified, func must accept a flow keyword argument.
  • **kwargs: keyword task arguments to map over
  • Any: the output of func, if any

from prefect import task, case, apply_map
from prefect.tasks.control_flow import merge

def inc(x):
return x + 1

def is_even(x):
return x % 2 == 0

def inc_if_even(x):
with case(is_even(x), True):
x2 = inc(x)
return merge(x, x2)

with Flow("example") as flow:
apply_map(inc_if_even, range(10))

This documentation was auto-generated from commit ffa9a6c
on February 1, 2023 at 18:44 UTC