# PIN 15: Skip state as Finished + Control Flow Conditional
Date: February 24, 2020
Author: Alex Goodman
# Status
Declined (see bottom of page for reason)
# Context
The ControlFlow Tasks allow for users to branch conditionally within the execution graph, skipping some tasks and allowing execution of others. However, back-to-back conditions have proven to be a problem under the existing implementation. Specifically, the problem appears to be with the semantics with the “Skip” Prefect State: it is considered to be fundamentally a “Success” state. That is, it is not easy (if impossible) to determine why a task was skipped, and if ignoring that Skipped state is OK. This means that using skip_on_upstream_skip
is not a good enough mechanism to implement back-to-back conditions and hints that more functionality is needed. Additionally, the current semeantics imply that a Skipped Task was never run, however, it is possible for a Task to manually raise the SKIP
signal mid-execution. Overall, the above points hint that a base class of Success
for the Skipped
state is probably not accurate.
Furthermore, indicating when a trigger condition is not met can also be misleading. By raising the TRIGGERFAIL, a Failed
state propagates through downstream tasks, which could be misconstrued as these downstream tasks failing, when in fact they have semantically been skipped. Take the following flow as an example:
@prefect.task()
def a(): pass
@prefect.task(trigger=prefect.triggers.any_failed)
def b(): pass
@prefect.task()
def c(): pass
with prefect.Flow("My Flow") as flow:
flow.chain(a, b, c)
Running this flow will result in A
completing successfully, followed by B
and C
having never been executed yet be in TriggerFailed
states. However, consider adding an any_failed
trigger to C
:
@prefect.task()
def a(): pass
@prefect.task(trigger=prefect.triggers.any_failed)
def b(): pass
@prefect.task(trigger=prefect.triggers.any_failed)
def c(): pass
with prefect.Flow("My Flow") as flow:
flow.chain(a, b, c)
This will result in B
in a TriggerFailed
state followed by C
completing successfully, which is arguably surprising behavior.
Ideally, a trigger should be the gatekeeper for Task exection: either attempt to run the current Task (or not) based on upstream states. Or said another way, attempt to run the current Task or Skip it --this does not imply a Failed
(or Success
ful) state being determined yet, as that should be the result of the Task itself.
# Proposal
This PIN proposes to enhance conditionally executing Tasks within Flows. Ultimately, the proposal is:
- Change the underlying base State of “Skip” from “Success” to “Finished”. This is more semantically correct: the task has neither completed successfully or in error, it has only completed. If it was not in a completed state it could be misconstrued with a pending-like state, however, we have already determined that no further action is necessary thus it has completed.
Given that it is no longer necessary to stop the continued execution now that Skip is no longer a “Success” state, there are further implied changes:
Any triggers that raise
TRIGGERFAILED
signal should now raiseSKIP
instead.Remove
skip_on_upstream_skip
as a Task argument (this can be handled by a user electing to use the newno_failed
trigger, introduced later).Remove the
TaskRunner.check_upstream_skipped
state logic.
Once these changes are made there is an opportunity to add additional functionality:
Add a new
no_failed
trigger, which passes if there are no failed states upstream.Introduce a
conditional
object that acts similar toswitch
, supporting only a single case (theTrue
case). This will be atask.run
wrapper instead of adding an additional CompareValue task. Any task can be a condition, a truthy value will be determined by the result of the given task, and if determined to be “False” thenFAIL
is raised.Example usage (within a flow context):
conditional(is_b_enabled, b_task)
instead ofswitch(is_b_enabled, {True: b_task})
Add the ability to combine conditions: add
any_condition
andall_conditions
reducing functions toprefect.tasks.control_flow.conditional
. This would add an additional task to the flow, with upstreams being the individual conditions, and the new task emitting a “True” result or raising aFAIL
.Example usage (within a flow context):
conditional(all_conditions(is_a_enabled, another_task), a_task)
# Consequences
Back-to-back switch
usage would be possible (since skip_on_upstream_skip
is no longer the mechanism used to determine if a task should respond to upstream Skip).
# Actions
Release the changes in two batches:
First release (breaking changes):
a. Add deprecation warnings for uses of
TRIGGERFAIL
signal,TriggerFailed
state, andskip_on_upstream_skip
Task option usageb. Implement proposal items 1-7
Second release (breaking changes):
a. Remove functionality for
TRIGGERFAIL
signal,TriggerFailed
state,skip_on_upstream_skip
# Reason for declining
The motivations for this PIN are sound however they require the designing of a new API in order to support. This proposal focuses on using triggers as the place for conditional logic however that fails to keep in tune with the original purpose of triggers. Changing the main state function of triggers to skip instead of failure when not satisfied will cause even simple flows to not behave properly as originally defined by the paradigm of the Prefect engine.
Say there existed a flow containing the edge Task_1 -> Task_2
. Under this proposal if Task_1
were to enter a Failed
state then the default all_successful
trigger of Task_2
would raise a SKIP
signal instead of a TriggerFailed
. Since Task_2
is a reference task for this flow the final state will be Success
even though the intendened state is for the flow to enter Failed
.
There is no doubt room for a better API which encapsulates the ideas outlined here, perhaps something that lives next to triggers on tasks called conditions. However, this PIN is not the official proposal for that functionality and is being declined.