# Upgrading Environments to RunConfig

Looking for the latest Prefect 2 release? Prefect 2 and Prefect Cloud 2 have been released for General Availability. See https://docs.prefect.io/ for details.

Prefect 0.14.0 included a new Flow configuration system based on RunConfig objects. This replaces the previous system based on Environment objects, with Environment based configuration being deprecated.

If you never configured flow.environment explicitly on your flow, your upgrade process should be seamless. Your flows will automatically transition to use the new flow.run_config system.

If you did set an Environment explicitly on a flow, you'll want to transition your flows to use an equivalent RunConfig. Below we'll outline a few common environment setups, and their equivalents using run-configs.

Note that while Environment based configuration is deprecated, support for environments will stick around for several versions. Your old flows should continue to run fine, giving you time to figure out a good transition plan.

# LocalEnvironment

LocalEnvironment was the default environment in previous versions of Prefect. It would execute flow runs in the same process started by the agent.

The name was confusing to many users, since LocalEnvironment didn't require using the LocalAgent (it worked with any agent). This also meant that the LocalEnvironment couldn't easily contain any platform-specific configuration.

In contrast, RunConfig objects correspond to a specific agent type (e.g. LocalRun for LocalAgent, KubernetesRun for KubernetesAgent, ...), and contain platform-specific configuration options (e.g. image, ...). The exception to this is UniversalRun, which works with any agent (but only contains configuration for setting labels).

LocalEnvironment also contained an option to configure an Executor for the flow run - this option has been moved to the Flow itself. See Executor for more information.

  • If you configured an Executor on your LocalEnvironment, move that setting to the flow itself.

    # Replace this
    from prefect.executors import DaskExecutor
    from prefect.environments.execution import LocalEnvironment
    flow.environment = LocalEnvironment(executor=DaskExecutor())
    
    # With this
    from prefect.executors import DaskExecutor
    flow.executor = DaskExecutor()
    
  • If you only need to configure flow labels, you should use a UniversalRun.

    # Replace this
    from prefect.environments.execution import LocalEnvironment
    flow.environment = LocalEnvironment(labels=["label-1", "label-2"])
    
    # With this
    from prefect.run_configs import UniversalRun
    flow.run_config = UniversalRun(labels=["label-1", "label-2"])
    
  • If you need to configure platform-specific settings (e.g. an image), you should use the run-config that corresponds to your agent. For example, if using the Kubernetes Agent you'd use a KubernetesRun:

    # Replace this
    from prefect.environments.execution import LocalEnvironment
    flow.environment = LocalEnvironment(metadata={"image": "my-image"})
    
    # With this
    from prefect.run_configs import KubernetesRun
    flow.run_config = KubernetesRun(image="my-image")
    

    The different RunConfig types support different platform-specific settings that you may be interested in - see the RunConfig docs for more info.

# KubernetesJobEnvironment

KubernetesJobEnvironment provided support for configuring details of a Kubernetes Job on a Flow. This has been replaced with KubernetesRun.

Unlike KubernetesJobEnvironment, which only accepted a custom job spec stored as a local file, KubernetesRun accepts custom job specs stored locally, in memory (you can pass in a dict/yaml directly), or stored on a remote filesystem (S3 or GCS). There are also options for common settings (e.g. image, cpu_request/cpu_limit, memory_request/memory_limit, ...). See the examples in the KubernetesRun docs for more information.

  • If you configured an Executor on your KubernetesJobEnvironment, move that setting to the flow itself.

    # Replace this
    from prefect.executors import DaskExecutor
    from prefect.environments.execution import KubernetesJobEnvironment
    flow.environment = KubernetesJobEnvironment(executor=DaskExecutor())
    
    # With this
    from prefect.executors import DaskExecutor
    flow.executor = DaskExecutor()
    
  • If you provided a custom job spec file, you can convert directly by passing in job_template_path to KubernetesRun. Depending on what you're configuring, you may find the other arguments to KubernetesRun a better fit for your needs, see the KubernetesRun docs for more information.

    # Replace this
    from prefect.environments.execution import KubernetesJobEnvironment
    flow.environment = KubernetesJobEnvironment(job_spec_file="path/to/file.yaml")
    
    # With this
    from prefect.run_configs import KubernetesRun
    flow.run_config = KubernetesRun(job_spec_file="path/to/file.yaml")
    

