Architecture
From a single FastAPI endpoint to a five-stage pipeline that decides how each Cypher query should be served.
Entry point
Everything starts in main.py. The app exposes three endpoints: POST /query for Cypher requests, GET /health for liveness, and GET /metrics for Prometheus. A FastAPI lifespan hook wires up Redis and Neo4j once at startup, attaches a single GraphCache instance to app.state, and tears them down on exit.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from redis.asyncio import from_url as redis_from_url
from middleware.cache import GraphCache
from middleware.neo4j_client import Neo4jClient
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = redis_from_url(settings.REDIS_URL, decode_responses=False)
neo4j_client = Neo4jClient(settings)
cache = GraphCache(neo4j_client, redis, settings)
app.state.redis = redis
app.state.neo4j = neo4j_client
app.state.cache = cache
try:
yield
finally:
await neo4j_client.close()
await redis.close()
app = FastAPI(title=settings.APP_NAME, lifespan=lifespan)
@app.post("/query")
async def query_endpoint(request: QueryRequest) -> dict:
result, metadata = await app.state.cache.get_or_compute(
request.query, request.params, session_id=request.session_id,
)
return {"result": result, "metadata": metadata}
Notice that decode_responses=False is set on the Redis client. This matters: the single-flight lock in the jitter module compares a UUID token in bytes, and a previous bug where the comparison was against a decoded string caused 1.2-second tail spikes in benchmark runs.
Components
The middleware is built from four kinds of objects: a Neo4j async client, a Redis async client, the orchestration class GraphCache, and five pluggable optimizer modules.
The orchestrator
The constructor of GraphCache instantiates each optimizer based on its feature flag, or sets the entry to None if the flag is off. The pipeline downstream then guards every optimizer behind an if optimizer is not None check, so toggling a flag at runtime means a code path simply skips.
class GraphCache:
def __init__(self, neo4j, redis, settings: Settings):
self.neo4j = neo4j
self.redis = redis
self.settings = settings
self.optimizers = {
"jitter": JitterStampede(redis, settings) if settings.JITTER_STAMPEDE else None,
"frequency": FrequencyAwareCache(redis, settings) if settings.FREQUENCY_AWARE else None,
"prefetch": AdaptivePrefetch(redis, settings) if settings.ADAPTIVE_PREFETCH else None,
"overlap": OverlappingSubqueryCache(redis, settings) if settings.OVERLAPPING_SUBQUERIES else None,
"bfs": ExternalBFS(redis, neo4j, settings) if settings.EXTERNAL_BFS else None,
}
Request lifecycle
This is the full path a single POST /query takes. Each step is implemented in get_or_compute or one of the helpers it calls.
normalize_query(query, params) collapses whitespace and SHA-256s a sorted JSON of params. build_cache_key prefixes the result with kgcache:.jitter.protect() tries a Redis SET NX. If another request already holds the lock, this request polls for either the lock to disappear or the key to materialise (single-flight reuse).neo4j_client.execute_cypher runs the query and the compute latency is timed.frequency.should_admit applies the threshold and cost rule; frequency.ttl_for returns the bounded sub-linear TTL. The result is stored under both the primary key and every overlap signature key.asyncio tasks to warm likely successors and one-hop neighbours.Metadata returned with every response
Every response carries a metadata object that lets the benchmark harness attribute behaviour. It records the cache status (hit, miss, single-flight-reuse, hit-after-lock), the source (redis, neo4j, subquery_cache, external_bfs), the assigned TTL, the lock-wait latency, whether early refresh fired, and which optimizers were applied.
return {
"cache_status": "miss",
"cache_key": cache_key,
"ttl_seconds": 0,
"source": "unknown",
"optimizations_applied": [
name for name, optimizer in self.optimizers.items() if optimizer is not None
],
"admitted_to_cache": False,
"lock_wait_ms": 0.0,
"prefetch_triggered": False,
"subquery_reuse": False,
"bfs_override": False,
"early_refresh_triggered": False,
"latency_ms": 0.0,
}
The benchmark CSV reports per-optimization counters such as subquery_reuse_count, prefetch_hits_total, and stampede_events_total. These let the discussion in Chapter 4 of the thesis attribute every change in throughput or P95 to a specific module rather than guessing.
Payload encoding
Cache payloads are serialised as JSON and optionally compressed with zstd if both ENABLE_ZSTD is on and the payload exceeds CACHE_COMPRESSION_MIN_BYTES. The compressed payloads are tagged with a zstd: prefix so the decoder can detect them.
def _encode_payload(self, result):
raw = json.dumps(result, default=str).encode("utf-8")
if (
self.settings.ENABLE_ZSTD
and zstd is not None
and len(raw) >= self.settings.CACHE_COMPRESSION_MIN_BYTES
):
compressor = zstd.ZstdCompressor()
return b"zstd:" + compressor.compress(raw)
return raw
def _decode_payload(self, payload):
if isinstance(payload, str):
payload = payload.encode("utf-8")
if payload.startswith(b"zstd:") and zstd is not None:
decompressor = zstd.ZstdDecompressor()
payload = decompressor.decompress(payload[5:])
return json.loads(payload.decode("utf-8"))