Configuring Cache Invalidation Strategy (TTL, Event-Based, Cache-Aside)
Caching without a thoughtout invalidation strategy is a source of hard-to-debug bugs with stale data. The choice of strategy depends on data freshness requirements and system architecture.
Main Strategies
TTL (Time-To-Live)—data automatically becomes stale after a set interval. Simple to implement, but data can be stale before TTL expires.
Cache-Aside (Lazy Loading)—application first checks cache, on miss loads from DB and writes to cache. The most common strategy.
Write-Through—writes go simultaneously to cache and DB. Data always fresh, but every write passes through cache.
Event-Based Invalidation—when data changes, an event is generated that invalidates corresponding cache keys.
Cache-Aside with TTL
import redis
import json
from functools import wraps
redis_client = redis.Redis(host='redis', decode_responses=True)
def cached(key_template, ttl=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = key_template.format(*args, **kwargs)
cached_val = redis_client.get(cache_key)
if cached_val:
return json.loads(cached_val)
result = func(*args, **kwargs)
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@cached("user:{0}", ttl=600)
def get_user(user_id):
return db.query("SELECT * FROM users WHERE id = %s", user_id)
Invalidation on update:
def update_user(user_id, data):
db.execute("UPDATE users SET ... WHERE id = %s", user_id)
redis_client.delete(f"user:{user_id}")
# Invalidate related keys
redis_client.delete(f"user_posts:{user_id}")
redis_client.delete(f"user_profile_full:{user_id}")
Event-Based Invalidation via Queue
# publisher (on data change)
import pika
def publish_invalidation(entity_type, entity_id, changed_fields=None):
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='cache_invalidation', exchange_type='topic')
message = json.dumps({
'entity': entity_type,
'id': entity_id,
'fields': changed_fields
})
channel.basic_publish(
exchange='cache_invalidation',
routing_key=f'invalidate.{entity_type}',
body=message
)
# subscriber (cache service)
def on_user_changed(channel, method, properties, body):
event = json.loads(body)
patterns_to_invalidate = [
f"user:{event['id']}",
f"user_full:{event['id']}",
]
if 'role' in (event.get('fields') or []):
patterns_to_invalidate.append(f"user_permissions:{event['id']}")
for key in patterns_to_invalidate:
redis_client.delete(key)
Cache Tags (Dependencies)
Tagging allows invalidating groups of related keys by one tag:
// PHP/Laravel: Spatie Response Cache or custom implementation
class TaggedCache
{
public function put(string $key, $value, int $ttl, array $tags = []): void
{
Redis::setex($key, $ttl, serialize($value));
foreach ($tags as $tag) {
Redis::sadd("cache_tag:{$tag}", $key);
Redis::expire("cache_tag:{$tag}", $ttl + 60);
}
}
public function invalidateByTag(string $tag): void
{
$keys = Redis::smembers("cache_tag:{$tag}");
if (!empty($keys)) {
Redis::del($keys);
}
Redis::del("cache_tag:{$tag}");
}
}
// Usage
$cache->put("product:42", $product, 3600, ['product:42', 'category:5', 'brand:3']);
// When category 5 changes—invalidate everything related
$cache->invalidateByTag('category:5');
Stale-While-Revalidate
Pattern: return stale data while revalidating cache in background. Eliminates cache stampede (thundering herd):
import threading
def get_with_stale_revalidate(key, fetch_fn, ttl=300, stale_ttl=60):
data = redis_client.get(key)
if data:
result = json.loads(data)
remaining_ttl = redis_client.ttl(key)
# If TTL is low—start background refresh
if remaining_ttl < stale_ttl:
lock_key = f"revalidate_lock:{key}"
if redis_client.set(lock_key, 1, nx=True, ex=30):
threading.Thread(
target=lambda: _background_refresh(key, fetch_fn, ttl)
).start()
return result
# Cache miss—synchronous fetch
result = fetch_fn()
redis_client.setex(key, ttl, json.dumps(result))
return result
def _background_refresh(key, fetch_fn, ttl):
try:
result = fetch_fn()
redis_client.setex(key, ttl, json.dumps(result))
finally:
redis_client.delete(f"revalidate_lock:{key}")
Cache Stampede Protection via Locks
def get_with_lock(key, fetch_fn, ttl=300):
result = redis_client.get(key)
if result:
return json.loads(result)
lock = redis_client.lock(f"lock:{key}", timeout=10)
if lock.acquire(blocking=True, blocking_timeout=5):
try:
# Double-check after acquiring lock
result = redis_client.get(key)
if result:
return json.loads(result)
data = fetch_fn()
redis_client.setex(key, ttl, json.dumps(data))
return data
finally:
lock.release()
TTL Strategies by Data Type
| Data Type | TTL | Invalidation |
|---|---|---|
| User profile | 10 min | On update |
| Product list | 5 min | On product change |
| App config | 1 hour | On deploy |
| Exchange rates | 30 sec | On event |
| User permissions | 5 min | On role change |
| HTML pages | 1 hour | On publish |
Monitoring Cache Effectiveness
# Redis INFO stats
redis-cli INFO stats | grep -E "keyspace_hits|keyspace_misses"
# keyspace_hits:12847293
# keyspace_misses:234821
# Hit rate = hits / (hits + misses)
# Normal: > 80%
Prometheus metric:
redis_keyspace_hits_total / (redis_keyspace_hits_total + redis_keyspace_misses_total)
Timeline
Developing invalidation strategy with Cache Tags and Event-Based approach—3–5 business days.







