# Executors
Prefect Executors are responsible for running tasks in a flow. During execution of a flow run, a flow's executor will be initialized, used to execute all tasks in the flow, then shutdown.
Currently, the available executor options are:
LocalExecutor
: the default, no frills executor. All tasks are executed in a single thread, parallelism is not supported.LocalDaskExecutor
: an executor that runs ondask
primitives with a using either threads or processes.DaskExecutor
: the most feature-rich of the executors, this executor runs ondask.distributed
and has support for distributed execution.
Which executor you choose depends on the performance requirements and characteristics of your Flow. See the executors docs for more information.
# Executor
Base Executor class that all other executors inherit from.
# LocalExecutor
An executor that runs all functions synchronously and immediately in the main thread. To be used mainly for debugging purposes.
# LocalDaskExecutor
An executor that runs all functions locally using dask
and a configurable dask scheduler. Args:
scheduler (str)
: The local dask scheduler to use; common options are "threads", "processes", and "synchronous". Defaults to "threads".**kwargs (Any)
: Additional keyword arguments to pass to dask config
# DaskExecutor
class
prefect.executors.dask.DaskExecutor
(address=None, cluster_class=None, cluster_kwargs=None, adapt_kwargs=None, client_kwargs=None, debug=None, performance_report_path=None, disable_cancellation_event=False, watch_worker_status=None)[source]An executor that runs all functions using the dask.distributed
scheduler. By default a temporary distributed.LocalCluster
is created (and subsequently torn down) within the start()
contextmanager. To use a different cluster class (e.g. dask_kubernetes.KubeCluster
), you can specify cluster_class
/cluster_kwargs
. Alternatively, if you already have a dask cluster running, you can provide the address of the scheduler via the address
kwarg. Note that if you have tasks with tags of the form "dask-resource:KEY=NUM"
they will be parsed and passed as Worker Resources of the form {"KEY": float(NUM)}
to the Dask Scheduler. Args:
address (string, optional)
: address of a currently running dask scheduler; if one is not provided, a temporary cluster will be created inexecutor.start()
. Defaults toNone
.cluster_class (string or callable, optional)
: the cluster class to use when creating a temporary dask cluster. Can be either the full class name (e.g."distributed.LocalCluster"
), or the class itself.cluster_kwargs (dict, optional)
: addtional kwargs to pass to thecluster_class
when creating a temporary dask cluster.adapt_kwargs (dict, optional)
: additional kwargs to pass tocluster.adapt
when creating a temporary dask cluster. Note that adaptive scaling is only enabled ifadapt_kwargs
are provided.client_kwargs (dict, optional)
: additional kwargs to use when creating adask.distributed.Client
.debug (bool, optional)
: When running with a local cluster, settingdebug=True
will increase dask's logging level, providing potentially useful debug info. Defaults to thedebug
value in your Prefect configuration.performance_report_path (str, optional)
: An optional path for the dask performance report.disable_cancellation_event (bool, optional)
: By default, Prefect uses a Dask event to allow for better cancellation of task runs. Sometimes this can cause strain on the scheduler as each task needs to retrieve a client to check the status of the cancellation event. If set toFalse
, we will skip this check.watch_worker_status (bool, optional)
: By default, Prefect subscribes to Dask worker events and logs when a worker is added or removed. This provides a hook for users to extend behavior on worker changes. This setting isNone
by default and will be enabled unlessadapt_kwargs
is set, in which case it will be disabled. Adaptive clusters often require this feature to be disabled as they use the worker status events for scaling and only one subscriber is allowed. If you set the value toTrue
orFalse
, it will be respected regardless of the value ofadapt_kwargs
. Examples: Using a temporary local dask cluster:
executor = DaskExecutor()
``` Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use [dask-cloudprovider](https://cloudprovider.dask.org):
```python
executor = DaskExecutor(
cluster_class="dask_cloudprovider.FargateCluster",
cluster_kwargs={
"image": "prefecthq/prefect:latest",
"n_workers": 5,
...
},
)
``` Connecting to an existing dask cluster
```python
executor = DaskExecutor(address="192.0.2.255:8786")
```</li></ul>
---
<br>
<p class="auto-gen">This documentation was auto-generated from commit <a href='https://github.com/PrefectHQ/prefect/commit/bd9182edb27a85c7726a177a55152c121af5471b'>bd9182e</a> </br>on July 31, 2024 at 18:02 UTC</p>