prefect.utilities.notifications.notifications.callback_factory (fn, check) [source] |
Utility for generating state handlers that serve as callbacks, under arbitrary state-based checks.
Args: fn (Callable) : a function with signature fn(obj, state: State) -> None that will be called anytime the associated state-check passes; in general, it is expected that this function will have side effects (e.g., sends an email). The first argument to this function is the Task or Flow it is attached to. check (Callable) : a function with signature check(state: State) -> bool that is used for determining when the callback function should be called Returns: state_handler (Callable) : a state handler function that can be attached to both Tasks and Flows Example:
from prefect import Task, Flow from prefect.utilities.notifications import callback_factory
fn = lambda obj, state: print(state) check = lambda state: state.is_successful() callback = callback_factory(fn, check)
t = Task(state_handlers=[callback]) f = Flow("My Example Flow", tasks=[t], state_handlers=[callback]) f.run()
|
prefect.utilities.notifications.notifications.slack_notifier (tracked_obj, old_state, new_state, ignore_states=None, only_states=None, webhook_secret=None, backend_info=True, proxies=None) [source] |
Slack state change handler; requires having the Prefect slack app installed. Works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below).
Args: tracked_obj (Task or Flow) : Task or Flow object the handler is registered with old_state (State) : previous state of tracked object new_state (State) : new state of tracked object ignore_states ([State], optional) : list of State classes to ignore, e.g., [Running, Scheduled] . If new_state is an instance of one of the passed states, no notification will occur. only_states ([State], optional) : similar to ignore_states , but instead only notifies you if the Task / Flow is in a state from the provided list of State classes webhook_secret (str, optional) : the name of the Prefect Secret that stores your slack webhook URL; defaults to "SLACK_WEBHOOK_URL" backend_info (bool, optional) : Whether to supply slack notification with urls pointing to backend pages; defaults to True proxies (dict), optional) : dict with "http" and/or "https" keys, passed to requests.post - for situations where a proxy is required to send requests to the Slack webhook Returns: State : the new_state object that was provided Raises: ValueError : if the slack notification fails for any reason Example:
from prefect import task from prefect.utilities.notifications import slack_notifier
@task(state_handlers=[slack_notifier(ignore_states=[Running])]) def add(x, y): return x + y
|
prefect.utilities.notifications.notifications.gmail_notifier (tracked_obj, old_state, new_state, ignore_states=None, only_states=None) [source] |
Email state change handler - configured to work solely with Gmail; works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below).
The username and password Gmail credentials will be taken from your "EMAIL_USERNAME" and "EMAIL_PASSWORD" secrets, respectively; note the username will also serve as the destination email address for the notification.
Args: tracked_obj (Task or Flow) : Task or Flow object the handler is registered with old_state (State) : previous state of tracked object new_state (State) : new state of tracked object ignore_states ([State], optional) : list of State classes to ignore, e.g., [Running, Scheduled] . If new_state is an instance of one of the passed states, no notification will occur. only_states ([State], optional) : similar to ignore_states , but instead only notifies you if the Task / Flow is in a state from the provided list of State classes Returns: State : the new_state object that was provided Raises: ValueError : if the email notification fails for any reason Example:
from prefect import task from prefect.utilities.notifications import gmail_notifier
@task(state_handlers=[gmail_notifier(ignore_states=[Running])]) def add(x, y): return x + y
|
prefect.utilities.notifications.jira_notification.jira_notifier (tracked_obj, old_state, new_state, ignore_states=None, only_states=None, server_URL=None, options=None, assignee="-1") [source] |
Jira Notifier requires a Jira account and API token. The API token can be created at: https://id.atlassian.com/manage/api-tokens The Jira account username ('JIRAUSER'), API token ('JIRATOKEN') should be set as part of a 'JIRASECRETS' object in Prefect Secrets.
An example 'JIRASECRETS' secret configuration looks like:
toml [secrets] JIRASECRETS.JIRATOKEN = "XXXXXXXXX" JIRASECRETS.JIRAUSER = "xxxxx@yyy.com" JIRASECRETS.JIRASERVER = "https://???.atlassian.net"
The server URL can be set as part of the 'JIRASECRETS' object ('JIRASERVER') or passed to the jira notifier state handler as the "server_URL" argument. Jira Notifier works as a standalone state handler, or can be called from within a custom state handler. This function is curried meaning that it can be called multiple times to partially bind any keyword arguments (see example below). Jira Notifier creates a new ticket with the information about the task or flow it is bound to when that task or flow is in a specific state. (For example it will create a ticket to tell you that the flow you set it on is in a failed state.) You should use the options dictionary to set the project name and issue type. You can use the "assignee" argument to assign that ticket to a specific member of your team.
Args: tracked_obj (Task or Flow) : Task or Flow object the handler is registered with old_state (State) : previous state of tracked object new_state (State) : new state of tracked object options (Dictionary) : Must inlucde a 'project' key and an 'issuetype' key (e.g. options = {'project': 'TEST', 'issuetype': {'name': 'task'}}). For jira service desk tickets, the issue type should use the request type id e.g. 'issuetype': {'id': '10010'}. A description can be added using the key 'description'. Custom fields can also be added e.g. 'customfield_10017': 'SDTS/flowdown' ignore_states ([State], optional) : list of State classes to ignore, e.g., [Running, Scheduled] . If new_state is an instance of one of the passed states, no notification will occur. only_states ([State], optional) : similar to ignore_states , but instead only notifies you if the Task / Flow is in a state from the provided list of State classes server_URL (String) : The URL of your atlassian account e.g. "https://test.atlassian.net". Can also be set as a Prefect Secret. assignee : the atlassian username of the person you want to assign the ticket to. Defaults to "automatic" if this is not set. Returns: State : the new_state object that was provided Raises: ValueError : if the jira ticket creation or assignment fails for any reason Example:
from prefect import task from prefect.utilities.jira_notification import jira_notifier
@task(state_handlers=[ jira_notifier( only_states=[Failed], options={'project': 'TEST', 'issuetype': {'name': 'Task'}}, assignee='tester' ) ]) def add(x, y): return x + y
|