# FargateTaskEnvironment

FargateTaskEnvironment provided support for configuring details of a Fargate Task on a Flow. This has been replaced with ECSRun.

ECSRun has similar config options as FargateTaskEnvironment, but also exposes raw configuration options for both registering and running ECS tasks. There are also options for common settings (e.g. image, cpu, memory, ...). See the examples in the ECSRun docs for more information.

Note that use of ECSRun requires running an ECS Agent, not the deprecated Fargate Agent.

  • If you configured an Executor on your FargateTaskEnvironment, move that setting to the flow itself.

    # Replace this
    from prefect.executors import DaskExecutor
    flow.environment = FargateTaskEnvironment(executor=DaskExecutor())
    
    # With this
    from prefect.executors import DaskExecutor
    flow.executor = DaskExecutor()
    
  • ECSRun (coupled with the ECS Agent) supports configuring ECS Tasks at a finer level than before. If you previously configured custom task definitions on the Fargate Agent, you may be better served by specifying these options via ECSRun objects instead. See the ECSRun API docs for a complete list of available options.

# DaskKubernetesEnvironment

DaskKubernetesEnvironment provided support for running Prefect on Kubernetes with Dask. With the 0.14.0 release, this would be replaced with

We split this configuration into two parts to make it easier to mix-and-match and enable deeper customization. One downside is that the configuration is a bit more verbose, but hopefully still relatively straightforward.

# Replace this
from prefect.environments.execution import DaskKubernetesEnvironment
flow.environment = DaskKubernetesEnvironment(
    min_workers=5, max_workers=10,
)

# With this
import prefect
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from dask_kubernetes import KubeCluster, make_pod_spec
# Configuration for the flow-runner pod goes here
flow.run_config = KubernetesRun()
# Configuration for the Dask cluster goes here
# - `cluster_class` takes any callable to create a new cluster. We define our
#    own helper function to forward the image name to be the same one used to
#    deploy the main flow-runner pod.
# - `adapt_kwargs` takes a dict of kwargs to pass to `cluster.adapt`. Here
#    we enable adaptive scaling between 5 and 10 workers.
flow.executor = DaskExecutor(
    cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
    adapt_kwargs={"minimum": 5, "maximum": 10},
)

With the above, you should have full customization of all Kubernetes objects created to execute the flow run. For more information on the various options, please see:

# DaskCloudProviderEnvironment

DaskCloudProviderEnvironment provided support for running Prefect on various cloud providers (originally just AWS ECS) with Dask using dask-cloudprovider. With the 0.14.0 release, this might be replaced with

We split this configuration into two parts to make it easier to mix-and-match and enable deeper customization. One downside is that the configuration is a bit more verbose, but hopefully still relatively straightforward.

# Replace this
from prefect.environments.execution import DaskCloudProviderEnvironment
flow.environment = DaskCloudProviderEnvironment(
    adaptive_min_workers=5, adaptive_max_workers=10,
)

# With this
import prefect
from prefect.executors import DaskExecutor
from prefect.run_configs import ECSRun
from dask_cloudprovider.aws import FargateCluster
# Configuration for the flow-runner task goes here
flow.run_config = ECSRun()
# Configuration for the Dask cluster goes here
# - `cluster_class` takes any callable to create a new cluster. We define our
#    own helper function to forward the image name to be the same one used to
#    deploy the main flow-runner task. If you statically know the image name,
#    you could skip the lambda and pass in:
#    
#    DaskExecutor(cluster_class=FargateCluster, cluster_kwargs={"image": "my-image"})
#    
# - `adapt_kwargs` takes a dict of kwargs to pass to `cluster.adapt`. Here
#    we enable adaptive scaling between 5 and 10 workers.
flow.executor = DaskExecutor(
    cluster_class=lambda: FargateCluster(image=prefect.context.image),
    adapt_kwargs={"minimum": 5, "maximum": 10},
)

With the above, you should have full customization of all AWS objects created to execute the flow run. For more information on the various options, please see: