# Mapping
Mapping in Prefect can be used to apply the same
task (or tasks) to multiple inputs. To map a task, use its .map method
instead of calling the task itself.
from prefect import task
@task
def add(x, y):
    return x + y
add(1, 2)  # 3
add.map([1, 10], [2, 3])  # [3, 13]
By default all arguments to .map expect iterables that can be mapped over. If
you want to pass in a non-iterable argument to be used by all branches in a
mapped task, you can wrap that argument in unmapped
(docs).
from prefect import unmapped
add.map([1, 10], unmapped(2))  # [3, 12]
In this example we build a flow with 4 stages:
- get_numbersreturns a list of- nnumbers
- incmaps across this list, running once per input
- addmaps across the output of- inc, running once per input.- unmappedis used to pass- 2as a second "unmapped" argument.
- The total list is then passed to compute_sumto calculate a total.
             ┌── inc ── add(2) ──┐
             │                   │
get_numbers ─┼── inc ── add(2) ──┼─ compute_sum
             │                   │
             ⋮                   ⋮
             └── inc ── add(2) ──┘
Note that the number of inputs to a mapped task doesn't need to be known until
runtime. By default this flow will run with 3 mapped branches - try
providing different values for the parameter n to see how it affects
execution.
See the mapping docs for more information.
Registering with Prefect Cloud/Server
This example can be registered in Prefect Cloud or Server by running:
prefect register --json https://docs.prefect.io/examples.json \
    --name 'Example: Mapping' \
    --project 'Prefect Examples'
(to register in a different project, replace 'Prefect Examples' with your project name).
from prefect import Flow, Parameter, task, unmapped
@task
def get_numbers(n):
    return range(1, n + 1)
@task
def inc(x):
    return x + 1
@task
def add(x, y):
    return x + y
@task(log_stdout=True)
def compute_sum(nums):
    total = sum(nums)
    print(f"total = {total}")
    return total
with Flow("Example: Mapping") as flow:
    # The number of branches in the mapped pipeline
    n = Parameter("n", default=3)
    # Generate the initial items to map over
    nums = get_numbers(n)  # [1, 2, 3]
    # Apply `inc` to every item in `nums`
    nums_2 = inc.map(nums)  # [2, 3, 4]
    # Apply `add` to every item in `nums_2`, with `2` as the second argument.
    nums_3 = add.map(nums_2, unmapped(2))  # [4, 5, 6]
    # Compute the sum of all items in `nums_3`
    total = compute_sum(nums_3)  # 15
if __name__ == "__main__":
    flow.run()
Output
$ python examples/mapping.py
INFO | Beginning Flow run for 'Example: Mapping'
INFO | Task 'n': Starting task run...
INFO | Task 'n': Finished task run for task with final state: 'Success'
INFO | Task 'get_numbers': Starting task run...
INFO | Task 'get_numbers': Finished task run for task with final state: 'Success'
INFO | Task 'inc': Starting task run...
INFO | Task 'inc': Finished task run for task with final state: 'Mapped'
INFO | Task 'inc[0]': Starting task run...
INFO | Task 'inc[0]': Finished task run for task with final state: 'Success'
INFO | Task 'inc[1]': Starting task run...
INFO | Task 'inc[1]': Finished task run for task with final state: 'Success'
INFO | Task 'inc[2]': Starting task run...
INFO | Task 'inc[2]': Finished task run for task with final state: 'Success'
INFO | Task 'add': Starting task run...
INFO | Task 'add': Finished task run for task with final state: 'Mapped'
INFO | Task 'add[0]': Starting task run...
INFO | Task 'add[0]': Finished task run for task with final state: 'Success'
INFO | Task 'add[1]': Starting task run...
INFO | Task 'add[1]': Finished task run for task with final state: 'Success'
INFO | Task 'add[2]': Starting task run...
INFO | Task 'add[2]': Finished task run for task with final state: 'Success'
INFO | Task 'compute_sum': Starting task run...
INFO | total = 15
INFO | Task 'compute_sum': Finished task run for task with final state: 'Success'
INFO | Flow run SUCCESS: all reference tasks succeeded
The flow source is available on GitHub here.
This documentation was auto-generated from commit bd9182e 
on July 31, 2024 at 18:02 UTC
 
 