# Storage


The Prefect Storage interface encapsulates logic for storing flows. Each storage unit is able to store multiple flows (with the constraint of name uniqueness within a given unit).

# Storage

class

prefect.storage.base.Storage

(result=None, secrets=None, labels=None, add_default_labels=None, stored_as_script=False)[source]

Base interface for Storage objects. All kwargs present in this base class are valid on storage subclasses.

Args:

  • result (Result, optional): a default result to use for all flows which utilize this storage class
  • secrets (List[str], optional): a list of Prefect Secrets which will be used to populate prefect.context for each flow run. Used primarily for providing authentication credentials.
  • labels (List[str], optional): a list of labels to associate with this Storage.
  • add_default_labels (bool): If True, adds the storage specific default label (if applicable) to the storage labels. Defaults to the value specified in the configuration at flows.defaults.storage.add_default_labels.
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False

methods:                                                                                                                                                       

prefect.storage.base.Storage.add_flow

(flow)[source]

Method for adding a new flow to this Storage object.

Args:

  • flow (Flow): a Prefect Flow to add
Returns:
  • str: the location of the newly added flow in this Storage object

prefect.storage.base.Storage.build

()[source]

Build the Storage object.

Returns:

  • Storage: a Storage object that contains information about how and where each flow is stored



# Azure

class

prefect.storage.azure.Azure

(container, connection_string_secret=None, blob_name=None, overwrite=False, stored_as_script=False, **kwargs)[source]

Azure Blob storage class. This class represents the Storage interface for Flows stored as bytes in an Azure container.

This storage class optionally takes a blob_name which will be the name of the Flow object when stored in Azure. If this key is not provided the Flow upload name will take the form slugified-flow-name/slugified-current-timestamp.

Args:

  • container (str): the name of the Azure Blob Container to store the Flow
  • connection_string_secret (str, optional): the name of a Prefect secret that contains an Azure connection string for communicating with Blob storage. If not provided the value set in the environment as AZURE_STORAGE_CONNECTION_STRING will be used
  • blob_name (str, optional): a unique key to use for uploading this Flow to Azure. This is only useful when storing a single Flow using this storage object.
  • overwrite (bool, optional): if set, an existing blob with the same name will be overwritten. By default, an error will be thrown if the blob already exists.
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False
  • **kwargs (Any, optional): any additional Storage initialization options



# Bitbucket

class

prefect.storage.bitbucket.Bitbucket

(project, repo, workspace=None, host=None, path=None, ref=None, access_token_secret=None, cloud_username_secret=None, cloud_app_password_secret=None, **kwargs)[source]

Bitbucket storage class. This class represents the Storage interface for Flows stored in .py files in a Bitbucket repository. This is for Bitbucket Server or Bitbucket Cloud.

This class represents a mapping of flow name to file paths contained in the git repo, meaning that all flow files should be pushed independently. A typical workflow using this storage type might look like the following:

  • Compose flow .py file where flow has Bitbucket storage:
flow = Flow("my-flow")
flow.storage = Bitbucket(
    project="my.project", repo="my.repo", path="/flows/flow.py", ref="my-branch"
)

  • Push this flow.py file to the my.repo repository under /flows/flow.py inside "my.project" project.

  • Call prefect register -f flow.py to register this flow with Bitbucket storage.

Args:

  • project (str): project that the repository will be in. Not equivalent to a GitHub project; required value for all Bitbucket repositories.
  • repo (str): the repository name, with complete taxonomy.
  • workspace (str, optional): the workspace name. Bitbucket cloud only.
  • host (str, optional): the server host. Bitbucket server only.
  • path (str, optional): a path pointing to a flow file in the repo
  • ref (str, optional): a commit SHA-1 value or branch name or tag. If not specified, defaults to master branch for the repo.
  • access_token_secret (str, optional): the name of a Prefect secret that contains a Bitbucket access token to use when loading flows from this storage. Bitbucket Server only
  • cloud_username_secret (str, optional): the name of a Prefect secret that contains a Bitbucket username to use when loading flows from this storage. Bitbucket Cloud only.
  • cloud_app_password_secret (str, optional): the name of a Prefect secret that contains a Bitbucket app password, from the account associated with the cloud_username_secret, to use when loading flows from this storage. Bitbucket Cloud only.
  • **kwargs (Any, optional): any additional Storage initialization options



