# Flow


# Flow

class

prefect.core.flow.Flow

(name, schedule=None, executor=None, environment=None, run_config=None, storage=None, tasks=None, edges=None, reference_tasks=None, state_handlers=None, on_failure=None, validate=None, result=None, terminal_state_handler=None)[source]

The Flow class is used as the representation of a collection of dependent Tasks. Flows track Task dependencies, parameters and provide the main API for constructing and managing workflows.

Initializing Flow example:

class MyTask(Task):
    def run(self):
        return "hello"

task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1])

flow.run()

Initializing Flow as context manager example:

@task
def my_task():
    return "hello"

with Flow("my_flow") as flow:
    task_1 = my_task()

flow.run()

Args:

  • name (str): The name of the flow. Cannot be None or an empty string
  • schedule (prefect.schedules.Schedule, optional): A default schedule for the flow
  • executor (prefect.executors.Executor, optional): The executor that the flow should use. If None, the default executor configured in the runtime environment will be used.
  • environment (prefect.environments.Environment, optional, DEPRECATED): The environment that the flow should be run in.
  • run_config (prefect.run_configs.RunConfig, optional): The runtime configuration to use when deploying this flow.
  • storage (prefect.storage.Storage, optional): The unit of storage that the flow will be written into.
  • tasks ([Task], optional): If provided, a list of tasks that will initialize the flow
  • edges ([Edge], optional): A list of edges between tasks
  • reference_tasks ([Task], optional): A list of tasks that determine the final state of a flow
  • result (Result, optional): the result instance used to retrieve and store task results during execution
  • state_handlers (Iterable[Callable], optional): A list of state change handlers that will be called whenever the flow changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the flow instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.
  • on_failure (Callable, optional): A function with signature fn(flow: Flow, state: State) -> None which will be called anytime this Flow enters a failure state
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles and illegal keys) after adding the edges passed in the edges argument. Defaults to the value of eager_edge_validation in your prefect configuration file.
  • terminal_state_handler (Callable, optional): A state handler that can be used to inspect or modify the final state of the flow run. Expects a callable with signature handler(flow: Flow, state: State, reference_task_states: Set[State]) -> Optional[State], where flow is the current Flow, state is the current state of the Flow run, and reference_task_states is set of states for all reference tasks in the flow. It should return either a new state for the flow run, or None (in which case the existing state will be used).

methods:                                                                                                                                                       

prefect.core.flow.Flow.add_edge

(upstream_task, downstream_task, key=None, mapped=False, flattened=False, validate=None)[source]

Add an edge in the flow between two tasks. All edges are directed beginning with an upstream task and ending with a downstream task.

Args:

  • upstream_task (Any): The task that the edge should start from. If it is not a Task, it will be converted into one.
  • downstream_task (Any): The task that the edge should end with. If it is not a Task, it will be converted into one.
  • key (str, optional): The key to be set for the new edge; the result of the upstream task will be passed to the downstream task's run() method under this keyword argument
  • mapped (bool, optional): Whether this edge represents a call to Task.map(); defaults to False
  • flattened (bool, optional): Whether the upstream task result is flattened
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles and illegal keys). Defaults to the value of eager_edge_validation in your prefect configuration file.
Returns:
  • prefect.core.edge.Edge: The Edge object that was successfully added to the flow
Raises:
  • ValueError: if the downstream_task is of type Parameter
  • ValueError: if the edge exists with this key and downstream_task

prefect.core.flow.Flow.add_task

(task)[source]

Add a task to the flow if the task does not already exist. The tasks are uniquely identified by their slug.

Args:

  • task (Task): the new Task to be added to the flow
Returns:
  • Task: the Task object passed in if the task was successfully added
Raises:
  • TypeError: if the task is not of type Task
  • ValueError: if the task.slug matches that of a task already in the flow

prefect.core.flow.Flow.all_downstream_edges

()[source]

Returns a dictionary relating each task in the Flow to the set of all downstream edges for the task

Returns:

  • dict with the key as tasks and the value as a set of downstream edges

prefect.core.flow.Flow.all_upstream_edges

()[source]

Returns a dictionary relating each task in the Flow to the set of all upstream edges for the task

