# Concurrency limits Cloud

Standard Tier Feature

Setting global concurrency limits is a feature of Prefect Cloud's Standard Tier.

# Flow run limits Cloud

Sometimes, you want to limit the number of flow runs executing simultaneously. For example, you may have an agent on a machine that cannot handle the load of many flow runs.

Prefect Cloud provides functionality to limit the number of simultaneous flow runs. This limit is based on flow run labels. Flow runs can be given as many labels as you wish, and each label can be provided a concurrency limit. If a flow has multiple labels, it will only run if all the labels have available concurrency. Flow run label concurrency limits are enforced globally across your entire team, and labels without explicit limits are considered to have unlimited concurrency.

# Labeling your flow runs

Labels can be set on a Flow via the run config

from prefect import Flow
from prefect.run_configs import UniversalRun

flow = Flow("example-flow")
flow.run_config = UniversalRun(labels=["small-machine"])

Labels can also be set per flow run, see the flow run creation documentation for details.

# Setting concurrency limits

# Inspecting concurrency limits

# Task run limits Cloud

There are situations in which you want to actively prevent too many tasks from running simultaneously; for example, if many tasks across multiple Flows are designed to interact with a database that only allows 10 max connections, we want to ensure that no more than 10 tasks which connect to this database are running at any given time.

Prefect Cloud has built-in functionality for achieving this; tasks can be "tagged" with as many tags as you wish, and each tag can optionally be provided a concurrency limit. If a task has multiple tags, it will only run if all tags have available concurrency. Tag concurrency limits are enforced globally across your entire team, and tags without explicit limits are considered to have unlimited concurrency.

Note that the ability to alter or update your task tag concurrency limits requires tenant admin level permissions.

# Tagging your tasks

Tagging your tasks is as simple as providing a list of tags to your Task at initialization via the tags keyword argument:

from prefect import task, Task


# While using the task decorator
@task(tags=["database", "aws"])
def my_task():
    pass

# While using a `Task` subclass
class MyTask(Task):
    pass

my_task = MyTask(tags=["webservice"])

These tags are then available via the tags attribute on your Task instances. More information about Task settings and initialization keywords can be found in the corresponding API documentation.

# Setting concurrency limits

Once you have tagged your various tasks and registered your Flow(s) to Prefect Cloud, you can set concurrency limits on as few or as many tags as you wish. You can set limits in any of the following three ways.

# Inspecting concurrency limits

If you wish to query for the currently set limit on a tag, or see all of your limits across all of your tags, you can do so in any of the following three ways.

# Execution Behavior

Task tag limits are checked whenever a task run attempts to enter a Running state in Prefect Cloud. If there are no concurrency slots available for any one of your Task's tags, the Task will instead enter a Queued state. The same Python process that is attempting running your Task will then attempt to re-enter a Running state every 30 seconds (this value is configurable via config.cloud.queue_interval in Prefect Configuration). Additionally, if that process ever fails, Prefect Cloud will create a new runner every 10 minutes, which will then attempt to rerun your task on the specified queue interval. This process will repeat until all requested concurrency slots become available.