Merge pull request #1160 from mvalentsev/fix/mcp-kg-lazy-per-path-cache

fix(mcp): lazy per-path KnowledgeGraph cache (#1136)
This commit is contained in:
Igor Lins e Silva
2026-05-06 01:33:47 -03:00
committed by GitHub
4 changed files with 319 additions and 25 deletions
+85 -22
View File
@@ -46,6 +46,8 @@ import argparse # noqa: E402 (deferred until after stdio protection above)
import json # noqa: E402
import logging # noqa: E402
import hashlib # noqa: E402
import sqlite3 # noqa: E402
import threading # noqa: E402
import time # noqa: E402
from datetime import date, datetime # noqa: E402
from pathlib import Path # noqa: E402
@@ -79,7 +81,7 @@ from .palace_graph import ( # noqa: E402
follow_tunnels,
)
from .knowledge_graph import KnowledgeGraph # noqa: E402
from .knowledge_graph import KnowledgeGraph, DEFAULT_KG_PATH # noqa: E402
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stderr)
logger = logging.getLogger("mempalace_mcp")
@@ -104,12 +106,61 @@ if _args.palace:
os.environ["MEMPALACE_PALACE_PATH"] = os.path.abspath(_args.palace)
_config = MempalaceConfig()
# Only override KG path when --palace is explicitly provided; otherwise use
# KnowledgeGraph's default (~/.mempalace/knowledge_graph.sqlite3).
if _args.palace:
_kg = KnowledgeGraph(db_path=os.path.join(_config.palace_path, "knowledge_graph.sqlite3"))
else:
_kg = KnowledgeGraph()
_kg_by_path: dict[str, KnowledgeGraph] = {}
_kg_cache_lock = threading.Lock()
_palace_flag_given: bool = bool(_args.palace)
def _resolve_kg_path() -> str:
if _palace_flag_given:
return os.path.join(_config.palace_path, "knowledge_graph.sqlite3")
return DEFAULT_KG_PATH
def _get_kg() -> KnowledgeGraph:
path = os.path.abspath(_resolve_kg_path())
kg = _kg_by_path.get(path)
if kg is not None:
return kg
with _kg_cache_lock:
kg = _kg_by_path.get(path)
if kg is None:
kg = KnowledgeGraph(db_path=path)
_kg_by_path[path] = kg
return kg
def _call_kg(op):
"""Run ``op(kg)`` against the cached KG with one-shot retry on close.
Race we're guarding against: a handler grabs ``kg = _get_kg()`` and is
about to call ``kg.add_triple(...)`` when ``tool_reconnect`` fires on
another thread, drains ``_kg_by_path``, and closes the underlying
sqlite3.Connection. The handler's call then raises
``sqlite3.ProgrammingError: Cannot operate on a closed database`` and
bubbles up as a -32000 to the MCP client even though the user just
asked for a reconnect.
Catch that single class of error, evict the stale entry from the
cache (only if it still points at the closed instance — another
thread may have already replaced it), and try once more with a fresh
KG. Beyond one retry give up: a second close means we're losing a
sustained race we won't win in this loop, and a hung loop is worse
than a clear failure surface.
"""
for attempt in range(2):
kg = _get_kg()
try:
return op(kg)
except sqlite3.ProgrammingError:
if attempt == 0:
path = os.path.abspath(_resolve_kg_path())
with _kg_cache_lock:
if _kg_by_path.get(path) is kg:
_kg_by_path.pop(path, None)
continue
raise
_client_cache = None
@@ -1065,7 +1116,7 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"):
return {"error": str(e)}
if direction not in ("outgoing", "incoming", "both"):
return {"error": "direction must be 'outgoing', 'incoming', or 'both'"}
results = _kg.query_entity(entity, as_of=as_of, direction=direction)
results = _call_kg(lambda kg: kg.query_entity(entity, as_of=as_of, direction=direction))
return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)}
@@ -1111,15 +1162,17 @@ def tool_kg_add(
"source_drawer_id": source_drawer_id,
},
)
triple_id = _kg.add_triple(
subject,
predicate,
object,
valid_from=valid_from,
valid_to=valid_to,
source_closet=source_closet,
source_file=source_file,
source_drawer_id=source_drawer_id,
triple_id = _call_kg(
lambda kg: kg.add_triple(
subject,
predicate,
object,
valid_from=valid_from,
valid_to=valid_to,
source_closet=source_closet,
source_file=source_file,
source_drawer_id=source_drawer_id,
)
)
return {"success": True, "triple_id": triple_id, "fact": f"{subject}{predicate}{object}"}
@@ -1151,7 +1204,7 @@ def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = N
"ended": resolved_ended,
},
)
_kg.invalidate(subject, predicate, object, ended=resolved_ended)
_call_kg(lambda kg: kg.invalidate(subject, predicate, object, ended=resolved_ended))
return {
"success": True,
"fact": f"{subject}{predicate}{object}",
@@ -1166,13 +1219,13 @@ def tool_kg_timeline(entity: str = None):
entity = sanitize_kg_value(entity, "entity")
except ValueError as e:
return {"error": str(e)}
results = _kg.timeline(entity)
results = _call_kg(lambda kg: kg.timeline(entity))
return {"entity": entity or "all", "timeline": results, "count": len(results)}
def tool_kg_stats():
"""Knowledge graph overview: entities, triples, relationship types."""
return _kg.stats()
return _call_kg(lambda kg: kg.stats())
# ==================== AGENT DIARY ====================
@@ -1404,10 +1457,11 @@ def tool_memories_filed_away():
def tool_reconnect():
"""Force the MCP server to drop the cached ChromaDB collection and reconnect.
"""Force the MCP server to drop cached ChromaDB + KnowledgeGraph state.
Use after external scripts or CLI commands modify the palace database
directly, which can leave the in-memory HNSW index stale.
or replace ``knowledge_graph.sqlite3`` directly, which can leave the
in-memory HNSW index stale or pin a closed-on-disk SQLite connection.
"""
global \
_client_cache, \
@@ -1425,6 +1479,15 @@ def tool_reconnect():
# still applies after the reconnect.
_vector_disabled = False
_vector_disabled_reason = ""
# Drain the per-path KnowledgeGraph cache so a replaced sqlite file is
# reopened on the next tool call rather than served from a stale handle.
with _kg_cache_lock:
for kg in _kg_by_path.values():
try:
kg.close()
except Exception:
pass
_kg_by_path.clear()
try:
col = _get_collection()
if col is None: