""" test_mcp_server.py — Tests for the MCP server tool handlers and dispatch. Tests each tool handler directly (unit-level) and the handle_request dispatch layer (integration-level). Uses isolated palace + KG fixtures via monkeypatch to avoid touching real data. """ import json def _patch_mcp_server(monkeypatch, config, kg): """Patch the mcp_server module globals to use test fixtures.""" from mempalace import mcp_server monkeypatch.setattr(mcp_server, "_config", config) monkeypatch.setattr(mcp_server, "_kg", kg) def _get_collection(palace_path, create=False): """Helper to get collection from test palace. Returns (client, collection) so callers can clean up the client when they are done. """ import chromadb client = chromadb.PersistentClient(path=palace_path) if create: return client, client.get_or_create_collection("mempalace_drawers") return client, client.get_collection("mempalace_drawers") # ── Protocol Layer ────────────────────────────────────────────────────── class TestHandleRequest: def test_initialize(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "initialize", "id": 1, "params": {}}) assert resp["result"]["serverInfo"]["name"] == "mempalace" assert resp["id"] == 1 def test_initialize_negotiates_client_version(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "initialize", "id": 1, "params": {"protocolVersion": "2025-11-25"}, } ) assert resp["result"]["protocolVersion"] == "2025-11-25" def test_initialize_negotiates_older_supported_version(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "initialize", "id": 1, "params": {"protocolVersion": "2025-03-26"}, } ) assert resp["result"]["protocolVersion"] == "2025-03-26" def test_initialize_unknown_version_falls_back_to_latest(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "initialize", "id": 1, "params": {"protocolVersion": "9999-12-31"}, } ) from mempalace.mcp_server import SUPPORTED_PROTOCOL_VERSIONS assert resp["result"]["protocolVersion"] == SUPPORTED_PROTOCOL_VERSIONS[0] def test_initialize_missing_version_uses_oldest(self): from mempalace.mcp_server import handle_request, SUPPORTED_PROTOCOL_VERSIONS resp = handle_request({"method": "initialize", "id": 1, "params": {}}) assert resp["result"]["protocolVersion"] == SUPPORTED_PROTOCOL_VERSIONS[-1] def test_notifications_initialized_returns_none(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "notifications/initialized", "id": None, "params": {}}) assert resp is None def test_ping_returns_empty_result(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "ping", "id": 11, "params": {}}) assert resp["id"] == 11 assert resp["result"] == {} def test_tools_list(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "tools/list", "id": 2, "params": {}}) tools = resp["result"]["tools"] names = {t["name"] for t in tools} assert "mempalace_status" in names assert "mempalace_search" in names assert "mempalace_add_drawer" in names assert "mempalace_kg_add" in names def test_null_arguments_does_not_hang(self, monkeypatch, config, palace_path, seeded_kg): """Sending arguments: null should return a result, not hang (#394).""" _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import handle_request _client, _col = _get_collection(palace_path, create=True) del _client resp = handle_request( { "method": "tools/call", "id": 10, "params": {"name": "mempalace_status", "arguments": None}, } ) assert "error" not in resp assert resp["result"] is not None def test_unknown_tool(self): from mempalace.mcp_server import handle_request resp = handle_request( { "method": "tools/call", "id": 3, "params": {"name": "nonexistent_tool", "arguments": {}}, } ) assert resp["error"]["code"] == -32601 def test_unknown_method(self): from mempalace.mcp_server import handle_request resp = handle_request({"method": "unknown/method", "id": 4, "params": {}}) assert resp["error"]["code"] == -32601 def test_any_notification_returns_none(self): """All notifications/* methods should return None (no response).""" from mempalace.mcp_server import handle_request for method in [ "notifications/initialized", "notifications/cancelled", "notifications/progress", "notifications/roots/list_changed", ]: resp = handle_request({"method": method, "params": {}}) assert resp is None, f"{method} should return None" def test_unknown_method_no_id_returns_none(self): """Messages without id (notifications) must never get a response.""" from mempalace.mcp_server import handle_request resp = handle_request({"method": "unknown/thing", "params": {}}) assert resp is None def test_malformed_method_none(self): """method=None or missing should not crash.""" from mempalace.mcp_server import handle_request # Explicit None resp = handle_request({"method": None, "params": {}}) assert resp is None # no id → no response # Missing method entirely resp = handle_request({"params": {}}) assert resp is None # method=None with id → should return error, not crash resp = handle_request({"method": None, "id": 99, "params": {}}) assert resp["error"]["code"] == -32601 def test_tools_call_dispatches(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import handle_request # Create a collection so status works _client, _col = _get_collection(palace_path, create=True) del _client resp = handle_request( { "method": "tools/call", "id": 5, "params": {"name": "mempalace_status", "arguments": {}}, } ) assert "result" in resp content = json.loads(resp["result"]["content"][0]["text"]) assert "total_drawers" in content # ── Read Tools ────────────────────────────────────────────────────────── class TestReadTools: def test_status_empty_palace(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_status result = tool_status() assert result["total_drawers"] == 0 assert result["wings"] == {} def test_status_with_data(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_status result = tool_status() assert result["total_drawers"] == 4 assert "project" in result["wings"] assert "notes" in result["wings"] def test_list_wings(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_wings result = tool_list_wings() assert result["wings"]["project"] == 3 assert result["wings"]["notes"] == 1 def test_list_rooms_all(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_rooms result = tool_list_rooms() assert "backend" in result["rooms"] assert "frontend" in result["rooms"] assert "planning" in result["rooms"] def test_list_rooms_filtered(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_rooms result = tool_list_rooms(wing="project") assert "backend" in result["rooms"] assert "planning" not in result["rooms"] def test_get_taxonomy(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_get_taxonomy result = tool_get_taxonomy() assert result["taxonomy"]["project"]["backend"] == 2 assert result["taxonomy"]["project"]["frontend"] == 1 assert result["taxonomy"]["notes"]["planning"] == 1 def test_no_palace_returns_error(self, monkeypatch, config, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_status result = tool_status() assert "error" in result # ── Search Tool ───────────────────────────────────────────────────────── class TestSearchTool: def test_search_basic(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search result = tool_search(query="JWT authentication tokens") assert "results" in result assert len(result["results"]) > 0 # Top result should be the auth drawer top = result["results"][0] assert "JWT" in top["text"] or "authentication" in top["text"].lower() def test_search_with_wing_filter(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search result = tool_search(query="planning", wing="notes") assert all(r["wing"] == "notes" for r in result["results"]) def test_search_with_room_filter(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search result = tool_search(query="database", room="backend") assert all(r["room"] == "backend" for r in result["results"]) def test_search_min_similarity_backwards_compat(self, monkeypatch, config, palace_path, seeded_collection, kg): """Old min_similarity param still works via backwards-compat shim.""" _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_search # Old name should work result = tool_search(query="JWT", min_similarity=1.5) assert "results" in result # Old name takes precedence when both provided result_strict = tool_search(query="JWT", max_distance=999.0, min_similarity=0.01) result_loose = tool_search(query="JWT", max_distance=0.01, min_similarity=999.0) assert len(result_strict["results"]) <= len(result_loose["results"]) # ── Write Tools ───────────────────────────────────────────────────────── class TestWriteTools: def test_add_drawer(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_add_drawer result = tool_add_drawer( wing="test_wing", room="test_room", content="This is a test memory about Python decorators and metaclasses.", ) assert result["success"] is True assert result["wing"] == "test_wing" assert result["room"] == "test_room" assert result["drawer_id"].startswith("drawer_test_wing_test_room_") def test_add_drawer_duplicate_detection(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_add_drawer content = "This is a unique test memory about Rust ownership and borrowing." result1 = tool_add_drawer(wing="w", room="r", content=content) assert result1["success"] is True result2 = tool_add_drawer(wing="w", room="r", content=content) assert result2["success"] is True assert result2["reason"] == "already_exists" def test_delete_drawer(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_delete_drawer result = tool_delete_drawer("drawer_proj_backend_aaa") assert result["success"] is True assert seeded_collection.count() == 3 def test_delete_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_delete_drawer result = tool_delete_drawer("nonexistent_drawer") assert result["success"] is False def test_check_duplicate(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_check_duplicate # Exact match text from seeded_collection should be flagged result = tool_check_duplicate( "The authentication module uses JWT tokens for session management. " "Tokens expire after 24 hours. Refresh tokens are stored in HttpOnly cookies.", threshold=0.5, ) assert result["is_duplicate"] is True # Unrelated content should not be flagged result = tool_check_duplicate( "Black holes emit Hawking radiation at the event horizon.", threshold=0.99, ) assert result["is_duplicate"] is False def test_get_drawer(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_get_drawer result = tool_get_drawer("drawer_proj_backend_aaa") assert result["drawer_id"] == "drawer_proj_backend_aaa" assert result["wing"] == "project" assert result["room"] == "backend" assert "JWT tokens" in result["content"] def test_get_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_get_drawer result = tool_get_drawer("nonexistent_drawer") assert "error" in result def test_list_drawers(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers() assert result["count"] == 4 assert len(result["drawers"]) == 4 def test_list_drawers_with_wing_filter(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(wing="project") assert result["count"] == 3 assert all(d["wing"] == "project" for d in result["drawers"]) def test_list_drawers_with_room_filter(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(wing="project", room="backend") assert result["count"] == 2 assert all(d["room"] == "backend" for d in result["drawers"]) def test_list_drawers_pagination(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(limit=2, offset=0) assert result["count"] == 2 assert result["limit"] == 2 assert result["offset"] == 0 def test_list_drawers_negative_offset_clamped(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_list_drawers result = tool_list_drawers(offset=-5) assert result["offset"] == 0 def test_update_drawer_content(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer, tool_get_drawer result = tool_update_drawer("drawer_proj_backend_aaa", content="Updated content about auth.") assert result["success"] is True fetched = tool_get_drawer("drawer_proj_backend_aaa") assert fetched["content"] == "Updated content about auth." def test_update_drawer_wing_and_room(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer result = tool_update_drawer("drawer_proj_backend_aaa", wing="new_wing", room="new_room") assert result["success"] is True assert result["wing"] == "new_wing" assert result["room"] == "new_room" def test_update_drawer_not_found(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer result = tool_update_drawer("nonexistent_drawer", content="hello") assert result["success"] is False def test_update_drawer_noop(self, monkeypatch, config, palace_path, seeded_collection, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_update_drawer result = tool_update_drawer("drawer_proj_backend_aaa") assert result["success"] is True assert result.get("noop") is True # ── KG Tools ──────────────────────────────────────────────────────────── class TestKGTools: def test_kg_add(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) from mempalace.mcp_server import tool_kg_add result = tool_kg_add( subject="Alice", predicate="likes", object="coffee", valid_from="2025-01-01", ) assert result["success"] is True def test_kg_query(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_query result = tool_kg_query(entity="Max") assert result["count"] > 0 def test_kg_invalidate(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_invalidate result = tool_kg_invalidate( subject="Max", predicate="does", object="chess", ended="2026-03-01", ) assert result["success"] is True def test_kg_timeline(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_timeline result = tool_kg_timeline(entity="Alice") assert result["count"] > 0 def test_kg_stats(self, monkeypatch, config, palace_path, seeded_kg): _patch_mcp_server(monkeypatch, config, seeded_kg) from mempalace.mcp_server import tool_kg_stats result = tool_kg_stats() assert result["entities"] >= 4 # ── Diary Tools ───────────────────────────────────────────────────────── class TestDiaryTools: def test_diary_write_and_read(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_diary_write, tool_diary_read w = tool_diary_write( agent_name="TestAgent", entry="Today we discussed authentication patterns.", topic="architecture", ) assert w["success"] is True assert w["agent"] == "TestAgent" r = tool_diary_read(agent_name="TestAgent") assert r["total"] == 1 assert r["entries"][0]["topic"] == "architecture" assert "authentication" in r["entries"][0]["content"] def test_diary_read_empty(self, monkeypatch, config, palace_path, kg): _patch_mcp_server(monkeypatch, config, kg) _client, _col = _get_collection(palace_path, create=True) del _client from mempalace.mcp_server import tool_diary_read r = tool_diary_read(agent_name="Nobody") assert r["entries"] == []