Jitter and Stampede Protection
Single-flight locks and probabilistic XFetch refresh, plus a topology-sensitive variant that scales the refresh window with node degree.
The classical XFetch rule treats every key the same. The topology-sensitive variant scales the early-refresh window by (1 + ln d) where d is the source node's degree. On the SSCA-inspired workload this lifts throughput from 255 → 305 qps and cuts P95 from 13.5 → 9.4 ms compared to plain XFetch.
Two jobs in one module
This file does two related things:
- Stampede protection. When many requests miss the same key simultaneously, only one of them should reach Neo4j; the rest should reuse its result. Implemented with a Redis
SET NXsingle-flight lock. - Probabilistic early refresh. When a request hits a cached entry, decide probabilistically whether to fire a background refresh that will pre-populate the cache before its scheduled expiry, so that the actual expiry does not produce a synchronised stampede.
The single-flight lock
The protect async context manager is the entry point. On a cache miss the orchestrator opens this block, and inside it tries to acquire the lock.
@asynccontextmanager
async def protect(self, key, *, degree=1, volatility=None,
avg_latency_ms=50.0, get_topology_phi=None, early_refresh=False):
token = str(uuid.uuid4()).encode("utf-8")
lock_key = f"{key}:lock"
jitter = random.randint(
self.settings.STAMPEDE_JITTER_MIN_MS,
self.settings.STAMPEDE_JITTER_MAX_MS,
)
timeout_ms = self.settings.STAMPEDE_LOCK_TIMEOUT_MS + jitter
acquired = await self.redis.set(lock_key, token, px=timeout_ms, nx=True)
if acquired:
context = StampedeContext(has_lock=True, lock_wait_ms=0.0, early_refresh=early_refresh)
try:
yield context
finally:
current = await self.redis.get(lock_key)
if current == token:
await self.redis.delete(lock_key)
return
Three details matter here:
- UUID token in bytes. The lock is released with a check-and-delete: only the holder of the token may delete the key. Both sides of the comparison live in bytes, because the Redis client is initialised with
decode_responses=False. A previous bug compared bytes against str and produced 1.2-second tail spikes when the lock was never released. - PX timeout with jitter. The lock has a hard expiry built into Redis itself, so a crashed holder cannot wedge the system. The expiry is randomised within a small range so that two concurrent holders that both crashed at exactly the same moment will not retry in lockstep.
- NX flag. The lock is acquired only if the key does not exist; this is what gives single-flight semantics.
Losers wait, then reuse
If a request fails to acquire the lock, it polls. If the cache key materialises while it is polling, it grabs the value (single-flight reuse). If the wait times out without the value materialising, it gives up and falls through to recompute, which is rare in practice.
STAMPEDE_EVENTS_TOTAL.inc()
waited_ms = 0.0
poll_interval = self.settings.STAMPEDE_WAIT_POLL_MS / 1000.0
while waited_ms < self.settings.STAMPEDE_WAIT_TIMEOUT_MS:
await asyncio.sleep(poll_interval)
waited_ms += self.settings.STAMPEDE_WAIT_POLL_MS
cached = await self.redis.get(key)
if cached is not None:
SINGLE_FLIGHT_HITS.inc()
yield StampedeContext(
has_lock=False, lock_wait_ms=waited_ms,
reused_result=cached, early_refresh=early_refresh,
)
return
Probabilistic early refresh (XFetch)
For each cache hit, the orchestrator calls should_trigger_early_refresh. The rule is the classical XFetch: trigger an early refresh if
where δ is the expected recomputation latency (provided per-request as avg_latency_ms) and β controls how aggressively early refresh is triggered. Because the rule fires probabilistically, multiple in-flight readers will, in expectation, regenerate the entry well before it expires, and only one of them needs to win the single-flight lock.
def should_trigger_early_refresh(self, t_remaining_ms, avg_latency_ms, *, degree=1,
volatility=None, get_topology_phi=None) -> bool:
if t_remaining_ms <= 0:
return True
phi = self._compute_phi(degree=degree, volatility=volatility,
get_topology_phi=get_topology_phi)
random_value = max(random.random(), 1e-9)
threshold = -phi * avg_latency_ms * math.log(random_value)
return t_remaining_ms <= threshold
Topology-sensitive β
The default XFetch rule treats every key identically. On a graph workload that is wasteful: hub keys are queried far more often than leaf keys, so they should refresh earlier. The _compute_phi function chooses between two modes via the TSPR_REFRESH_MODE setting.
def _compute_phi(self, *, degree=1, volatility=None, get_topology_phi=None) -> float:
if get_topology_phi is not None:
return get_topology_phi()
sigma = volatility if volatility is not None else self.volatility_default
if self.refresh_mode == "plain":
return self.beta_base * sigma
return self.beta_base * (1 + math.log(max(degree, 1))) * sigma
The two formulas:
plain
β = β0 · σ
Constant β scaled by per-key volatility. Equivalent to the original XFetch.
topology_sensitive
β = β0 · (1 + ln d) · σ
Logarithmic in node degree. A degree-1 node sees β = β0, while a degree-100 hub sees roughly 5.6× that.
| Source degree d | (1 + ln d) | Effective β multiplier |
|---|---|---|
| 1 (leaf) | 1.00 | 1.0× |
| 10 | 3.30 | 3.3× |
| 50 | 4.91 | 4.9× |
| 100 (hub) | 5.60 | 5.6× |
| 500 (super) | 7.21 | 7.2× |
Where does degree come from?
The benchmark harness pre-fetches per-person degrees from Neo4j and injects them as degree in the request params (see annotate_query_degrees in run_benchmark.py). The orchestrator pulls them out in _refresh_inputs:
def _refresh_inputs(self, params):
return {
"degree": int(params.get("degree", 1)),
"volatility": float(params["volatility"]) if params.get("volatility") is not None else None,
"avg_latency_ms": float(params.get("estimated_neo4j_ms", 50.0)),
}
If a request does not provide a degree, the rule degrades gracefully: (1 + ln 1) = 1, so topology mode collapses to plain XFetch behaviour for that request.
Background refresh
When the rule fires, the orchestrator launches the refresh as an asyncio.create_task so it does not block the request that triggered it. The background task re-acquires protect() with early_refresh=True, which means even if many concurrent requests trigger a refresh, only one of them will actually recompute.
async def _maybe_trigger_early_refresh(self, cache_key, query, params, session_id) -> bool:
jitter = self.optimizers["jitter"]
if jitter is None or not hasattr(self.redis, "pttl"):
return False
ttl_remaining_ms = await self.redis.pttl(cache_key)
if ttl_remaining_ms is None or ttl_remaining_ms < 0:
return False
refresh_args = self._refresh_inputs(params)
if not jitter.should_trigger_early_refresh(ttl_remaining_ms, **refresh_args):
return False
asyncio.create_task(self._refresh_cached_entry(
cache_key=cache_key, query=query, params=params,
session_id=session_id, refresh_args=refresh_args,
))
return True
Why this matters in practice
On the SSCA-inspired workload the degree distribution is highly skewed (R-MAT generators produce hub-heavy graphs by construction), so the (1 + ln d) factor has a real signal to amplify. On the LDBC Person/KNOWS subset the degree distribution is more uniform and the headroom is smaller, but the topology-sensitive variant is still marginally ahead of plain XFetch on both throughput and P95.
If avg_latency_ms is overestimated, β·δ becomes large, and early refresh fires too often — wasting Neo4j capacity. If it is underestimated, refresh barely fires and synchronised expiries return. The default of 50 ms with a per-query override (estimated_neo4j_ms in the benchmark queries) is the operationally safe choice.