# CodeCommit

class

prefect.storage.codecommit.CodeCommit

(repo, path=None, commit=None, client_options=None, **kwargs)[source]

CodeCommit storage class. This class represents the Storage interface for Flows stored in .py files in a CodeCommit repository.

This class represents a mapping of flow name to file paths contained in the git repo, meaning that all flow files should be pushed independently. A typical workflow using this storage type might look like the following:

  • Compose flow .py file where flow has CodeCommit storage:
flow = Flow("my-flow")
flow.storage = CodeCommit(repo="my/repo", path="/flows/flow.py")

  • Push this flow.py file to the my/repo repository under /flows/flow.py.

  • Call prefect register -f flow.py to register this flow with CodeCommit storage.

Args:

  • repo (str): the name of a CodeCommit repository to store this Flow
  • path (str, optional): a path pointing to a flow file in the repo
  • commit (str, optional): fully quaified reference that identifies the commit that contains the file. For example, you can specify a full commit ID, a tag, a branch name, or a reference such as refs/heads/master. If none is provided, the head commit is used
  • client_options (dict, optional): Additional options for the boto3 client.
  • **kwargs (Any, optional): any additional Storage initialization options



# Docker

class

prefect.storage.docker.Docker

(registry_url=None, base_image=None, dockerfile=None, dockerignore=None, python_dependencies=None, image_name=None, image_tag=None, env_vars=None, files=None, prefect_version=None, local_image=False, ignore_healthchecks=False, base_url=None, tls_config=False, build_kwargs=None, prefect_directory="/opt/prefect", path=None, stored_as_script=False, extra_dockerfile_commands=None, **kwargs)[source]

Docker storage provides a mechanism for storing Prefect flows in Docker images and optionally pushing them to a registry.

A user specifies a registry_url, base_image and other optional dependencies (e.g., python_dependencies) and build() will create a temporary Dockerfile that is used to build the image.

Note that the base_image must be capable of pip installing. Note that registry behavior with respect to image names can differ between providers - for example, Google's GCR registry allows for registry URLs of the form gcr.io/my-registry/subdir/my-image-name whereas DockerHub requires the registry URL to be separate from the image name.

Custom modules can be packaged up during build by attaching the files and setting the PYTHONPATH to the location of those files. Otherwise the modules can be set independently when using a custom base image prior to the build here.

Docker(
    files={
        # absolute path source -> destination in image
        "/Users/me/code/mod1.py": "/modules/mod1.py",
        "/Users/me/code/mod2.py": "/modules/mod2.py",
    },
    env_vars={
        # append modules directory to PYTHONPATH
        "PYTHONPATH": "$PYTHONPATH:modules/"
    },
)

Args:

  • registry_url (str, optional): URL of a registry to push the image to; image will not be pushed if not provided
  • base_image (str, optional): the base image for this when building this image (e.g. python:3.7), defaults to the prefecthq/prefect image matching your python version and prefect core library version used at runtime.
  • dockerfile (str, optional): a path to a Dockerfile to use in building this storage; note that, if provided, your present working directory will be used as the build context
  • python_dependencies (List[str], optional): list of pip installable dependencies for the image
  • image_name (str, optional): name of the image to use when building, populated with a UUID after build
  • image_tag (str, optional): tag of the image to use when building, populated with a UUID after build
  • env_vars (dict, optional): a dictionary of environment variables to use when building
  • files (dict, optional): a dictionary of files or directories to copy into the image when building. Takes the format of {'src': 'dest'}
  • dockerignore (str, optional): an optional Path to a dockerignore file. when used with the files argument, the specified dockerignore will be included in the build context
  • prefect_version (str, optional): an optional branch, tag, or commit specifying the version of prefect you want installed into the container; defaults to the version you are currently using or "master" if your version is ahead of the latest tag
  • local_image (bool, optional): an optional flag whether or not to use a local docker image, if True then a pull will not be attempted
  • ignore_healthchecks (bool, optional): if True, the Docker healthchecks are not added to the Dockerfile. If False (default), healthchecks are included.
  • base_url (str, optional): a URL of a Docker daemon to use when for Docker related functionality. Defaults to DOCKER_HOST env var if not set
  • tls_config (Union[bool, docker.tls.TLSConfig], optional): a TLS configuration to pass to the Docker client. Documentation
  • build_kwargs (dict, optional): Additional keyword arguments to pass to Docker's build step. Documentation
  • prefect_directory (str, optional): Path to the directory where prefect configuration/flows should be stored inside the Docker image. Defaults to /opt/prefect.
  • path (str, optional): a direct path to the location of the flow file in the Docker image if stored_as_script=True.
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False
  • extra_dockerfile_commands (list[str], optional): list of Docker build commands which are injected at the end of generated DockerFile (before the health checks). Defaults to None
  • **kwargs (Any, optional): any additional Storage initialization options
