|
Context manager for setting task tags.
Args: *tags ([str]) : a list of tags to apply to the tasks created within the context manager Example:
@task def add(x, y): return x + y
with Flow("My Flow") as f: with tags("math", "function"): result = add(1, 5)
print(result.tags)
|
prefect.utilities.tasks.as_task (x, flow=None) [source] |
Wraps a function, collection, or constant with the appropriate Task type. If a constant or collection of constants is passed, a Constant task is returned.
Args: x (object) : any Python object to convert to a prefect Task flow (Flow, optional) : Flow to which the prefect Task will be bound Returns: - a prefect Task representing the passed object
|
prefect.utilities.tasks.pause_task (message=None, duration=None) [source] |
Utility function for pausing a task during execution to wait for manual intervention. Note that the entire task will be rerun if the user decides to run this task again! The only difference is that this utility will not raise a PAUSE signal. To bypass a PAUSE signal being raised, put the task into a Resume state.
Args: message (str) : an optional message for the Pause state. duration (timedelta) : an optional pause duration; otherwise infinite (well, 10 years) Example:
from prefect import Flow from prefect.utilities.tasks import task, pause_task
@task def add(x, y): z = y - x if z == 0: pause_task() return x + y
with Flow("My Flow") as f: res = add(4, 4)
state = f.run() state.result[res]
state = f.run(task_states={res: Resume()}) state.result[res]
|
prefect.utilities.tasks.task (fn=None, **task_init_kwargs) [source] |
A decorator for creating Tasks from functions.
Args: fn (Callable) : the decorated function **task_init_kwargs (Any) : keyword arguments that will be passed to the Task constructor on initialization. Returns: FunctionTask : A instance of a FunctionTask Raises: ValueError : if the provided function violates signature requirements for Task run methods Usage:
@task(name='hello', max_retries=3, retry_delay=1) def hello(name): print('hello, {}'.format(name))
with Flow("My Flow") as flow: t1 = hello('foo') t2 = hello('bar')
The decorator is best suited to Prefect's functional API, but can also be used with the imperative API.
@task def fn_without_args(): return 1
@task def fn_with_args(x): return x
with Flow("My Flow"): fn_without_args() fn_with_args(1)
|
prefect.utilities.tasks.defaults_from_attrs (*attr_args) [source] |
Helper decorator for dealing with Task classes with attributes that serve as defaults for Task.run . Specifically, this decorator allows the author of a Task to identify certain keyword arguments to the run method which will fall back to self.ATTR_NAME if not explicitly provided to self.run . This pattern allows users to create a Task "template", whose default settings can be created at initialization but overrided in individual instances when the Task is called.
Args: *attr_args (str) : a splatted list of strings specifying which kwargs should fallback to attributes, if not provided at runtime. Note that the strings provided here must match keyword arguments in the run call signature, as well as the names of attributes of this Task. Returns: Callable : the decorated / altered Task.run method Example:
class MyTask(Task): def init(self, a=None, b=None): self.a = a self.b = b
@defaults_from_attrs('a', 'b') def run(self, a=None, b=None): return a, b
task = MyTask(a=1, b=2)
task.run() task.run(a=99) task.run(a=None, b=None)
|
prefect.utilities.tasks.apply_map (func, *args, flow=None, **kwargs) [source] |
Map a function that adds tasks to a flow elementwise across one or more tasks. Arguments that should not be mapped over should be wrapped with prefect.unmapped . Nested arguments that should be unnested before mapping over should be wrapped with prefect.flatten
This can be useful when wanting to create complicated mapped pipelines (e.g. ones using control flow components like case ).
Args: func (Callable) : a function that adds tasks to a flow *args : task arguments to map over flow (Flow, optional) : The flow to use, defaults to the current flow in context if no flow is specified. If specified, func must accept a flow keyword argument. **kwargs : keyword task arguments to map over Returns: Any : the output of func , if any Example:
from prefect import task, case, apply_map from prefect.tasks.control_flow import merge
@task def inc(x): return x + 1
@task def is_even(x): return x % 2 == 0
def inc_if_even(x): with case(is_even(x), True): x2 = inc(x) return merge(x, x2)
with Flow("example") as flow: apply_map(inc_if_even, range(10))
|