# Executors

Prefect Executors are responsible for running tasks in a flow. During execution of a flow run, a flow's executor will be initialized, used to execute all tasks in the flow, then shutdown.

Currently, the available executor options are:

  • LocalExecutor: the default, no frills executor. All tasks are executed in a single thread, parallelism is not supported.
  • LocalDaskExecutor: an executor that runs on dask primitives with a using either threads or processes.
  • DaskExecutor: the most feature-rich of the executors, this executor runs on dask.distributed and has support for distributed execution.

Which executor you choose depends on the performance requirements and characteristics of your Flow. See the executors docs for more information.

# Executor




Base Executor class that all other executors inherit from.

# LocalExecutor




An executor that runs all functions synchronously and immediately in the main thread. To be used mainly for debugging purposes.

# LocalDaskExecutor



(scheduler="threads", **kwargs)[source]

An executor that runs all functions locally using dask and a configurable dask scheduler. Args:

  • scheduler (str): The local dask scheduler to use; common options are "threads", "processes", and "synchronous". Defaults to "threads".
  • **kwargs (Any): Additional keyword arguments to pass to dask config

# DaskExecutor



(address=None, cluster_class=None, cluster_kwargs=None, adapt_kwargs=None, client_kwargs=None, debug=None, performance_report_path=None, disable_cancellation_event=False, watch_worker_status=None)[source]

An executor that runs all functions using the dask.distributed scheduler. By default a temporary distributed.LocalCluster is created (and subsequently torn down) within the start() contextmanager. To use a different cluster class (e.g. dask_kubernetes.KubeCluster), you can specify cluster_class/cluster_kwargs. Alternatively, if you already have a dask cluster running, you can provide the address of the scheduler via the address kwarg. Note that if you have tasks with tags of the form "dask-resource:KEY=NUM" they will be parsed and passed as Worker Resources of the form {"KEY": float(NUM)} to the Dask Scheduler. Args:

  • address (string, optional): address of a currently running dask scheduler; if one is not provided, a temporary cluster will be created in executor.start(). Defaults to None.
  • cluster_class (string or callable, optional): the cluster class to use when creating a temporary dask cluster. Can be either the full class name (e.g. "distributed.LocalCluster"), or the class itself.
  • cluster_kwargs (dict, optional): addtional kwargs to pass to the cluster_class when creating a temporary dask cluster.
  • adapt_kwargs (dict, optional): additional kwargs to pass to cluster.adapt when creating a temporary dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
  • client_kwargs (dict, optional): additional kwargs to use when creating a dask.distributed.Client.
  • debug (bool, optional): When running with a local cluster, setting debug=True will increase dask's logging level, providing potentially useful debug info. Defaults to the debug value in your Prefect configuration.
  • performance_report_path (str, optional): An optional path for the dask performance report.
  • disable_cancellation_event (bool, optional): By default, Prefect uses a Dask event to allow for better cancellation of task runs. Sometimes this can cause strain on the scheduler as each task needs to retrieve a client to check the status of the cancellation event. If set to False, we will skip this check.
  • watch_worker_status (bool, optional): By default, Prefect subscribes to Dask worker events and logs when a worker is added or removed. This provides a hook for users to extend behavior on worker changes. This setting is None by default and will be enabled unless adapt_kwargs is set, in which case it will be disabled. Adaptive clusters often require this feature to be disabled as they use the worker status events for scaling and only one subscriber is allowed. If you set the value to True or False, it will be respected regardless of the value of adapt_kwargs. Examples: Using a temporary local dask cluster:

executor = DaskExecutor()

``` Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use [dask-cloudprovider](https://cloudprovider.dask.org): 
executor = DaskExecutor(
        "image": "prefecthq/prefect:latest",
        "n_workers": 5,

``` Connecting to an existing dask cluster 
executor = DaskExecutor(address="")



<p class="auto-gen">This documentation was auto-generated from commit <a href='https://github.com/PrefectHQ/prefect/commit/ffa9a6c38ec55db7b153f57e7a14469272c344fb'>ffa9a6c</a> </br>on February 1, 2023 at 18:44 UTC</p>