# Run Configuration
# RunConfig
Base class for RunConfigs.
A "run config" is an object for configuring a flow run, which maps to a specific agent backend.
Args:
env (dict, optional)
: Additional environment variables to setlabels (Iterable[str], optional)
: an iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work
methods: |
---|
prefect.run_configs.base.RunConfig.serialize ()[source] |
Returns a serialized version of the RunConfig.
|
# UniversalRun
Configure a flow-run to run universally on any Agent.
Unlike the other agent-specific RunConfig
classes (e.g. LocalRun
for the Local Agent), the UniversalRun
run config is compatible with any agent. This can be useful for flows that don't require any custom configuration other than flow labels, allowing for transitioning a flow between agent types without any config changes.
Args:
env (dict, optional)
: Additional environment variables to setlabels (Iterable[str], optional)
: an iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work
Use the defaults set on the agent:
flow.run_config = UniversalRun()
Configure additional labels:
flow.run_config = UniversalRun(env={"SOME_VAR": "value"}, labels=["label-1", "label-2"])
# LocalRun
Configure a flow-run to run as a Local Process.
Args:
env (dict, optional)
: Additional environment variables to set for the processworking_dir (str, optional)
: Working directory in which to start the process, must already exist. If not provided, will be run in the same directory as the agent.labels (Iterable[str], optional)
: an iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work
Use the defaults set on the agent:
flow.run_config = LocalRun()
Run in a specified working directory:
flow.run_config = LocalRun(working_dir="/path/to/working-directory")
Set an environment variable in the flow run process:
flow.run_config = LocalRun(env={"SOME_VAR": "value"})
# DockerRun
class
prefect.run_configs.docker.DockerRun
(image=None, env=None, labels=None, ports=None, host_config=None)[source]Configure a flow-run to run as a Docker container.
Args:
image (str, optional)
: The image to useenv (dict, optional)
: Additional environment variables to set in the containerlabels (Iterable[str], optional)
: an iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for workports (Iterable[int], optional)
: an iterable of ports to pass to the Docker Agent which are used to expose container ports.host_config (dict, optional)
: A dictionary or runtime arguments to pass to the Docker Agent. See link below for options. https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.create_host_config
Use the defaults set on the agent:
flow.run_config = DockerRun()
Set an environment variable in the flow run process:
flow.run_config = DockerRun(env={"SOME_VAR": "value"})
# KubernetesRun
class
prefect.run_configs.kubernetes.KubernetesRun
(job_template_path=None, job_template=None, image=None, env=None, cpu_limit=None, cpu_request=None, memory_limit=None, memory_request=None, service_account_name=None, image_pull_secrets=None, labels=None, image_pull_policy=None)[source]Configure a flow-run to run as a Kubernetes Job.
Kubernetes jobs are configured by filling in a job template at runtime. A job template can be specified either as a path (to be read in at runtime) or an in-memory object (which will be stored along with the flow in Prefect Cloud/Server). By default the job template configured on the Agent is used.
Args:
job_template_path (str, optional)
: Path to a job template to use. If a local path (no file scheme, or afile
/local
scheme), the job template will be loaded on initialization and stored on theKubernetesRun
object as thejob_template
field. Otherwise the job template will be loaded at runtime on the agent. Supported runtime file schemes include (s3
,gcs
, andagent
(for paths local to the runtime agent)).job_template (str or dict, optional)
: An in-memory job template to use. Can be either a dict, or a YAML string.image (str, optional)
: The image to useenv (dict, optional)
: Additional environment variables to set on the jobcpu_limit (float or str, optional)
: The CPU limit to use for the jobcpu_request (float or str, optional)
: The CPU request to use for the jobmemory_limit (str, optional)
: The memory limit to use for the jobmemory_request (str, optional)
: The memory request to use for the jobservice_account_name (str, optional)
: A service account name to use for this job. If present, overrides any service account configured on the agent or in the job template.image_pull_secrets (list, optional)
: A list of image pull secrets to use for this job. If present, overrides any image pull secrets configured on the agent or in the job template.labels (Iterable[str], optional)
: an iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for workimage_pull_policy (str, optional)
: The imagePullPolicy to use for the job. https://kubernetes.io/docs/concepts/configuration/overview/#container-images
Use the defaults set on the agent:
flow.run_config = KubernetesRun()
Use a local job template, which is stored along with the Flow in Prefect Cloud/Server:
flow.run_config = KubernetesRun(
job_template_path="my_custom_template.yaml"
)
Use a job template stored in S3, but override the image and CPU limit:
flow.run_config = KubernetesRun(
job_template_path="s3://example-bucket/my_custom_template.yaml",
image="example/my-custom-image:latest",
cpu_limit=2,
)
Use an image not tagged with :latest, and set the image pull policy to Always
:
flow.run_config = KubernetesRun(
image="example/my-custom-image:my-tag,
image_pull_policy="Always"
)
Use a custom job_template
with a custom label (or any other necessary changes to the default).
Note: you can use the default job template, found at /prefect/src/prefect/agent/kubernetes/job_template.yaml
in the repository, as a base to build on. Once a job_template
is specified, the default is no longer used:
flow.run_config = KubernetesRun(
image="example/my-custom-image:my-tag,
job_template={
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"metadata": {
"labels": {
"my-custom-label": "something"
}
},
"spec": {
"containers": [
{
"name": "flow"
}
]
}
}
}
}
)
# ECSRun
class
prefect.run_configs.ecs.ECSRun
(task_definition=None, task_definition_path=None, task_definition_arn=None, image=None, env=None, cpu=None, memory=None, task_role_arn=None, execution_role_arn=None, run_task_kwargs=None, labels=None)[source]Configure a flow-run to run as an ECS Task.
ECS Tasks are composed of task definitions and runtime parameters.
Task definitions can be configured using either the task_definition
, task_definition_path
, or task_definition_arn
parameters. If neither is specified, the default configured on the agent will be used. At runtime this task definition will be registered once per flow version - subsequent runs of the same flow version will reuse the existing definition.
Runtime parameters can be specified via run_task_kwargs
. These will be merged with any runtime parameters configured on the agent when starting the task.
Note that certain Prefect specific environment variables defined within the task_definition
will be overwritten by the ECS Agent when a new task is run (see method get_run_task_kwargs of the ECSAgent). For example, although the variable PREFECT__LOGGING__LEVEL
might be defined within containerDefinitions
(of the task_definition
), it will first be ovewritten with the Prefect config and then by env
(if defined). Therefore, do not set any Prefect specific environment variables within task_definition
, task_definition_path
or task_definition_arn
. Instead, use the top-level ECSRun.env
setting.
Args:
task_definition (dict, optional)
: An in-memory task definition spec to use. The flow will be executed in a container namedflow
- if a container namedflow
isn't part of the task definition, Prefect will add a new container with that name. See the ECS.Client.register_task_definition docs for more information on task definitions. Note that this definition will be stored directly in Prefect Cloud/Server - usetask_definition_path
instead if you wish to avoid this.task_definition_path (str, optional)
: Path to a task definition spec to use. If you started your agent in a local process (withprefect agent ecs start --label your-label
), then you can reference a path on that machine (use either no file scheme, or afile
/local
scheme). The task definition will then be loaded on initialization and stored on theECSRun
object as thetask_definition
field. Otherwise the task definition will be loaded at runtime on the agent. Supported runtime file schemes includes3
,gcs
, andagent
(for paths local to the runtime agent).task_definition_arn (str, optional)
: A pre-registered task definition ARN to use (eitherfamily:revision
, or a full task definition ARN). This task definition must include a container namedflow
(which will be used to run the flow).image (str, optional)
: The image to use for this task. If not provided, will be either inferred from the flow's storage (if usingDocker
storage), or use the default configured on the agent. Note that when using an ECR image, you need to explicitly provide theexecution_role_arn
, unless you set this role explicitly in a customtask_definition
. This means: either you provide a customtask_definition
that contains both the image andexecution_role_arn
(with ECR permissions), or you provide both a customimage
and anexecution_role_arn
simultaneously.env (dict, optional)
: Additional environment variables to set on the task.cpu (int or str, optional)
: The amount of CPU available to the task. Can be ether an integer in CPU units (e.g.1024
), or a string using vCPUs (e.g."1 vcpu"
). Note that ECS imposes strict limits on what values this can take, see the ECS documentation for more information.memory (int or str, optional)
: The amount of memory available to the task. Can be ether an integer in MiB (e.g.1024
), or a string with units (e.g."1 GB"
). Note that ECS imposes strict limits on what values this can take, see the ECS documentation for more information.task_role_arn (str, optional)
: The full ARN of the IAM role to use for this task. If you provide a customtask_definition
that already contains thetask_role_arn
, then you can skip this argument. You can also skip it when your flow doesn't need access to any AWS resources such as S3. But if you use S3 storage (or results) and you don't set a customtask_definition
, this role must be set explicitly.execution_role_arn (str, optional)
: The execution role ARN to use when registering a task definition for this task. If you provide a customtask_definition
that already contains theexecution_role_arn
, then you can skip this argument. But if you don't set any customtask_definition
, and you supply an ECRimage
, then you need to set anexecution_role_arn
explicitly to grant ECR access.run_task_kwargs (dict, optional)
: Additional keyword arguments to pass torun_task
when starting this task. It should be used only for runtime-specific arguments such ascpu
,memory
,cluster
, orlaunchType
, rather thantask_definition
-specific arguments. See the ECS.Client.run_task docs for more information.labels (Iterable[str], optional)
: An iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work
- Use the defaults set on the agent (assuming label "prod"):
flow.run_config = ECSRun(labels=["prod"])
- Use a custom task definition uploaded to S3 as a YAML file. This task definition contains a container named "flow", as well as a custom execution role and task role. The flow should be picked up for execution by an agent with label "prod" and should be deployed to a cluster called "prefectEcsCluster".
flow.run_config = ECSRun(
labels=["prod"],
task_definition_path="s3://bucket/flow_task_definition.yaml",
run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
An example of flow_task_definition.yaml
:
family: prefectFlow
requiresCompatibilities:
- FARGATE
networkMode: awsvpc
cpu: 1024
memory: 2048
taskRoleArn: arn:aws:iam::XXX:role/prefectTaskRole
executionRoleArn: arn:aws:iam::XXX:role/prefectECSAgentTaskExecutionRole
containerDefinitions:
- name: flow
image: "XXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest"
essential: true
environment:
- name: AWS_RETRY_MODE
value: "adaptive"
- name: AWS_MAX_ATTEMPTS
value: "10"
logConfiguration:
logDriver: awslogs
options:
awslogs-group: "/ecs/prefectEcsAgent"
awslogs-region: "us-east-1"
awslogs-stream-prefix: "ecs"
awslogs-create-group: "true"
- Use a custom pre-registered task definition (referenced by
family:revision
or by a full ARN), then deploy the flow to a cluster called "prefectEcsCluster":
flow.run_config = ECSRun(
labels=["prod"],
task_definition_arn="prefectFlow:1",
# or using a full ARN:
# task_definition_arn="arn:aws:ecs:us-east-1:XXX:task-definition/prefectFlow:1"
run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
- Let Prefect register a new task definition for a flow with S3 storage and a custom ECR image:
flow.run_config = ECSRun(
labels=["prod"],
task_role_arn="arn:aws:iam::XXXX:role/prefectTaskRole",
execution_role_arn="arn:aws:iam::XXXX:role/prefectECSAgentTaskExecutionRole",
image="XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest",
run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
- Use the default task definition, but override the image and CPU:
flow.run_config = ECSRun(
execution_role_arn="arn:aws:iam::XXXX:role/prefectECSAgentTaskExecutionRole",
image="XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest",
cpu="2 vcpu",
)
- Use an explicit task definition stored in S3 as a YAML file, but override the image and CPU:
flow.run_config = ECSRun(
task_definition="s3://bucket/flow_task_definition.yaml",
image="XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:latest",
cpu="2 vcpu",
)
- An example flow showing ECSRun with a
task_definition
defined as dictionary, and S3 storage:
import prefect
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow
from prefect.client.secrets import Secret
FLOW_NAME = "ecs_demo_ecr"
ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
bucket="your_bucket_name",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = ECSRun(
labels=["prod"],
task_definition=dict(
family=FLOW_NAME,
requiresCompatibilities=["FARGATE"],
networkMode="awsvpc",
cpu=1024,
memory=2048,
taskRoleArn=f"arn:aws:iam::{ACCOUNT_ID}:role/prefectTaskRole",
executionRoleArn=f"arn:aws:iam::{ACCOUNT_ID}:role/prefectECSAgentTaskExecutionRole",
containerDefinitions=[
dict(
name="flow",
image=f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/your_image_name:latest",
)
],
),
run_task_kwargs=dict(cluster="prefectEcsCluster"),
)
@task
def say_hi():
logger = prefect.context.get("logger")
logger.info("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
say_hi()
# VertexRun
class
prefect.run_configs.vertex.VertexRun
(env=None, labels=None, image=None, machine_type="e2-standard-4", scheduling=None, service_account=None, network=None, worker_pool_specs=None)[source]Configure a flow-run to run as a Vertex CustomJob.
The most common configuration is for the run to use a single machine, and prefect will provide a default workerPoolSpec from the machine_type argument. But this can be customized by providing the worker_pool_spec arg with the content described in workerPoolSpec. Prefect will always provide the container spec for the 0th workerPoolSpec, which is used to actually run the flow
Args:
env (dict, optional)
: Additional environment variables to set.labels (Iterable[str], optional)
: an iterable of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for workimage (str, optional)
: The image to use for this task. If not provided, will be either inferred from the flow's storage (if usingDocker
storage), or use the default configured on the agent.machine_type (str, optional)
: The machine type to use for the run, which controls the available CPU and memory. See machine_types for valid choices.scheduling (dictionary, optional)
: The scheduling for the custom job, which can be used to set a timeout (maximum run time). See CustomJobSpec for the expected format.service_account (str, optional)
: Specifies the service account to use as the run-as account in vertex. The agent submitting jobs must have act-as permission on this run-as account. If unspecified, the AI Platform Custom Code Service Agent for the CustomJob's project is used.network (str, optional)
: The full name of the Compute Engine network to which the Job should be peered. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network.worker_pool_specs (list, optional)
: The full worker pool specs for the custom job, which can be used for advanced configuration. If provided, it will overwrite the default constructed from the machine type arg.
Examples:
Use the defaults set on the agent:
flow.run_config = VertexRun()
Use the default task definition, but override the image and machine type:
flow.run_config = VertexRun(
image="example/my-custom-image:latest",
machine_type="e2-highmem-8",
)
1: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec 2: https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types
This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC