# Notifications and Callback Tools


# Functions

top-level functions:                                                                                                                                                       

prefect.utilities.notifications.notifications.callback_factory

(fn, check)[source]

Utility for generating state handlers that serve as callbacks, under arbitrary state-based checks.

Args:

  • fn (Callable): a function with signature fn(obj, state: State) -> None that will be called anytime the associated state-check passes; in general, it is expected that this function will have side effects (e.g., sends an email). The first argument to this function is the Task or Flow it is attached to.
  • check (Callable): a function with signature check(state: State) -> bool that is used for determining when the callback function should be called
Returns:
  • state_handler (Callable): a state handler function that can be attached to both Tasks and Flows
Example:
    from prefect import Task, Flow
from prefect.utilities.notifications import callback_factory

fn = lambda obj, state: print(state)
check = lambda state: state.is_successful()
callback = callback_factory(fn, check)

t = Task(state_handlers=[callback])
f = Flow("My Example Flow", tasks=[t], state_handlers=[callback])
f.run()
# prints:
# Success("Task run succeeded.")
# Success("All reference tasks succeeded.")


prefect.utilities.notifications.notifications.slack_notifier

(tracked_obj, old_state, new_state, ignore_states=None, only_states=None, webhook_secret=None, backend_info=True, proxies=None)[source]

Slack state change handler; requires having the Prefect slack app installed. Works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below).

Args:

  • tracked_obj (Task or Flow): Task or Flow object the handler is registered with
  • old_state (State): previous state of tracked object
  • new_state (State): new state of tracked object
  • ignore_states ([State], optional): list of State classes to ignore, e.g., [Running, Scheduled]. If new_state is an instance of one of the passed states, no notification will occur.
  • only_states ([State], optional): similar to ignore_states, but instead only notifies you if the Task / Flow is in a state from the provided list of State classes
  • webhook_secret (str, optional): the name of the Prefect Secret that stores your slack webhook URL; defaults to "SLACK_WEBHOOK_URL"
  • backend_info (bool, optional): Whether to supply slack notification with urls pointing to backend pages; defaults to True
  • proxies (dict), optional): dict with "http" and/or "https" keys, passed to requests.post - for situations where a proxy is required to send requests to the Slack webhook
Returns:
  • State: the new_state object that was provided
Raises:
  • ValueError: if the slack notification fails for any reason
Example:
    from prefect import task
from prefect.utilities.notifications import slack_notifier

@task(state_handlers=[slack_notifier(ignore_states=[Running])]) # uses currying
def add(x, y):
return x + y


prefect.utilities.notifications.notifications.gmail_notifier

(tracked_obj, old_state, new_state, ignore_states=None, only_states=None)[source]

Email state change handler - configured to work solely with Gmail; works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below).

The username and password Gmail credentials will be taken from your "EMAIL_USERNAME" and "EMAIL_PASSWORD" secrets, respectively; note the username will also serve as the destination email address for the notification.

Args:

  • tracked_obj (Task or Flow): Task or Flow object the handler is registered with
  • old_state (State): previous state of tracked object
  • new_state (State): new state of tracked object
  • ignore_states ([State], optional): list of State classes to ignore, e.g., [Running, Scheduled]. If new_state is an instance of one of the passed states, no notification will occur.
  • only_states ([State], optional): similar to ignore_states, but instead only notifies you if the Task / Flow is in a state from the provided list of State classes
Returns:
  • State: the new_state object that was provided
Raises:
  • ValueError: if the email notification fails for any reason
Example:
    from prefect import task
from prefect.utilities.notifications import gmail_notifier

@task(state_handlers=[gmail_notifier(ignore_states=[Running])]) # uses currying
def add(x, y):
return x + y


prefect.utilities.notifications.jira_notification.jira_notifier

(tracked_obj, old_state, new_state, ignore_states=None, only_states=None, server_URL=None, options=None, assignee="-1")[source]

