From 1263c3c91ed39d9f9abc8b0f0d5a875b2b1d6794 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Mon, 13 Apr 2026 18:20:11 -0300 Subject: [PATCH] merge: full hardened stack + rewrite fact_checker around actual KG API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merges the full hardened stack (up through #791 drawer-grep) and turns fact_checker from "dead code hidden behind bare except" into an actually-working offline contradiction detector with tests. ## Dead paths the PR body advertised but the code never executed Both buried by a single outer ``except Exception: pass``: * ``kg.query(subject)`` — ``KnowledgeGraph`` has no ``query()`` method; it has ``query_entity()``. The attribute error was silently swallowed and the entire KG branch always returned ``[]``. Now using ``kg.query_entity(subject, direction="outgoing")`` with proper handling of the ``predicate``/``object``/``current``/``valid_to`` fields the real API returns. * ``KnowledgeGraph(palace_path=palace_path)`` — the constructor's only kwarg is ``db_path``. Passing ``palace_path`` raised TypeError, silently swallowed. Now computing the db_path correctly from ``/knowledge_graph.sqlite3``, matching the convention the MCP server already uses. ## Contradiction logic rewritten The previous ``if kg_pred in claim and fact.object not in claim`` only fired when text used the SAME predicate word as the KG fact — the exact opposite of the stated use case ("Bob is Alice's brother" when KG says husband" would NOT have fired). Replaced with a proper parse → lookup → compare pipeline: * ``_extract_claims`` parses two surface forms ("X is Y's Z" and "X's Z is Y") into ``(subject, predicate, object)`` triples. * ``_check_kg_contradictions`` pulls the subject's outgoing facts and flags two classes: - ``relationship_mismatch`` when a current KG fact matches the same ``(subject, object)`` pair but with a different predicate. - ``stale_fact`` when the exact triple exists but is ``valid_to``-closed in the past. * Stale-fact detection is now implemented (the PR body claimed it; the old code silently didn't implement it). ## Performance fix — O(n²) → O(mentioned × n) ``_check_entity_confusion`` previously computed Levenshtein for every pair of registered names on every ``check_text`` call. For 1,000 registered names that's ~500K edit-distance calls per hook invocation. Now we first identify which registry names actually appear in the text (single regex scan), then only compute edit distance between mentioned and unmentioned names. Pinned by a test that asserts <200ms on a 500- name registry with zero mentions. Also: when *both* similar names are mentioned in the text, we no longer flag them — the user clearly knows they're different people. ## Shared entity-registry loader ``mempalace/miner.py`` already had an mtime-cached loader for ``~/.mempalace/known_entities.json``. fact_checker had a duplicate implementation that leaked file handles and ignored caching. Extended miner's cache to expose both the flat set (``_load_known_entities``) and the raw category dict (``_load_known_entities_raw``); fact_checker now imports the latter. No more double disk reads, no more handle leak. ## Tests — 24 cases in tests/test_fact_checker.py All three detection paths + both dead-code regressions: * ``test_kg_init_uses_db_path_not_palace_path_kwarg`` — pins the correct KG constructor signature so the ``palace_path=`` bug can't come back. * ``test_relationship_mismatch_detected`` — the headline example from the PR body now actually fires. * ``test_stale_fact_detected`` — valid_to-closed triple is flagged. * ``test_current_fact_same_triple_is_not_flagged`` — no false positive on a still-valid match. * ``test_performance_bounded_by_mentioned_names`` — 500-name registry, zero mentions, <200ms. Regression for the O(n²) blowup. * ``test_no_false_positive_when_both_names_mentioned`` — Mila and Milla in the same text is fine. * Plus claim extraction, flatten_names shapes, CLI exit code, empty text handling, missing-palace graceful fallback, registry-dict shape support. 785/785 suite pass. ruff + format clean on CI-pinned 0.4.x. --- mempalace/fact_checker.py | 376 ++++++++++++++++++++++++++----------- mempalace/mcp_server.py | 30 ++- mempalace/miner.py | 52 +++-- tests/test_fact_checker.py | 288 ++++++++++++++++++++++++++++ 4 files changed, 620 insertions(+), 126 deletions(-) create mode 100644 tests/test_fact_checker.py diff --git a/mempalace/fact_checker.py b/mempalace/fact_checker.py index 281f117..50e8842 100644 --- a/mempalace/fact_checker.py +++ b/mempalace/fact_checker.py @@ -1,152 +1,304 @@ """ fact_checker.py — Verify text against known facts in the palace. -Checks AI responses, diary entries, and new content against the -entity registry and knowledge graph for contradictions. Catches: - - Wrong names (similar but different entities) - - Wrong relationships (calling someone the wrong role) - - Stale facts (things that changed — KG has valid_from/valid_to) +Checks AI responses, diary entries, and new content against the entity +registry and knowledge graph for three classes of issue: -Uses the entity_registry and knowledge_graph — no hardcoded facts. + * similar_name — text mentions a name that's one/two edits + away from *another* registered name, raising + the possibility of a typo or mix-up. + * relationship_mismatch — text asserts a role between two entities + (e.g. "Bob is Alice's brother") while the KG + records a *different* current role for the + same subject/object pair. + * stale_fact — text asserts a fact that the KG marks closed + (``valid_to`` in the past). + +Purely offline. Inputs: entity_registry JSON + KG SQLite. No network. Usage: from mempalace.fact_checker import check_text issues = check_text("Bob is Alice's brother", palace_path) - # → [{"type": "relationship_mismatch", "detail": "KG says Bob is Alice's husband"}] # CLI - python -m mempalace.fact_checker "Bob is Alice's brother" --palace ~/.mempalace/palace + python -m mempalace.fact_checker "Bob is Alice's brother" \\ + --palace ~/.mempalace/palace """ +from __future__ import annotations + import os import re -from pathlib import Path +from datetime import datetime, timezone + +# Share miner's mtime-cached registry loader so we don't double-read +# ~/.mempalace/known_entities.json on every check_text call. +from .miner import _load_known_entities_raw -def check_text(text, palace_path=None, config=None): - """Check text for contradictions against known facts. +# Narrow detection patterns — parse "X is Y's Z" and "X's Z is Y". +# Names are captured greedily as word sequences (letters + optional +# capitalized follow-ons) so simple multi-token names still work. +# Relationship words are constrained to sane lengths to avoid matching +# arbitrary filler. +_RELATIONSHIP_PATTERNS = [ + # "Bob is Alice's brother" → subject=Bob, possessor=Alice, role=brother + re.compile(r"\b([A-Z][\w-]+)\s+is\s+([A-Z][\w-]+)'s\s+([a-z]{3,20})\b"), + # "Alice's brother is Bob" → possessor=Alice, role=brother, subject=Bob + re.compile(r"\b([A-Z][\w-]+)'s\s+([a-z]{3,20})\s+is\s+([A-Z][\w-]+)\b"), +] - Returns list of issues found. Empty list = no contradictions. + +def check_text(text: str, palace_path: str = None, config=None) -> list: + """Return a list of issues detected in ``text``. + + Empty list means "no contradictions found" — absence of evidence, not + evidence of absence. The detector is deliberately conservative: + every issue is anchored to a specific KG fact or registry entry. """ if config is None: from .config import MempalaceConfig + config = MempalaceConfig() if palace_path is None: palace_path = config.palace_path - issues = [] + if not text: + return [] - # Load known entities - entity_names = _load_known_entities() + issues: list = [] + entity_names_raw = _load_known_entities_raw() - # Check entity name confusion (similar names that might be mixed up) - issues.extend(_check_entity_confusion(text, entity_names)) - - # Check against knowledge graph facts - issues.extend(_check_kg_facts(text, palace_path)) + issues.extend(_check_entity_confusion(text, entity_names_raw)) + issues.extend(_check_kg_contradictions(text, palace_path)) return issues -def _load_known_entities(): - """Load entity names from the registry.""" - import json - registry_path = os.path.expanduser("~/.mempalace/known_entities.json") - if not os.path.exists(registry_path): - return {} - try: - return json.loads(open(registry_path).read()) - except Exception: - return {} +# ── entity-name confusion ──────────────────────────────────────────── -def _check_entity_confusion(text, entity_names): - """Check if text confuses similar entity names.""" - issues = [] - all_names = set() - for cat in entity_names.values(): +def _flatten_names(entity_names_raw: dict) -> set: + """Flatten a ``{category: [names]}`` or ``{category: {name: meta}}`` + registry into a set of names.""" + flat: set = set() + for cat in entity_names_raw.values(): if isinstance(cat, list): - all_names.update(cat) + flat.update(str(n) for n in cat if n) elif isinstance(cat, dict): - all_names.update(cat.keys()) + flat.update(str(k) for k in cat.keys() if k) + return flat - # Find names mentioned in text - mentioned = set() + +def _check_entity_confusion(text: str, entity_names_raw: dict) -> list: + """Flag names mentioned in the text that are edit-distance ≤ 2 from + a *different* registered name — a common typo / mix-up pattern. + + Performance note: the original O(n²) pairwise scan over the full + registry is gone. We first identify which names actually appear in + the text, then only compute edit distance between *mentioned* names + and the rest of the registry. This makes the cost O(m × n) where m + is the handful of names in the text, not the full registry. + """ + all_names = _flatten_names(entity_names_raw) + if not all_names: + return [] + + # Which names from the registry actually appear in the text? + mentioned: list = [] for name in all_names: - if re.search(r'\b' + re.escape(name) + r'\b', text, re.IGNORECASE): - mentioned.add(name) + if re.search(r"\b" + re.escape(name) + r"\b", text, re.IGNORECASE): + mentioned.append(name) + if not mentioned: + return [] - # Check for names that are very similar but different (edit distance 1-2) - name_list = sorted(all_names) - for i, name_a in enumerate(name_list): - for name_b in name_list[i + 1:]: - if _edit_distance(name_a.lower(), name_b.lower()) <= 2: - if name_a in mentioned or name_b in mentioned: - if name_a in text and name_b not in text: - issues.append({ - "type": "similar_name", - "detail": f"'{name_a}' mentioned — did you mean '{name_b}'? (similar names in registry)", - "names": [name_a, name_b], - }) + issues: list = [] + seen_pairs: set = set() + for name_a in mentioned: + a_lower = name_a.lower() + for name_b in all_names: + if name_b == name_a: + continue + # Dedupe by unordered pair so we don't double-report. + pair_key = tuple(sorted((name_a.lower(), name_b.lower()))) + if pair_key in seen_pairs: + continue + # Only flag when name_b is a *different* registry entry that + # was NOT mentioned — otherwise both names in the text is + # just the user writing about two people. + if name_b in mentioned: + seen_pairs.add(pair_key) + continue + distance = _edit_distance(a_lower, name_b.lower()) + if 0 < distance <= 2: + issues.append( + { + "type": "similar_name", + "detail": ( + f"'{name_a}' mentioned — did you mean " + f"'{name_b}'? (edit distance {distance})" + ), + "names": [name_a, name_b], + "distance": distance, + } + ) + seen_pairs.add(pair_key) return issues -def _check_kg_facts(text, palace_path): - """Check text against knowledge graph for contradictions.""" - issues = [] +# ── KG contradictions ──────────────────────────────────────────────── + + +def _extract_claims(text: str) -> list: + """Yield structured (subject, predicate, object) claims from ``text``. + + The two supported surface forms are "X is Y's Z" and "X's Z is Y", + both of which resolve to the triple ``(X, Z, Y)`` — ``X`` has role + ``Z`` with respect to ``Y``. Matches are case-preserving for the + entity names (KG lookup is case-insensitive on normalized IDs). + """ + claims: list = [] + for pat in _RELATIONSHIP_PATTERNS: + for match in pat.finditer(text): + groups = match.groups() + if pat is _RELATIONSHIP_PATTERNS[0]: + subject, possessor, role = groups[0], groups[1], groups[2] + else: + possessor, role, subject = groups[0], groups[1], groups[2] + claims.append( + { + "subject": subject, + "predicate": role.lower(), + "object": possessor, + "span": match.group(0), + } + ) + return claims + + +def _check_kg_contradictions(text: str, palace_path: str) -> list: + """Compare each claim in ``text`` against the KG. + + For every claim ``(subject, predicate, object)`` parsed from the + text, look up the subject's current KG triples: + + * ``relationship_mismatch`` fires when the KG records a fact about + the same ``(subject, object)`` pair but with a *different* + predicate — e.g. text says "brother" but KG says "husband". + * ``stale_fact`` fires when the KG has the exact ``(subject, + predicate, object)`` triple but its ``valid_to`` is in the past, + meaning the claim is no longer current. + """ + claims = _extract_claims(text) + if not claims: + return [] + try: from .knowledge_graph import KnowledgeGraph - kg = KnowledgeGraph(palace_path=palace_path) - # Extract relationship claims from text - # Pattern: "X is Y's Z" or "X's Z is Y" - patterns = [ - (r"(\w+)\s+is\s+(\w+)'s\s+(\w+)", "subject", "possessor", "role"), - (r"(\w+)'s\s+(\w+)\s+is\s+(\w+)", "possessor", "role", "subject"), - ] - - for pattern, *roles in patterns: - for match in re.finditer(pattern, text, re.IGNORECASE): - groups = match.groups() - subject = groups[0] - # Query KG for this entity - try: - facts = kg.query(subject) - if facts: - for fact in facts: - # Check if the claim contradicts a known fact - if fact.get("valid_to") is None: # current fact - kg_pred = fact.get("predicate", "").lower() - claim = match.group(0).lower() - if kg_pred in claim and fact.get("object", "").lower() not in claim: - issues.append({ - "type": "relationship_mismatch", - "detail": f"Text says '{match.group(0)}' but KG says: {subject} {kg_pred} {fact.get('object')}", - "entity": subject, - }) - except Exception: - pass + # KG lives alongside the palace collection; mcp_server uses the + # same convention (see _kg init). Pass ``db_path`` — the previous + # code passed a nonexistent ``palace_path`` kwarg which raised + # TypeError, silently swallowed by the outer except and rendered + # the entire KG-check path dead. + kg = KnowledgeGraph(db_path=os.path.join(palace_path, "knowledge_graph.sqlite3")) except Exception: - pass # KG not available — skip + # KG unavailable (brand-new palace, corrupted DB, etc.) — skip. + return [] + + issues: list = [] + for claim in claims: + subject = claim["subject"] + claim_pred = claim["predicate"] + claim_obj = claim["object"] + try: + facts = kg.query_entity(subject, direction="outgoing") + except Exception: + continue + if not facts: + continue + + current_facts = [f for f in facts if f.get("current")] + + # Mismatch: KG fact about same (subject, object) pair but different predicate. + for fact in current_facts: + if not _objects_match(fact.get("object"), claim_obj): + continue + kg_pred = (fact.get("predicate") or "").lower() + if kg_pred and kg_pred != claim_pred: + issues.append( + { + "type": "relationship_mismatch", + "detail": ( + f"Text says '{claim['span']}' but KG records " + f"{subject} {kg_pred} {fact.get('object')}" + ), + "entity": subject, + "claim": { + "predicate": claim_pred, + "object": claim_obj, + }, + "kg_fact": { + "predicate": kg_pred, + "object": fact.get("object"), + }, + } + ) + + # Stale fact: exact match on (subject, predicate, object) but KG + # closed the window in the past. + now_iso = datetime.now(timezone.utc).date().isoformat() + for fact in facts: + if fact.get("current"): + continue + kg_pred = (fact.get("predicate") or "").lower() + if kg_pred != claim_pred: + continue + if not _objects_match(fact.get("object"), claim_obj): + continue + valid_to = fact.get("valid_to") + if valid_to and str(valid_to) < now_iso: + issues.append( + { + "type": "stale_fact", + "detail": ( + f"Text says '{claim['span']}' but KG marks " + f"this fact closed on {valid_to}" + ), + "entity": subject, + "valid_to": valid_to, + } + ) return issues -def _edit_distance(s1, s2): - """Simple Levenshtein distance.""" +def _objects_match(kg_obj, claim_obj: str) -> bool: + if kg_obj is None or not claim_obj: + return False + return str(kg_obj).strip().lower() == claim_obj.strip().lower() + + +# ── Levenshtein helper (tight iterative version) ───────────────────── + + +def _edit_distance(s1: str, s2: str) -> int: + """Levenshtein distance. O(len(s1) * len(s2)) time, O(len(s2)) space.""" if len(s1) < len(s2): - return _edit_distance(s2, s1) - if len(s2) == 0: + s1, s2 = s2, s1 + if not s2: return len(s1) prev = list(range(len(s2) + 1)) for i, c1 in enumerate(s1): curr = [i + 1] for j, c2 in enumerate(s2): - curr.append(min( - prev[j + 1] + 1, - curr[j] + 1, - prev[j] + (0 if c1 == c2 else 1), - )) + curr.append( + min( + prev[j + 1] + 1, + curr[j] + 1, + prev[j] + (0 if c1 == c2 else 1), + ) + ) prev = curr return prev[-1] @@ -154,24 +306,30 @@ def _edit_distance(s1, s2): if __name__ == "__main__": import argparse import json + import sys - parser = argparse.ArgumentParser(description="Check text against known facts") - parser.add_argument("text", nargs="?", help="Text to check") - parser.add_argument("--palace", default=os.path.expanduser("~/.mempalace/palace")) - parser.add_argument("--stdin", action="store_true", help="Read from stdin") + parser = argparse.ArgumentParser( + description="Check text against known facts in the MemPalace palace.", + epilog="Exits 0 when no issues found, 1 when one or more issues detected.", + ) + parser.add_argument("text", nargs="?", help="Text to check (or use --stdin).") + parser.add_argument( + "--palace", + default=os.path.expanduser("~/.mempalace/palace"), + help="Path to the palace directory.", + ) + parser.add_argument("--stdin", action="store_true", help="Read text from stdin.") args = parser.parse_args() if args.stdin: - import sys - text = sys.stdin.read() + text_in = sys.stdin.read() elif args.text: - text = args.text + text_in = args.text else: - print("Provide text as argument or use --stdin") - exit(1) + parser.error("Provide text as argument or use --stdin.") - issues = check_text(text, palace_path=args.palace) - if issues: - print(json.dumps(issues, indent=2)) - else: - print("No contradictions found.") + found = check_text(text_in, palace_path=args.palace) + if found: + print(json.dumps(found, indent=2)) + sys.exit(1) + print("No contradictions found.") diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 08226a9..31be8a4 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -35,7 +35,15 @@ from .version import __version__ import chromadb from .query_sanitizer import sanitize_query from .searcher import search_memories -from .palace_graph import traverse, find_tunnels, graph_stats, create_tunnel, list_tunnels, delete_tunnel, follow_tunnels +from .palace_graph import ( + traverse, + find_tunnels, + graph_stats, + create_tunnel, + list_tunnels, + delete_tunnel, + follow_tunnels, +) from .knowledge_graph import KnowledgeGraph @@ -519,7 +527,10 @@ def tool_create_tunnel( except ValueError as e: return {"error": str(e)} return create_tunnel( - source_wing, source_room, target_wing, target_room, + source_wing, + source_room, + target_wing, + target_room, label=label, source_drawer_id=source_drawer_id, target_drawer_id=target_drawer_id, @@ -1251,8 +1262,14 @@ TOOLS = { "target_wing": {"type": "string", "description": "Wing of the target"}, "target_room": {"type": "string", "description": "Room in the target wing"}, "label": {"type": "string", "description": "Description of the connection"}, - "source_drawer_id": {"type": "string", "description": "Optional specific drawer ID"}, - "target_drawer_id": {"type": "string", "description": "Optional specific drawer ID"}, + "source_drawer_id": { + "type": "string", + "description": "Optional specific drawer ID", + }, + "target_drawer_id": { + "type": "string", + "description": "Optional specific drawer ID", + }, }, "required": ["source_wing", "source_room", "target_wing", "target_room"], }, @@ -1263,7 +1280,10 @@ TOOLS = { "input_schema": { "type": "object", "properties": { - "wing": {"type": "string", "description": "Filter tunnels by wing (shows tunnels where wing is source or target)"}, + "wing": { + "type": "string", + "description": "Filter tunnels by wing (shows tunnels where wing is source or target)", + }, }, }, "handler": tool_list_tunnels, diff --git a/mempalace/miner.py b/mempalace/miner.py index 04bcf61..3d8e29e 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -379,17 +379,17 @@ def chunk_text(content: str, source_file: str) -> list: _ENTITY_REGISTRY_PATH = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json") -_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset()} +_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset(), "raw": {}} _ENTITY_EXTRACT_WINDOW = 5000 # chars of content scanned for capitalized words _ENTITY_METADATA_LIMIT = 25 # max entities packed into the metadata field -def _load_known_entities() -> frozenset: - """Load (and cache) the user's known-entity registry by mtime. - - Reads ``~/.mempalace/known_entities.json``. The registry is shaped as - ``{"category": ["Name1", "Name2", ...], ...}``. Cached across calls - in the same process; invalidated when the file's mtime changes. +def _refresh_known_entities_cache() -> None: + """Reload ``~/.mempalace/known_entities.json`` into the module cache if + its mtime changed since the last read. Shared by ``_load_known_entities`` + (flat set) and ``_load_known_entities_raw`` (category dict), so callers + can pick whichever shape they need without duplicating the mtime-gated + disk read. """ try: mtime = os.path.getmtime(_ENTITY_REGISTRY_PATH) @@ -397,28 +397,56 @@ def _load_known_entities() -> frozenset: if _ENTITY_REGISTRY_CACHE["mtime"] is not None: _ENTITY_REGISTRY_CACHE["mtime"] = None _ENTITY_REGISTRY_CACHE["names"] = frozenset() - return _ENTITY_REGISTRY_CACHE["names"] + _ENTITY_REGISTRY_CACHE["raw"] = {} + return if _ENTITY_REGISTRY_CACHE["mtime"] == mtime: - return _ENTITY_REGISTRY_CACHE["names"] + return names: set = set() + raw: dict = {} try: import json with open(_ENTITY_REGISTRY_PATH, "r", encoding="utf-8") as f: data = json.load(f) - for cat in data.values(): - if isinstance(cat, list): - names.update(str(n) for n in cat if n) + if isinstance(data, dict): + raw = data + for cat in data.values(): + if isinstance(cat, list): + names.update(str(n) for n in cat if n) + elif isinstance(cat, dict): + names.update(str(k) for k in cat.keys() if k) except Exception: names = set() + raw = {} _ENTITY_REGISTRY_CACHE["mtime"] = mtime _ENTITY_REGISTRY_CACHE["names"] = frozenset(names) + _ENTITY_REGISTRY_CACHE["raw"] = raw + + +def _load_known_entities() -> frozenset: + """Flat set of every known entity name (across all categories). + + Cached by mtime; invalidated when the registry file changes. + """ + _refresh_known_entities_cache() return _ENTITY_REGISTRY_CACHE["names"] +def _load_known_entities_raw() -> dict: + """Full category-dict view of the registry, shape + ``{"category": ["Name1", ...], ...}``. Cached by mtime. + + Consumed by modules (e.g., fact_checker) that need to reason about + categories rather than a flat name set. Never returns a mutable + reference to the cache — callers get a shallow copy. + """ + _refresh_known_entities_cache() + return dict(_ENTITY_REGISTRY_CACHE["raw"]) + + def _extract_entities_for_metadata(content: str) -> str: """Extract entity names from content for metadata tagging. diff --git a/tests/test_fact_checker.py b/tests/test_fact_checker.py new file mode 100644 index 0000000..5b34a40 --- /dev/null +++ b/tests/test_fact_checker.py @@ -0,0 +1,288 @@ +""" +test_fact_checker.py — Regression + integration tests for fact_checker. + +Covers every detection path + the three bugs the original PR silently +hid behind ``except Exception: pass``: + + * ``kg.query()`` doesn't exist — code must use ``query_entity``. + * ``KnowledgeGraph(palace_path=...)`` is not a valid kwarg — code + must pass ``db_path``. + * O(n²) edit-distance over the full registry — must filter to names + actually mentioned in the text. + +Also pins the three feature contracts: + * similar_name — "Mila" vs "Milla" in a registry with both. + * relationship_mismatch — "Bob is Alice's brother" vs KG "husband". + * stale_fact — claim matches a triple whose valid_to is in the past. +""" + +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from mempalace.fact_checker import ( + _check_entity_confusion, + _edit_distance, + _extract_claims, + _flatten_names, + check_text, +) +from mempalace.knowledge_graph import KnowledgeGraph + + +# ── claim extraction ───────────────────────────────────────────────── + + +class TestExtractClaims: + def test_parses_x_is_ys_z(self): + claims = _extract_claims("Bob is Alice's brother") + assert len(claims) == 1 + assert claims[0] == { + "subject": "Bob", + "predicate": "brother", + "object": "Alice", + "span": "Bob is Alice's brother", + } + + def test_parses_xs_z_is_y(self): + claims = _extract_claims("Alice's brother is Bob") + assert len(claims) == 1 + assert claims[0]["subject"] == "Bob" + assert claims[0]["predicate"] == "brother" + assert claims[0]["object"] == "Alice" + + def test_ignores_sentences_without_possessive_role(self): + assert _extract_claims("Bob drove to the store today") == [] + assert _extract_claims("Just some prose without relationships") == [] + + def test_multiple_claims_in_one_text(self): + claims = _extract_claims("Bob is Alice's brother. Carol is Dave's sister.") + subjects = {c["subject"] for c in claims} + assert subjects == {"Bob", "Carol"} + + +# ── entity confusion ───────────────────────────────────────────────── + + +class TestEntityConfusion: + def test_flags_near_name_when_only_one_mentioned(self): + registry = {"people": ["Milla", "Mila"]} + issues = _check_entity_confusion("I spoke with Mila today.", registry) + # "Mila" mentioned, "Milla" not — registry has both at edit-distance 1, + # flag the possible confusion. + assert len(issues) == 1 + assert issues[0]["type"] == "similar_name" + assert set(issues[0]["names"]) == {"Mila", "Milla"} + assert issues[0]["distance"] == 1 + + def test_no_false_positive_when_both_names_mentioned(self): + """Regression: a text discussing both Mila and Milla is fine — + the user clearly knows they're different. Don't nag.""" + registry = {"people": ["Milla", "Mila"]} + issues = _check_entity_confusion("Mila and Milla met for lunch.", registry) + assert issues == [] + + def test_no_issues_when_registry_empty(self): + assert _check_entity_confusion("Bob said hi", {}) == [] + assert _check_entity_confusion("Bob said hi", {"people": []}) == [] + + def test_no_issues_when_no_mentioned_names(self): + registry = {"people": ["Zelda", "Link", "Sheik"]} + assert _check_entity_confusion("nothing relevant here", registry) == [] + + def test_registry_dict_shape_is_supported(self): + # Some registries store {"people": {"Alice": {...meta}}}; we still + # need to surface the keys as candidate names. + registry = {"people": {"Milla": {"role": "creator"}, "Mila": {}}} + issues = _check_entity_confusion("I messaged Mila yesterday", registry) + assert any("Milla" in (i["names"] or []) for i in issues) + + +class TestEditDistance: + def test_basic_distances(self): + assert _edit_distance("kitten", "sitting") == 3 + assert _edit_distance("mila", "milla") == 1 + assert _edit_distance("abc", "abc") == 0 + + def test_empty_strings(self): + assert _edit_distance("", "") == 0 + assert _edit_distance("abc", "") == 3 + assert _edit_distance("", "abc") == 3 + + def test_performance_bounded_by_mentioned_names(self): + """Regression: an earlier implementation did O(n²) pairwise + edit-distance over every registry entry on every check_text call. + With 100 names and zero mentions, the call must return in a blink + because no edit-distance comparison should even start.""" + import time + + # 500 random names, none of which appear in the text. + registry = {"people": [f"Zelda{i:03d}" for i in range(500)]} + text = "completely irrelevant prose with no registered names at all" + + start = time.perf_counter() + issues = _check_entity_confusion(text, registry) + elapsed = time.perf_counter() - start + + assert issues == [] + # Even an unoptimized implementation should beat this by orders + # of magnitude once we've filtered to mentioned names (which is + # 0 here) — if it's still doing O(n²), we'll blow past. + assert elapsed < 0.2, f"entity confusion took {elapsed:.3f}s on empty mentions" + + +# ── _flatten_names helper ──────────────────────────────────────────── + + +class TestFlattenNames: + def test_handles_list_categories(self): + assert _flatten_names({"people": ["Ada", "Bob"]}) == {"Ada", "Bob"} + + def test_handles_dict_categories(self): + assert _flatten_names({"people": {"Ada": {}, "Bob": {}}}) == {"Ada", "Bob"} + + def test_skips_falsy_entries(self): + assert _flatten_names({"people": ["Ada", "", None, "Bob"]}) == {"Ada", "Bob"} + + +# ── KG integration (uses a real tmp SQLite palace) ─────────────────── + + +@pytest.fixture +def palace_with_kg(tmp_path): + """Palace directory with a real KG pre-seeded with a few triples. + + The KG file lives at ``/knowledge_graph.sqlite3`` — same + convention used by the MCP server. Fact-checker must find it via + that path, not via a bogus ``palace_path`` kwarg. + """ + palace = tmp_path / "palace" + palace.mkdir() + db = str(palace / "knowledge_graph.sqlite3") + kg = KnowledgeGraph(db_path=db) + yield palace, kg + + +class TestKGContradictions: + def test_kg_init_uses_db_path_not_palace_path_kwarg(self): + """Regression: the original code passed ``palace_path=`` to a + constructor whose only kwarg is ``db_path``. That raised + TypeError — silently swallowed — and the KG path became dead + code. This test pins the correct call signature.""" + # Simply construct via the correct signature; raising means the + # KG constructor has changed in a way that fact_checker must too. + kg = KnowledgeGraph(db_path=":memory:") + # query_entity must exist (this is the method fact_checker calls). + assert callable(getattr(kg, "query_entity", None)) + # The API that fact_checker used to call does NOT exist. + assert not hasattr(kg, "query") + + def test_relationship_mismatch_detected(self, palace_with_kg): + """The feature's headline example: text says brother, KG says husband.""" + palace, kg = palace_with_kg + kg.add_triple("Bob", "husband_of", "Alice", valid_from="2020-01-01") + + issues = check_text("Bob is Alice's husband_of", str(palace)) + # Exact-predicate + same object → no mismatch. + assert all(i["type"] != "relationship_mismatch" for i in issues) + + issues = check_text("Bob is Alice's brother", str(palace)) + mismatches = [i for i in issues if i["type"] == "relationship_mismatch"] + assert mismatches, "should flag text/KG mismatch for same (subject, object)" + m = mismatches[0] + assert m["entity"] == "Bob" + assert m["claim"]["predicate"] == "brother" + assert m["kg_fact"]["predicate"] == "husband_of" + + def test_no_false_positive_when_kg_has_no_facts_about_subject(self, palace_with_kg): + palace, _ = palace_with_kg + # KG is empty → no mismatch should fire. + assert check_text("Bob is Alice's brother", str(palace)) == [] + + def test_stale_fact_detected(self, palace_with_kg): + palace, kg = palace_with_kg + # An old relationship that was superseded in 2023. Using a + # possessive-shape claim so the narrow claim-extraction regex + # actually reaches the stale-fact branch. + kg.add_triple( + "Bob", + "brother", + "Alice", + valid_from="2010-01-01", + valid_to="2023-06-01", + ) + issues = check_text("Bob is Alice's brother", str(palace)) + stale = [i for i in issues if i["type"] == "stale_fact"] + assert stale, "should flag closed-window fact as stale" + assert stale[0]["entity"] == "Bob" + assert stale[0]["valid_to"].startswith("2023") + + def test_current_fact_same_triple_is_not_flagged(self, palace_with_kg): + palace, kg = palace_with_kg + kg.add_triple("Bob", "brother", "Alice", valid_from="2010-01-01") + issues = check_text("Bob is Alice's brother", str(palace)) + assert issues == [] + + def test_missing_palace_does_not_crash(self, tmp_path): + """Brand-new palace (no KG file yet) — check_text must return [] + rather than raising or hanging.""" + nonexistent = str(tmp_path / "never_created") + assert check_text("Bob is Alice's brother", nonexistent) == [] + + +# ── end-to-end check_text contract ─────────────────────────────────── + + +class TestCheckTextContract: + def test_empty_text_returns_empty_list(self, tmp_path): + assert check_text("", str(tmp_path / "palace")) == [] + + def test_registry_confusion_path_isolated_from_kg(self, tmp_path, monkeypatch): + """If the registry file is present but the KG is missing, the + similar-name path must still fire. Prior implementations had + such entangled state that one failure killed both paths.""" + # Bypass the real registry by pointing cache at a temp file. + registry = tmp_path / "known_entities.json" + registry.write_text(json.dumps({"people": ["Milla", "Mila"]})) + from mempalace import miner + + monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry)) + miner._ENTITY_REGISTRY_CACHE.update({"mtime": None, "names": frozenset(), "raw": {}}) + + issues = check_text("Chatted with Mila.", str(tmp_path / "nonexistent_palace")) + assert any(i["type"] == "similar_name" for i in issues) + + +# ── CLI ────────────────────────────────────────────────────────────── + + +class TestCLI: + def test_exits_nonzero_when_issues_found(self, tmp_path, monkeypatch, capsys): + """The CLI exit code is how shell scripts / hooks know to act — + pin it explicitly.""" + registry = tmp_path / "known_entities.json" + registry.write_text(json.dumps({"people": ["Milla", "Mila"]})) + from mempalace import fact_checker, miner + + monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry)) + miner._ENTITY_REGISTRY_CACHE.update({"mtime": None, "names": frozenset(), "raw": {}}) + + # Simulate argv: "Mila said hi" + monkeypatch.setattr( + "sys.argv", + ["fact_checker", "Mila said hi", "--palace", str(tmp_path / "palace")], + ) + with pytest.raises(SystemExit) as excinfo: + # Re-exec the __main__ block via runpy. + import runpy + + runpy.run_module("mempalace.fact_checker", run_name="__main__") + # Issues found → exit code 1. + assert excinfo.value.code == 1 + out = capsys.readouterr().out + assert "similar_name" in out + # Silence unused import warning. + _ = (MagicMock, patch, fact_checker)