# Execution Environments
Execution environments encapsulate the logic for where your Flow should execute in Prefect Cloud.
DEPRECATED: Environment based configuration is deprecated, please transition to
configuring flow.run_config instead of flow.environment. See Flow configuration for more info.
# DaskKubernetesEnvironment
class
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment
(min_workers=1, max_workers=2, work_stealing=True, scheduler_logs=False, private_registry=False, docker_secret=None, labels=None, on_start=None, on_exit=None, metadata=None, scheduler_spec_file=None, worker_spec_file=None, image_pull_secret=None, log_k8s_errors=False)[source]DaskKubernetesEnvironment is an environment which deploys your flow on Kubernetes by spinning up a temporary Dask Cluster (using dask-kubernetes) and running the Prefect DaskExecutor on this cluster.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config instead of flow.environment. See Flow configuration for more info.
When running your flows that are registered with a private container registry, you should either specify the name of an image_pull_secret on the flow's DaskKubernetesEnvironment or directly set the imagePullSecrets on your custom worker/scheduler specs.
It is possible to provide a custom scheduler and worker spec YAML files through the scheduler_spec_file and worker_spec_file arguments. These specs (if provided) will be used in place of the defaults. Your spec files should be modeled after the job.yaml and worker_pod.yaml found here. The main aspects to be aware of are the command and args on the container. The following environment variables, required for cloud, do not need to be included––they are automatically added and populated during execution:
PREFECT__CLOUD__GRAPHQL-PREFECT__CLOUD__AUTH_TOKEN-PREFECT__CONTEXT__FLOW_RUN_ID-PREFECT__CONTEXT__NAMESPACE-PREFECT__CONTEXT__IMAGE-PREFECT__CLOUD__USE_LOCAL_SECRETS-PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS-PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS-PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS-PREFECT__LOGGING__LEVEL-PREFECT__CLOUD__SEND_FLOW_RUN_LOGS-PREFECT__LOGGING__EXTRA_LOGGERS
Note: the logging attributes are only populated if they are not already provided.
Args:
min_workers (int, optional): the minimum allowed number of Dask worker pods; defaults to 1max_workers (int, optional): the maximum allowed number of Dask worker pods; defaults to 1work_stealing (bool, optional): toggle Dask Distributed scheduler work stealing; defaults to False Only used when a custom scheduler spec is not provided. Enabling this may cause ClientErrors to appear when multiple Dask workers try to run the same Prefect Task.Warning:work_stealingif provided won't be appended to your customscheduler_spec_file. If wanted, don't forget to add it in your container env (DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING).scheduler_logs (bool, optional): log all Dask scheduler logs, defaults to Falseprivate_registry (bool, optional, DEPRECATED): a boolean specifying whether your Flow's Docker container will be in a private Docker registry; if so, requires a Prefect Secret containing your docker credentials to be set. Defaults toFalse.docker_secret (str, optional, DEPRECATED): the name of the Prefect Secret containing your Docker credentials; defaults to"DOCKER_REGISTRY_CREDENTIALS". This Secret should be a dictionary containing the following keys:"docker-server","docker-username","docker-password", and"docker-email".labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional): a function callback which will be called before the flow begins to runon_exit (Callable, optional): a function callback which will be called after the flow finishes its runmetadata (dict, optional): extra metadata to be set and serialized on this environmentscheduler_spec_file (str, optional): Path to a scheduler spec YAML fileworker_spec_file (str, optional): Path to a worker spec YAML fileimage_pull_secret (str, optional): optional name of animagePullSecretto use for the scheduler and worker pods. To specify multiple image pull secrets, provide a comma delimited string with no spaces, like"some-secret,other-secret". For more information go here.Warning:image_pull_secretif provided won't be appended to your customworker_spec_fileorscheduler_spec_file. If you want it, don't forget to add it in your spec files.log_k8s_errors (bool, optional): optional toggle to also log Kubernetes errors that may occur using the Prefect logger. Defaults toFalse.
| methods: |
|---|
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment.execute (flow)[source] |
Create a single Kubernetes job that spins up a dask scheduler, dynamically creates worker pods, and runs the flow.
|
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment.run (flow)[source] |
Run the flow using a temporary dask-kubernetes cluster.
|
prefect.environments.execution.dask.k8s.DaskKubernetesEnvironment.setup (flow)[source] |
Sets up any infrastructure needed for this environment
|
# DaskCloudProviderEnvironment
class
prefect.environments.execution.dask.cloud_provider.DaskCloudProviderEnvironment
(provider_class, adaptive_min_workers=None, adaptive_max_workers=None, security=None, executor_kwargs=None, labels=None, on_execute=None, on_start=None, on_exit=None, metadata=None, **kwargs)[source]DaskCloudProviderEnvironment creates Dask clusters using the Dask Cloud Provider project.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config instead of flow.environment. See Flow configuration for more info.
For each flow run, a new Dask cluster will be dynamically created and the flow will run using a DaskExecutor with the Dask scheduler address from the newly created Dask cluster. You can specify the number of Dask workers manually (for example, passing the kwarg n_workers) or enable adaptive mode by passing adaptive_min_workers and, optionally, adaptive_max_workers. This environment aims to provide a very easy path to Dask scalability for users of cloud platforms, like AWS.
NOTE: AWS Fargate Task (not Prefect Task) startup time can be slow, depending on docker image size. Total startup time for a Dask scheduler and workers can be several minutes. This environment is a much better fit for production deployments of scheduled Flows where there's little sensitivity to startup time. DaskCloudProviderEnvironment is a particularly good fit for automated deployment of Flows in a CI/CD pipeline where the infrastructure for each Flow should be as independent as possible, e.g. each Flow could have its own docker image, dynamically create the Dask cluster to run on, etc. However, for development and interactive testing, creating a Dask cluster manually with Dask Cloud Provider and then using LocalEnvironment with a DaskExecutor will result in a much better development experience.
(Dask Cloud Provider currently only supports AWS using either Fargate or ECS. Support for AzureML is coming soon.)
IMPORTANT By default, Dask Cloud Provider may create a Dask cluster in some environments (e.g. Fargate) that is accessible via a public IP, without any authentication, and configured to NOT encrypt network traffic. Please be conscious of security issues if you test this environment. (Also see pull requests 85 and 91 in the Dask Cloud Provider project.)
Args:
provider_class (class): Class of a provider from the Dask Cloud Provider projects. Current supported options areECSClusterandFargateCluster.adaptive_min_workers (int, optional): Minimum number of workers for adaptive mode. If this value is None, then adaptive mode will not be used and you should passn_workersor the appropriate kwarg for the provider class you are using.adaptive_max_workers (int, optional): Maximum number of workers for adaptive mode.security (Type[Security], optional): a Dask Security object fromdistributed.security.Security. Use this to connect to a Dask cluster that is enabled with TLS encryption. For more on using TLS with Dask see https://distributed.dask.org/en/latest/tls.htmlexecutor_kwargs (dict, optional): a dictionary of kwargs to be passed to the executor; defaults to an empty dictionarylabels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_execute (Callable[[Dict[str, Any], Dict[str, Any]], None], optional): a function callback which will be called before the flow begins to run. The callback function can examine the Flow run parameters and modify kwargs to be passed to the Dask Cloud Provider class's constructor prior to launching the Dask cluster for the Flow run. This allows for dynamically sizing the cluster based on the Flow run parameters, e.g. settings n_workers. The callback function's signature should be:on_execute(parameters: Dict[str, Any], provider_kwargs: Dict[str, Any]) -> NoneThe callback function may modify provider_kwargs (e.g.provider_kwargs["n_workers"] = 3) and any relevant changes will be used when creating the Dask cluster via a Dask Cloud Provider class.on_start (Callable, optional): a function callback which will be called before the flow begins to runon_exit (Callable, optional): a function callback which will be called after the flow finishes its runmetadata (dict, optional): extra metadata to be set and serialized on this environment**kwargs (dict, optional): additional keyword arguments to pass to boto3 forregister_task_definitionandrun_task
| methods: |
|---|
prefect.environments.execution.dask.cloud_provider.DaskCloudProviderEnvironment.execute (flow, **kwargs)[source] |
Execute a flow run on a dask-cloudprovider cluster.
|
# FargateTaskEnvironment
class
prefect.environments.execution.fargate.fargate_task.FargateTaskEnvironment
(launch_type="FARGATE", aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, region_name=None, executor=None, labels=None, on_start=None, on_exit=None, metadata=None, **kwargs)[source]FargateTaskEnvironment is an environment which deploys your flow as a Fargate task.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config instead of flow.environment. See Flow configuration for more info.
This environment requires AWS credentials and extra boto3 kwargs which are used in the creation and running of the Fargate task. When providing a custom container definition spec the first container in the spec must be the container that the flow runner will be executed on.
The following environment variables, required for cloud, do not need to be included––they are automatically added and populated during execution:
PREFECT__CLOUD__GRAPHQL-PREFECT__CLOUD__AUTH_TOKEN-PREFECT__CONTEXT__FLOW_RUN_ID-PREFECT__CONTEXT__IMAGE-PREFECT__CLOUD__USE_LOCAL_SECRETS-PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS-PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS-PREFECT__CLOUD__SEND_FLOW_RUN_LOGS-PREFECT__LOGGING__EXTRA_LOGGERS
Additionally, the following command will be applied to the first container:
$ /bin/sh -c "python -c 'import prefect; prefect.environments.execution.load_and_run_flow()'"
All kwargs are accepted that one would normally pass to boto3 for register_task_definition and run_task. For information on the kwargs supported visit the following links:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.register_task_definition
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
Note: You must provide family and taskDefinition with the same string so they match on run of the task.
The secrets and kwargs that are provided at initialization time of this environment are not serialized and will only ever exist on this object.
Args:
launch_type (str, optional): either FARGATE or EC2, defaults to FARGATEaws_access_key_id (str, optional): AWS access key id for connecting the boto3 client. Defaults to the value set in the environment variableAWS_ACCESS_KEY_IDorNoneaws_access_key_id (str, optional): AWS access key id for connecting the boto3 client. Defaults to the value set in the environment variableAWS_ACCESS_KEY_IDorNoneaws_secret_access_key (str, optional): AWS secret access key for connecting the boto3 client. Defaults to the value set in the environment variableAWS_SECRET_ACCESS_KEYorNoneaws_session_token (str, optional): AWS session key for connecting the boto3 client. Defaults to the value set in the environment variableAWS_SESSION_TOKENorNoneregion_name (str, optional): AWS region name for connecting the boto3 client. Defaults to the value set in the environment variableREGION_NAMEorNoneexecutor (Executor, optional): the executor to run the flow with. If not provided, the default executor will be used.labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional): a function callback which will be called before the flow begins to runon_exit (Callable, optional): a function callback which will be called after the flow finishes its runmetadata (dict, optional): extra metadata to be set and serialized on this environment**kwargs (dict, optional): additional keyword arguments to pass to boto3 forregister_task_definitionandrun_task
| methods: |
|---|
prefect.environments.execution.fargate.fargate_task.FargateTaskEnvironment.execute (flow)[source] |
Run the Fargate task that was defined for this flow.
|
prefect.environments.execution.fargate.fargate_task.FargateTaskEnvironment.setup (flow)[source] |
Register the task definition if it does not already exist.
|
# KubernetesJobEnvironment
class
prefect.environments.execution.k8s.job.KubernetesJobEnvironment
(job_spec_file=None, unique_job_name=False, executor=None, labels=None, on_start=None, on_exit=None, metadata=None)[source]KubernetesJobEnvironment is an environment which deploys your flow as a Kubernetes job. This environment allows (and requires) a custom job YAML spec to be provided.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config instead of flow.environment. See Flow configuration for more info.
When providing a custom YAML job spec the first container in the spec must be the container that the flow runner will be executed on.
The following environment variables, required for cloud, do not need to be included––they are automatically added and populated during execution:
PREFECT__CLOUD__GRAPHQL-PREFECT__CLOUD__AUTH_TOKEN-PREFECT__CONTEXT__FLOW_RUN_ID-PREFECT__CONTEXT__NAMESPACE-PREFECT__CONTEXT__IMAGE-PREFECT__CLOUD__USE_LOCAL_SECRETS-PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS-PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS-PREFECT__CLOUD__SEND_FLOW_RUN_LOGS-PREFECT__LOGGING__EXTRA_LOGGERS
Additionally, the following command will be applied to the first container: $ /bin/sh -c "python -c 'import prefect; prefect.environments.execution.load_and_run_flow()'"
Args:
job_spec_file (str, optional): Path to a job spec YAML file. This path is only used when the environment is built, so should refer to a file on the machine used to build the flow.unique_job_name (bool, optional): whether to use a unique name for each job created with this environment. Defaults toFalseexecutor (Executor, optional): the executor to run the flow with. If not provided, the default executor will be used.labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional): a function callback which will be called before the flow begins to runon_exit (Callable, optional): a function callback which will be called after the flow finishes its runmetadata (dict, optional): extra metadata to be set and serialized on this environment
| methods: |
|---|
prefect.environments.execution.k8s.job.KubernetesJobEnvironment.execute (flow, **kwargs)[source] |
Create a single Kubernetes job that runs the flow.
|
# LocalEnvironment
class
prefect.environments.execution.local.LocalEnvironment
(executor=None, labels=None, on_start=None, on_exit=None, metadata=None)[source]A LocalEnvironment class for executing a flow in the local process.
DEPRECATED: Environment based configuration is deprecated, please transition to configuring flow.run_config instead of flow.environment. See Flow configuration for more info.
Args:
executor (Executor, optional): the executor to run the flow with. If not provided, the default executor will be used.labels (List[str], optional): a list of labels, which are arbitrary string identifiers used by Prefect Agents when polling for workon_start (Callable, optional): a function callback which will be called before the flow begins to runon_exit (Callable, optional): a function callback which will be called after the flow finishes its runmetadata (dict, optional): extra metadata to be set and serialized on this environment
| methods: |
|---|
prefect.environments.execution.local.LocalEnvironment.execute (flow)[source] |
Executes the flow in the local process.
|
This documentation was auto-generated from commit n/a
on July 1, 2021 at 18:35 UTC
← TaskRunner Executors →