# Execution Environments
Execution environments encapsulate the logic for where your Flow should execute in Prefect Cloud.
DEPRECATED: Environment based configuration is deprecated, please transition to
configuring flow.run_config
instead of flow.environment
. See Flow configuration for more info.
# DaskKubernetesEnvironment
class
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment
(min_workers=1, max_workers=2, work_stealing=True, scheduler_logs=False, private_registry=False, docker_secret=None, labels=None, on_start=None, on_exit=None, metadata=None, scheduler_spec_file=None, worker_spec_file=None, image_pull_secret=None, log_k8s_errors=False)[source]DaskKubernetesEnvironment is an environment which deploys your flow on Kubernetes by spinning up a temporary Dask Cluster (using dask-kubernetes) and running the Prefect DaskExecutor
on this cluster.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config
instead of flow.environment
. See Flow configuration for more info.
When running your flows that are registered with a private container registry, you should either specify the name of an image_pull_secret
on the flow's DaskKubernetesEnvironment
or directly set the imagePullSecrets
on your custom worker/scheduler specs.
It is possible to provide a custom scheduler and worker spec YAML files through the scheduler_spec_file
and worker_spec_file
arguments. These specs (if provided) will be used in place of the defaults. Your spec files should be modeled after the job.yaml and worker_pod.yaml found here. The main aspects to be aware of are the command
and args
on the container. The following environment variables, required for cloud, do not need to be included––they are automatically added and populated during execution:
PREFECT__CLOUD__GRAPHQL
-PREFECT__CLOUD__AUTH_TOKEN
-PREFECT__CONTEXT__FLOW_RUN_ID
-PREFECT__CONTEXT__NAMESPACE
-PREFECT__CONTEXT__IMAGE
-PREFECT__CLOUD__USE_LOCAL_SECRETS
-PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
-PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS
-PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
-PREFECT__LOGGING__LEVEL
-PREFECT__CLOUD__SEND_FLOW_RUN_LOGS
-PREFECT__LOGGING__EXTRA_LOGGERS
Note: the logging attributes are only populated if they are not already provided.
Args:
min_workers (int, optional)
: the minimum allowed number of Dask worker pods; defaults to 1max_workers (int, optional)
: the maximum allowed number of Dask worker pods; defaults to 1work_stealing (bool, optional)
: toggle Dask Distributed scheduler work stealing; defaults to False Only used when a custom scheduler spec is not provided. Enabling this may cause ClientErrors to appear when multiple Dask workers try to run the same Prefect Task.Warning
:work_stealing
if provided won't be appended to your customscheduler_spec_file
. If wanted, don't forget to add it in your container env (DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING
).scheduler_logs (bool, optional)
: log all Dask scheduler logs, defaults to Falseprivate_registry (bool, optional, DEPRECATED)
: a boolean specifying whether your Flow's Docker container will be in a private Docker registry; if so, requires a Prefect Secret containing your docker credentials to be set. Defaults toFalse
.docker_secret (str, optional, DEPRECATED)
: the name of the Prefect Secret containing your Docker credentials; defaults to"DOCKER_REGISTRY_CREDENTIALS"
. This Secret should be a dictionary containing the following keys:"docker-server"
,"docker-username"
,"docker-password"
, and"docker-email"
.labels (List[str], optional)
: a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional)
: a function callback which will be called before the flow begins to runon_exit (Callable, optional)
: a function callback which will be called after the flow finishes its runmetadata (dict, optional)
: extra metadata to be set and serialized on this environmentscheduler_spec_file (str, optional)
: Path to a scheduler spec YAML fileworker_spec_file (str, optional)
: Path to a worker spec YAML fileimage_pull_secret (str, optional)
: optional name of animagePullSecret
to use for the scheduler and worker pods. To specify multiple image pull secrets, provide a comma delimited string with no spaces, like"some-secret,other-secret"
. For more information go here.Warning
:image_pull_secret
if provided won't be appended to your customworker_spec_file
orscheduler_spec_file
. If you want it, don't forget to add it in your spec files.log_k8s_errors (bool, optional)
: optional toggle to also log Kubernetes errors that may occur using the Prefect logger. Defaults toFalse
.
methods: |
---|
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment.execute (flow)[source] |
Create a single Kubernetes job that spins up a dask scheduler, dynamically creates worker pods, and runs the flow.
|
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment.run (flow)[source] |
Run the flow using a temporary dask-kubernetes cluster.
|
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment.setup (flow)[source] |
Sets up any infrastructure needed for this environment
|
# DaskCloudProviderEnvironment
class
prefect.environments.execution.dask.cloud_provider.DaskCloudProviderEnvironment
(provider_class, adaptive_min_workers=None, adaptive_max_workers=None, security=None, executor_kwargs=None, labels=None, on_execute=None, on_start=None, on_exit=None, metadata=None, **kwargs)[source]DaskCloudProviderEnvironment creates Dask clusters using the Dask Cloud Provider project.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config
instead of flow.environment
. See Flow configuration for more info.
For each flow run, a new Dask cluster will be dynamically created and the flow will run using a DaskExecutor
with the Dask scheduler address from the newly created Dask cluster. You can specify the number of Dask workers manually (for example, passing the kwarg n_workers
) or enable adaptive mode by passing adaptive_min_workers
and, optionally, adaptive_max_workers
. This environment aims to provide a very easy path to Dask scalability for users of cloud platforms, like AWS.
NOTE: AWS Fargate Task (not Prefect Task) startup time can be slow, depending on docker image size. Total startup time for a Dask scheduler and workers can be several minutes. This environment is a much better fit for production deployments of scheduled Flows where there's little sensitivity to startup time. DaskCloudProviderEnvironment
is a particularly good fit for automated deployment of Flows in a CI/CD pipeline where the infrastructure for each Flow should be as independent as possible, e.g. each Flow could have its own docker image, dynamically create the Dask cluster to run on, etc. However, for development and interactive testing, creating a Dask cluster manually with Dask Cloud Provider and then using LocalEnvironment
with a DaskExecutor
will result in a much better development experience.
(Dask Cloud Provider currently only supports AWS using either Fargate or ECS. Support for AzureML is coming soon.)
IMPORTANT By default, Dask Cloud Provider may create a Dask cluster in some environments (e.g. Fargate) that is accessible via a public IP, without any authentication, and configured to NOT encrypt network traffic. Please be conscious of security issues if you test this environment. (Also see pull requests 85 and 91 in the Dask Cloud Provider project.)
Args:
provider_class (class)
: Class of a provider from the Dask Cloud Provider projects. Current supported options areECSCluster
andFargateCluster
.adaptive_min_workers (int, optional)
: Minimum number of workers for adaptive mode. If this value is None, then adaptive mode will not be used and you should passn_workers
or the appropriate kwarg for the provider class you are using.adaptive_max_workers (int, optional)
: Maximum number of workers for adaptive mode.security (Type[Security], optional)
: a Dask Security object fromdistributed.security.Security
. Use this to connect to a Dask cluster that is enabled with TLS encryption. For more on using TLS with Dask see https://distributed.dask.org/en/latest/tls.htmlexecutor_kwargs (dict, optional)
: a dictionary of kwargs to be passed to the executor; defaults to an empty dictionarylabels (List[str], optional)
: a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_execute (Callable[[Dict[str, Any], Dict[str, Any]], None], optional)
: a function callback which will be called before the flow begins to run. The callback function can examine the Flow run parameters and modify kwargs to be passed to the Dask Cloud Provider class's constructor prior to launching the Dask cluster for the Flow run. This allows for dynamically sizing the cluster based on the Flow run parameters, e.g. settings n_workers. The callback function's signature should be:on_execute(parameters: Dict[str, Any], provider_kwargs: Dict[str, Any]) -> None
The callback function may modify provider_kwargs (e.g.provider_kwargs["n_workers"] = 3
) and any relevant changes will be used when creating the Dask cluster via a Dask Cloud Provider class.on_start (Callable, optional)
: a function callback which will be called before the flow begins to runon_exit (Callable, optional)
: a function callback which will be called after the flow finishes its runmetadata (dict, optional)
: extra metadata to be set and serialized on this environment**kwargs (dict, optional)
: additional keyword arguments to pass to boto3 forregister_task_definition
andrun_task
methods: |
---|
prefect.environments.execution.dask.cloud_provider.DaskCloudProviderEnvironment.execute (flow, **kwargs)[source] |
Execute a flow run on a dask-cloudprovider cluster.
|
# FargateTaskEnvironment
class
prefect.environments.execution.fargate.fargate_task.FargateTaskEnvironment
(launch_type="FARGATE", aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, region_name=None, executor=None, labels=None, on_start=None, on_exit=None, metadata=None, **kwargs)[source]FargateTaskEnvironment is an environment which deploys your flow as a Fargate task.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config
instead of flow.environment
. See Flow configuration for more info.
This environment requires AWS credentials and extra boto3 kwargs which are used in the creation and running of the Fargate task. When providing a custom container definition spec the first container in the spec must be the container that the flow runner will be executed on.
The following environment variables, required for cloud, do not need to be included––they are automatically added and populated during execution:
PREFECT__CLOUD__GRAPHQL
-PREFECT__CLOUD__AUTH_TOKEN
-PREFECT__CONTEXT__FLOW_RUN_ID
-PREFECT__CONTEXT__IMAGE
-PREFECT__CLOUD__USE_LOCAL_SECRETS
-PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
-PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS
-PREFECT__CLOUD__SEND_FLOW_RUN_LOGS
-PREFECT__LOGGING__EXTRA_LOGGERS
Additionally, the following command will be applied to the first container:
$ /bin/sh -c "python -c 'import prefect; prefect.environments.execution.load_and_run_flow()'"
All kwargs
are accepted that one would normally pass to boto3 for register_task_definition
and run_task
. For information on the kwargs supported visit the following links:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.register_task_definition
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
Note: You must provide family
and taskDefinition
with the same string so they match on run of the task.
The secrets and kwargs that are provided at initialization time of this environment are not serialized and will only ever exist on this object.
Args:
launch_type (str, optional)
: either FARGATE or EC2, defaults to FARGATEaws_access_key_id (str, optional)
: AWS access key id for connecting the boto3 client. Defaults to the value set in the environment variableAWS_ACCESS_KEY_ID
orNone
aws_access_key_id (str, optional)
: AWS access key id for connecting the boto3 client. Defaults to the value set in the environment variableAWS_ACCESS_KEY_ID
orNone
aws_secret_access_key (str, optional)
: AWS secret access key for connecting the boto3 client. Defaults to the value set in the environment variableAWS_SECRET_ACCESS_KEY
orNone
aws_session_token (str, optional)
: AWS session key for connecting the boto3 client. Defaults to the value set in the environment variableAWS_SESSION_TOKEN
orNone
region_name (str, optional)
: AWS region name for connecting the boto3 client. Defaults to the value set in the environment variableREGION_NAME
orNone
executor (Executor, optional)
: the executor to run the flow with. If not provided, the default executor will be used.labels (List[str], optional)
: a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional)
: a function callback which will be called before the flow begins to runon_exit (Callable, optional)
: a function callback which will be called after the flow finishes its runmetadata (dict, optional)
: extra metadata to be set and serialized on this environment**kwargs (dict, optional)
: additional keyword arguments to pass to boto3 forregister_task_definition
andrun_task
methods: |
---|
prefect.environments.execution.fargate.fargate_task.FargateTaskEnvironment.execute (flow)[source] |
Run the Fargate task that was defined for this flow.
|
prefect.environments.execution.fargate.fargate_task.FargateTaskEnvironment.setup (flow)[source] |
Register the task definition if it does not already exist.
|
# KubernetesJobEnvironment
class
prefect.environments.execution.k8s.job.KubernetesJobEnvironment
(job_spec_file=None, unique_job_name=False, executor=None, labels=None, on_start=None, on_exit=None, metadata=None)[source]KubernetesJobEnvironment is an environment which deploys your flow as a Kubernetes job. This environment allows (and requires) a custom job YAML spec to be provided.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config
instead of flow.environment
. See Flow configuration for more info.
When providing a custom YAML job spec the first container in the spec must be the container that the flow runner will be executed on.
The following environment variables, required for cloud, do not need to be included––they are automatically added and populated during execution:
PREFECT__CLOUD__GRAPHQL
-PREFECT__CLOUD__AUTH_TOKEN
-PREFECT__CONTEXT__FLOW_RUN_ID
-PREFECT__CONTEXT__NAMESPACE
-PREFECT__CONTEXT__IMAGE
-PREFECT__CLOUD__USE_LOCAL_SECRETS
-PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS
-PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS
-PREFECT__CLOUD__SEND_FLOW_RUN_LOGS
-PREFECT__LOGGING__EXTRA_LOGGERS
Additionally, the following command will be applied to the first container: $ /bin/sh -c "python -c 'import prefect; prefect.environments.execution.load_and_run_flow()'"
Args:
job_spec_file (str, optional)
: Path to a job spec YAML file. This path is only used when the environment is built, so should refer to a file on the machine used to build the flow.unique_job_name (bool, optional)
: whether to use a unique name for each job created with this environment. Defaults toFalse
executor (Executor, optional)
: the executor to run the flow with. If not provided, the default executor will be used.labels (List[str], optional)
: a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional)
: a function callback which will be called before the flow begins to runon_exit (Callable, optional)
: a function callback which will be called after the flow finishes its runmetadata (dict, optional)
: extra metadata to be set and serialized on this environment
methods: |
---|
prefect.environments.execution.k8s.job.KubernetesJobEnvironment.execute (flow, **kwargs)[source] |
Create a single Kubernetes job that runs the flow.
|
# LocalEnvironment
class
prefect.environments.execution.local.LocalEnvironment
(executor=None, labels=None, on_start=None, on_exit=None, metadata=None)[source]A LocalEnvironment class for executing a flow in the local process.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config
instead of flow.environment
. See Flow configuration for more info.
Args:
executor (Executor, optional)
: the executor to run the flow with. If not provided, the default executor will be used.labels (List[str], optional)
: a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional)
: a function callback which will be called before the flow begins to runon_exit (Callable, optional)
: a function callback which will be called after the flow finishes its runmetadata (dict, optional)
: extra metadata to be set and serialized on this environment
methods: |
---|
prefect.environments.execution.local.LocalEnvironment.execute (flow)[source] |
Executes the flow in the local process.
|
This documentation was auto-generated from commit n/a
on July 1, 2021 at 18:35 UTC
← TaskRunner Executors →