From beac5d99547121ca04ff069dbeaac7619fc741a8 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Fri, 24 Apr 2026 12:46:31 +0500 Subject: [PATCH 1/8] refactor(mcp): replace eager _kg with lazy per-path cache (#1136) Swap the module-level KnowledgeGraph singleton for a lazy, per-path cache keyed by the resolved sqlite path. Import no longer creates a sqlite file as a side effect, and MCP servers started with --palace now route KG calls to the correct tenant when MEMPALACE_PALACE_PATH changes between calls, matching the per-call behavior of _get_client() on the ChromaDB side. Default-path behavior is preserved: without --palace at startup, KG stays on DEFAULT_KG_PATH regardless of env var. The "no --palace but env var set" case is #540's scope and is not changed here. --- mempalace/mcp_server.py | 52 +++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index e3e89c6..eae048b 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -46,6 +46,7 @@ import argparse # noqa: E402 (deferred until after stdio protection above) import json # noqa: E402 import logging # noqa: E402 import hashlib # noqa: E402 +import threading # noqa: E402 import time # noqa: E402 from datetime import date, datetime # noqa: E402 from pathlib import Path # noqa: E402 @@ -78,7 +79,7 @@ from .palace_graph import ( # noqa: E402 follow_tunnels, ) -from .knowledge_graph import KnowledgeGraph # noqa: E402 +from .knowledge_graph import KnowledgeGraph, DEFAULT_KG_PATH # noqa: E402 logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stderr) logger = logging.getLogger("mempalace_mcp") @@ -103,12 +104,39 @@ if _args.palace: os.environ["MEMPALACE_PALACE_PATH"] = os.path.abspath(_args.palace) _config = MempalaceConfig() -# Only override KG path when --palace is explicitly provided; otherwise use -# KnowledgeGraph's default (~/.mempalace/knowledge_graph.sqlite3). -if _args.palace: - _kg = KnowledgeGraph(db_path=os.path.join(_config.palace_path, "knowledge_graph.sqlite3")) -else: - _kg = KnowledgeGraph() + +# Lazy per-path KG cache. Import no longer creates the sqlite file as a side +# effect (see issue #1136). The path is resolved on each tool call so that a +# multi-tenant host rotating MEMPALACE_PALACE_PATH between calls routes each +# call to the correct KG file, matching the per-call behavior of _get_client() +# on the ChromaDB side. +_kg_by_path: dict[str, KnowledgeGraph] = {} +_kg_cache_lock = threading.Lock() + +# Whether --palace was given at startup. Controls default-path resolution: +# with the flag, KG follows _config.palace_path per call; without it, KG stays +# on DEFAULT_KG_PATH regardless of env var (issue #540's territory, out of +# scope here). +_palace_flag_given: bool = bool(_args.palace) + + +def _resolve_kg_path() -> str: + if _palace_flag_given: + return os.path.join(_config.palace_path, "knowledge_graph.sqlite3") + return DEFAULT_KG_PATH + + +def _get_kg() -> KnowledgeGraph: + path = os.path.abspath(_resolve_kg_path()) + kg = _kg_by_path.get(path) + if kg is not None: + return kg + with _kg_cache_lock: + kg = _kg_by_path.get(path) + if kg is None: + kg = KnowledgeGraph(db_path=path) + _kg_by_path[path] = kg + return kg _client_cache = None @@ -1063,7 +1091,7 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"): return {"error": str(e)} if direction not in ("outgoing", "incoming", "both"): return {"error": "direction must be 'outgoing', 'incoming', or 'both'"} - results = _kg.query_entity(entity, as_of=as_of, direction=direction) + results = _get_kg().query_entity(entity, as_of=as_of, direction=direction) return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)} @@ -1108,7 +1136,7 @@ def tool_kg_add( "source_drawer_id": source_drawer_id, }, ) - triple_id = _kg.add_triple( + triple_id = _get_kg().add_triple( subject, predicate, object, @@ -1147,7 +1175,7 @@ def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = N "ended": resolved_ended, }, ) - _kg.invalidate(subject, predicate, object, ended=resolved_ended) + _get_kg().invalidate(subject, predicate, object, ended=resolved_ended) return { "success": True, "fact": f"{subject} → {predicate} → {object}", @@ -1162,13 +1190,13 @@ def tool_kg_timeline(entity: str = None): entity = sanitize_kg_value(entity, "entity") except ValueError as e: return {"error": str(e)} - results = _kg.timeline(entity) + results = _get_kg().timeline(entity) return {"entity": entity or "all", "timeline": results, "count": len(results)} def tool_kg_stats(): """Knowledge graph overview: entities, triples, relationship types.""" - return _kg.stats() + return _get_kg().stats() # ==================== AGENT DIARY ==================== From 9e730098e97c173476b9949a5eca2253e56a08a5 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Fri, 24 Apr 2026 12:48:00 +0500 Subject: [PATCH 2/8] test(mcp): migrate _kg monkeypatches to _get_kg (#1136) Direct module-attribute patching of _kg is obsolete after the lazy cache refactor. Switch test helpers to patch _get_kg instead so the fixture KG replaces the factory rather than a now-missing singleton. - tests/test_mcp_server.py: _patch_mcp_server helper - tests/benchmarks/test_mcp_bench.py: _patch_mcp_config helper - tests/benchmarks/test_memory_profile.py: inline patch in test_tool_status_repeated_calls --- tests/benchmarks/test_mcp_bench.py | 3 ++- tests/benchmarks/test_memory_profile.py | 3 ++- tests/test_mcp_server.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/benchmarks/test_mcp_bench.py b/tests/benchmarks/test_mcp_bench.py index 4e8330b..42e73ec 100644 --- a/tests/benchmarks/test_mcp_bench.py +++ b/tests/benchmarks/test_mcp_bench.py @@ -40,8 +40,9 @@ def _patch_mcp_config(monkeypatch, palace_path, tmp_path): import mempalace.mcp_server as mcp_mod + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) monkeypatch.setattr(mcp_mod, "_config", cfg) - monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))) + monkeypatch.setattr(mcp_mod, "_get_kg", lambda: kg) def _get_rss_mb(): diff --git a/tests/benchmarks/test_memory_profile.py b/tests/benchmarks/test_memory_profile.py index b299b2d..047bfaa 100644 --- a/tests/benchmarks/test_memory_profile.py +++ b/tests/benchmarks/test_memory_profile.py @@ -84,8 +84,9 @@ class TestToolStatusMemoryProfile: cfg = MempalaceConfig(config_dir=str(tmp_path / "cfg")) monkeypatch.setattr(cfg, "_file_config", {"palace_path": palace_path}) + kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")) monkeypatch.setattr(mcp_mod, "_config", cfg) - monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))) + monkeypatch.setattr(mcp_mod, "_get_kg", lambda: kg) from mempalace.mcp_server import tool_status diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index ec9562b..2ab2eb8 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -18,7 +18,7 @@ def _patch_mcp_server(monkeypatch, config, kg): from mempalace import mcp_server monkeypatch.setattr(mcp_server, "_config", config) - monkeypatch.setattr(mcp_server, "_kg", kg) + monkeypatch.setattr(mcp_server, "_get_kg", lambda: kg) def _get_collection(palace_path, create=False): From c69a622a18db0a082bce8729d35c1d6a8d98efdf Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Fri, 24 Apr 2026 12:53:19 +0500 Subject: [PATCH 3/8] test(mcp): add multi-tenant and lazy-init tests for KG (#1136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TestKGLazyCache covers the scenarios behind the lazy per-path refactor: - test_lazy_init_no_import_side_effect: a fresh subprocess import does not create ~/.mempalace/knowledge_graph.sqlite3 (what closed PR #167 was aiming at). - test_get_kg_returns_same_instance: two _get_kg() calls under the same resolved path return the same object, cache has one entry. - test_get_kg_different_paths_different_instances: rotating env var produces distinct KGs. - test_multi_tenant_env_switch: the exact scenario from #1136 — write under path A, query under path B returns empty, switching back to A sees the fact. - test_cache_thread_safe: 16 threads racing _get_kg() end up with one shared instance and one cache entry. --- tests/test_mcp_server.py | 112 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 2ab2eb8..86d5878 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -8,6 +8,7 @@ via monkeypatch to avoid touching real data. from datetime import datetime import json +import os import sys import pytest @@ -1143,3 +1144,114 @@ class TestCacheInvalidation: for kwargs in captured["get"]: assert "embedding_function" in kwargs assert kwargs["embedding_function"] is not None + + +class TestKGLazyCache: + """Lazy per-path KnowledgeGraph cache (issue #1136).""" + + def test_lazy_init_no_import_side_effect(self, tmp_path): + """Importing mcp_server must not create knowledge_graph.sqlite3. + + Runs in a fresh subprocess with HOME pointed at tmp_path so the + assertion targets a clean filesystem, independent of conftest's + session-level HOME patch. + """ + import subprocess + import sys + + kg_file = tmp_path / ".mempalace" / "knowledge_graph.sqlite3" + result = subprocess.run( + [sys.executable, "-c", "import mempalace.mcp_server"], + env={ + "HOME": str(tmp_path), + "USERPROFILE": str(tmp_path), + "PATH": os.environ.get("PATH", ""), + "PYTHONPATH": os.environ.get("PYTHONPATH", ""), + }, + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0, f"import failed: {result.stderr}" + assert not kg_file.exists(), f"import created sqlite file at {kg_file} as a side effect" + + def test_get_kg_returns_same_instance(self, tmp_path, monkeypatch): + """Two calls with the same resolved path return the same KG.""" + from mempalace import mcp_server + + monkeypatch.setattr(mcp_server, "_kg_by_path", {}) + monkeypatch.setattr(mcp_server, "_palace_flag_given", True) + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_path)) + + kg1 = mcp_server._get_kg() + kg2 = mcp_server._get_kg() + assert kg1 is kg2 + assert len(mcp_server._kg_by_path) == 1 + + def test_get_kg_different_paths_different_instances(self, tmp_path, monkeypatch): + """Different palace paths map to different KG instances.""" + from mempalace import mcp_server + + tmp_a = tmp_path / "a" + tmp_b = tmp_path / "b" + tmp_a.mkdir() + tmp_b.mkdir() + + monkeypatch.setattr(mcp_server, "_kg_by_path", {}) + monkeypatch.setattr(mcp_server, "_palace_flag_given", True) + + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_a)) + kg_a = mcp_server._get_kg() + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_b)) + kg_b = mcp_server._get_kg() + + assert kg_a is not kg_b + assert len(mcp_server._kg_by_path) == 2 + + def test_multi_tenant_env_switch(self, tmp_path, monkeypatch): + """The issue #1136 acceptance scenario. + + Rotating MEMPALACE_PALACE_PATH between MCP tool calls must route + each call to the correct tenant's KG sqlite file. + """ + from mempalace import mcp_server + + tmp_a = tmp_path / "tenant_a" + tmp_b = tmp_path / "tenant_b" + tmp_a.mkdir() + tmp_b.mkdir() + + monkeypatch.setattr(mcp_server, "_kg_by_path", {}) + monkeypatch.setattr(mcp_server, "_palace_flag_given", True) + + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_a)) + add_result = mcp_server.tool_kg_add( + subject="alice_secret", + predicate="owns", + object="repo_a", + ) + assert add_result.get("success") is True, add_result + + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_b)) + query_b = mcp_server.tool_kg_query(entity="alice_secret") + assert query_b.get("count", 0) == 0, f"tenant B leaked tenant A's fact: {query_b}" + + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_a)) + query_a = mcp_server.tool_kg_query(entity="alice_secret") + assert query_a.get("count", 0) >= 1, f"tenant A lost its own fact: {query_a}" + + def test_cache_thread_safe(self, tmp_path, monkeypatch): + """Concurrent _get_kg() for the same path yields one instance.""" + import concurrent.futures + from mempalace import mcp_server + + monkeypatch.setattr(mcp_server, "_kg_by_path", {}) + monkeypatch.setattr(mcp_server, "_palace_flag_given", True) + monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_path)) + + with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool: + results = list(pool.map(lambda _: mcp_server._get_kg(), range(16))) + + ids = {id(kg) for kg in results} + assert len(ids) == 1, f"expected 1 unique instance, got {len(ids)}" + assert len(mcp_server._kg_by_path) == 1 From 84f9726a39e65be3f8afba0d14d4183999520c0d Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Fri, 24 Apr 2026 13:03:12 +0500 Subject: [PATCH 4/8] test(mcp): fix Windows subprocess env in KG lazy-init test Passing a stripped env dict without SYSTEMROOT/WINDIR breaks Python bootstrap on Windows (_Py_HashRandomization_Init). Inherit the parent env and strip MEMPAL* vars instead, then override HOME/USERPROFILE to the tmp dir. --- tests/test_mcp_server.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 86d5878..638ac15 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1160,14 +1160,12 @@ class TestKGLazyCache: import sys kg_file = tmp_path / ".mempalace" / "knowledge_graph.sqlite3" + env = {k: v for k, v in os.environ.items() if not k.startswith("MEMPAL")} + env["HOME"] = str(tmp_path) + env["USERPROFILE"] = str(tmp_path) result = subprocess.run( [sys.executable, "-c", "import mempalace.mcp_server"], - env={ - "HOME": str(tmp_path), - "USERPROFILE": str(tmp_path), - "PATH": os.environ.get("PATH", ""), - "PYTHONPATH": os.environ.get("PYTHONPATH", ""), - }, + env=env, capture_output=True, text=True, timeout=30, From 19f8a4ff682fe5fdad71c6c5d31a45fac6ce2310 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Fri, 24 Apr 2026 13:07:34 +0500 Subject: [PATCH 5/8] style(mcp): drop issue-tracker comments from KG cache block Inline comments referencing #1136 and #540 add no information the identifiers do not already convey. PR description carries the context; code stays quiet. --- mempalace/mcp_server.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index eae048b..5f74df6 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -105,18 +105,8 @@ if _args.palace: _config = MempalaceConfig() -# Lazy per-path KG cache. Import no longer creates the sqlite file as a side -# effect (see issue #1136). The path is resolved on each tool call so that a -# multi-tenant host rotating MEMPALACE_PALACE_PATH between calls routes each -# call to the correct KG file, matching the per-call behavior of _get_client() -# on the ChromaDB side. _kg_by_path: dict[str, KnowledgeGraph] = {} _kg_cache_lock = threading.Lock() - -# Whether --palace was given at startup. Controls default-path resolution: -# with the flag, KG follows _config.palace_path per call; without it, KG stays -# on DEFAULT_KG_PATH regardless of env var (issue #540's territory, out of -# scope here). _palace_flag_given: bool = bool(_args.palace) From 0a626580513c09bd3a9f7719e53dc3e4810463f5 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Sat, 2 May 2026 18:00:36 +0500 Subject: [PATCH 6/8] fix(mcp): drain KG cache on tool_reconnect tool_reconnect cleared ChromaDB caches but left _kg_by_path entries intact. After an external replacement of knowledge_graph.sqlite3 the server kept serving the old open sqlite3.Connection, returning stale results. Now iterate _kg_by_path under _kg_cache_lock, call close() best-effort, and clear the dict so the next tool call reopens the KG from disk. Two new tests in TestKGLazyCache verify cache invalidation and that a failing close() does not block the clear. --- mempalace/mcp_server.py | 14 ++++++++++++-- tests/test_mcp_server.py | 42 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 5f74df6..3b3b4e2 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -1418,10 +1418,11 @@ def tool_memories_filed_away(): def tool_reconnect(): - """Force the MCP server to drop the cached ChromaDB collection and reconnect. + """Force the MCP server to drop cached ChromaDB + KnowledgeGraph state. Use after external scripts or CLI commands modify the palace database - directly, which can leave the in-memory HNSW index stale. + or replace ``knowledge_graph.sqlite3`` directly, which can leave the + in-memory HNSW index stale or pin a closed-on-disk SQLite connection. """ global \ _client_cache, \ @@ -1439,6 +1440,15 @@ def tool_reconnect(): # still applies after the reconnect. _vector_disabled = False _vector_disabled_reason = "" + # Drain the per-path KnowledgeGraph cache so a replaced sqlite file is + # reopened on the next tool call rather than served from a stale handle. + with _kg_cache_lock: + for kg in _kg_by_path.values(): + try: + kg.close() + except Exception: + pass + _kg_by_path.clear() try: col = _get_collection() if col is None: diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 638ac15..092b707 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1253,3 +1253,45 @@ class TestKGLazyCache: ids = {id(kg) for kg in results} assert len(ids) == 1, f"expected 1 unique instance, got {len(ids)}" assert len(mcp_server._kg_by_path) == 1 + + def test_tool_reconnect_drains_kg_cache(self, monkeypatch): + """``tool_reconnect`` must close cached KG instances and clear the dict. + + Without this, an external replacement of ``knowledge_graph.sqlite3`` + leaves the server pinned to a stale ``sqlite3.Connection``. + """ + from mempalace import mcp_server + + class _FakeKG: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + fake_a = _FakeKG() + fake_b = _FakeKG() + monkeypatch.setattr(mcp_server, "_kg_by_path", {"/a": fake_a, "/b": fake_b}) + # Bypass real ChromaDB so the test isolates KG-cache behaviour. + monkeypatch.setattr(mcp_server, "_get_collection", lambda: None) + + mcp_server.tool_reconnect() + + assert fake_a.closed is True + assert fake_b.closed is True + assert mcp_server._kg_by_path == {} + + def test_tool_reconnect_swallows_kg_close_errors(self, monkeypatch): + """A failing ``close()`` on one cached KG must not block cache clearing.""" + from mempalace import mcp_server + + class _BoomKG: + def close(self): + raise RuntimeError("boom") + + monkeypatch.setattr(mcp_server, "_kg_by_path", {"/a": _BoomKG()}) + monkeypatch.setattr(mcp_server, "_get_collection", lambda: None) + + mcp_server.tool_reconnect() + + assert mcp_server._kg_by_path == {} From b8816e0fe2fa857efb984e0aff9e52739fb2af34 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Sun, 3 May 2026 21:43:51 +0500 Subject: [PATCH 7/8] fix(mcp): retry KG handlers once on concurrent close race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Race scenario: a KG tool handler calls _get_kg() and gets a live KnowledgeGraph; another thread fires tool_reconnect() between that return and the handler's kg.add_triple()/kg.query_entity()/etc call. tool_reconnect drains _kg_by_path and closes the underlying sqlite3.Connection; the handler then raises sqlite3.ProgrammingError: 'Cannot operate on a closed database', which surfaces as a -32000 to the MCP client even though the user just asked for a reconnect. New _call_kg(op) helper wraps each handler's kg call in a one-shot retry: catch exactly sqlite3.ProgrammingError, evict the stale entry (only if the cache slot still points at the closed instance — another thread may have already replaced it), and rerun op against a fresh _get_kg(). Beyond one retry give up so a sustained close-stream surfaces clearly instead of looping. All five KG handlers (tool_kg_query, tool_kg_add, tool_kg_invalidate, tool_kg_timeline, tool_kg_stats) now route through _call_kg. Tests pin the contract: * retries with a fresh KG and returns the second result * non-ProgrammingError exceptions propagate without retry * gives up after exactly one retry on sustained close --- mempalace/mcp_server.py | 61 ++++++++++++++++++++++++------- tests/test_mcp_server.py | 77 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 13 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 3b3b4e2..c67619e 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -46,6 +46,7 @@ import argparse # noqa: E402 (deferred until after stdio protection above) import json # noqa: E402 import logging # noqa: E402 import hashlib # noqa: E402 +import sqlite3 # noqa: E402 import threading # noqa: E402 import time # noqa: E402 from datetime import date, datetime # noqa: E402 @@ -129,6 +130,38 @@ def _get_kg() -> KnowledgeGraph: return kg +def _call_kg(op): + """Run ``op(kg)`` against the cached KG with one-shot retry on close. + + Race we're guarding against: a handler grabs ``kg = _get_kg()`` and is + about to call ``kg.add_triple(...)`` when ``tool_reconnect`` fires on + another thread, drains ``_kg_by_path``, and closes the underlying + sqlite3.Connection. The handler's call then raises + ``sqlite3.ProgrammingError: Cannot operate on a closed database`` and + bubbles up as a -32000 to the MCP client even though the user just + asked for a reconnect. + + Catch that single class of error, evict the stale entry from the + cache (only if it still points at the closed instance — another + thread may have already replaced it), and try once more with a fresh + KG. Beyond one retry give up: a second close means we're losing a + sustained race we won't win in this loop, and a hung loop is worse + than a clear failure surface. + """ + for attempt in range(2): + kg = _get_kg() + try: + return op(kg) + except sqlite3.ProgrammingError: + if attempt == 0: + path = os.path.abspath(_resolve_kg_path()) + with _kg_cache_lock: + if _kg_by_path.get(path) is kg: + _kg_by_path.pop(path, None) + continue + raise + + _client_cache = None _collection_cache = None _palace_db_inode = 0 # inode of chroma.sqlite3 at cache time @@ -1081,7 +1114,7 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"): return {"error": str(e)} if direction not in ("outgoing", "incoming", "both"): return {"error": "direction must be 'outgoing', 'incoming', or 'both'"} - results = _get_kg().query_entity(entity, as_of=as_of, direction=direction) + results = _call_kg(lambda kg: kg.query_entity(entity, as_of=as_of, direction=direction)) return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)} @@ -1126,15 +1159,17 @@ def tool_kg_add( "source_drawer_id": source_drawer_id, }, ) - triple_id = _get_kg().add_triple( - subject, - predicate, - object, - valid_from=valid_from, - valid_to=valid_to, - source_closet=source_closet, - source_file=source_file, - source_drawer_id=source_drawer_id, + triple_id = _call_kg( + lambda kg: kg.add_triple( + subject, + predicate, + object, + valid_from=valid_from, + valid_to=valid_to, + source_closet=source_closet, + source_file=source_file, + source_drawer_id=source_drawer_id, + ) ) return {"success": True, "triple_id": triple_id, "fact": f"{subject} → {predicate} → {object}"} @@ -1165,7 +1200,7 @@ def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = N "ended": resolved_ended, }, ) - _get_kg().invalidate(subject, predicate, object, ended=resolved_ended) + _call_kg(lambda kg: kg.invalidate(subject, predicate, object, ended=resolved_ended)) return { "success": True, "fact": f"{subject} → {predicate} → {object}", @@ -1180,13 +1215,13 @@ def tool_kg_timeline(entity: str = None): entity = sanitize_kg_value(entity, "entity") except ValueError as e: return {"error": str(e)} - results = _get_kg().timeline(entity) + results = _call_kg(lambda kg: kg.timeline(entity)) return {"entity": entity or "all", "timeline": results, "count": len(results)} def tool_kg_stats(): """Knowledge graph overview: entities, triples, relationship types.""" - return _get_kg().stats() + return _call_kg(lambda kg: kg.stats()) # ==================== AGENT DIARY ==================== diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 092b707..e365afc 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1295,3 +1295,80 @@ class TestKGLazyCache: mcp_server.tool_reconnect() assert mcp_server._kg_by_path == {} + + def test_call_kg_retries_after_concurrent_close(self, monkeypatch): + """A KG closed mid-handler must trigger a one-shot retry with a fresh + instance — not surface a -32000 to the MCP client.""" + import sqlite3 as _sqlite3 + + from mempalace import mcp_server + + path = "/fake/palace/knowledge_graph.sqlite3" + monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path) + + class _ClosedKG: + def query_entity(self, entity, **kwargs): + raise _sqlite3.ProgrammingError("Cannot operate on a closed database") + + class _FreshKG: + def query_entity(self, entity, **kwargs): + return [{"entity": entity}] + + cache = {os.path.abspath(path): _ClosedKG()} + monkeypatch.setattr(mcp_server, "_kg_by_path", cache) + + # Second _get_kg() call (after the cache eviction) constructs a new + # KG. Patch the constructor so we don't open a real sqlite file. + monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FreshKG()) + + result = mcp_server._call_kg(lambda kg: kg.query_entity("Alice")) + assert result == [{"entity": "Alice"}] + # The closed instance must be evicted; the fresh one must be cached. + assert isinstance(cache[os.path.abspath(path)], _FreshKG) + + def test_call_kg_does_not_retry_on_other_errors(self, monkeypatch): + """Non-ProgrammingError exceptions must propagate without retry — + we don't want the retry guard masking real bugs.""" + from mempalace import mcp_server + + path = "/fake/palace/knowledge_graph.sqlite3" + monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path) + + calls = {"count": 0} + + class _FailingKG: + def query_entity(self, entity, **kwargs): + calls["count"] += 1 + raise ValueError("bad input") + + monkeypatch.setattr(mcp_server, "_kg_by_path", {os.path.abspath(path): _FailingKG()}) + monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FailingKG()) + + with pytest.raises(ValueError, match="bad input"): + mcp_server._call_kg(lambda kg: kg.query_entity("Alice")) + assert calls["count"] == 1, "non-ProgrammingError must not trigger retry" + + def test_call_kg_gives_up_after_one_retry(self, monkeypatch): + """If the second attempt also hits a closed DB, give up rather than + loop forever — a sustained close-stream is a different bug.""" + import sqlite3 as _sqlite3 + + from mempalace import mcp_server + + path = "/fake/palace/knowledge_graph.sqlite3" + monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path) + + calls = {"count": 0} + + class _AlwaysClosedKG: + def query_entity(self, entity, **kwargs): + calls["count"] += 1 + raise _sqlite3.ProgrammingError("closed again") + + cache = {} + monkeypatch.setattr(mcp_server, "_kg_by_path", cache) + monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _AlwaysClosedKG()) + + with pytest.raises(_sqlite3.ProgrammingError): + mcp_server._call_kg(lambda kg: kg.query_entity("Alice")) + assert calls["count"] == 2, "expected exactly one retry beyond the initial attempt" From 75ad8ae7819ee9eebdf635cc0c3e969b2f9bc73b Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Sun, 3 May 2026 22:04:22 +0500 Subject: [PATCH 8/8] ci: retrigger linux 3.13 (transient onnx download flake)