Architecture

From a single FastAPI endpoint to a five-stage pipeline that decides how each Cypher query should be served.

Entry point

Everything starts in main.py. The app exposes three endpoints: POST /query for Cypher requests, GET /health for liveness, and GET /metrics for Prometheus. A FastAPI lifespan hook wires up Redis and Neo4j once at startup, attaches a single GraphCache instance to app.state, and tears them down on exit.

main.pyPython
from contextlib import asynccontextmanager
from fastapi import FastAPI
from redis.asyncio import from_url as redis_from_url

from middleware.cache import GraphCache
from middleware.neo4j_client import Neo4jClient

@asynccontextmanager
async def lifespan(app: FastAPI):
    redis = redis_from_url(settings.REDIS_URL, decode_responses=False)
    neo4j_client = Neo4jClient(settings)
    cache = GraphCache(neo4j_client, redis, settings)
    app.state.redis = redis
    app.state.neo4j = neo4j_client
    app.state.cache = cache
    try:
        yield
    finally:
        await neo4j_client.close()
        await redis.close()

app = FastAPI(title=settings.APP_NAME, lifespan=lifespan)

@app.post("/query")
async def query_endpoint(request: QueryRequest) -> dict:
    result, metadata = await app.state.cache.get_or_compute(
        request.query, request.params, session_id=request.session_id,
    )
    return {"result": result, "metadata": metadata}

Notice that decode_responses=False is set on the Redis client. This matters: the single-flight lock in the jitter module compares a UUID token in bytes, and a previous bug where the comparison was against a decoded string caused 1.2-second tail spikes in benchmark runs.

Components

The middleware is built from four kinds of objects: a Neo4j async client, a Redis async client, the orchestration class GraphCache, and five pluggable optimizer modules.

Client
FastAPI · POST /query
Orchestrator
GraphCache.get_or_compute
Optimizer
JitterStampede
Optimizer
OverlappingSubqueryCache
Optimizer
FrequencyAwareCache
Optimizer
AdaptivePrefetch
Optimizer
ExternalBFS
Cache
Redis (async)
Backend
Neo4j 5 (Bolt)

The orchestrator

The constructor of GraphCache instantiates each optimizer based on its feature flag, or sets the entry to None if the flag is off. The pipeline downstream then guards every optimizer behind an if optimizer is not None check, so toggling a flag at runtime means a code path simply skips.

middleware/cache.py · GraphCache.__init__Python
class GraphCache:
    def __init__(self, neo4j, redis, settings: Settings):
        self.neo4j = neo4j
        self.redis = redis
        self.settings = settings
        self.optimizers = {
            "jitter": JitterStampede(redis, settings) if settings.JITTER_STAMPEDE else None,
            "frequency": FrequencyAwareCache(redis, settings) if settings.FREQUENCY_AWARE else None,
            "prefetch": AdaptivePrefetch(redis, settings) if settings.ADAPTIVE_PREFETCH else None,
            "overlap": OverlappingSubqueryCache(redis, settings) if settings.OVERLAPPING_SUBQUERIES else None,
            "bfs": ExternalBFS(redis, neo4j, settings) if settings.EXTERNAL_BFS else None,
        }

Request lifecycle

This is the full path a single POST /query takes. Each step is implemented in get_or_compute or one of the helpers it calls.

1
Normalize query and build cache key
normalize_query(query, params) collapses whitespace and SHA-256s a sorted JSON of params. build_cache_key prefixes the result with kgcache:.
2
Probe Redis for the primary key
If a cached payload exists, the request is a hit. The jitter module is consulted for a probabilistic early refresh, the frequency module bumps the access count, and the prefetcher records that this key was consumed.
3
Acquire single-flight lock
On a miss with stampede protection on, jitter.protect() tries a Redis SET NX. If another request already holds the lock, this request polls for either the lock to disappear or the key to materialise (single-flight reuse).
4
External BFS check
If the query matches an unweighted shortest-path pattern and BFS is enabled, the result is computed via NetworkX rather than Neo4j. Disabled in the benchmark matrix.
5
Overlap signature lookup
The overlap module computes canonical and fragment signature keys and probes Redis under each. A hit means a structurally equivalent query was answered earlier and we reuse its result.
6
Execute against Neo4j
If everything above missed, neo4j_client.execute_cypher runs the query and the compute latency is timed.
7
Frequency-aware admission and TTL
frequency.should_admit applies the threshold and cost rule; frequency.ttl_for returns the bounded sub-linear TTL. The result is stored under both the primary key and every overlap signature key.
8
Trigger prefetch
The prefetcher updates its session-level Markov successor map and launches up to four background asyncio tasks to warm likely successors and one-hop neighbours.

Metadata returned with every response

Every response carries a metadata object that lets the benchmark harness attribute behaviour. It records the cache status (hit, miss, single-flight-reuse, hit-after-lock), the source (redis, neo4j, subquery_cache, external_bfs), the assigned TTL, the lock-wait latency, whether early refresh fired, and which optimizers were applied.

Metadata seed (cache.py · _base_metadata)Python
return {
    "cache_status": "miss",
    "cache_key": cache_key,
    "ttl_seconds": 0,
    "source": "unknown",
    "optimizations_applied": [
        name for name, optimizer in self.optimizers.items() if optimizer is not None
    ],
    "admitted_to_cache": False,
    "lock_wait_ms": 0.0,
    "prefetch_triggered": False,
    "subquery_reuse": False,
    "bfs_override": False,
    "early_refresh_triggered": False,
    "latency_ms": 0.0,
}
▶ Why expose all this

The benchmark CSV reports per-optimization counters such as subquery_reuse_count, prefetch_hits_total, and stampede_events_total. These let the discussion in Chapter 4 of the thesis attribute every change in throughput or P95 to a specific module rather than guessing.

Payload encoding

Cache payloads are serialised as JSON and optionally compressed with zstd if both ENABLE_ZSTD is on and the payload exceeds CACHE_COMPRESSION_MIN_BYTES. The compressed payloads are tagged with a zstd: prefix so the decoder can detect them.

cache.py · _encode_payload / _decode_payloadPython
def _encode_payload(self, result):
    raw = json.dumps(result, default=str).encode("utf-8")
    if (
        self.settings.ENABLE_ZSTD
        and zstd is not None
        and len(raw) >= self.settings.CACHE_COMPRESSION_MIN_BYTES
    ):
        compressor = zstd.ZstdCompressor()
        return b"zstd:" + compressor.compress(raw)
    return raw

def _decode_payload(self, payload):
    if isinstance(payload, str):
        payload = payload.encode("utf-8")
    if payload.startswith(b"zstd:") and zstd is not None:
        decompressor = zstd.ZstdDecompressor()
        payload = decompressor.decompress(payload[5:])
    return json.loads(payload.decode("utf-8"))