Jira Notifier requires a Jira account and API token. The API token can be created at: https://id.atlassian.com/manage/api-tokens The Jira account username ('JIRAUSER'), API token ('JIRATOKEN') should be set as part of a 'JIRASECRETS' object in Prefect Secrets.

An example 'JIRASECRETS' secret configuration looks like:


toml
[secrets]
JIRASECRETS.JIRATOKEN = "XXXXXXXXX"
JIRASECRETS.JIRAUSER = "xxxxx@yyy.com"
JIRASECRETS.JIRASERVER = "https://???.atlassian.net"



The server URL can be set as part of the 'JIRASECRETS' object ('JIRASERVER') or passed to the jira notifier state handler as the "server_URL" argument. Jira Notifier works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below). Jira Notifier creates a new ticket with the information about the task or flow it is bound to when that task or flow is in a specific state. (For example it will create a ticket to tell you that the flow you set it on is in a failed state.) You should use the options dictionary to set the project name and issue type. You can use the "assignee" argument to assign that ticket to a specific member of your team.

Args:
  • tracked_obj (Task or Flow): Task or Flow object the handler is registered with
  • old_state (State): previous state of tracked object
  • new_state (State): new state of tracked object
  • options (Dictionary): Must inlucde a 'project' key and an 'issuetype' key (e.g. options = {'project': 'TEST', 'issuetype': {'name': 'task'}}). For jira service desk tickets, the issue type should use the request type id e.g. 'issuetype': {'id': '10010'}. A description can be added using the key 'description'. Custom fields can also be added e.g. 'customfield_10017': 'SDTS/flowdown'
  • ignore_states ([State], optional): list of State classes to ignore, e.g., [Running, Scheduled]. If new_state is an instance of one of the passed states, no notification will occur.
  • only_states ([State], optional): similar to ignore_states, but instead only notifies you if the Task / Flow is in a state from the provided list of State classes
  • server_URL (String): The URL of your atlassian account e.g. "https://test.atlassian.net". Can also be set as a Prefect Secret.
  • assignee: the atlassian username of the person you want to assign the ticket to. Defaults to "automatic" if this is not set.
Returns:
  • State: the new_state object that was provided
Raises:
  • ValueError: if the jira ticket creation or assignment fails for any reason
Example:
    from prefect import task
from prefect.utilities.jira_notification import jira_notifier

@task(state_handlers=[
jira_notifier(
only_states=[Failed],
options={'project': 'TEST', 'issuetype': {'name': 'Task'}},
assignee='tester'
)
])
def add(x, y):
return x + y


prefect.utilities.notifications.notifications.snowflake_logger

(tracked_obj, old_state, new_state, ignore_states=None, only_states=None, snowflake_secret=None, snowflake_log_table_name=None, test_env=False)[source]

Snowflake state change handler/logger; requires having the Prefect Snowflake app installed. Works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below). Args:

  • tracked_obj (Task or Flow): Task or Flow object the handler is registered with.
  • old_state (State): Previous state of tracked object.
  • new_state (State): New state of tracked object.
  • ignore_states ([State], optional): List of State classes to ignore, e.g., [Running, Scheduled]. If new_state is an instance of one of the passed states, no notification will occur.
  • only_states ([State], optional): Similar to ignore_states, but instead only notifies you if the Task / Flow is in a state from the provided list of State classes
  • snowflake_secret (str, optional): The name of the Prefect Secret that stores your Snowflake credentials; defaults to "SNOWFLAKE_CREDS".
  • snowflake_log_table_name (str, optional): The fully qualified Snowflake log table name e.g. DB.SCHEMA.TABLE.
  • test_env (bool): Only used for testing and defaults to False. Example:
        from prefect import task
    from prefect.utilities.notifications import snowflake_logger
    @task(state_handlers=[snowflake_logger(ignore_states=[Running])]) # uses currying
    def add(x, y):
    return x + y


This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC