# Task Utilities


# Functions

top-level functions:                                                                                                                                                       

prefect.utilities.tasks.tags

(*tags)[source]

Context manager for setting task tags.

Args:

  • *tags ([str]): a list of tags to apply to the tasks created within the context manager
Example:
@task
def add(x, y):
return x + y

with Flow("My Flow") as f:
with tags("math", "function"):
result = add(1, 5)

print(result.tags) # {"function", "math"}

prefect.utilities.tasks.as_task

(x, flow=None)[source]

Wraps a function, collection, or constant with the appropriate Task type. If a constant or collection of constants is passed, a Constant task is returned.

Args:

  • x (object): any Python object to convert to a prefect Task
  • flow (Flow, optional): Flow to which the prefect Task will be bound
Returns:
  • a prefect Task representing the passed object

prefect.utilities.tasks.pause_task

(message=None, duration=None)[source]

Utility function for pausing a task during execution to wait for manual intervention. Note that the entire task will be rerun if the user decides to run this task again! The only difference is that this utility will not raise a PAUSE signal. To bypass a PAUSE signal being raised, put the task into a Resume state.

Args:

  • message (str): an optional message for the Pause state.
  • duration (timedelta): an optional pause duration; otherwise infinite (well, 10 years)
Example:
    from prefect import Flow
from prefect.utilities.tasks import task, pause_task

@task
def add(x, y):
z = y - x ## this code will be rerun after resuming from the pause!
if z == 0: ## this code will be rerun after resuming from the pause!
pause_task()
return x + y

with Flow("My Flow") as f:
res = add(4, 4)

state = f.run()
state.result[res] # a Paused state

state = f.run(task_states={res: Resume()})
state.result[res] # a Success state


prefect.utilities.tasks.task

(fn=None, **task_init_kwargs)[source]

A decorator for creating Tasks from functions.

Args:

  • fn (Callable): the decorated function
  • **task_init_kwargs (Any): keyword arguments that will be passed to the Task constructor on initialization.
Returns:
  • FunctionTask: A instance of a FunctionTask
Raises:
  • ValueError: if the provided function violates signature requirements for Task run methods
Usage:


@task(name='hello', max_retries=3, retry_delay=1)
def hello(name):
print('hello, {}'.format(name))

with Flow("My Flow") as flow:
t1 = hello('foo')
t2 = hello('bar')



The decorator is best suited to Prefect's functional API, but can also be used with the imperative API.


@task
def fn_without_args():
return 1

@task
def fn_with_args(x):
return x

# both tasks work inside a functional flow context
with Flow("My Flow"):
fn_without_args()
fn_with_args(1)

prefect.utilities.tasks.defaults_from_attrs

(*attr_args)[source]

Helper decorator for dealing with Task classes with attributes that serve as defaults for Task.run. Specifically, this decorator allows the author of a Task to identify certain keyword arguments to the run method which will fall back to self.ATTR_NAME if not explicitly provided to self.run. This pattern allows users to create a Task "template", whose default settings can be created at initialization but overrided in individual instances when the Task is called.

Args:

  • *attr_args (str): a splatted list of strings specifying which kwargs should fallback to attributes, if not provided at runtime. Note that the strings provided here must match keyword arguments in the run call signature, as well as the names of attributes of this Task.
Returns:
  • Callable: the decorated / altered Task.run method
Example:
class MyTask(Task):
def init(self, a=None, b=None):
self.a = a
self.b = b

@defaults_from_attrs('a', 'b')
def run(self, a=None, b=None):
return a, b

task = MyTask(a=1, b=2)

task.run() # (1, 2)
task.run(a=99) # (99, 2)
task.run(a=None, b=None) # (None, None)

prefect.utilities.tasks.apply_map

(func, *args, flow=None, **kwargs)[source]

Map a function that adds tasks to a flow elementwise across one or more tasks. Arguments that should not be mapped over should be wrapped with prefect.unmapped. Nested arguments that should be unnested before mapping over should be wrapped with prefect.flatten

This can be useful when wanting to create complicated mapped pipelines (e.g. ones using control flow components like case).

Args:

  • func (Callable): a function that adds tasks to a flow
  • *args: task arguments to map over
  • flow (Flow, optional): The flow to use, defaults to the current flow in context if no flow is specified. If specified, func must accept a flow keyword argument.
  • **kwargs: keyword task arguments to map over
Returns:
  • Any: the output of func, if any
Example:


from prefect import task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x):
return x + 1

@task
def is_even(x):
return x % 2 == 0

def inc_if_even(x):
with case(is_even(x), True):
x2 = inc(x)
return merge(x, x2)

with Flow("example") as flow:
apply_map(inc_if_even, range(10))

This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC