Adaptive Prefetching
A first-order Markov model over per-session query patterns, plus a free one-hop neighbour expansion. Small mechanism, real gains on traversal-heavy workloads.
Overview
Graph exploration is sequential. A user clicks a node, looks at neighbours, expands one of them, drills further. If the system can predict the next click, it can preload that result while the user is still looking at the previous one. This module does that with a deliberately simple successor model and a small fan-out so it cannot saturate Neo4j on bad predictions.
In-memory state
Two structures sit in process memory (not Redis, by design — these are tiny and per-instance).
class AdaptivePrefetch:
def __init__(self, redis, settings: Settings):
self.redis = redis
self.settings = settings
self.session_patterns: OrderedDict[str, list[str]] = OrderedDict()
self.successor_map: defaultdict[str, dict[str, int]] = defaultdict(dict)
self.prefetched_keys: set[str] = set()
session_patterns— an LRU dictionary keyed bysession_id. Each value is a list of the last 5 normalised query patterns observed for that session. Capped atPREFETCH_MAX_PATTERNSsessions (default 256), with eviction in least-recently-seen order.successor_map— a sparse two-level dictionary that counts pattern transitions.successor_map[A][B]is the number of times pattern B was observed immediately after pattern A across all sessions. This is effectively a frequency-weighted first-order Markov chain.prefetched_keys— the set of cache keys this instance has actively prefetched but not yet observed being read. Drains vianote_consumed_key(hit) orflush_waste(eviction).
Observing a request
On every POST /query, the orchestrator passes the query and session id to observe. The function normalises the query, appends it to the session's pattern list, and bumps the successor count for the previous pattern.
def observe(self, session_id, query, params) -> str:
pattern = normalize_query(query, params)
if not session_id:
return pattern
existing = self.session_patterns.get(session_id, [])
if existing:
previous = existing[-1]
current_score = self.successor_map[previous].get(pattern, 0)
self.successor_map[previous][pattern] = current_score + 1
existing.append(pattern)
self.session_patterns[session_id] = existing[-5:]
self.session_patterns.move_to_end(session_id)
while len(self.session_patterns) > self.settings.PREFETCH_MAX_PATTERNS:
self.session_patterns.popitem(last=False)
return pattern
Ranking successors
Given a pattern, likely_successors returns the top-K most-frequent next patterns. K is PREFETCH_MAX_QUERIES, default 4.
def likely_successors(self, pattern: str) -> list[str]:
ranked = sorted(
self.successor_map.get(pattern, {}).items(),
key=lambda item: item[1],
reverse=True,
)
return [candidate for candidate, _ in ranked[: self.settings.PREFETCH_MAX_QUERIES]]
Triggering prefetch
After Neo4j returns a result, the orchestrator hands it to trigger_prefetch along with overlap candidates and an executor closure. The function builds three kinds of prefetch candidates:
- Top-K successors learned from the Markov model.
- Every overlap signature key that the current query produced — these are very cheap to warm because they hash to the same Redis slot.
- A neighbour-expansion injection for the first
PREFETCH_DEPTHnode ids found in the result rows.
It then launches up to K of these as background asyncio tasks. Each task re-enters the same GraphCache pipeline with prefetch=True, so the prefetched result still goes through admission and TTL.
async def trigger_prefetch(self, session_id, query, params, result, overlap_candidates, executor):
pattern = self.observe(session_id, query, params)
candidates = []
for suffix in self.likely_successors(pattern):
candidates.append((suffix, {}))
candidates.extend((cq, cp) for _, cq, cp in overlap_candidates)
node_ids = self._extract_node_ids(result)
for node_id in node_ids[: self.settings.PREFETCH_DEPTH]:
candidates.append((
"MATCH (n)-[]->(m) WHERE n.id = $node_id "
"RETURN m.id AS neighbor_id LIMIT 25",
{"node_id": node_id},
))
launched = 0
for candidate_query, candidate_params in candidates:
if launched >= self.settings.PREFETCH_MAX_QUERIES:
break
PREFETCH_ATTEMPTS.inc()
asyncio.create_task(self._run_prefetch(candidate_query, candidate_params, executor))
launched += 1
return {"prefetch_triggered": launched > 0, "prefetch_candidates": launched}
The neighbour-expansion injection
This is the simplest possible learned heuristic: if the result contains node ids, prefetch their immediate one-hop neighbours. The query is generic and label-free, so it works regardless of the original query's labels. It is also cheap: LIMIT 25 bounds the per-prefetch cost, and it only fires for the first PREFETCH_DEPTH ids (default 1).
Aggressive prefetching can saturate Neo4j worse than the workload it tries to help. Three caps stop that: (1) PREFETCH_MAX_QUERIES caps the per-request fan-out, (2) PREFETCH_DEPTH caps the neighbour injection, and (3) the prefetcher only fires once per request, not recursively (the recursive case is gated by the prefetch=True flag in _compute_and_store, which short-circuits the prefetch step).
Hit and waste accounting
Every prefetched key is tracked in the prefetched_keys set. When the orchestrator later sees a cache hit for one of these keys it calls note_consumed_key, which increments PREFETCH_HITS and removes the key. Keys that are never consumed accumulate as waste, which the operator can drain through flush_waste to update PREFETCH_WASTE.
async def note_consumed_key(self, cache_key):
if cache_key in self.prefetched_keys:
PREFETCH_HITS.inc()
self.prefetched_keys.discard(cache_key)
def flush_waste(self) -> None:
waste = len(self.prefetched_keys)
if waste:
PREFETCH_WASTE.inc(waste)
self.prefetched_keys.clear()
Extracting node ids from a result
This is the sketchiest part of the module: it walks the result rows and pulls out anything that looks like a node id. Two cases are handled — a value that is a dict with an id field, and a value that is itself an integer.
def _extract_node_ids(self, result) -> list[int]:
node_ids = []
for row in result:
for value in row.values():
if isinstance(value, dict) and "id" in value and isinstance(value["id"], int):
node_ids.append(value["id"])
elif isinstance(value, int):
node_ids.append(value)
return node_ids
This intentionally errs on the permissive side: any integer in the result might be a node id. Worst case the prefetched query returns nothing useful, which costs one bounded Neo4j round-trip and a Redis write. That cost is what the PREFETCH_WASTE counter measures.
Behaviour on the two benchmarks
From the per-run CSV in this repository:
| Workload | baseline | isolated_adaptive_prefetch | change |
|---|---|---|---|
| LDBC SF1 · qps | 56.77 | ~58 | small + |
| LDBC SF1 · P95 | 242.40 ms | ~240 ms | flat |
| SSCA · qps | 48.76 | ~202 | +4× |
| SSCA · P95 | 167.22 ms | ~16 ms | −90% |
The asymmetry is by construction: the SSCA-inspired workload sweeps a fixed kernel set in order, so its sessions are highly predictable and the Markov model wins big. The LDBC interactive mix is broader, so the model has less signal to work with.