# Concurrency limits Cloud
Standard Tier Feature
Setting global concurrency limits is a feature of Prefect Cloud's Standard Tier.
# Flow run limits Cloud
Sometimes, you want to limit the number of flow runs executing simultaneously. For example, you may have an agent on a machine that cannot handle the load of many flow runs.
Prefect Cloud provides functionality to limit the number of simultaneous flow runs. This limit is based on flow run labels. Flow runs can be given as many labels as you wish, and each label can be provided a concurrency limit. If a flow has multiple labels, it will only run if all the labels have available concurrency. Flow run label concurrency limits are enforced globally across your entire team, and labels without explicit limits are considered to have unlimited concurrency.
# Labeling your flow runs
Labels can be set on a Flow
via the run config
from prefect import Flow
from prefect.run_configs import UniversalRun
flow = Flow("example-flow")
flow.run_config = UniversalRun(labels=["small-machine"])
Labels can also be set per flow run, see the flow run creation documentation for details.
# Setting concurrency limits
# Inspecting concurrency limits
# Task run limits Cloud
There are situations in which you want to actively prevent too many tasks from running simultaneously; for example, if many tasks across multiple Flows are designed to interact with a database that only allows 10 max connections, we want to ensure that no more than 10 tasks which connect to this database are running at any given time.
Prefect Cloud has built-in functionality for achieving this; tasks can be "tagged" with as many tags as you wish, and each tag can optionally be provided a concurrency limit. If a task has multiple tags, it will only run if all tags have available concurrency. Tag concurrency limits are enforced globally across your entire team, and tags without explicit limits are considered to have unlimited concurrency.
Note that the ability to alter or update your task tag concurrency limits requires tenant admin level permissions.
# Tagging your tasks
Tagging your tasks is as simple as providing a list of tags to your Task
at initialization via the tags
keyword argument:
from prefect import task, Task
# While using the task decorator
@task(tags=["database", "aws"])
def my_task():
pass
# While using a `Task` subclass
class MyTask(Task):
pass
my_task = MyTask(tags=["webservice"])
These tags are then available via the tags
attribute on your Task
instances. More information about Task
settings and initialization keywords can be found in the corresponding API documentation.
# Setting concurrency limits
Once you have tagged your various tasks and registered your Flow(s) to Prefect Cloud, you can set concurrency limits on as few or as many tags as you wish. You can set limits in any of the following three ways.
# Inspecting concurrency limits
If you wish to query for the currently set limit on a tag, or see all of your limits across all of your tags, you can do so in any of the following three ways.
# Execution Behavior
Task tag limits are checked whenever a task run attempts to enter a Running
state in Prefect Cloud. If there are no concurrency slots available for any one of your Task's tags, the Task will instead enter a Queued
state. The same Python process that is attempting running your Task will then attempt to re-enter a Running
state every 30 seconds (this value is configurable via config.cloud.queue_interval
in Prefect Configuration). Additionally, if that process ever fails, Prefect Cloud will create a new runner every 10 minutes, which will then attempt to rerun your task on the specified queue interval. This process will repeat until all requested concurrency slots become available.