Files
mempalace/tests/test_mcp_server.py
T
2026-05-07 09:10:00 -03:00

1593 lines
63 KiB
Python

"""
test_mcp_server.py — Tests for the MCP server tool handlers and dispatch.
Tests each tool handler directly (unit-level) and the handle_request
dispatch layer (integration-level). Uses isolated palace + KG fixtures
via monkeypatch to avoid touching real data.
"""
from datetime import datetime
import json
import os
import sys
from unittest.mock import MagicMock
import pytest
def _patch_mcp_server(monkeypatch, config, kg):
"""Patch the mcp_server module globals to use test fixtures."""
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "_config", config)
monkeypatch.setattr(mcp_server, "_get_kg", lambda: kg)
def _get_collection(palace_path, create=False):
"""Helper to get collection from test palace.
Returns (client, collection) so callers can clean up the client
when they are done.
"""
import chromadb
client = chromadb.PersistentClient(path=palace_path)
if create:
return (
client,
client.get_or_create_collection("mempalace_drawers", metadata={"hnsw:space": "cosine"}),
)
return client, client.get_collection("mempalace_drawers")
# ── Protocol Layer ──────────────────────────────────────────────────────
class TestHandleRequest:
def test_initialize(self):
from mempalace.mcp_server import handle_request
resp = handle_request({"method": "initialize", "id": 1, "params": {}})
assert resp["result"]["serverInfo"]["name"] == "mempalace"
assert resp["id"] == 1
def test_initialize_negotiates_client_version(self):
from mempalace.mcp_server import handle_request
resp = handle_request(
{
"method": "initialize",
"id": 1,
"params": {"protocolVersion": "2025-11-25"},
}
)
assert resp["result"]["protocolVersion"] == "2025-11-25"
def test_initialize_negotiates_older_supported_version(self):
from mempalace.mcp_server import handle_request
resp = handle_request(
{
"method": "initialize",
"id": 1,
"params": {"protocolVersion": "2025-03-26"},
}
)
assert resp["result"]["protocolVersion"] == "2025-03-26"
def test_initialize_unknown_version_falls_back_to_latest(self):
from mempalace.mcp_server import handle_request
resp = handle_request(
{
"method": "initialize",
"id": 1,
"params": {"protocolVersion": "9999-12-31"},
}
)
from mempalace.mcp_server import SUPPORTED_PROTOCOL_VERSIONS
assert resp["result"]["protocolVersion"] == SUPPORTED_PROTOCOL_VERSIONS[0]
def test_initialize_missing_version_uses_oldest(self):
from mempalace.mcp_server import handle_request, SUPPORTED_PROTOCOL_VERSIONS
resp = handle_request({"method": "initialize", "id": 1, "params": {}})
assert resp["result"]["protocolVersion"] == SUPPORTED_PROTOCOL_VERSIONS[-1]
def test_notifications_initialized_returns_none(self):
from mempalace.mcp_server import handle_request
resp = handle_request({"method": "notifications/initialized", "id": None, "params": {}})
assert resp is None
def test_ping_returns_empty_result(self):
from mempalace.mcp_server import handle_request
resp = handle_request({"method": "ping", "id": 11, "params": {}})
assert resp["id"] == 11
assert resp["result"] == {}
def test_tools_list(self):
from mempalace.mcp_server import handle_request
resp = handle_request({"method": "tools/list", "id": 2, "params": {}})
tools = resp["result"]["tools"]
names = {t["name"] for t in tools}
assert "mempalace_status" in names
assert "mempalace_search" in names
assert "mempalace_add_drawer" in names
assert "mempalace_kg_add" in names
def test_null_arguments_does_not_hang(self, monkeypatch, config, palace_path, seeded_kg):
"""Sending arguments: null should return a result, not hang (#394)."""
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import handle_request
_client, _col = _get_collection(palace_path, create=True)
del _client
resp = handle_request(
{
"method": "tools/call",
"id": 10,
"params": {"name": "mempalace_status", "arguments": None},
}
)
assert "error" not in resp
assert resp["result"] is not None
def test_unknown_tool(self):
from mempalace.mcp_server import handle_request
resp = handle_request(
{
"method": "tools/call",
"id": 3,
"params": {"name": "nonexistent_tool", "arguments": {}},
}
)
assert resp["error"]["code"] == -32601
def test_tools_call_missing_params(self):
from mempalace.mcp_server import handle_request
for bad_params in [None, {}, {"arguments": {}}]:
resp = handle_request(
{
"method": "tools/call",
"id": 15,
"params": bad_params,
}
)
assert resp["error"]["code"] == -32602
assert "Invalid params" in resp["error"]["message"]
def test_unknown_method(self):
from mempalace.mcp_server import handle_request
resp = handle_request({"method": "unknown/method", "id": 4, "params": {}})
assert resp["error"]["code"] == -32601
def test_any_notification_returns_none(self):
"""All notifications/* methods should return None (no response)."""
from mempalace.mcp_server import handle_request
for method in [
"notifications/initialized",
"notifications/cancelled",
"notifications/progress",
"notifications/roots/list_changed",
]:
resp = handle_request({"method": method, "params": {}})
assert resp is None, f"{method} should return None"
def test_unknown_method_no_id_returns_none(self):
"""Messages without id (notifications) must never get a response."""
from mempalace.mcp_server import handle_request
resp = handle_request({"method": "unknown/thing", "params": {}})
assert resp is None
def test_malformed_method_none(self):
"""method=None or missing should not crash."""
from mempalace.mcp_server import handle_request
# Explicit None
resp = handle_request({"method": None, "params": {}})
assert resp is None # no id → no response
# Missing method entirely
resp = handle_request({"params": {}})
assert resp is None
# method=None with id → should return error, not crash
resp = handle_request({"method": None, "id": 99, "params": {}})
assert resp["error"]["code"] == -32601
@pytest.mark.parametrize("payload", [None, [], "plain", 42, True])
def test_handle_request_invalid_payload_returns_jsonrpc_error(self, payload):
from mempalace.mcp_server import handle_request
resp = handle_request(payload)
assert resp == {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32600, "message": "Invalid Request"},
}
def test_tools_call_dispatches(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import handle_request
# Create a collection so status works
_client, _col = _get_collection(palace_path, create=True)
del _client
resp = handle_request(
{
"method": "tools/call",
"id": 5,
"params": {"name": "mempalace_status", "arguments": {}},
}
)
assert "result" in resp
content = json.loads(resp["result"]["content"][0]["text"])
assert "total_drawers" in content
# ── Read Tools ──────────────────────────────────────────────────────────
class TestReadTools:
def test_status_cold_start_no_collection(self, monkeypatch, config, palace_path, kg):
"""Status on a valid palace with no ChromaDB collection yet (#830).
After `mempalace init`, chroma.sqlite3 exists but the mempalace_drawers
collection has not been created (no mine or add_drawer yet). Status
should return total_drawers: 0, not 'No palace found'.
"""
import chromadb
_patch_mcp_server(monkeypatch, config, kg)
# Create the DB file (init does this) but NOT the collection
client = chromadb.PersistentClient(path=palace_path)
del client
from mempalace.mcp_server import tool_status
result = tool_status()
assert "error" not in result, f"cold-start should not error: {result}"
assert result["total_drawers"] == 0
def test_status_empty_palace(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_status
result = tool_status()
assert result["total_drawers"] == 0
assert result["wings"] == {}
def test_status_with_data(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_status
result = tool_status()
assert result["total_drawers"] == 4
assert "project" in result["wings"]
assert "notes" in result["wings"]
def test_status_handles_none_metadata_without_partial(
self, monkeypatch, config, palace_path, kg
):
"""tool_status must not crash or go partial when the metadata cache
returns a ``None`` entry — palaces can contain drawers with no
metadata (older mining paths, third-party writes). Before the guard,
``m.get("wing")`` raised AttributeError mid-tally and the result
carried ``"error"`` + ``"partial": True`` even though the data was
perfectly fetchable."""
from unittest.mock import patch as _patch
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_status
# Inject a metadata cache where one entry is None
with _patch("mempalace.mcp_server._get_collection") as mock_get_col:
fake_col = type("C", (), {"count": lambda self: 2})()
mock_get_col.return_value = fake_col
with _patch(
"mempalace.mcp_server._get_cached_metadata",
return_value=[{"wing": "proj", "room": "r"}, None],
):
result = tool_status()
# The None-metadata drawer falls under 'unknown/unknown' — no crash,
# no partial flag.
assert "error" not in result
assert result.get("partial") is not True
assert result["total_drawers"] == 2
assert result["wings"].get("proj") == 1
assert result["wings"].get("unknown") == 1
def test_list_wings(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_wings
result = tool_list_wings()
assert result["wings"]["project"] == 3
assert result["wings"]["notes"] == 1
def test_list_rooms_all(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_rooms
result = tool_list_rooms()
assert "backend" in result["rooms"]
assert "frontend" in result["rooms"]
assert "planning" in result["rooms"]
def test_list_rooms_filtered(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_rooms
result = tool_list_rooms(wing="project")
assert "backend" in result["rooms"]
assert "planning" not in result["rooms"]
def test_get_taxonomy(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_get_taxonomy
result = tool_get_taxonomy()
assert result["taxonomy"]["project"]["backend"] == 2
assert result["taxonomy"]["project"]["frontend"] == 1
assert result["taxonomy"]["notes"]["planning"] == 1
def test_no_palace_returns_error(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_status
result = tool_status()
assert "error" in result
# ── Search Tool ─────────────────────────────────────────────────────────
class TestSearchTool:
def test_search_basic(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_search
result = tool_search(query="JWT authentication tokens")
assert "results" in result
assert len(result["results"]) > 0
# Top result should be the auth drawer
top = result["results"][0]
assert "JWT" in top["text"] or "authentication" in top["text"].lower()
def test_search_with_wing_filter(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_search
result = tool_search(query="planning", wing="notes")
assert all(r["wing"] == "notes" for r in result["results"])
def test_search_with_room_filter(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_search
result = tool_search(query="database", room="backend")
assert all(r["room"] == "backend" for r in result["results"])
def test_search_min_similarity_backwards_compat(
self, monkeypatch, config, palace_path, seeded_collection, kg
):
"""Old min_similarity param still works via backwards-compat shim."""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_search
# Old name should work
result = tool_search(query="JWT", min_similarity=1.5)
assert "results" in result
# Old name takes precedence when both provided
result_strict = tool_search(query="JWT", max_distance=999.0, min_similarity=0.01)
result_loose = tool_search(query="JWT", max_distance=0.01, min_similarity=999.0)
assert len(result_strict["results"]) <= len(result_loose["results"])
def test_list_rooms_rejects_invalid_wing(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "_get_collection", lambda: pytest.fail())
result = mcp_server.tool_list_rooms(wing="../etc/passwd")
assert "error" in result
def test_search_rejects_invalid_room(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "search_memories", lambda: pytest.fail())
result = mcp_server.tool_search(query="JWT", room="../backend")
assert "error" in result
def test_list_drawers_rejects_invalid_wing(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "_get_collection", lambda: pytest.fail())
result = mcp_server.tool_list_drawers(wing="../notes")
assert "error" in result
def test_find_tunnels_rejects_invalid_wing(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "_get_collection", lambda: pytest.fail())
result = mcp_server.tool_find_tunnels(wing_a="../project")
assert "error" in result
def test_wal_redacts_sensitive_fields(self, monkeypatch, config, kg, tmp_path):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
wal_file = tmp_path / "write_log.jsonl"
monkeypatch.setattr(mcp_server, "_WAL_FILE", wal_file)
mcp_server._wal_log(
"test",
{"content": "secret note", "query": "private search", "safe": "ok"},
)
entry = json.loads(wal_file.read_text().strip())
assert entry["params"]["content"].startswith("[REDACTED")
assert entry["params"]["query"].startswith("[REDACTED")
assert entry["params"]["safe"] == "ok"
# ── Write Tools ─────────────────────────────────────────────────────────
class TestWriteTools:
def test_add_drawer(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_add_drawer
result = tool_add_drawer(
wing="test_wing",
room="test_room",
content="This is a test memory about Python decorators and metaclasses.",
)
assert result["success"] is True
assert result["wing"] == "test_wing"
assert result["room"] == "test_room"
assert result["drawer_id"].startswith("drawer_test_wing_test_room_")
def test_add_drawer_duplicate_detection(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_add_drawer
content = "This is a unique test memory about Rust ownership and borrowing."
result1 = tool_add_drawer(wing="w", room="r", content=content)
assert result1["success"] is True
result2 = tool_add_drawer(wing="w", room="r", content=content)
assert result2["success"] is True
assert result2["reason"] == "already_exists"
def test_add_drawer_fails_when_readback_misses(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
class _FakeGetResult:
ids = []
class _FakeCol:
def get(self, **kwargs):
return _FakeGetResult()
def upsert(self, **kwargs):
return None
monkeypatch.setattr(mcp_server, "_get_collection", lambda create=False: _FakeCol())
result = mcp_server.tool_add_drawer("w", "r", "content")
assert result["success"] is False
assert "not readable" in result["error"]
def test_add_drawer_shared_header_no_collision(self, monkeypatch, config, palace_path, kg):
"""Documents sharing a >100-char header must get distinct IDs (full-content hash)."""
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_add_drawer
header = "# ACME Corp Knowledge Base\n**Project:** Alpha | **Team:** Backend | **Status:** Active\n\n"
doc1 = (
header
+ "Decision: Use PostgreSQL for primary storage. Rationale: ACID compliance required."
)
doc2 = header + "Decision: Use Redis for session caching. Rationale: sub-ms latency needed."
result1 = tool_add_drawer(wing="work", room="decisions", content=doc1)
result2 = tool_add_drawer(wing="work", room="decisions", content=doc2)
assert result1["success"] is True
assert result2["success"] is True
assert result1["drawer_id"] != result2["drawer_id"], (
"Documents with shared header but different content must have distinct drawer IDs"
)
def test_delete_drawer(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_delete_drawer
result = tool_delete_drawer("drawer_proj_backend_aaa")
assert result["success"] is True
assert seeded_collection.count() == 3
def test_delete_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_delete_drawer
result = tool_delete_drawer("nonexistent_drawer")
assert result["success"] is False
def test_check_duplicate_handles_none_metadata(self, monkeypatch, config, kg):
"""tool_check_duplicate must tolerate None entries in the result lists
that ChromaDB 1.5.x returns for partially-flushed rows.
Previously ``meta = results["metadatas"][0][i]`` was unguarded and
raised ``AttributeError: 'NoneType' object has no attribute 'get'``
the moment the first matching drawer came back with None metadata —
surfacing to the MCP client as the uninformative
``"Duplicate check failed"`` because the broad ``except Exception``
wrapper swallows the real cause.
"""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
mock_col = MagicMock()
mock_col.query.return_value = {
"ids": [["d1", "d2"]],
"distances": [[0.05, 0.05]],
"metadatas": [[{"wing": "w", "room": "r"}, None]],
"documents": [["first doc", None]],
}
monkeypatch.setattr(mcp_server, "_get_collection", lambda: mock_col)
result = mcp_server.tool_check_duplicate("any content", threshold=0.5)
# Both entries land in matches (above threshold), None ones rendered
# with sentinel values rather than crashing the whole response.
assert result.get("is_duplicate") is True
assert len(result["matches"]) == 2
# The None-metadata entry falls back to sentinels.
none_entry = result["matches"][1]
assert none_entry["wing"] == "?"
assert none_entry["room"] == "?"
assert none_entry["content"] == ""
def test_check_duplicate(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_check_duplicate
# Exact match text from seeded_collection should be flagged
result = tool_check_duplicate(
"The authentication module uses JWT tokens for session management. "
"Tokens expire after 24 hours. Refresh tokens are stored in HttpOnly cookies.",
threshold=0.5,
)
assert result["is_duplicate"] is True
# Unrelated content should not be flagged
result = tool_check_duplicate(
"Black holes emit Hawking radiation at the event horizon.",
threshold=0.99,
)
assert result["is_duplicate"] is False
def test_get_drawer(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_get_drawer
result = tool_get_drawer("drawer_proj_backend_aaa")
assert result["drawer_id"] == "drawer_proj_backend_aaa"
assert result["wing"] == "project"
assert result["room"] == "backend"
assert "JWT tokens" in result["content"]
def test_get_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_get_drawer
result = tool_get_drawer("nonexistent_drawer")
assert "error" in result
def test_get_drawer_does_not_leak_absolute_source_file_path(
self, monkeypatch, config, palace_path, collection, kg
):
"""tool_get_drawer must not expose the absolute filesystem path
that the miners write into ``source_file``. Same threat class as
the palace_path leak in mempalace_status: in nested-agent or
multi-server MCP topologies the client is a separate trust
domain, and the directory layout of the host has no documented
client-side use. Basename is enough for citation."""
_patch_mcp_server(monkeypatch, config, kg)
secret_dir = "/private/home/alice/secret-research/2026"
absolute_source = f"{secret_dir}/notes.md"
collection.add(
ids=["drawer_leak_probe"],
documents=["verbatim drawer body for leak probe"],
metadatas=[
{
"wing": "research",
"room": "notes",
"source_file": absolute_source,
"chunk_index": 0,
"added_by": "miner",
"filed_at": "2026-05-03T00:00:00",
}
],
)
from mempalace.mcp_server import tool_get_drawer
result = tool_get_drawer("drawer_leak_probe")
assert result["drawer_id"] == "drawer_leak_probe"
assert result["metadata"]["source_file"] == "notes.md"
# Defense-in-depth: no field anywhere in the response should
# contain the absolute path or its parent directory.
serialized = json.dumps(result)
assert absolute_source not in serialized
assert secret_dir not in serialized
def test_list_drawers(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_drawers
result = tool_list_drawers()
assert result["count"] == 4
assert len(result["drawers"]) == 4
def test_list_drawers_with_wing_filter(
self, monkeypatch, config, palace_path, seeded_collection, kg
):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_drawers
result = tool_list_drawers(wing="project")
assert result["count"] == 3
assert all(d["wing"] == "project" for d in result["drawers"])
def test_list_drawers_with_room_filter(
self, monkeypatch, config, palace_path, seeded_collection, kg
):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_drawers
result = tool_list_drawers(wing="project", room="backend")
assert result["count"] == 2
assert all(d["room"] == "backend" for d in result["drawers"])
def test_list_drawers_pagination(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_drawers
result = tool_list_drawers(limit=2, offset=0)
assert result["count"] == 2
assert result["limit"] == 2
assert result["offset"] == 0
def test_list_drawers_negative_offset_clamped(
self, monkeypatch, config, palace_path, seeded_collection, kg
):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_list_drawers
result = tool_list_drawers(offset=-5)
assert result["offset"] == 0
def test_update_drawer_content(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_update_drawer, tool_get_drawer
result = tool_update_drawer(
"drawer_proj_backend_aaa", content="Updated content about auth."
)
assert result["success"] is True
fetched = tool_get_drawer("drawer_proj_backend_aaa")
assert fetched["content"] == "Updated content about auth."
def test_update_drawer_wing_and_room(
self, monkeypatch, config, palace_path, seeded_collection, kg
):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_update_drawer
result = tool_update_drawer("drawer_proj_backend_aaa", wing="new_wing", room="new_room")
assert result["success"] is True
assert result["wing"] == "new_wing"
assert result["room"] == "new_room"
def test_update_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_update_drawer
result = tool_update_drawer("nonexistent_drawer", content="hello")
assert result["success"] is False
def test_update_drawer_noop(self, monkeypatch, config, palace_path, seeded_collection, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_update_drawer
result = tool_update_drawer("drawer_proj_backend_aaa")
assert result["success"] is True
assert result.get("noop") is True
# ── KG Tools ────────────────────────────────────────────────────────────
class TestKGTools:
def test_kg_add(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_kg_add
result = tool_kg_add(
subject="Alice",
predicate="likes",
object="coffee",
valid_from="2025-01-01",
)
assert result["success"] is True
def test_kg_query(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_query
result = tool_kg_query(entity="Max")
assert result["count"] > 0
def test_kg_invalidate(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_invalidate
result = tool_kg_invalidate(
subject="Max",
predicate="does",
object="chess",
ended="2026-03-01",
)
assert result["success"] is True
# Regression #1314: response must echo the actual ended date,
# not silently drop it and return the literal string "today".
assert result["ended"] == "2026-03-01"
def test_kg_add_forwards_valid_to(self, monkeypatch, config, palace_path, kg):
"""Regression #1314 case 1: valid_to must round-trip through kg_add."""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_kg_add
result = tool_kg_add(
subject="_test_temporal",
predicate="had_value",
object="probe",
valid_from="2026-01-01",
valid_to="2026-04-28",
)
assert result["success"] is True
facts = kg.query_entity("_test_temporal")
assert len(facts) == 1
assert facts[0]["valid_from"] == "2026-01-01"
assert facts[0]["valid_to"] == "2026-04-28"
# An already-ended fact must not be reported as still current.
assert facts[0]["current"] is False
def test_kg_add_forwards_source_provenance(self, monkeypatch, config, palace_path, kg):
"""Regression #1314 case 3: source_file / source_drawer_id reach storage."""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_kg_add
result = tool_kg_add(
subject="operating-verb",
predicate="candidate",
object="husbandry",
valid_from="2026-04-28",
source_closet="closet-42",
source_file="docs/decisions.md",
source_drawer_id="drawer_abc123",
)
assert result["success"] is True
triple_id = result["triple_id"]
# Read raw row to verify all provenance columns persisted.
with kg._lock:
row = (
kg._conn()
.execute(
"SELECT source_closet, source_file, source_drawer_id FROM triples WHERE id = ?",
(triple_id,),
)
.fetchone()
)
assert row is not None
assert row["source_closet"] == "closet-42"
assert row["source_file"] == "docs/decisions.md"
assert row["source_drawer_id"] == "drawer_abc123"
def test_kg_invalidate_returns_actual_ended_date(
self, monkeypatch, config, palace_path, seeded_kg
):
"""Regression #1314 case 2: response reports the resolved date, not 'today'."""
from datetime import date as _date
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_invalidate
# Caller-supplied date round-trips into the response.
explicit = tool_kg_invalidate(
subject="Max",
predicate="does",
object="swimming",
ended="2026-04-28",
)
assert explicit["ended"] == "2026-04-28"
# Caller-omitted date resolves to today's ISO date — never the
# literal string "today" the buggy implementation used to return.
implicit = tool_kg_invalidate(
subject="Max",
predicate="loves",
object="Chess",
)
assert implicit["ended"] != "today"
assert implicit["ended"] == _date.today().isoformat()
def test_kg_timeline(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_timeline
result = tool_kg_timeline(entity="Alice")
assert result["count"] > 0
def test_kg_stats(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_stats
result = tool_kg_stats()
assert result["entities"] >= 4
# --- Date validation at the MCP boundary (issue #1164) ---
def test_kg_add_rejects_invalid_valid_from(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
from mempalace.mcp_server import tool_kg_add
result = tool_kg_add(
subject="Alice",
predicate="likes",
object="coffee",
valid_from="Jan 2025",
)
assert result["success"] is False
assert "valid_from" in result["error"]
assert "ISO-8601" in result["error"]
def test_kg_query_rejects_invalid_as_of(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_query
result = tool_kg_query(entity="Max", as_of="March 2026")
assert "error" in result
assert "as_of" in result["error"]
def test_kg_invalidate_rejects_invalid_ended(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_invalidate
result = tool_kg_invalidate(
subject="Max",
predicate="does",
object="chess",
ended="yesterday",
)
assert result["success"] is False
assert "ended" in result["error"]
def test_kg_query_rejects_partial_iso_dates(self, monkeypatch, config, palace_path, seeded_kg):
_patch_mcp_server(monkeypatch, config, seeded_kg)
from mempalace.mcp_server import tool_kg_query
# Partial ISO dates are rejected: KG queries compare TEXT dates
# lexicographically, so "2026-01-01" <= "2026" is False, which
# silently excludes facts. Reject at the boundary — only YYYY-MM-DD
# produces correct results.
for value in ("2026", "2026-03"):
result = tool_kg_query(entity="Max", as_of=value)
assert "error" in result, f"accepted partial date {value!r}: {result}"
# Full ISO-8601 dates still pass.
result = tool_kg_query(entity="Max", as_of="2026-03-15")
assert "error" not in result, f"rejected valid date: {result}"
# ── Diary Tools ─────────────────────────────────────────────────────────
class TestDiaryTools:
def test_diary_write_and_read(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_diary_write, tool_diary_read
w = tool_diary_write(
agent_name="TestAgent",
entry="Today we discussed authentication patterns.",
topic="architecture",
)
assert w["success"] is True
# agent_name is normalized to lowercase on write (#1243).
assert w["agent"] == "testagent"
r = tool_diary_read(agent_name="TestAgent")
assert r["total"] == 1
assert r["entries"][0]["topic"] == "architecture"
assert "authentication" in r["entries"][0]["content"]
def test_diary_read_empty(self, monkeypatch, config, palace_path, kg):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_diary_read
r = tool_diary_read(agent_name="Nobody")
assert r["entries"] == []
def test_diary_write_same_second_shared_prefix_no_collision(
self, monkeypatch, config, palace_path, kg
):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace import mcp_server
class FrozenDateTime:
calls = [
datetime(2026, 4, 13, 22, 15, 30, 123456),
datetime(2026, 4, 13, 22, 15, 30, 123457),
]
fallback = datetime(2026, 4, 13, 22, 15, 30, 123457)
@classmethod
def now(cls):
if cls.calls:
return cls.calls.pop(0)
return cls.fallback
monkeypatch.setattr(mcp_server, "datetime", FrozenDateTime)
from mempalace.mcp_server import tool_diary_read, tool_diary_write
entry1 = "A" * 50 + " entry one"
entry2 = "A" * 50 + " entry two"
result1 = tool_diary_write(agent_name="TestAgent", entry=entry1, topic="status")
result2 = tool_diary_write(agent_name="TestAgent", entry=entry2, topic="status")
assert result1["success"] is True
assert result2["success"] is True
assert result1["entry_id"] != result2["entry_id"]
read_result = tool_diary_read(agent_name="TestAgent")
contents = [entry["content"] for entry in read_result["entries"]]
assert read_result["total"] == 2
assert entry1 in contents
assert entry2 in contents
def test_diary_read_empty_wing_spans_all_wings(self, monkeypatch, config, palace_path, kg):
"""diary_read(wing='') must return entries from every wing this agent
wrote to. Hooks write to project-derived wings (#659); a reader that
silos by default wing would never see those entries."""
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_diary_read, tool_diary_write
w1 = tool_diary_write(
agent_name="TestAgent",
entry="default-wing entry",
topic="general",
)
w2 = tool_diary_write(
agent_name="TestAgent",
entry="project-wing entry",
topic="general",
wing="wing_someproject",
)
assert w1["success"] and w2["success"]
# Empty wing → return both entries
r = tool_diary_read(agent_name="TestAgent", wing="")
assert r["total"] == 2
contents = {e["content"] for e in r["entries"]}
assert "default-wing entry" in contents
assert "project-wing entry" in contents
# Explicit wing → return only that wing's entries
r_scoped = tool_diary_read(agent_name="TestAgent", wing="wing_someproject")
assert r_scoped["total"] == 1
assert r_scoped["entries"][0]["content"] == "project-wing entry"
def test_diary_read_case_insensitive_agent(self, monkeypatch, config, palace_path, kg):
"""Regression for #1243: diary_read must be case-insensitive over
agent_name. Writing as "Claude" and reading as "claude" (or vice
versa) must surface the same entries — sanitize_name preserved
case, which silently dropped reads when the agent name's casing
differed from the write."""
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace.mcp_server import tool_diary_read, tool_diary_write
# Write as "Claude" → read as "claude" should match.
w1 = tool_diary_write(
agent_name="Claude",
entry="entry written as Claude",
topic="general",
)
assert w1["success"]
r1 = tool_diary_read(agent_name="claude")
assert "entries" in r1, r1
contents1 = {e["content"] for e in r1["entries"]}
assert "entry written as Claude" in contents1
# Write as "CLAUDE" → read as "Claude" should also match the
# same agent. After normalization both writes target the same
# lowercase agent identity, so both entries are returned.
w2 = tool_diary_write(
agent_name="CLAUDE",
entry="entry written as CLAUDE",
topic="general",
)
assert w2["success"]
r2 = tool_diary_read(agent_name="Claude")
contents2 = {e["content"] for e in r2["entries"]}
assert "entry written as Claude" in contents2
assert "entry written as CLAUDE" in contents2
# The stored agent metadata is the lowercase form, and the
# default wing is derived from that lowercase form too.
assert w1["agent"] == "claude"
assert w2["agent"] == "claude"
# ── Cache Invalidation (inode/mtime) ──────────────────────────────────
class TestCacheInvalidation:
"""Tests for _get_collection inode/mtime cache invalidation logic."""
def test_mtime_change_invalidates_cache(self, monkeypatch, config, palace_path, kg):
"""When mtime changes, the cached collection should be replaced."""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
# Create a real collection so _get_collection succeeds
_client, _col = _get_collection(palace_path, create=True)
del _client
# Prime the cache
col1 = mcp_server._get_collection()
assert col1 is not None
# Simulate an external write changing the mtime
old_mtime = mcp_server._palace_db_mtime
monkeypatch.setattr(mcp_server, "_palace_db_mtime", old_mtime - 10.0)
# _get_collection should detect the mtime drift and reconnect
col2 = mcp_server._get_collection()
assert col2 is not None
def test_inode_change_invalidates_cache(self, monkeypatch, config, palace_path, kg):
"""When inode changes (file replaced), the cached collection should be replaced."""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
_client, _col = _get_collection(palace_path, create=True)
del _client
# Prime the cache
col1 = mcp_server._get_collection()
assert col1 is not None
# Simulate a rebuild that changes the inode
monkeypatch.setattr(mcp_server, "_palace_db_inode", 99999)
col2 = mcp_server._get_collection()
assert col2 is not None
@pytest.mark.skipif(
sys.platform == "win32",
reason="Windows holds chroma.sqlite3 open while the client is cached, blocking os.remove",
)
def test_missing_db_invalidates_cache(self, monkeypatch, config, palace_path, kg):
"""When chroma.sqlite3 disappears, a cached collection should be invalidated."""
_patch_mcp_server(monkeypatch, config, kg)
import os
from mempalace import mcp_server
_client, _col = _get_collection(palace_path, create=True)
del _client
# Prime the cache
col1 = mcp_server._get_collection()
assert col1 is not None
assert mcp_server._collection_cache is not None
# Delete the DB file to simulate a rebuild in progress
db_file = os.path.join(palace_path, "chroma.sqlite3")
if os.path.isfile(db_file):
os.remove(db_file)
# Cache should be invalidated; _get_collection returns None
# because the backend can't open a missing DB without create=True
mcp_server._get_collection()
# The key assertion: the old cached collection was dropped
assert mcp_server._palace_db_inode == 0
assert mcp_server._palace_db_mtime == 0.0
def test_reconnect_reports_failure_when_no_palace(self, monkeypatch, config, kg):
"""tool_reconnect should report failure when no collection is available."""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
# Make _get_collection always return None
monkeypatch.setattr(mcp_server, "_get_collection", lambda create=False: None)
result = mcp_server.tool_reconnect()
assert result["success"] is False
assert "No palace found" in result["message"]
assert result["drawers"] == 0
def test_reconnect_reports_success(self, monkeypatch, config, palace_path, kg):
"""tool_reconnect should report success with drawer count."""
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace import mcp_server
result = mcp_server.tool_reconnect()
assert result["success"] is True
assert "Reconnected" in result["message"]
assert isinstance(result["drawers"], int)
def test_reconnect_closes_shared_backend(self, monkeypatch, config, kg):
_patch_mcp_server(monkeypatch, config, kg)
from unittest.mock import MagicMock
from mempalace import mcp_server, palace
close_palace = MagicMock()
monkeypatch.setattr(palace._DEFAULT_BACKEND, "close_palace", close_palace)
class _FakeCol:
def count(self):
return 7
monkeypatch.setattr(mcp_server, "_get_collection", lambda create=False: _FakeCol())
result = mcp_server.tool_reconnect()
assert result["success"] is True
close_palace.assert_called_once_with(config.palace_path)
def test_get_collection_create_true_avoids_get_or_create_on_reopen(
self, monkeypatch, config, palace_path, kg
):
"""Regression for the MCP-server half of #1262.
ChromaDB 1.5.x's Rust bindings SIGSEGV when
``client.get_or_create_collection`` is called with metadata that
differs from the collection's stored metadata. The Stop hook
path (``tool_diary_write`` -> ``_get_collection(create=True)``)
was reaching that codepath on every session-end; #1262 fixed
the equivalent crash class in ``ChromaBackend`` but left this
site untouched. ``_get_collection(create=True)`` must call
``client.get_collection`` first and only fall back to
``client.create_collection`` when the collection does not yet
exist on disk.
"""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
col1 = mcp_server._get_collection(create=True)
assert col1 is not None
client = mcp_server._client_cache
assert client is not None
# Patch at the class level — chromadb's mtime-change detection
# may rebuild the client between calls, so an instance-level
# spy would not survive.
client_cls = type(client)
calls: list[tuple] = []
def _spy(self, *args, **kwargs):
calls.append((args, kwargs))
raise AssertionError(
"get_or_create_collection must not be called on reopen "
"(SIGSEGV path on metadata mismatch)"
)
monkeypatch.setattr(client_cls, "get_or_create_collection", _spy)
mcp_server._collection_cache = None
col2 = mcp_server._get_collection(create=True)
assert col2 is not None
assert calls == [], f"get_or_create_collection was called: {calls}"
def test_get_collection_passes_embedding_function(self, monkeypatch, config, palace_path, kg):
"""Regression for #1299.
``mcp_server._get_collection`` must pass ``embedding_function=`` into
both ``client.get_collection`` and ``client.create_collection``,
mirroring ``ChromaBackend.get_collection``. Without it, ChromaDB 1.x
falls back to its built-in ``DefaultEmbeddingFunction`` (whose lazy
ONNX provider selection has SIGSEGV'd on python 3.14 + Apple Silicon),
and writers/readers can disagree with the miner about which EF is
bound to the collection. The miner / Stop hook ingest path routes
through ``ChromaBackend.get_collection`` which does this correctly;
the MCP server must match.
"""
_patch_mcp_server(monkeypatch, config, kg)
from mempalace import mcp_server
client = mcp_server._get_client()
client_cls = type(client)
captured: dict[str, list[dict]] = {"get": [], "create": []}
real_get = client_cls.get_collection
real_create = client_cls.create_collection
def _spy_get(self, name, **kwargs):
captured["get"].append(dict(kwargs))
return real_get(self, name, **kwargs)
def _spy_create(self, name, **kwargs):
captured["create"].append(dict(kwargs))
return real_create(self, name, **kwargs)
monkeypatch.setattr(client_cls, "get_collection", _spy_get)
monkeypatch.setattr(client_cls, "create_collection", _spy_create)
mcp_server._collection_cache = None
col = mcp_server._get_collection(create=True)
assert col is not None
all_calls = captured["get"] + captured["create"]
assert all_calls, "expected get_collection or create_collection to be called"
for kwargs in all_calls:
assert (
"embedding_function" in kwargs
), f"missing embedding_function= in chromadb call: {kwargs}"
assert kwargs["embedding_function"] is not None
# Same expectation on the create=False (cache-miss) reopen path.
mcp_server._collection_cache = None
captured["get"].clear()
captured["create"].clear()
col2 = mcp_server._get_collection()
assert col2 is not None
assert captured["get"], "expected get_collection on cache-miss reopen"
for kwargs in captured["get"]:
assert "embedding_function" in kwargs
assert kwargs["embedding_function"] is not None
def test_get_collection_retries_once_on_exception(self, monkeypatch, config, palace_path, kg):
"""Regression: a transient failure inside _get_collection must trigger
one retry after clearing the client/collection caches, not silently
return None.
Before this fix, a stale chromadb handle (e.g. the rust bindings
invalidating after an out-of-band write) would raise inside the
single ``try`` block, get swallowed by ``except Exception: return
None``, and every subsequent tool call would hit the same poisoned
cache returning None. The retry forces ``_get_client()`` to rebuild
the client (which re-runs ``quarantine_stale_hnsw`` per #1322), so
the second attempt heals the common stale-handle case.
"""
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace import mcp_server
# Force a cold cache so the first call goes through the open path.
mcp_server._client_cache = None
mcp_server._collection_cache = None
real_get_client = mcp_server._get_client
attempts = {"count": 0}
def flaky_get_client():
attempts["count"] += 1
if attempts["count"] == 1:
raise RuntimeError("simulated transient chromadb failure")
return real_get_client()
monkeypatch.setattr(mcp_server, "_get_client", flaky_get_client)
col = mcp_server._get_collection()
# Both attempts ran and the second succeeded.
assert attempts["count"] == 2
assert col is not None
def test_get_collection_returns_none_after_two_failures(
self, monkeypatch, config, palace_path, kg
):
"""If both attempts fail, return None (matches the prior contract for
permanent failures — only the transient case is now self-healing)."""
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace import mcp_server
mcp_server._client_cache = None
mcp_server._collection_cache = None
attempts = {"count": 0}
def always_fails():
attempts["count"] += 1
raise RuntimeError("permanent chromadb failure")
monkeypatch.setattr(mcp_server, "_get_client", always_fails)
col = mcp_server._get_collection()
assert attempts["count"] == 2
assert col is None
class TestKGLazyCache:
"""Lazy per-path KnowledgeGraph cache (issue #1136)."""
def test_lazy_init_no_import_side_effect(self, tmp_path):
"""Importing mcp_server must not create knowledge_graph.sqlite3.
Runs in a fresh subprocess with HOME pointed at tmp_path so the
assertion targets a clean filesystem, independent of conftest's
session-level HOME patch.
"""
import subprocess
import sys
kg_file = tmp_path / ".mempalace" / "knowledge_graph.sqlite3"
env = {k: v for k, v in os.environ.items() if not k.startswith("MEMPAL")}
env["HOME"] = str(tmp_path)
env["USERPROFILE"] = str(tmp_path)
result = subprocess.run(
[sys.executable, "-c", "import mempalace.mcp_server"],
env=env,
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, f"import failed: {result.stderr}"
assert not kg_file.exists(), f"import created sqlite file at {kg_file} as a side effect"
def test_get_kg_returns_same_instance(self, tmp_path, monkeypatch):
"""Two calls with the same resolved path return the same KG."""
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "_kg_by_path", {})
monkeypatch.setattr(mcp_server, "_palace_flag_given", True)
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_path))
kg1 = mcp_server._get_kg()
kg2 = mcp_server._get_kg()
assert kg1 is kg2
assert len(mcp_server._kg_by_path) == 1
def test_get_kg_different_paths_different_instances(self, tmp_path, monkeypatch):
"""Different palace paths map to different KG instances."""
from mempalace import mcp_server
tmp_a = tmp_path / "a"
tmp_b = tmp_path / "b"
tmp_a.mkdir()
tmp_b.mkdir()
monkeypatch.setattr(mcp_server, "_kg_by_path", {})
monkeypatch.setattr(mcp_server, "_palace_flag_given", True)
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_a))
kg_a = mcp_server._get_kg()
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_b))
kg_b = mcp_server._get_kg()
assert kg_a is not kg_b
assert len(mcp_server._kg_by_path) == 2
def test_multi_tenant_env_switch(self, tmp_path, monkeypatch):
"""The issue #1136 acceptance scenario.
Rotating MEMPALACE_PALACE_PATH between MCP tool calls must route
each call to the correct tenant's KG sqlite file.
"""
from mempalace import mcp_server
tmp_a = tmp_path / "tenant_a"
tmp_b = tmp_path / "tenant_b"
tmp_a.mkdir()
tmp_b.mkdir()
monkeypatch.setattr(mcp_server, "_kg_by_path", {})
monkeypatch.setattr(mcp_server, "_palace_flag_given", True)
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_a))
add_result = mcp_server.tool_kg_add(
subject="alice_secret",
predicate="owns",
object="repo_a",
)
assert add_result.get("success") is True, add_result
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_b))
query_b = mcp_server.tool_kg_query(entity="alice_secret")
assert query_b.get("count", 0) == 0, f"tenant B leaked tenant A's fact: {query_b}"
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_a))
query_a = mcp_server.tool_kg_query(entity="alice_secret")
assert query_a.get("count", 0) >= 1, f"tenant A lost its own fact: {query_a}"
def test_cache_thread_safe(self, tmp_path, monkeypatch):
"""Concurrent _get_kg() for the same path yields one instance."""
import concurrent.futures
from mempalace import mcp_server
monkeypatch.setattr(mcp_server, "_kg_by_path", {})
monkeypatch.setattr(mcp_server, "_palace_flag_given", True)
monkeypatch.setenv("MEMPALACE_PALACE_PATH", str(tmp_path))
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool:
results = list(pool.map(lambda _: mcp_server._get_kg(), range(16)))
ids = {id(kg) for kg in results}
assert len(ids) == 1, f"expected 1 unique instance, got {len(ids)}"
assert len(mcp_server._kg_by_path) == 1
def test_tool_reconnect_drains_kg_cache(self, monkeypatch):
"""``tool_reconnect`` must close cached KG instances and clear the dict.
Without this, an external replacement of ``knowledge_graph.sqlite3``
leaves the server pinned to a stale ``sqlite3.Connection``.
"""
from mempalace import mcp_server
class _FakeKG:
def __init__(self):
self.closed = False
def close(self):
self.closed = True
fake_a = _FakeKG()
fake_b = _FakeKG()
monkeypatch.setattr(mcp_server, "_kg_by_path", {"/a": fake_a, "/b": fake_b})
# Bypass real ChromaDB so the test isolates KG-cache behaviour.
monkeypatch.setattr(mcp_server, "_get_collection", lambda: None)
mcp_server.tool_reconnect()
assert fake_a.closed is True
assert fake_b.closed is True
assert mcp_server._kg_by_path == {}
def test_tool_reconnect_swallows_kg_close_errors(self, monkeypatch):
"""A failing ``close()`` on one cached KG must not block cache clearing."""
from mempalace import mcp_server
class _BoomKG:
def close(self):
raise RuntimeError("boom")
monkeypatch.setattr(mcp_server, "_kg_by_path", {"/a": _BoomKG()})
monkeypatch.setattr(mcp_server, "_get_collection", lambda: None)
mcp_server.tool_reconnect()
assert mcp_server._kg_by_path == {}
def test_call_kg_retries_after_concurrent_close(self, monkeypatch):
"""A KG closed mid-handler must trigger a one-shot retry with a fresh
instance — not surface a -32000 to the MCP client."""
import sqlite3 as _sqlite3
from mempalace import mcp_server
path = "/fake/palace/knowledge_graph.sqlite3"
monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path)
class _ClosedKG:
def query_entity(self, entity, **kwargs):
raise _sqlite3.ProgrammingError("Cannot operate on a closed database")
class _FreshKG:
def query_entity(self, entity, **kwargs):
return [{"entity": entity}]
cache = {os.path.abspath(path): _ClosedKG()}
monkeypatch.setattr(mcp_server, "_kg_by_path", cache)
# Second _get_kg() call (after the cache eviction) constructs a new
# KG. Patch the constructor so we don't open a real sqlite file.
monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FreshKG())
result = mcp_server._call_kg(lambda kg: kg.query_entity("Alice"))
assert result == [{"entity": "Alice"}]
# The closed instance must be evicted; the fresh one must be cached.
assert isinstance(cache[os.path.abspath(path)], _FreshKG)
def test_call_kg_does_not_retry_on_other_errors(self, monkeypatch):
"""Non-ProgrammingError exceptions must propagate without retry —
we don't want the retry guard masking real bugs."""
from mempalace import mcp_server
path = "/fake/palace/knowledge_graph.sqlite3"
monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path)
calls = {"count": 0}
class _FailingKG:
def query_entity(self, entity, **kwargs):
calls["count"] += 1
raise ValueError("bad input")
monkeypatch.setattr(mcp_server, "_kg_by_path", {os.path.abspath(path): _FailingKG()})
monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _FailingKG())
with pytest.raises(ValueError, match="bad input"):
mcp_server._call_kg(lambda kg: kg.query_entity("Alice"))
assert calls["count"] == 1, "non-ProgrammingError must not trigger retry"
def test_call_kg_gives_up_after_one_retry(self, monkeypatch):
"""If the second attempt also hits a closed DB, give up rather than
loop forever — a sustained close-stream is a different bug."""
import sqlite3 as _sqlite3
from mempalace import mcp_server
path = "/fake/palace/knowledge_graph.sqlite3"
monkeypatch.setattr(mcp_server, "_resolve_kg_path", lambda: path)
calls = {"count": 0}
class _AlwaysClosedKG:
def query_entity(self, entity, **kwargs):
calls["count"] += 1
raise _sqlite3.ProgrammingError("closed again")
cache = {}
monkeypatch.setattr(mcp_server, "_kg_by_path", cache)
monkeypatch.setattr(mcp_server, "KnowledgeGraph", lambda **_: _AlwaysClosedKG())
with pytest.raises(_sqlite3.ProgrammingError):
mcp_server._call_kg(lambda kg: kg.query_entity("Alice"))
assert calls["count"] == 2, "expected exactly one retry beyond the initial attempt"