# Cache Validators


Cache validators are functions that determine if a task's output cache is still valid, or whether that task should be re-run; they are provided at Task creation via the cache_validator keyword argument (for more information on instantiating Tasks see the Task documentation).

Task caches are created at Task runtime if and only if the cache_for keyword argument is provided to the Task, that specifies how long the output cache will be valid for after its creation. Cache validators come into play when a cached Task is re-run, and are used to determine whether to re-run the Task or use the cache.

Note that all validators take into account cache expiration.

A cache validator returns True if the cache is still valid, and False otherwise.

# Functions

top-level functions:                                                                                                                                                       

prefect.engine.cache_validators.never_use

(state, inputs, parameters)[source]

Never uses the cache.

Args:

  • state (State): a Success state from the last successful Task run that contains the cache
  • inputs (dict): a dict of inputs that were available on the last successful run of the cached Task
  • parameters (dict): a dict of parameters that were available on the last successful run of the cached Task
Returns:
  • boolean specifying whether or not the cache should be used

prefect.engine.cache_validators.duration_only

(state, inputs, parameters)[source]

Validates the cache based only on cache expiration.

Args:

  • state (State): a Success state from the last successful Task run that contains the cache
  • inputs (dict): a dict of inputs that were available on the last successful run of the cached Task
  • parameters (dict): a dict of parameters that were available on the last successful run of the cached Task
Returns:
  • boolean specifying whether or not the cache should be used

prefect.engine.cache_validators.all_inputs

(state, inputs, parameters)[source]

Validates the cache based on cache expiration and all inputs that were provided on the last successful run.

Args:

  • state (State): a Success state from the last successful Task run that contains the cache
  • inputs (dict): a dict of inputs that were available on the last successful run of the cached Task
  • parameters (dict): a dict of parameters that were available on the last successful run of the cached Task
Returns:
  • boolean specifying whether or not the cache should be used

prefect.engine.cache_validators.all_parameters

(state, inputs, parameters)[source]

Validates the cache based on cache expiration and all parameters that were provided on the last successful run.

Args:

  • state (State): a Success state from the last successful Task run that contains the cache
  • inputs (dict): a dict of inputs that were available on the last successful run of the cached Task
  • parameters (dict): a dict of parameters that were available on the last successful run of the cached Task
Returns:
  • boolean specifying whether or not the cache should be used

prefect.engine.cache_validators.partial_parameters_only

(validate_on=None)[source]

Validates the cache based on cache expiration and a subset of parameters (determined by the validate_on keyword) that were provided on the last successful run.

Args:

  • validate_on (list): a list of strings specifying the parameter names to validate against
Returns:
  • Callable: the actual validation function specifying whether or not the cache should be used
Example:
from datetime import timedelta
import pendulum
from prefect import Flow, Parameter, task
from prefect.engine.cache_validators import partial_parameters_only

@task(cache_for=timedelta(days=1),
cache_validator=partial_parameters_only(validate_on=['nrows']))
def daily_db_refresh(nrows, runtime):
pass

with Flow("My Flow") as f:
nrows = Parameter("nrows", default=500)
runtime = Parameter("runtime")
db_state = daily_db_refresh(nrows, runtime)

state1 = f.run(parameters=dict(nrows=1000, runtime=pendulum.now('utc')))

## the second run will use the cache contained within prefect.context.caches
## even though runtime has changed
state2 = f.run(parameters=dict(nrows=1000, runtime=pendulum.now('utc')))
## similarly, providing input state omits running daily_db_refresh even
## without cache arguments in the task decorator
state3 = f.run(parameters=dict(nrows=1000, runtime=pendulum.now('utc')),
task_states={db_state: state1.result[db_state]})

prefect.engine.cache_validators.partial_inputs_only

(validate_on=None)[source]

Validates the cache based on cache expiration and a subset of inputs (determined by the validate_on keyword) that were provided on the last successful run.

Args:

  • validate_on (list): a list of strings specifying the input names to validate against
Returns:
  • Callable: the actual validation function specifying whether or not the cache should be used
Example:
import random
from datetime import timedelta
from prefect import Flow, task
from prefect.engine.cache_validators import partial_inputs_only

@task(cache_for=timedelta(days=1),
cache_validator=partial_inputs_only(validate_on=['x', 'y']))
def add(x, y, as_string=False):
if as_string:
return '{0} + {1}'.format(x, y)
return x + y

@task
def rand_bool():
return random.random() > 0.5

with Flow("My Flow") as f:
ans = add(1, 2, rand_bool())

state1 = f.run()
## the second run will use the cache contained within prefect.context.caches
## even though rand_bool might change
state2 = f.run()
## similarly, providing input state omits running add even
## without cache arguments in the task decorator:
state3 = f.run(task_states={ans: state1.result[ans]})

This documentation was auto-generated from commit bd9182e
on July 31, 2024 at 18:02 UTC