Raises:
  • ValueError: if both base_image and dockerfile are provided



# GCS

class

prefect.storage.gcs.GCS

(bucket, key=None, project=None, stored_as_script=False, local_script_path=None, **kwargs)[source]

GoogleCloudStorage storage class. This class represents the Storage interface for Flows stored as bytes in an GCS bucket. To authenticate with Google Cloud, you need to ensure that your Prefect Agent has the proper credentials available (see https://cloud.google.com/docs/authentication/production for all the authentication options).

This storage class optionally takes a key which will be the name of the Flow object when stored in GCS. If this key is not provided the Flow upload name will take the form slugified-flow-name/slugified-current-timestamp.

Args:

  • bucket (str, optional): the name of the GCS Bucket to store the Flow
  • key (str, optional): a unique key to use for uploading this Flow to GCS. This is only useful when storing a single Flow using this storage object.
  • project (str, optional): the google project where any GCS API requests are billed to; if not provided, the project will be inferred from your Google Cloud credentials.
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False
  • local_script_path (str, optional): the path to a local script to upload when stored_as_script is set to True. If not set then the value of local_script_path from prefect.context is used. If neither are set then script will not be uploaded and users should manually place the script file in the desired key location in a GCS bucket.
  • **kwargs (Any, optional): any additional Storage initialization options



# Git

class

prefect.storage.git.Git

(flow_path, repo=None, repo_host="github.com", flow_name=None, git_token_secret_name=None, git_token_username=None, git_clone_url_secret_name=None, branch_name=None, tag=None, commit=None, clone_depth=1, use_ssh=False, format_access_token=True, **kwargs)[source]

Git storage class. This class represents the Storage interface for Flows stored in .py files in a git repository.

This class represents a mapping of flow name to file paths contained in the git repo, meaning that all flow files should be pushed independently.

A typical workflow using this storage type might look like the following:

  • Compose flow .py file where flow has Git storage:
flow = Flow("my-flow")
flow.storage = Git(repo="my/repo", flow_path="/flows/flow.py", repo_host="github.com")

  • Push this flow.py file to the my/repo repository under /flows/flow.py.

  • Call prefect register -f flow.py to register this flow with Git storage.

Args:

  • flow_path (str): A file path pointing to a .py file containing a flow
  • repo (str, optional): The name of a git repository to store this Flow. If not provided, the repo must be set using a secret. See git_clone_url_secret_name.
  • repo_host (str, optional): The site hosting the repo. Defaults to 'github.com'
  • flow_name (str, optional): A specific name of a flow to extract from a file. If not set then the first flow object retrieved from file will be returned.
  • git_token_secret_name (str, optional): The name of the Prefect Secret containing an access token for the repo. Defaults to None
  • git_token_username (str, optional): the username associated with git access token, if not provided it will default to repo owner
  • git_clone_url_secret_name (str, optional): the name of the Prefect Secret specifying the exact git url to clone, if provided it will override repo, repo_host, git_token_secret_name, git_token_username, use_ssh, and format_access_token parameters
  • branch_name (str, optional): branch name, if not specified and tag and commit_sha not specified, repo default branch latest commit will be used
  • tag (str, optional): tag name, if not specified and branch_name and commit_sha not specified, repo default branch latest commit will be used
  • commit (str, optional): a commit SHA-1 value, if not specified and branch_name and tag not specified, repo default branch latest commit will be used
  • clone_depth (int): the number of history revisions in cloning, defaults to 1
  • use_ssh (bool): if True, cloning will use ssh. Ssh keys must be correctly configured in the environment for this to work
  • format_access_token (bool): if True, the class will attempt to format access tokens for common git hosting sites
  • **kwargs (Any, optional): any additional Storage initialization options



# GitHub

class

prefect.storage.github.GitHub

(repo, path, ref=None, access_token_secret=None, base_url=None, **kwargs)[source]

GitHub storage class. This class represents the Storage interface for Flows stored in .py files in a GitHub repository.

This class represents a mapping of flow name to file paths contained in the git repo, meaning that all flow files should be pushed independently. A typical workflow using this storage type might look like the following:

  • Compose flow .py file where flow has GitHub storage:
flow = Flow("my-flow")
flow.storage = GitHub(repo="my/repo", path="/flows/flow.py")

  • Push this flow.py file to the my/repo repository under /flows/flow.py.

  • Call prefect register -f flow.py to register this flow with GitHub storage.

Args:

  • repo (str): the name of a GitHub repository to store this Flow
  • path (str): a path pointing to a flow file in the repo
  • ref (str, optional): a commit SHA-1 value, tag, or branch name. If not specified, defaults to the default branch for the repo.
  • access_token_secret (str, optional): The name of a Prefect secret that contains a GitHub access token to use when loading flows from this storage.
  • base_url(str, optional): the Github REST api url for the repo. If not specified, https://api.github.com is used.
  • **kwargs (Any, optional): any additional Storage initialization options



# GitLab

class

prefect.storage.gitlab.GitLab

(repo, host=None, path=None, ref=None, access_token_secret=None, **kwargs)[source]

GitLab storage class. This class represents the Storage interface for Flows stored in .py files in a GitLab repository.

This class represents a mapping of flow name to file paths contained in the git repo, meaning that all flow files should be pushed independently. A typical workflow using this storage type might look like the following:

  • Compose flow .py file where flow has GitLab storage:
flow = Flow("my-flow")
# Can also use `repo="123456"`
flow.storage = GitLab(repo="my/repo", path="/flows/flow.py", ref="my-branch")

  • Push this flow.py file to the my/repo repository under /flows/flow.py.

  • Call prefect register -f flow.py to register this flow with GitLab storage.

Args:

  • repo (str): the project path (i.e., 'namespace/project') or ID
  • host (str, optional): If using GitLab server, the server host. If not specified, defaults to Gitlab cloud.
  • path (str, optional): a path pointing to a flow file in the repo
  • ref (str, optional): a commit SHA-1 value or branch name
  • access_token_secret (str, optional): The name of a Prefect secret that contains a GitLab access token to use when loading flows from this storage.
  • **kwargs (Any, optional): any additional Storage initialization options



# Local

class

prefect.storage.local.Local

(directory=None, validate=True, path=None, stored_as_script=False, **kwargs)[source]

Local storage class. This class represents the Storage interface for Flows stored as bytes in the local filesystem.

Note that if you register a Flow with Prefect Cloud using this storage, your flow will automatically be labeled with your machine's hostname. This ensures that only agents that are known to be running on the same filesystem can run your flow.

Args:

  • directory (str, optional): the directory the flows will be stored in; defaults to ~/.prefect/flows. If it doesn't already exist, it will be created for you.
  • validate (bool, optional): a boolean specifying whether to validate the provided directory path; if True, the directory will be converted to an absolute path and created. Defaults to True
  • path (str, optional): a direct path to the location of the flow file if stored_as_script=True, otherwise this path will be used when storing the serialized, pickled flow. If stored_as_script=True, the direct path may be a file path (such as 'path/to/myflow.py') or a direct python path (such as 'myrepo.mymodule.myflow')
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False
  • **kwargs (Any, optional): any additional Storage initialization options



# Module

class

prefect.storage.module.Module

(module, **kwargs)[source]

A Prefect Storage class for referencing flows that can be imported from a python module.

Args:

  • module (str): The module to import the flow from.
  • **kwargs (Any, optional): any additional Storage options.
Example:

Suppose you have a python module myproject.flows that contains all your Prefect flows. If this module is installed and available in your execution environment you can use Module storage to reference and load the flows.

from prefect import Flow
from prefect.storage import Module

flow = Flow("module storage example")
flow.storage = Module("myproject.flows")

# Tip: you can use `__name__` to automatically reference the current module
flow.storage = Module(__name__)



# S3

class

prefect.storage.s3.S3

(bucket, key=None, stored_as_script=False, local_script_path=None, client_options=None, upload_options=None, **kwargs)[source]

S3 storage class. This class represents the Storage interface for Flows stored as bytes in an S3 bucket.

This storage class optionally takes a key which will be the name of the Flow object when stored in S3. If this key is not provided the Flow upload name will take the form slugified-flow-name/slugified-current-timestamp.

Args:

  • bucket (str): the name of the S3 Bucket to store Flows
  • key (str, optional): a unique key to use for uploading a Flow to S3. This is only useful when storing a single Flow using this storage object.
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False
  • local_script_path (str, optional): the path to a local script to upload when stored_as_script is set to True. If not set then the value of local_script_path from prefect.context is used. If neither are set then script will not be uploaded and users should manually place the script file in the desired key location in an S3 bucket.
  • client_options (dict, optional): Additional options for the boto3 client.
  • upload_options (dict, optional): Additional options for s3 client upload_file() and upload_fileobj() methods ExtraArgs argument
  • **kwargs (Any, optional): any additional Storage initialization options



# Webhook

class

prefect.storage.webhook.Webhook

(build_request_kwargs, build_request_http_method, get_flow_request_kwargs, get_flow_request_http_method, stored_as_script=False, flow_script_path=None, **kwargs)[source]

Webhook storage class. This class represents the Storage interface for Flows stored and retrieved with HTTP requests.

This storage class takes in keyword arguments which describe how to create the requests. These arguments' values can contain template strings which will be filled in dynamically from environment variables or Prefect secrets.

Args:

  • build_request_kwargs (dict): Dictionary of keyword arguments to the function from requests used to store the flow. Do not supply "data" to this argument, as it will be overwritten with the flow's content when .build() is run.
  • build_request_http_method (str): HTTP method identifying the type of request to execute when storing the flow. For example, "POST" for requests.post().
  • get_flow_request_kwargs (dict): Dictionary of keyword arguments to the function from requests used to retrieve the flow.
  • get_flow_request_http_method (str): HTTP method identifying the type of request to execute when storing the flow. For example, "GET" for requests.post().
  • stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a .py file. Defaults to False.
  • flow_script_path (str, optional): path to a local .py file that defines the flow. You must pass a value to this argument if stored_as_script is True. This script's content will be read into a string and attached to the request in build() as UTF-8 encoded binary data. Similarly, .get_flow() expects that the script's contents will be returned as binary data. This path will not be sent to Prefect Cloud and is only needed when running .build().
  • **kwargs (Any, optional): any additional Storage initialization options
Including Sensitive Data


It is common for requests used with this storage to need access to sensitive information.

For example:

  • auth tokens passed in headers like X-Api-Key or Authorization - auth information passed in to URL as query parameters

Webhook storage supports the inclusion of such sensitive information with templating. Any of the string values passed to build_flow_request_kwargs or get_flow_request_kwargs can include template strings like ${SOME_VARIABLE}. When .build() or .get_flow() is run, such values will be replaced with the value of environment variables or, when no matching environment variable is found, Prefect Secrets.

So, for example, to get an API key from an environment variable you can do the following

storage = Webhook(
    build_request_kwargs={
        "url": "some-service/upload",
        "headers" = {
            "Content-Type" = "application/octet-stream",
            "X-Api-Key": "${MY_COOL_ENV_VARIABLE}"
        }
    },
    build_request_http_method="POST",
)

You can also take advantage of this templating when only part of a string needs to be replaced.

storage = Webhook(
    get_flow_request_kwargs={
        "url": "some-service/download",
        "headers" = {
            "Accept" = "application/octet-stream",
            "Authorization": "Bearer ${MY_COOL_ENV_VARIABLE}"
        }
    },
    build_request_http_method="POST",
)



This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC