# Storage Options

Looking for the latest Prefect 2 release? Prefect 2 and Prefect Cloud 2 have been released for General Availability. See https://docs.prefect.io/ for details.

WARNING

Flows configured with environments are no longer supported. We recommend users transition to using RunConfig instead. See the Flow Configuration and Upgrading Environments to RunConfig documentation for more information.

See Storage for current Flow definition storage capabilities.

Prefect includes a variety of Storage options for saving flows.

As of Prefect version 0.9.0 every storage option except for Docker and GitHub will automatically have a result handler attached that will write results to the corresponding platform. For example, this means that if you register a flow with the Prefect API using the S3 storage option then the flow's results will also be written to the same S3 bucket through the use of the S3 Result.

Version 0.12.0 introduced a new way to store flows using the various cloud storage options (S3, GCS, and Azure) and then, in turn, run them using agents that orchestrate containerized environments. For more information see below.

Version 0.12.5 introduced script-based storage for all storage options. For more information see the Using script based flow storage idiom.

# Local

Local Storage is the default Storage option for all flows. This stores the flow as bytes in the local filesystem, which means it can only be run by a local agent running on the same machine.

from prefect import Flow
from prefect.storage import Local

flow = Flow("local-flow", storage=Local())

flow.storage.build()

The flow is now available under ~/.prefect/flows/local-flow.prefect.

Automatic Labels

Flows registered with this storage option will automatically be labeled with the hostname of the machine from which it was registered. This prevents agents not running on the same machine from attempting to run this flow.

Flow Results

In more recent releases of Prefect Core, your flow will default to using a LocalResult for persisting any task results in the same file location.

# Azure Blob Storage

Azure Storage is a storage option that uploads flows to an Azure Blob container.

from prefect import Flow
from prefect.storage import Azure

flow = Flow("azure-flow", storage=Azure(container="<my-container>", connection_string_secret="<my-connection-string>"))

flow.storage.build()

The flow is now available in the container under azure-flow/slugified-current-timestamp.

Flow Results

In more recent releases of Core your flow will default to using a AzureResult for persisting any task results in the same Azure container.

Azure Credentials

Azure Storage uses an Azure connection string for Azure authentication in aim to upload (build) or download flows, so make sure to provide a valid connection string for your Azure account. A connection string can be set as a secret or an environment variable AZURE_STORAGE_CONNECTION_STRING in run configuration if it is not passed as connection_string_secret.

# AWS S3

S3 Storage is a storage option that uploads flows to an AWS S3 bucket.

from prefect import Flow
from prefect.storage import S3

flow = Flow("s3-flow", storage=S3(bucket="<my-bucket>"))

flow.storage.build()

The flow is now available in the bucket under s3-flow/slugified-current-timestamp.

Flow Results

In more recent releases of Core your flow will default to using a S3Result for persisting any task results in the same S3 bucket.

AWS Credentials

S3 Storage uses AWS credentials the same way as boto3 which means both upload (build) and download (local agent) times need to have proper AWS credential configuration.

# Google Cloud Storage

GCS Storage is a storage option that uploads flows to a Google Cloud Storage bucket.

from prefect import Flow
from prefect.storage import GCS

flow = Flow("gcs-flow", storage=GCS(bucket="<my-bucket>"))

flow.storage.build()

The flow is now available in the bucket under gcs-flow/slugified-current-timestamp.

Flow Results

In more recent releases of Core your flow will default to using a GCSResult for persisting any task results in the same GCS location.

Google Cloud Credentials

GCS Storage uses Google Cloud credentials the same way as the standard google.cloud library which means both upload (build) and download (local agent) times need to have the proper Google Application Credentials configuration.

# GitHub

GitHub Storage is a storage option that reads flows from a GitHub repository as .py files at runtime.

For a detailed look on how to use GitHub storage visit the Using script based storage idiom.

GitHub Credentials

GitHub storage uses a personal access token for authenticating with repositories.

# GitLab

GitLab Storage is a storage option that reads flows from a GitHub repository as .py files at runtime.

Much of the GitHub example in the script based storage documentation applies to GitLab as well.

GitLab Credentials

GitLab storage uses a personal access token for authenticating with repositories.

GitLab Server

GitLab server users can point the host argument to their personal GitLab instance.

# Bitbucket

Bitbucket Storage is a storage option that reads flows from a GitHub repository as .py files at runtime.

Much of the GitHub example in the script based storage documentation applies to Bitbucket as well.

Bitbucket Credentials

Bitbucket storage uses a personal access token for authenticating with repositories.

Bitbucket Server

Bitbucket server users can point the host argument to their personal or organization Bitbucket instance.

Bitbucket Projects

