# PIN 14: Event-Driven Flow Execution via Listeners
Date: December 14, 2019
Author: Chris White
# Status
Paused; supersedes PIN 8.
# Context
Many use cases require event-driven Flows; for example, we might want to run an ETL-style Flow to process individual files each time one lands in an S3 bucket. Historically, we have recommended users sign up for a Scheduler account and use Prefect Cloud's API to trigger parametrized runs (e.g., using AWS Lambda). This suffers from a few drawbacks:
- it doesn't allow users to explicitly specify that this Flow depends on an external event
- it doesn't allow users to configure that event as a first-class hook within Prefect
- it requires the Flow be run through Prefect Cloud
This PIN outlines a first-class implementation of event-driven workflows that will allow users to configure an EventClock
that listens for events of a specified type, and passes a JSON representation of each event as a Parmeter to the Flow. This will allow for:
- a maintained library of event-driven hooks
- the ability to configure event-driven flows which also run on a schedule
- an implementation that allows both Core and Cloud users to run event-driven Flows
- the ability to toggle whether a Flow is "listening" through Prefect instead of an external service
# Proposal
The heart of the current proposal is to introduce a new EventClock
(see PIN 10 for a description of the clock model) which polls to find events matching its specification. The proposal is similar in spirit to PIN 8, with a somewhat simpler implementation now that Clocks have been implemented. In addition to a new type of Clock, this proposal outlines a new listen()
method on FlowRunner
classes, which will become responsible for waiting for either a scheduled time or an event to take place, and will ultimately replace a large piece of the scheduling logic within flow.run()
.
# Core
In Core, we introduce two new APIs:
- an
EventClock
which has an unimplementednext
method, responsible for polling for events and collecting them in a work queue - an
EventParameter
(or a new keyword argument toParameter
s) that allows users to designate a Parameter as the JSON event payload, with a default value of eitherNone
ordict()
Subclasses of this EventClock
can then be used to construct an appropriate Schedule
for the Flow (possibly with other time-driven clocks!), and flow.run
will essentially work with minimal change. The only additional piece of information that needs to be conveyed is the value for the designated event parameter, and this can be achieved through one additional call (maybe flow.retrieve_event()
which then extracts the event from the EventClock
) and then placing the result into Prefect context.
In addition, a new listen
method on FlowRunner
classes will be responsible for calling the schedule and configuring context for each run. Much of the current scheduling logic in flow.run()
should be replaceable by this new method on the FlowRunner
class.
# Cloud
There are a few proposed changes that will also require changes to Prefect Cloud:
- any subclass of the
EventClock
will be deserialized as the baseEventClock
class, which returns an empty list fornext
- users will be responsible for starting and stopping the listening by creating / cancelling a given Flow run
- once created, an appropriate agent will pick up the flow run and submit it for "execution" (in this case, submit it to listen); we might need to introduce a new pre-listening state so that the Agent knows this is a listener and not a normal flow run
- During this execution, the Flow run will enter a
Listening
state andCloudFlowRunner.listen()
will be used to poll for events; anytime an event is found, it will be used to call the Cloud API and create a new parametrized flow run with the appropriate event parameter payload - while polling, or through a standard heartbeat thread, if a Flow run is found to be in a
Listening
state but hasn't sent a heartbeat, the Lazarus process will revive it - to prevent duplicate processing runs, we will also need to use some form of "event ID" as the idempotency key for the run
# Consequences
First and foremost, this will allow us to create and maintain a curated list of event-based hooks for Prefect Flows. Additionally, if we enforce a "null"-like default vaule for event parameters, scheduled batch and cleanup jobs can still occur while the flow is listening for events. Moreover, both Core and Cloud users will be able to benefit from the functionality as proposed above.