Returns:

  • dict with the key as tasks and the value as a set of upstream edges

prefect.core.flow.Flow.chain

(*tasks, validate=None)[source]

Adds a sequence of dependent tasks to the flow; each task should be provided as an argument (or splatted from a list).

Args:

  • *tasks (list): A list of tasks to chain together
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles). Defaults to the value of eager_edge_validation in your prefect configuration file.
Returns:
  • A list of Edge objects added to the flow

prefect.core.flow.Flow.copy

()[source]

Create and returns a copy of the current Flow.

prefect.core.flow.Flow.diagnostics

(include_secret_names=False)[source]

Get flow and Prefect diagnostic information

Args:

  • include_secret_names (bool, optional): toggle output of Secret names, defaults to False. Note: Secret values are never returned, only their names.

prefect.core.flow.Flow.downstream_tasks

(task)[source]

Get all of the tasks downstream of a task

Args:

  • task (Task): The task that we want to find downstream tasks from
Returns:
  • set of Task objects which are downstream of task

prefect.core.flow.Flow.edges_from

(task)[source]

Get all of the edges leading from a task (i.e., the downstream edges)

Args:

  • task (Task): The task that we want to find edges leading from
Returns:
  • Set: set of all edges leading from that task
Raises:
  • ValueError: if task is not found in this flow

prefect.core.flow.Flow.edges_to

(task)[source]

Get all of the edges leading to a task (i.e., the upstream edges)

Args:

  • task (Task): The task that we want to find edges leading to
Returns:
  • Set: set of all edges leading from that task
Raises:
  • ValueError: if task is not found in this flow

prefect.core.flow.Flow.get_tasks

(name=None, slug=None, tags=None, task_type=None)[source]

Helper method for retrieving tasks from this flow based on certain attributes. The intersection of all provided attributes is taken, i.e., only those tasks which match all provided conditions are returned.

Args:

  • name (str, optional): the name of the task
  • slug (str, optional): the slug of the task
  • tags ([str], optional): an iterable of task tags
  • task_type (type, optional): a possible task class type
Returns:
  • [Task]: a list of tasks that meet the required conditions

prefect.core.flow.Flow.load

(fpath)[source]

Reads a Flow from a file that was created with flow.save().

Args:

  • fpath (str): either the absolute filepath where your Flow will be loaded from, or the name of the Flow you wish to load

prefect.core.flow.Flow.parameters

()[source]

Returns any parameters of the flow.

Returns:

  • set: a set of any Parameters in this flow

prefect.core.flow.Flow.reference_tasks

()[source]

A flow's "reference tasks" are used to determine its state when it runs. If all the reference tasks are successful, then the flow run is considered successful. However, if any of the reference tasks fail, the flow is considered to fail. (Note that skips are counted as successes; see the state documentation for a full description of what is considered failure, success, etc.)

By default, a flow's reference tasks are its terminal tasks. This means the state of a flow is determined by those tasks that have no downstream dependencies.

In some situations, users may want to customize this behavior; for example, if a flow's terminal tasks are "clean up" tasks for the rest of the flow that only run if certain (more relevant) tasks fail, we might not want them determining the overall state of the flow run. The flow.set_reference_tasks() method can be used to set such custom reference_tasks.

Please note that even if reference_tasks are provided that are not terminal tasks, the flow will not be considered "finished" until all terminal tasks have completed. Only then will state be determined, using the reference tasks.

Returns:

  • set of Task objects which are the reference tasks in the flow

prefect.core.flow.Flow.register

(project_name=None, build=True, labels=None, set_schedule_active=True, version_group_id=None, no_url=False, idempotency_key=None, **kwargs)[source]

Register the flow with Prefect Cloud; if no storage is present on the Flow, the default value from your config will be used and initialized with **kwargs.