Unlike GitHub or GitLab, Bitbucket organizes repositories in Projects and each repo must be associated with a Project. Bitbucket storage requires a project argument pointing to the correct project name.

# CodeCommit

CodeCommit Storage is a storage option that reads flows from a GitHub repository as .py files at runtime.

AWS Credentials

CodeCommit uses AWS credentials the same way as boto3 which means both upload (build) and download (local agent) times need to have proper AWS credential configuration.

# Docker

Docker Storage is a storage option that puts flows inside of a Docker image and pushes them to a container registry. This method of Storage has deployment compatability with the Docker Agent, Kubernetes Agent, and Fargate Agent.

from prefect import Flow
from prefect.storage import Docker

flow = Flow("gcs-flow", storage=Docker(registry_url="<my-registry.io>", image_name="my_flow"))

flow.storage.build()

The flow is now available in the container registry under my-registry.io/my_flow:slugified-current-timestamp. Note that each type of container registry uses a different format for image naming (e.g. DockerHub vs GCR).

If you do not specify a registry_url for your Docker Storage then the image will not attempt to be pushed to a container registry and instead the image will live only on your local machine. This is useful when using the Docker Agent because it will not need to perform a pull of the image since it already exists locally.

Container Registry Credentials

Docker Storage uses the Docker SDK for Python to build the image and push to a registry. Make sure you have the Docker daemon running locally and you are configured to push to your desired container registry. Additionally make sure whichever platform Agent deploys the container also has permissions to pull from that same registry.

# Webhook

Webhook Storage is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services.

For example, the following code could be used to store flows in DropBox.

from prefect import Flow
from prefect.storage import Webhook

flow = Flow(
    "dropbox-flow",
    storage=Webhook(
        build_request_kwargs={
            "url": "https://content.dropboxapi.com/2/files/upload",
            "headers": {
                "Content-Type": "application/octet-stream",
                "Dropbox-API-Arg": json.dumps(
                    {
                        "path": "/Apps/prefect-test-app/dropbox-flow.flow",
                        "mode": "overwrite",
                        "autorename": False,
                        "strict_conflict": True,
                    }
                ),
                "Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}"
            },
        },
        build_request_http_method="POST",
        get_flow_request_kwargs={
            "url": "https://content.dropboxapi.com/2/files/download",
            "headers": {
                "Accept": "application/octet-stream",
                "Dropbox-API-Arg": json.dumps(
                    {"path": "/Apps/prefect-test-app/dropbox-flow.flow"}
                ),
                "Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}"
            },
        },
        get_flow_request_http_method="POST",
    )
)

flow.storage.build()

Template strings in ${} are used to reference sensitive information. Given ${SOME_TOKEN}, this storage object will first look in environment variable SOME_TOKEN and then fall back to Prefect secrets SOME_TOKEN. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud.

# Non-Docker Storage for Containerized Environments

Prefect allows for flows to be stored in cloud storage services and executed in containerized environments. This has the added benefit of rapidly deploying new versions of flows without having to rebuild images each time. To enable this functionality add an image name to the flow's Environment metadata.

from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.storage import S3

flow = Flow("example")

# set flow storage
flow.storage = S3(bucket="my-flows")

# set flow environment
flow.environment = LocalEnvironment(metadata={"image": "repo/name:tag"})

This example flow can now be run using an agent that orchestrates containerized environments. When the flow is run the image set in the environment's metadata will be used and inside that container the flow will be retrieved from the storage object (which is S3 in this example).

# starting a kubernetes agent that will pull flows stored in S3
prefect agent kubernetes start -l s3-flow-storage

Default Labels

The addition of these default labels can be disabled by passing add_default_labels=False to the flow's storage option. If this is set then the agents can ignore having to also set these labels. For more information on labels visit the documentation.

# Authentication for using Cloud Storage with Containerized Environments

One thing to keep in mind when using cloud storage options in conjunction with containerized environments is authentication. Since the flow is being retrieved from inside a container then that container must be authenticated to pull the flow from whichever cloud storage it has set. This means that at runtime the container needs to have the proper authentication.

Prefect has a couple default secrets which could be used for off-the-shelf authentication. Using the above snippet as an example it is possible to create an AWS_CREDENTIALS Prefect secret that will automatically be used to pull the flow from S3 storage at runtime without having to configure authentication in the image directly.

flow.storage = S3(bucket="my-flows", secrets=["AWS_CREDENTIALS"])

flow.environment = LocalEnvironment(metadata={"image": "prefecthq/prefect"})

Dependencies

It is important to make sure that the image set in the environment's metadata contains the dependencies required to use the storage option. For example, using S3 storage requires Prefect's aws dependencies.

These are generally packaged with custom built images or optionally you could use the prefecthq/prefect image which contains all of Prefect's orchestration extras.