prefect.core.flow.Flow(name, schedule=None, executor=None, run_config=None, storage=None, tasks=None, edges=None, reference_tasks=None, state_handlers=None, on_failure=None, validate=None, result=None, terminal_state_handler=None)[source]
The Flow class is used as the representation of a collection of dependent Tasks. Flows track Task dependencies, parameters and provide the main API for constructing and managing workflows.
Initializing Flow example:
class MyTask(Task): def run(self): return "hello" task_1 = MyTask() flow = Flow(name="my_flow", tasks=[task_1]) flow.run()
Initializing Flow as context manager example:
@task def my_task(): return "hello" with Flow("my_flow") as flow: task_1 = my_task() flow.run()
name (str): The name of the flow. Cannot be
Noneor an empty string
schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
executor (prefect.executors.Executor, optional): The executor that the flow should use. If
None, the default executor configured in the runtime environment will be used.
run_config (prefect.run_configs.RunConfig, optional): The runtime configuration to use when deploying this flow.
storage (prefect.storage.Storage, optional): The unit of storage that the flow will be written into.
tasks ([Task], optional): If provided, a list of tasks that will initialize the flow
edges ([Edge], optional): A list of edges between tasks
reference_tasks ([Task], optional): A list of tasks that determine the final state of a flow
result (Result, optional): the result instance used to retrieve and store task results during execution
state_handlers (Iterable[Callable], optional): A list of state change handlers that will be called whenever the flow changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the flow instance, the old (prior) state, and the new (current) state, with the following signature:
state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]If multiple functions are passed, then the
new_stateargument will be the result of the previous handler.
on_failure (Callable, optional): A function with signature
fn(flow: Flow, state: State) -> Nonewhich will be called anytime this Flow enters a failure state
validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles and illegal keys) after adding the edges passed in the
edgesargument. Defaults to the value of
eager_edge_validationin your prefect configuration file.
terminal_state_handler (Callable, optional): A state handler that can be used to inspect or modify the final state of the flow run. Expects a callable with signature
handler(flow: Flow, state: State, reference_task_states: Set[State]) -> Optional[State], where
flowis the current Flow,
stateis the current state of the Flow run, and
reference_task_statesis set of states for all reference tasks in the flow. It should return either a new state for the flow run, or
None(in which case the existing state will be used).
prefect.core.flow.Flow.add_edge(upstream_task, downstream_task, key=None, mapped=False, flattened=False, validate=None)[source]
Add an edge in the flow between two tasks. All edges are directed beginning with an upstream task and ending with a downstream task.
Add a task to the flow if the task does not already exist. The tasks are uniquely identified by their
Returns a dictionary relating each task in the Flow to the set of all downstream edges for the task
Returns a dictionary relating each task in the Flow to the set of all upstream edges for the task
Adds a sequence of dependent tasks to the flow; each task should be provided as an argument (or splatted from a list).
Create and returns a copy of the current Flow.
Get flow and Prefect diagnostic information
Get all of the tasks downstream of a task
Get all of the edges leading from a task (i.e., the downstream edges)
Get all of the edges leading to a task (i.e., the upstream edges)
prefect.core.flow.Flow.get_tasks(name=None, slug=None, tags=None, task_type=None)[source]
Helper method for retrieving tasks from this flow based on certain attributes. The intersection of all provided attributes is taken, i.e., only those tasks which match all provided conditions are returned.
Reads a Flow from a file that was created with
Returns any parameters of the flow.
A flow's "reference tasks" are used to determine its state when it runs. If all the reference tasks are successful, then the flow run is considered successful. However, if any of the reference tasks fail, the flow is considered to fail. (Note that skips are counted as successes; see the state documentation for a full description of what is considered failure, success, etc.)
prefect.core.flow.Flow.register(project_name=None, build=True, labels=None, set_schedule_active=True, version_group_id=None, no_url=False, idempotency_key=None, **kwargs)[source]
Register the flow with Prefect Cloud; if no storage is present on the Flow, the default value from your config will be used and initialized with
prefect.core.flow.Flow.replace(old, new, validate=True)[source]
Performs an inplace replacement of the old task with the provided new task.
Get the tasks in the flow that have no upstream dependencies; these are the tasks that, by default, flow execution begins with.
prefect.core.flow.Flow.run(parameters=None, run_on_schedule=None, runner_cls=None, **kwargs)[source]
Run the flow on its schedule using an instance of a FlowRunner. If the Flow has no schedule, a single stateful run will occur (including retries).
prefect.core.flow.Flow.run_agent(token=None, show_flow_logs=False, log_to_cloud=None, api_key=None)[source]
Runs a Cloud agent for this Flow in-process.
Saves the Flow to a file by serializing it with cloudpickle. This method is recommended if you wish to separate out the building of your Flow from its registration.
Creates a serialized representation of the flow.
Generate a deterministic hash of the serialized flow. This is useful for determining if the flow has changed. If this hash is equal to a previous hash, no new information would be passed to the server on a call to
prefect.core.flow.Flow.set_dependencies(task, upstream_tasks=None, downstream_tasks=None, keyword_tasks=None, mapped=False, validate=None)[source]
Convenience function for adding task dependencies.
Get the tasks in this flow in a sorted manner. This allows us to find if any cycles exist in this flow's DAG.
Get the tasks in the flow that have no downstream dependencies
prefect.core.flow.Flow.update(flow, merge_parameters=False, validate=None, merge_reference_tasks=False)[source]
Take all tasks and edges in another flow and add it to this flow. When
Get all of the tasks upstream of a task
Checks that the flow is valid.
prefect.core.flow.Flow.visualize(flow_state=None, filename=None, format=None, horizontal=False)[source]
Creates graphviz object for representing the current flow; this graphviz object will be rendered inline if called from an IPython notebook, otherwise it will be rendered in a new window. If a
This documentation was auto-generated from commit ffa9a6c
on February 1, 2023 at 18:44 UTC