# Next Steps
In this tutorial we showed how Prefect can be used to improve the behavior of your workflow. However, there is a lot more to Prefect that we didn't cover!
# Logging within your Tasks
Within our tutorial Tasks
are displaying information via Python's built in print()
function:
@task
def load_reference_data(ref_data):
print("saving reference data...")
db = aclib.Database()
db.update_reference_data(ref_data)
However, there is already a Logger
object made available to you within Tasks
:
@task
def load_reference_data(ref_data):
logger = prefect.context.get("logger")
logger.info("saving reference data...")
db = aclib.Database()
db.update_reference_data(ref_data)
Read More About Logging
# Storing Task Results
In this particular Aircraft ETL example we did not have an explicit need to keep intermediate results at each step along the way. However, Prefect provides a Result
abstraction that enables users to persist Results
returned from each Task
to a storage of choice:
from prefect.engine.results import LocalResult
result = LocalResult(dir="./my-results")
with Flow("Aircraft-ETL", result=result) as flow:
...
flow.run()
Now any results generated by individual Tasks
are written out to the ./my-results
directory, relative to where the Flow was executed from. Prefect provides many supported Results to persist results to common storage options such as S3, Google Cloud Storage, Azure Storage, and more.
Read More About Results
# Advanced Control Structures
In this tutorial we could have gone one step further and asked for fetching all aircraft data within X radius of multiple Y airports. In this case Mapping
would be the easiest way to tackle this, calling our extract_live_data
Task for each desired airport.
More about Control Structures
# Ready-made Task Library
Within our tutorial Tasks
were always user-provided functions, however, Prefect provides a library of ready-made Tasks
. Here's an example Flow
that uses ShellTasks
to run arbitrary commands:
from prefect import Flow
from prefect.tasks.shell import ShellTask
shell = ShellTask()
with Flow("Simple Pipeline") as flow:
flow.chain(
shell(command='pip install -r requirements.txt'),
shell(command='black --check .'),
shell(command='pytest .'),
)
flow.run()
The Task library includes integrations with Kubernetes, GitHub, Slack, Docker, AWS, GCP, and more!
← Scaling Out Tasks →