# Using the functional pipe operator

Looking for the latest Prefect 2 release? Prefect 2 and Prefect Cloud 2 have been released for General Availability. See https://docs.prefect.io/ for details.

# Background and Motivation

Functional pipes are a common idiom in several languages. For example, you may be used to piping commands in bash:

echo "foo" | rev
# oof

You might also have noticed that Pandas offers a pipe method , and it does so for many of the same reasons we will discuss below.

In Prefect, you may find yourself calling a long series of tasks, and you might want a way to improve the readability of your code. For example, let's say we have defined five tasks, named task_1 through task_5, which each take a single keyword argument called keyword. We then want to call them sequentially, using the output from the previous task as the input to the next task.

One way to do this is to nest the task function calls:

res = task_5(task_4(task_3(task_2(task_1(arg, keyword=1), keyword=2), keyword=3), keyword=4), keyword=5)

Unfortunately this code is quite confusing. In particular, it reads from inside out, which is confusing because it neither reads left-to-right nor right-to-left.

Another way you might do this is by assigning the output from each task to an intermediate variable:

res_1 = task_1(arg)
res_2 = task_2(res_1)
res_3 = task_3(res_2)
res_4 = task_4(res_3)
res_5 = task_5(res_4)

This is much clearer. It now reads left-to-right, and then up-to-down, which is very easy to follow for many readers. However, we now have introduced four intermediate variables. This is annoying because we:

  • Have to come up with concise names for each output variable, even though there might not be a logical name for each output
  • Have to come up with names that don't clash with existing variable names
  • Have to make sure we don't use the wrong variable as an input to any task
  • Have polluted the namespace with variables we aren't going to actually use more than once

Functional piping offers an alternative that doesn't suffer the issues of either these approaches.

# Piping in Prefect

Prefect offers two types of "pipe":

  • |, the pipe operator, which sets task dependencies, but does not pass arguments between tasks
  • .pipe(), the Task pipe method, which sets task dependencies and passes arguments between tasks

Since this first operator is very unusual to use, we will focus instead on the pipe method. At its core, task_a.pipe(task_b) is exactly equivalent to task_b(task_a), or :

var = task_a()
task_b(var)

However, the advantage of .pipe should become evident when we increase the number of tasks. Returning to our example above, we can rewrite it using .pipe as follows:

task_1(arg, keyword=1).pipe(task_2, keyword=2).pipe(task_3, keyword=3)

Notice how this reads strictly left-to-right, and doesn't require us to invent intermediate variable names.

To make this more readable as the number of tasks in the pipeline increases, you may consider the "call chain" formatting style:

(
    task_1(arg, keyword=1)
    .pipe(task_2, keyword=2)
    .pipe(task_3, keyword=3)
    .pipe(task_4, keyword=4)
    .pipe(task_5, keyword=5)
)

Not only is this easy to follow, but it's fully supported by the Black code formatter , which means you don't need to worry about manually formatting your pipeline.

# Primary Argument Position

When you use .pipe(), the left hand side (the "primary" argument) will be passed in as the first positional argument to the right hand side. Consider the following example:

from prefect import task, Flow


@task()
def task_a(a):
    return 10


@task()
def task_b(a, b, c):
    return a + b * c


with Flow("my flow") as flow:
    task_a().pipe(task_b, b=2, c=5)

In this case, the output of task_a() (10), will be passed in as the a parameter for task_b. If you want it to be passed in as a keyword argument, or at a different position, consider rewriting your task function, or writing a wrapper function. In the below example, the output of task_a() will instead be passed in as the b parameter:

def func_b(a, b, c):
    return a + b * c


@task()
def task_b(b, **kwargs):
    return func_b(b=b, **kwargs)


with Flow("my flow") as flow:
    task_a().pipe(task_b)

# Secondary Argument Positions

Note that so far we have only used keyword arguments as the "secondary" arguments when we use .pipe(). This is because .pipe() does not support positional secondary arguments. Therefore, the following will always fail:

task_a().pipe(task_b, 1, 2, 3)

This isn't actually a limitation however, because most arguments can be expressed as keyword arguments. Only positional-only arguments and *args cannot. These are not supported by Prefect.