# Static Dask Cluster on Kubernetes
This recipe is for a flow deployed to Kubernetes using a shared static Dask cluster. This Dask cluster runs on the same Kubernetes cluster that the flow runs on.
Note that for most deployments we recommend using temporary per-flow clusters, rather than a single long-running Dask cluster (although there are use cases for both). See the Dask Executor documentation for more information.
# Kubernetes Manifests
Below we provide an example Kubernetes
Manifest
for deploying a static Dask cluster on Kubernetes. It starts a cluster with 2
workers, with the scheduler listening at tcp://dask-scheduler:8786
.
For a production deployment you may be interested in using something like the Dask Helm Chart instead - the manifests below are only provided as an example.
apiVersion: apps/v1
kind: Deployment
metadata:
name: dask-scheduler
labels:
app: dask-scheduler
spec:
replicas: 1
selector:
matchLabels:
app: dask-scheduler
template:
metadata:
labels:
app: dask-scheduler
spec:
containers:
- name: dask-scheduler
image: prefecthq/prefect:latest
args:
- dask-scheduler
- --port
- "8786"
ports:
- containerPort: 8786
---
apiVersion: v1
kind: Service
metadata:
name: dask-scheduler
spec:
selector:
app: dask-scheduler
ports:
- port: 8786
targetPort: 8786
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: dask-worker
labels:
app: dask-worker
spec:
replicas: 2
selector:
matchLabels:
app: dask-worker
template:
metadata:
labels:
app: dask-worker
spec:
containers:
- name: dask-worker
image: prefecthq/prefect:latest
args:
[
dask-worker,
dask-scheduler:8786,
--no-bokeh,
--nthreads,
"4"
]
Required dependencies
When running Dask on Kubernetes you must ensure your image contains the
dependencies your flow needs to execute, either by using the flow's Docker
storage as the image for Dask or by building a custom image with all the
required dependencies. The manifest above uses the prefecthq/prefect:latest
image for both the Dask scheduler & worker pods, since our flow has no external
dependencies beyond Prefect.
# Flow Source
Here we create a flow configured with:
- A KubernetesRun run config, specifying that it should be run by a Kubernetes Agent.
- A DaskExecutor configured to connect to the static Dask cluster created above.
- A Docker storage, specifying that the flow source should be built and stored in a new Docker image.
from prefect import task, Flow
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
@task
def get_value():
return "Example!"
@task
def output_value(value):
print(value)
with Flow("Static Dask Cluster Example") as flow:
value = get_value()
output_value(value)
flow.run_config = KubernetesRun()
flow.executor = DaskExecutor("tcp://dask-scheduler:8786")
flow.storage = Docker(registry_url="gcr.io/dev/", image_name="dask-k8s-flow", image_tag="0.1.0")