# Static Dask Cluster on Kubernetes
This recipe is for a flow deployed to Kubernetes using a shared static Dask cluster. This Dask cluster runs on the same Kubernetes cluster that the flow runs on.
Note that for most deployments we recommend using temporary per-flow clusters, rather than a single long-running Dask cluster (although there are use cases for both). See the Dask Executor documentation for more information.
# Kubernetes Manifests
Below we provide an example Kubernetes
for deploying a static Dask cluster on Kubernetes. It starts a cluster with 2
workers, with the scheduler listening at
For a production deployment you may be interested in using something like the Dask Helm Chart instead - the manifests below are only provided as an example.
apiVersion: apps/v1 kind: Deployment metadata: name: dask-scheduler labels: app: dask-scheduler spec: replicas: 1 selector: matchLabels: app: dask-scheduler template: metadata: labels: app: dask-scheduler spec: containers: - name: dask-scheduler image: prefecthq/prefect:latest args: - dask-scheduler - --port - "8786" ports: - containerPort: 8786 --- apiVersion: v1 kind: Service metadata: name: dask-scheduler spec: selector: app: dask-scheduler ports: - port: 8786 targetPort: 8786 --- apiVersion: apps/v1 kind: Deployment metadata: name: dask-worker labels: app: dask-worker spec: replicas: 2 selector: matchLabels: app: dask-worker template: metadata: labels: app: dask-worker spec: containers: - name: dask-worker image: prefecthq/prefect:latest args: [ dask-worker, dask-scheduler:8786, --no-bokeh, --nthreads, "4" ]
When running Dask on Kubernetes you must ensure your image contains the
dependencies your flow needs to execute, either by using the flow's Docker
storage as the image for Dask or by building a custom image with all the
required dependencies. The manifest above uses the
image for both the Dask scheduler & worker pods, since our flow has no external
dependencies beyond Prefect.
# Flow Source
Here we create a flow configured with:
- A KubernetesRun run config, specifying that it should be run by a Kubernetes Agent.
- A DaskExecutor configured to connect to the static Dask cluster created above.
- A Docker storage, specifying that the flow source should be built and stored in a new Docker image.
from prefect import task, Flow from prefect.executors import DaskExecutor from prefect.run_configs import KubernetesRun from prefect.storage import Docker @task def get_value(): return "Example!" @task def output_value(value): print(value) with Flow("Static Dask Cluster Example") as flow: value = get_value() output_value(value) flow.run_config = KubernetesRun() flow.executor = DaskExecutor("tcp://dask-scheduler:8786") flow.storage = Docker(registry_url="gcr.io/dev/", image_name="dask-k8s-flow", image_tag="0.1.0")