Args:

  • project_name (str, optional): the project that should contain this flow.
  • build (bool, optional): if True, the flow's environment is built prior to serialization; defaults to True
  • labels (List[str], optional): a list of labels to add to this Flow's environment; useful for associating Flows with individual Agents; see http://docs.prefect.io/orchestration/agents/overview.html#labels
  • set_schedule_active (bool, optional): if False, will set the schedule to inactive in the database to prevent auto-scheduling runs (if the Flow has a schedule). Defaults to True. This can be changed later.
  • version_group_id (str, optional): the UUID version group ID to use for versioning this Flow in Cloud; if not provided, the version group ID associated with this Flow's project and name will be used.
  • no_url (bool, optional): if True, the stdout from this function will not contain the URL link to the newly-registered flow in the Cloud UI
  • idempotency_key (str, optional): a key that, if matching the most recent registration call for this flow group, will prevent the creation of another flow version and return the existing flow id instead.
  • **kwargs (Any): if instantiating a Storage object from default settings, these keyword arguments will be passed to the initialization method of the default Storage class
Returns:
  • str: the ID of the flow that was registered

prefect.core.flow.Flow.replace

(old, new, validate=True)[source]

Performs an inplace replacement of the old task with the provided new task.

Args:

  • old (Task): the old task to replace
  • new (Task): the new task to replace the old with; if not a Prefect Task, Prefect will attempt to convert it to one
  • validate (boolean, optional): whether to validate the Flow after the replace has been completed; defaults to True
Raises:
  • ValueError: if the old task is not a part of this flow

prefect.core.flow.Flow.root_tasks

()[source]

Get the tasks in the flow that have no upstream dependencies; these are the tasks that, by default, flow execution begins with.

Returns:

  • set of Task objects that have no upstream dependencies

prefect.core.flow.Flow.run

(parameters=None, run_on_schedule=None, runner_cls=None, **kwargs)[source]

Run the flow on its schedule using an instance of a FlowRunner. If the Flow has no schedule, a single stateful run will occur (including retries).

Note that this command will block and run this Flow on its schedule indefinitely (if it has one); all task states will be stored in memory, and task retries will not occur until every Task in the Flow has had a chance to run.

Args:

  • parameters (Dict[str, Any], optional): values to pass into the runner
  • run_on_schedule (bool, optional): whether to run this flow on its schedule, or run a single execution; if not provided, will default to the value set in your user config
  • runner_cls (type): an optional FlowRunner class (will use the default if not provided)
  • **kwargs: additional keyword arguments; if any provided keywords match known parameter names, they will be used as such. Otherwise they will be passed to the FlowRunner.run() method
Raises:
  • ValueError: if this Flow has a Schedule with no more scheduled runs
  • ValueError: if the return_tasks keyword argument is provided
Returns:
  • State: the state of the flow after its final run

prefect.core.flow.Flow.run_agent

(token=None, show_flow_logs=False, log_to_cloud=None)[source]

Runs a Cloud agent for this Flow in-process.

Args:

  • token (str, optional): A Prefect Cloud API token with a RUNNER scope; will default to the token found in config.cloud.agent.auth_token
  • show_flow_logs (bool, optional): a boolean specifying whether the agent should re-route Flow run logs to stdout; defaults to False
  • log_to_cloud (bool, optional): a boolean specifying whether Flow run logs should be sent to Prefect Cloud; defaults to None which uses the config value

prefect.core.flow.Flow.save

(fpath=None)[source]

Saves the Flow to a file by serializing it with cloudpickle. This method is recommended if you wish to separate out the building of your Flow from its registration.

Args:

  • fpath (str, optional): the filepath where your Flow will be saved; defaults to ~/.prefect/flows/FLOW-NAME.prefect
Returns:
  • str: the full location the Flow was saved to

prefect.core.flow.Flow.serialize

(build=False)[source]

Creates a serialized representation of the flow.

Args:

  • build (bool, optional): if True, the flow's environment is built prior to serialization
Returns:
  • dict representing the flow
Raises:
  • ValueError: if build=True and the flow has no storage

prefect.core.flow.Flow.serialized_hash

(build=False)[source]

Generate a deterministic hash of the serialized flow. This is useful for determining if the flow has changed. If this hash is equal to a previous hash, no new information would be passed to the server on a call to flow.register()

Note that this will not always detect code changes since the task code is not included in the serialized flow sent to the server. That said, as long as the flow is "built" during registration, the code changes will be in effect even if a new version is not registered with the server.

Args:

  • build (bool, optional): if True, the flow's environment is built prior to serialization. Passed through to Flow.serialize().
Returns:
  • str: the hash of the serialized flow

prefect.core.flow.Flow.set_dependencies

(task, upstream_tasks=None, downstream_tasks=None, keyword_tasks=None, mapped=False, validate=None)[source]

Convenience function for adding task dependencies.

Args:

  • task (Any): a Task that will become part of the Flow. If the task is not a Task subclass, Prefect will attempt to convert it to one.
  • upstream_tasks ([Any], optional): Tasks that will run before the task runs. If any task is not a Task subclass, Prefect will attempt to convert it to one.
  • downstream_tasks ([Any], optional): Tasks that will run after the task runs. If any task is not a Task subclass, Prefect will attempt to convert it to one.
  • keyword_tasks ({key: Any}, optional): The results of these tasks will be provided to the task under the specified keyword arguments. If any task is not a Task subclass, Prefect will attempt to convert it to one.
  • mapped (bool, optional): Whether the upstream tasks (both keyed and non-keyed) should be mapped over; defaults to False. If True, any tasks wrapped in the prefect.utilities.edges.unmapped container will not be mapped over.
  • validate (bool, optional): Whether or not to check the validity of the flow (e.g., presence of cycles). Defaults to the value of eager_edge_validation in your Prefect configuration file.
Returns:
  • None

prefect.core.flow.Flow.set_reference_tasks

(tasks)[source]

Sets the reference_tasks for the flow. See flow.reference_tasks for more details.

Args:

  • tasks ([Task]): the tasks that should be set as a flow's reference tasks
Returns:
  • None

prefect.core.flow.Flow.sorted_tasks

(root_tasks=None)[source]

Get the tasks in this flow in a sorted manner. This allows us to find if any cycles exist in this flow's DAG.

Args:

  • root_tasks ([Tasks], optional): an Iterable of Task objects to start the sorting from
Returns:
  • tuple of task objects that were sorted
Raises:
  • ValueError: if a cycle is found in the flow's DAG

prefect.core.flow.Flow.terminal_tasks

()[source]

Get the tasks in the flow that have no downstream dependencies

Returns:

  • set of Task objects that have no downstream dependencies

prefect.core.flow.Flow.update

(flow, merge_parameters=False, validate=None, merge_reference_tasks=False)[source]

Take all tasks and edges in another flow and add it to this flow. When merge_parameters is set toTrue -- Duplicate parameters in the input flow are replaced with those in the flow being updated.

Args:

  • flow (Flow): A flow which is used to update this flow.
  • merge_parameters (bool, False): If True, duplicate parameters are replaced with parameters from the provided flow. Defaults to False. If True, validate will also be set to True.
  • validate (bool, optional): Whether or not to check the validity of the flow.
  • merge_reference_tasks(bool, False): If True, add reference tasks from the provided flow to the current flow reference tasks set.
Returns:
  • None

prefect.core.flow.Flow.upstream_tasks

(task)[source]

Get all of the tasks upstream of a task

Args:

  • task (Task): The task that we want to find upstream tasks of
Returns:
  • set of Task objects which are upstream of task

prefect.core.flow.Flow.validate

()[source]

Checks that the flow is valid.

Returns:

  • None
Raises:
  • ValueError: if edges refer to tasks that are not in this flow
  • ValueError: if specified reference tasks are not in this flow
  • ValueError: if any tasks do not have assigned IDs

prefect.core.flow.Flow.visualize

(flow_state=None, filename=None, format=None)[source]

Creates graphviz object for representing the current flow; this graphviz object will be rendered inline if called from an IPython notebook, otherwise it will be rendered in a new window. If a filename is provided, the object will not be rendered and instead saved to the location specified.

Args:

  • flow_state (State, optional): flow state object used to optionally color the nodes
  • filename (str, optional): a filename specifying a location to save this visualization to; if provided, the visualization will not be rendered automatically
  • format (str, optional): a format specifying the output file type; defaults to 'pdf'. Refer to http://www.graphviz.org/doc/info/output.html for valid formats
Raises:
  • ImportError: if graphviz is not installed



This documentation was auto-generated from commit n/a
on July 1, 2021 at 18:35 UTC