Jitter and Stampede Protection

Single-flight locks and probabilistic XFetch refresh, plus a topology-sensitive variant that scales the refresh window with node degree.

▶ Original contribution

The classical XFetch rule treats every key the same. The topology-sensitive variant scales the early-refresh window by (1 + ln d) where d is the source node's degree. On the SSCA-inspired workload this lifts throughput from 255 → 305 qps and cuts P95 from 13.5 → 9.4 ms compared to plain XFetch.

Two jobs in one module

This file does two related things:

  1. Stampede protection. When many requests miss the same key simultaneously, only one of them should reach Neo4j; the rest should reuse its result. Implemented with a Redis SET NX single-flight lock.
  2. Probabilistic early refresh. When a request hits a cached entry, decide probabilistically whether to fire a background refresh that will pre-populate the cache before its scheduled expiry, so that the actual expiry does not produce a synchronised stampede.

The single-flight lock

The protect async context manager is the entry point. On a cache miss the orchestrator opens this block, and inside it tries to acquire the lock.

middleware/optimizations/jitter_stampede.pyPython
@asynccontextmanager
async def protect(self, key, *, degree=1, volatility=None,
                  avg_latency_ms=50.0, get_topology_phi=None, early_refresh=False):
    token = str(uuid.uuid4()).encode("utf-8")
    lock_key = f"{key}:lock"
    jitter = random.randint(
        self.settings.STAMPEDE_JITTER_MIN_MS,
        self.settings.STAMPEDE_JITTER_MAX_MS,
    )
    timeout_ms = self.settings.STAMPEDE_LOCK_TIMEOUT_MS + jitter
    acquired = await self.redis.set(lock_key, token, px=timeout_ms, nx=True)

    if acquired:
        context = StampedeContext(has_lock=True, lock_wait_ms=0.0, early_refresh=early_refresh)
        try:
            yield context
        finally:
            current = await self.redis.get(lock_key)
            if current == token:
                await self.redis.delete(lock_key)
        return

Three details matter here:

Losers wait, then reuse

If a request fails to acquire the lock, it polls. If the cache key materialises while it is polling, it grabs the value (single-flight reuse). If the wait times out without the value materialising, it gives up and falls through to recompute, which is rare in practice.

protect() — losing pathPython
STAMPEDE_EVENTS_TOTAL.inc()
waited_ms = 0.0
poll_interval = self.settings.STAMPEDE_WAIT_POLL_MS / 1000.0
while waited_ms < self.settings.STAMPEDE_WAIT_TIMEOUT_MS:
    await asyncio.sleep(poll_interval)
    waited_ms += self.settings.STAMPEDE_WAIT_POLL_MS
    cached = await self.redis.get(key)
    if cached is not None:
        SINGLE_FLIGHT_HITS.inc()
        yield StampedeContext(
            has_lock=False, lock_wait_ms=waited_ms,
            reused_result=cached, early_refresh=early_refresh,
        )
        return

Probabilistic early refresh (XFetch)

For each cache hit, the orchestrator calls should_trigger_early_refresh. The rule is the classical XFetch: trigger an early refresh if

tremaining  ≤  −β · δ · ln(u),   u ∼ U(0, 1]

where δ is the expected recomputation latency (provided per-request as avg_latency_ms) and β controls how aggressively early refresh is triggered. Because the rule fires probabilistically, multiple in-flight readers will, in expectation, regenerate the entry well before it expires, and only one of them needs to win the single-flight lock.

should_trigger_early_refresh()Python
def should_trigger_early_refresh(self, t_remaining_ms, avg_latency_ms, *, degree=1,
                                  volatility=None, get_topology_phi=None) -> bool:
    if t_remaining_ms <= 0:
        return True
    phi = self._compute_phi(degree=degree, volatility=volatility,
                            get_topology_phi=get_topology_phi)
    random_value = max(random.random(), 1e-9)
    threshold = -phi * avg_latency_ms * math.log(random_value)
    return t_remaining_ms <= threshold

Topology-sensitive β

The default XFetch rule treats every key identically. On a graph workload that is wasteful: hub keys are queried far more often than leaf keys, so they should refresh earlier. The _compute_phi function chooses between two modes via the TSPR_REFRESH_MODE setting.

_compute_phi()Python
def _compute_phi(self, *, degree=1, volatility=None, get_topology_phi=None) -> float:
    if get_topology_phi is not None:
        return get_topology_phi()
    sigma = volatility if volatility is not None else self.volatility_default
    if self.refresh_mode == "plain":
        return self.beta_base * sigma
    return self.beta_base * (1 + math.log(max(degree, 1))) * sigma

The two formulas:

Mode 1

plain

β = β0 · σ

Constant β scaled by per-key volatility. Equivalent to the original XFetch.

Mode 2 · Default · Novel

topology_sensitive

β = β0 · (1 + ln d) · σ

Logarithmic in node degree. A degree-1 node sees β = β0, while a degree-100 hub sees roughly 5.6× that.

Source degree d(1 + ln d)Effective β multiplier
1 (leaf)1.001.0×
103.303.3×
504.914.9×
100 (hub)5.605.6×
500 (super)7.217.2×

Where does degree come from?

The benchmark harness pre-fetches per-person degrees from Neo4j and injects them as degree in the request params (see annotate_query_degrees in run_benchmark.py). The orchestrator pulls them out in _refresh_inputs:

cache.py · _refresh_inputsPython
def _refresh_inputs(self, params):
    return {
        "degree": int(params.get("degree", 1)),
        "volatility": float(params["volatility"]) if params.get("volatility") is not None else None,
        "avg_latency_ms": float(params.get("estimated_neo4j_ms", 50.0)),
    }

If a request does not provide a degree, the rule degrades gracefully: (1 + ln 1) = 1, so topology mode collapses to plain XFetch behaviour for that request.

Background refresh

When the rule fires, the orchestrator launches the refresh as an asyncio.create_task so it does not block the request that triggered it. The background task re-acquires protect() with early_refresh=True, which means even if many concurrent requests trigger a refresh, only one of them will actually recompute.

cache.py · _maybe_trigger_early_refresh / _refresh_cached_entryPython
async def _maybe_trigger_early_refresh(self, cache_key, query, params, session_id) -> bool:
    jitter = self.optimizers["jitter"]
    if jitter is None or not hasattr(self.redis, "pttl"):
        return False
    ttl_remaining_ms = await self.redis.pttl(cache_key)
    if ttl_remaining_ms is None or ttl_remaining_ms < 0:
        return False
    refresh_args = self._refresh_inputs(params)
    if not jitter.should_trigger_early_refresh(ttl_remaining_ms, **refresh_args):
        return False
    asyncio.create_task(self._refresh_cached_entry(
        cache_key=cache_key, query=query, params=params,
        session_id=session_id, refresh_args=refresh_args,
    ))
    return True

Why this matters in practice

On the SSCA-inspired workload the degree distribution is highly skewed (R-MAT generators produce hub-heavy graphs by construction), so the (1 + ln d) factor has a real signal to amplify. On the LDBC Person/KNOWS subset the degree distribution is more uniform and the headroom is smaller, but the topology-sensitive variant is still marginally ahead of plain XFetch on both throughput and P95.

▶ Limit case

If avg_latency_ms is overestimated, β·δ becomes large, and early refresh fires too often — wasting Neo4j capacity. If it is underestimated, refresh barely fires and synchronised expiries return. The default of 50 ms with a per-query override (estimated_neo4j_ms in the benchmark queries) is the operationally safe choice.