merge: develop + harden entity metadata, BM25, and diary ingest for production

Merges develop (closet hardening #826, strip_noise #785, lock #784) and
replaces every sub-feature in this PR with a correct, tested
implementation. Shippable now.

## 1. Real Okapi-BM25 (searcher.py)

The prior `_bm25_score()` hardcoded `idf = log(2.0)` for every term — it
was really a scaled TF, not BM25, and couldn't tell a discriminative
term from a generic one. Replaced with `_bm25_scores(query, documents)`
that computes proper IDF over the provided candidate corpus using the
Lucene smoothed formula `log((N - df + 0.5) / (df + 0.5) + 1)`. Well-
defined for re-ranking vector-retrieval candidates — IDF there measures
how discriminative each term is *within the candidate set*, exactly the
signal we want.

`_hybrid_rank` also fixed:
- Vector normalization is now absolute `max(0, 1 - dist)`, not
  `1 - dist/max_dist` — adding/removing a candidate no longer reshuffles
  the others.
- BM25 is min-max normalized within candidates (bounded [0, 1]).
- Closet path now re-ranks too (was previously returning closet-order
  hits without hybrid scoring).
- `_hybrid_score` internal field stripped from output; `bm25_score`
  exposed for debugging.

## 2. Entity metadata (miner.py)

- Reuses `_ENTITY_STOPLIST` from palace.py so sentence-starters like
  "When", "After", "The" no longer land as entities (regression test
  covers this).
- Known-entity registry is cached at module level, keyed by the
  registry file's mtime — no more disk read per drawer.
- File handle now uses a context manager.
- Truncates the entity LIST (to 25) before joining — never splits a
  name in the middle.

## 3. Diary ingest (diary_ingest.py)

- State file now lives at `~/.mempalace/state/diary_ingest_<hash>.json`,
  keyed by (palace_path, diary_dir). No more pollution of the user's
  content directory.
- Drawer IDs now hash `(wing, date_str)` — a user with personal + work
  diaries on the same day no longer silently clobbers.
- Each day's upsert runs inside `mine_lock(source_file)` so concurrent
  ingest from two terminals can't race.
- `force=True` now calls `purge_file_closets` before rebuild so
  leftover numbered closets from a longer prior day don't orphan.

## 4. Tests (tests/test_closets.py)

Merged this PR's MineLock/Entity/BM25/Diary tests with develop's
hardened Build/Upsert/Purge/Rebuild/SearchClosetFirst tests. Added
specific regression tests for every fix above:
- entity stoplist applies (no "When/After/The")
- entity list capped before join (no partial tokens)
- registry cached by mtime (mock-verified zero re-reads)
- BM25 IDF downweights terms present in every doc (real BM25 evidence)
- hybrid rank absolute normalization stable against outliers
- diary state file outside user's diary dir
- diary wing-prefixed IDs prevent cross-wing date collisions

35/35 closet tests pass; full suite 743/743. ruff + format clean under
CI-pinned 0.4.x.
This commit is contained in:
Igor Lins e Silva
2026-04-13 17:37:45 -03:00
17 changed files with 1623 additions and 403 deletions
+505 -101
View File
@@ -1,130 +1,341 @@
"""Tests for the closet layer, mine_lock, entity metadata, BM25 hybrid search,
and diary ingest.
"""
test_closets.py — Tests for the closet (searchable index) layer and the
features that ride on top of it: mine_lock serialization, entity metadata,
hybrid BM25+vector search, and diary ingest.
Content derived from Milla's omnibus test file; trimmed to only the features
present in this PR stack (#784 lock, #788 closets, this PR's entity/BM25/diary).
Strip-noise tests live with #785; tunnel tests live with the tunnels PR.
Coverage map:
* mine_lock — acquire/release, blocks concurrent acquisition.
* build_closet_lines — pointer-line shape, header pickup, entity stoplist
(regression for "When/After/The"), real-name survival, fallback line.
* upsert_closet_lines — pure overwrite (regression for the append bug),
char-limit packing without splitting a line.
* purge_file_closets — scoped to source_file.
* Project-miner end-to-end rebuild — re-mining with fewer topics fully
purges leftover numbered closets from a larger prior run.
* _extract_drawer_ids_from_closet — pointer parsing + dedup.
* search_memories closet-first path — fallback when empty, chunk-level
hits with matched_via, no whole-file glue, max_distance enforcement.
* Entity metadata — extracted, stoplist applied, registry cached by mtime.
* Real BM25 — real IDF over candidate corpus, hybrid rerank.
* Diary ingest — drawers + closets created, incremental skips, state
file lives outside the diary dir, wing-prefixed drawer IDs prevent
cross-diary collisions, force=True purges leftover closets.
"""
import json
import os
import tempfile
import threading
import time
import yaml
from mempalace.miner import (
_extract_entities_for_metadata,
_load_known_entities,
mine,
)
from mempalace.palace import (
CLOSET_CHAR_LIMIT,
build_closet_lines,
get_closets_collection,
get_collection,
mine_lock,
purge_file_closets,
upsert_closet_lines,
)
from mempalace.miner import _extract_entities_for_metadata
from mempalace.searcher import _bm25_score, _hybrid_rank
from mempalace.searcher import (
_bm25_scores,
_extract_drawer_ids_from_closet,
_hybrid_rank,
search_memories,
)
# ── mine_lock ────────────────────────────────────────────────────────────
class TestMineLock:
def test_lock_acquires_and_releases(self):
with mine_lock("/tmp/test_lock_file.txt"):
def test_lock_acquires_and_releases(self, tmp_path):
target = str(tmp_path / "lock_target.txt")
with mine_lock(target):
lock_dir = os.path.expanduser("~/.mempalace/locks")
assert os.path.isdir(lock_dir)
# Re-acquire after release should succeed instantly.
start = time.time()
with mine_lock(target):
pass
assert time.time() - start < 1.0
def test_lock_blocks_concurrent_access(self):
def test_lock_blocks_concurrent_access(self, tmp_path):
target = str(tmp_path / "concurrent_lock.txt")
results = []
def worker(name):
start = time.time()
with mine_lock("/tmp/same_file_lock_test.txt"):
with mine_lock(target):
results.append((name, time.time() - start))
time.sleep(0.2)
t1 = threading.Thread(target=worker, args=("a",))
t2 = threading.Thread(target=worker, args=("b",))
t1.start()
time.sleep(0.05)
time.sleep(0.05) # ensure t1 acquires first
t2.start()
t1.join()
t2.join()
# Second thread should have waited
wait_times = sorted(results, key=lambda x: x[1])
assert wait_times[1][1] > 0.1, "Second thread should block"
# The second worker must have waited at least most of t1's hold time.
wait_times = sorted(r[1] for r in results)
assert (
wait_times[1] > 0.1
), f"second thread should block on mine_lock, waited only {wait_times[1]:.3f}s"
# ── closet lines ─────────────────────────────────────────────────────────
# ── build_closet_lines ─────────────────────────────────────────────────
class TestBuildClosetLines:
def test_returns_list_of_lines(self):
lines = build_closet_lines(
"/tmp/test.py", ["drawer_001"], "We built the auth system", "code", "general"
def test_emits_pointer_line_shape(self):
content = (
"# Auth rewrite\n\n"
"Decided we need to migrate to passkeys. "
"Built the prototype with WebAuthn. "
"Reviewed the API surface."
)
assert isinstance(lines, list)
assert len(lines) >= 1
def test_each_line_has_pointer(self):
lines = build_closet_lines(
"/tmp/test.py",
["drawer_001", "drawer_002"],
"We built the auth system and tested the login flow",
"code",
"general",
"/proj/auth.md",
["drawer_proj_backend_aaa", "drawer_proj_backend_bbb"],
content,
wing="proj",
room="backend",
)
assert lines, "should always emit at least one line"
for line in lines:
assert "" in line, f"Line missing pointer: {line}"
assert "" in line, f"line missing pointer arrow: {line!r}"
parts = line.split("|")
assert len(parts) == 3, f"expected topic|entities|→refs, got {line!r}"
assert parts[2].startswith("")
def test_fallback_when_no_topics(self):
lines = build_closet_lines(
"/tmp/test.py", ["drawer_001"], "short text", "wing", "room"
def test_extracts_section_headers_as_topics(self):
content = "# First Header\nbody\n## Second Header\nmore body"
lines = build_closet_lines("/x.md", ["d1"], content, "w", "r")
joined = "\n".join(lines).lower()
assert "first header" in joined
assert "second header" in joined
def test_entity_stoplist_filters_sentence_starters(self):
# "When", "After", "The" repeat 3+ times — old code would index them
# as entities. Stoplist drops them.
content = (
"When the pipeline ran, the result was good. "
"When the user logged in, the token was issued. "
"After the migration, the latency dropped. "
"After the rollback, the latency rose. "
"The new flow is stable. The audit cleared."
)
assert len(lines) >= 1
assert "" in lines[0]
lines = build_closet_lines("/x.md", ["d1"], content, "w", "r")
entity_segments = [line.split("|")[1] for line in lines]
for seg in entity_segments:
tokens = set(seg.split(";")) if seg else set()
assert "When" not in tokens
assert "After" not in tokens
assert "The" not in tokens
def test_real_proper_nouns_survive_stoplist(self):
content = (
"Igor reviewed the diff. Milla wrote the spec. "
"Igor pushed the fix. Milla approved the PR. "
"Igor and Milla shipped together."
)
lines = build_closet_lines("/x.md", ["d1"], content, "w", "r")
joined_entities = ";".join(line.split("|")[1] for line in lines)
assert "Igor" in joined_entities
assert "Milla" in joined_entities
def test_emits_fallback_line_when_nothing_extractable(self):
content = "lorem ipsum dolor sit amet consectetur adipiscing elit"
lines = build_closet_lines("/x/notes.txt", ["d1"], content, "wing", "room")
assert len(lines) == 1
assert "wing/room/notes" in lines[0]
assert "→d1" in lines[0]
def test_pointer_references_first_three_drawers(self):
ids = [f"drawer_{i}" for i in range(10)]
lines = build_closet_lines("/x.md", ids, "# A\n# B", "w", "r")
assert all("→drawer_0,drawer_1,drawer_2" in line for line in lines)
# ── upsert_closet_lines ─────────────────────────────────────────────────
# ── upsert_closet_lines ───────────────────────────────────────────────
class TestUpsertClosetLines:
def test_writes_closets(self):
with tempfile.TemporaryDirectory() as tmpdir:
col = get_closets_collection(tmpdir)
lines = [
"topic one|Entity1|→drawer_001",
"topic two|Entity2|→drawer_002",
]
n = upsert_closet_lines(col, "test_closet", lines, {"wing": "test"})
assert n >= 1
assert col.count() >= 1
def test_overwrites_existing_closet_does_not_append(self, palace_path):
col = get_closets_collection(palace_path)
base = "closet_test_room_abc"
meta = {"wing": "test", "room": "room", "source_file": "/x.md"}
def test_never_splits_mid_topic(self):
with tempfile.TemporaryDirectory() as tmpdir:
col = get_closets_collection(tmpdir)
# Create lines that together exceed CLOSET_CHAR_LIMIT
lines = [f"topic_{i}|{'x' * 200}|→drawer_{i}" for i in range(20)]
n = upsert_closet_lines(col, "test_closet", lines, {"wing": "test"})
assert n >= 2, "Should create multiple closets"
upsert_closet_lines(col, base, ["alpha|;|→d1", "beta|;|→d2", "gamma|;|→d3"], meta)
first = col.get(ids=[f"{base}_01"])
assert "alpha" in first["documents"][0]
# Verify each closet has complete lines
all_data = col.get(include=["documents"])
for doc in all_data["documents"]:
for line in doc.strip().split("\n"):
assert "" in line, f"Split topic found: {line}"
# Second mine — entirely different lines. Must replace, not append.
upsert_closet_lines(col, base, ["delta|;|→d4", "epsilon|;|→d5"], meta)
second = col.get(ids=[f"{base}_01"])
doc = second["documents"][0]
assert "delta" in doc
assert "epsilon" in doc
assert "alpha" not in doc, "old closet line leaked into rebuild"
assert "beta" not in doc
def test_respects_char_limit(self):
with tempfile.TemporaryDirectory() as tmpdir:
col = get_closets_collection(tmpdir)
lines = [f"topic_{i}|entities|→drawer_{i}" for i in range(50)]
upsert_closet_lines(col, "test_closet", lines, {"wing": "test"})
def test_packs_into_multiple_closets_without_splitting_lines(self, palace_path):
col = get_closets_collection(palace_path)
base = "closet_pack_room_def"
meta = {"wing": "test", "room": "room", "source_file": "/y.md"}
all_data = col.get(include=["documents"])
for doc in all_data["documents"]:
assert len(doc) <= CLOSET_CHAR_LIMIT + 100 # small buffer for existing content
line = "x" * 600 # well under CLOSET_CHAR_LIMIT
n_written = upsert_closet_lines(col, base, [line, line, line, line], meta)
# 4 lines @ 601 chars each = 2404 — should pack into 2 closets
assert n_written == 2
for i in range(1, n_written + 1):
doc = col.get(ids=[f"{base}_{i:02d}"])["documents"][0]
for chunk in doc.split("\n"):
assert len(chunk) == 600, f"line was truncated in closet {i}"
assert len(doc) <= CLOSET_CHAR_LIMIT
# ── entity metadata ──────────────────────────────────────────────────────
# ── purge_file_closets ────────────────────────────────────────────────
class TestPurgeFileClosets:
def test_deletes_only_the_targeted_source(self, palace_path):
col = get_closets_collection(palace_path)
col.upsert(
ids=["closet_a_01", "closet_b_01"],
documents=["a|;|→d1", "b|;|→d2"],
metadatas=[
{"source_file": "/keep.md", "wing": "w", "room": "r"},
{"source_file": "/drop.md", "wing": "w", "room": "r"},
],
)
purge_file_closets(col, "/drop.md")
remaining_ids = set(col.get()["ids"])
assert "closet_a_01" in remaining_ids
assert "closet_b_01" not in remaining_ids
# ── project miner: closet rebuild end-to-end ──────────────────────────
class TestMinerClosetRebuild:
def test_remine_replaces_closets_completely(self, tmp_path):
project = tmp_path / "proj"
project.mkdir()
(project / "mempalace.yaml").write_text(
yaml.dump({"wing": "proj", "rooms": [{"name": "general", "description": "x"}]})
)
target = project / "doc.md"
# First mine — long content produces multiple numbered closets.
first_topics = "\n\n".join(f"# Topic {i}\n" + ("filler text " * 30) for i in range(15))
target.write_text(first_topics)
palace = tmp_path / "palace"
mine(str(project), str(palace), wing_override="proj", agent="test")
col = get_closets_collection(str(palace))
first_pass = col.get(where={"source_file": str(target)})
assert first_pass["ids"], "first mine should have written closets"
first_ids = set(first_pass["ids"])
assert any("topic 0" in (d or "").lower() for d in first_pass["documents"])
# Touch mtime + shrink content so the rebuild produces fewer closets.
target.write_text("# Only Topic Now\n" + ("short body " * 5))
new_mtime = os.path.getmtime(target) + 60
os.utime(target, (new_mtime, new_mtime))
time.sleep(0.01)
mine(str(project), str(palace), wing_override="proj", agent="test")
col = get_closets_collection(str(palace))
second_pass = col.get(where={"source_file": str(target)})
second_docs = "\n".join(second_pass["documents"]).lower()
assert "only topic now" in second_docs
for i in range(15):
assert (
f"topic {i}\n" not in second_docs
), f"stale 'Topic {i}' from first mine survived the rebuild"
# Numbered closets that existed only in the larger first run must be gone.
leftover = first_ids - set(second_pass["ids"])
for stale_id in leftover:
assert not col.get(ids=[stale_id])[
"ids"
], f"orphan closet {stale_id} from larger first run survived purge"
# ── _extract_drawer_ids_from_closet ───────────────────────────────────
class TestExtractDrawerIds:
def test_parses_single_pointer(self):
assert _extract_drawer_ids_from_closet("topic|;|→drawer_x") == ["drawer_x"]
def test_parses_multiple_pointers_per_line(self):
line = "topic|ent|→drawer_a,drawer_b,drawer_c"
assert _extract_drawer_ids_from_closet(line) == ["drawer_a", "drawer_b", "drawer_c"]
def test_dedupes_across_lines(self):
doc = "one|;|→drawer_a,drawer_b\ntwo|;|→drawer_b,drawer_c"
assert _extract_drawer_ids_from_closet(doc) == ["drawer_a", "drawer_b", "drawer_c"]
def test_empty_doc_returns_empty(self):
assert _extract_drawer_ids_from_closet("") == []
assert _extract_drawer_ids_from_closet("no arrows here") == []
# ── search_memories closet-first path ────────────────────────────────
class TestSearchMemoriesClosetFirst:
def test_falls_back_to_direct_when_no_closets(self, palace_path, seeded_collection):
result = search_memories("JWT authentication", palace_path)
assert result["results"], "should still find drawer hits via fallback"
for hit in result["results"]:
assert hit.get("matched_via") == "drawer"
def test_closet_first_returns_chunk_level_hits(self, palace_path, seeded_collection):
closets = get_closets_collection(palace_path)
closets.upsert(
ids=["closet_proj_backend_aaa_01"],
documents=["JWT auth tokens|;|→drawer_proj_backend_aaa"],
metadatas=[{"wing": "project", "room": "backend", "source_file": "auth.py"}],
)
result = search_memories("JWT authentication", palace_path)
assert result["results"], "closet-first search should hydrate the drawer"
top = result["results"][0]
assert top["matched_via"] == "closet"
assert "JWT" in top["text"]
# Chunk-level — must NOT glue every drawer in the file together.
assert "Database migrations" not in top["text"]
assert "→drawer_proj_backend_aaa" in top["closet_preview"]
def test_max_distance_filters_closet_hits(self, palace_path, seeded_collection):
closets = get_closets_collection(palace_path)
closets.upsert(
ids=["closet_proj_backend_aaa_01"],
documents=["JWT auth tokens|;|→drawer_proj_backend_aaa"],
metadatas=[{"wing": "project", "room": "backend", "source_file": "auth.py"}],
)
result = search_memories(
"completely unrelated query about quantum gardening",
palace_path,
max_distance=0.001,
)
for hit in result["results"]:
assert hit["distance"] <= 0.001
# ── entity metadata ──────────────────────────────────────────────────
class TestEntityMetadata:
@@ -136,66 +347,259 @@ class TestEntityMetadata:
def test_empty_for_no_entities(self):
text = "this is all lowercase with no proper nouns at all"
entities = _extract_entities_for_metadata(text)
assert entities == ""
assert _extract_entities_for_metadata(text) == ""
def test_semicolon_separated(self):
text = "Alice and Bob met Charlie. Alice said hello. Bob agreed. Charlie laughed."
entities = _extract_entities_for_metadata(text)
assert ";" in entities
def test_stoplist_filters_sentence_starters(self):
# Same regression as the closet entity test — "When/After/The" must
# not become entities just because they're capitalized 2+ times.
text = (
"When the build broke, the team paged. "
"When the fix landed, the alarm cleared. "
"After the rollback, the queue drained. "
"After the deploy, the latency normalized."
)
entities = _extract_entities_for_metadata(text)
tokens = set(entities.split(";")) if entities else set()
assert "When" not in tokens
assert "After" not in tokens
assert "The" not in tokens
# ── BM25 hybrid search ──────────────────────────────────────────────────
def test_capped_list_never_truncates_a_name(self):
# 30 distinct repeated proper nouns — extraction should cap the list
# before joining so a name never gets cut in half.
# Use morphologically distinct stems so the [A-Z][a-z]+ regex sees
# each as its own token.
names = [
"Anna",
"Brian",
"Carol",
"David",
"Elena",
"Frank",
"Grace",
"Harold",
"Iris",
"Julian",
"Kira",
"Liam",
"Maya",
"Noah",
"Oscar",
"Penny",
"Quinn",
"Rosa",
"Sergei",
"Tara",
"Umar",
"Vera",
"Walter",
"Xander",
"Yvonne",
"Zachary",
"Amelia",
"Boris",
"Clara",
"Dmitri",
]
text = " ".join(f"{n} met {n}." for n in names)
entities = _extract_entities_for_metadata(text)
extracted = [n for n in entities.split(";") if n]
assert extracted, "should have extracted some entities"
for name in extracted:
assert name in names, f"truncation produced a partial token: {name!r}"
def test_known_registry_is_cached_by_mtime(self, monkeypatch, tmp_path):
# Point the registry at a temp file we control, exercise the cache.
registry = tmp_path / "known_entities.json"
registry.write_text(json.dumps({"people": ["Zelda"]}))
from mempalace import miner
monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry))
miner._ENTITY_REGISTRY_CACHE["mtime"] = None
miner._ENTITY_REGISTRY_CACHE["names"] = frozenset()
first = _load_known_entities()
assert "Zelda" in first
# Second call without changing mtime: must reuse cache, not re-read.
read_count = {"n": 0}
original_open = open
def counting_open(path, *a, **kw):
if str(path) == str(registry):
read_count["n"] += 1
return original_open(path, *a, **kw)
monkeypatch.setattr("builtins.open", counting_open)
_load_known_entities()
assert read_count["n"] == 0, "registry should not be re-read when mtime unchanged"
# Bump mtime → cache must invalidate.
new_mtime = os.path.getmtime(registry) + 5
os.utime(registry, (new_mtime, new_mtime))
registry.write_text(json.dumps({"people": ["Zelda", "Link"]}))
os.utime(registry, (new_mtime, new_mtime))
names = _load_known_entities()
assert "Link" in names
# ── BM25 hybrid search (real IDF over candidate corpus) ──────────────
class TestBM25:
def test_bm25_score_positive_for_match(self):
score = _bm25_score("database migration", "We migrated the database to Postgres")
assert score > 0
def test_scores_positive_for_matching_doc(self):
scores = _bm25_scores(
"database migration",
["We migrated the database to Postgres.", "unrelated cookery tips"],
)
assert scores[0] > 0
assert scores[1] == 0.0
def test_bm25_score_zero_for_no_match(self):
score = _bm25_score("quantum physics", "We built a web application in React")
assert score == 0.0
def test_scores_zero_when_no_overlap(self):
scores = _bm25_scores("quantum physics", ["We built a web app in React"])
assert scores == [0.0]
def test_hybrid_rank_reorders(self):
def test_idf_downweights_terms_present_in_every_doc(self):
# "database" appears in every candidate → low IDF → low contribution.
# "vacuum" is unique to one → high IDF → that doc dominates.
scores = _bm25_scores(
"database vacuum",
[
"database backup nightly schedule",
"database vacuum scheduled weekly",
"database failover plan",
],
)
assert scores[1] == max(scores), "doc with the rare query term should win on IDF"
def test_empty_inputs_return_zeros(self):
assert _bm25_scores("", ["hello world"]) == [0.0]
assert _bm25_scores("query here", []) == []
assert _bm25_scores("query", [""]) == [0.0]
def test_hybrid_rank_promotes_keyword_match(self):
results = [
{"text": "database schema design for Postgres", "distance": 0.5},
{"text": "unrelated topic about cooking", "distance": 0.3},
]
ranked = _hybrid_rank(results, "database Postgres schema")
# The database result should rank higher despite worse vector distance
# The keyword-rich result outranks the closer-vector but irrelevant one.
assert "database" in ranked[0]["text"]
# bm25_score field is exposed for debugging.
assert "bm25_score" in ranked[0]
# No internal scoring leak.
assert "_hybrid_score" not in ranked[0]
def test_hybrid_rank_absolute_normalization(self):
# Adding a much-worse result to the candidate set must NOT reshuffle
# the top two — proves we're using absolute (1 - dist) and not
# dist / max_dist normalization.
base = [
{"text": "alpha alpha alpha", "distance": 0.1},
{"text": "beta beta beta", "distance": 0.4},
]
ranked_short = _hybrid_rank([dict(r) for r in base], "alpha")
with_outlier = base + [{"text": "gamma gamma gamma", "distance": 1.9}]
ranked_long = _hybrid_rank([dict(r) for r in with_outlier], "alpha")
assert ranked_short[0]["text"] == ranked_long[0]["text"]
assert ranked_short[1]["text"] == ranked_long[1]["text"]
# ── diary ingest ─────────────────────────────────────────────────────────
# ── diary ingest ─────────────────────────────────────────────────────
class TestDiaryIngest:
def test_ingest_creates_drawers_and_closets(self):
with tempfile.TemporaryDirectory() as palace_dir:
diary_dir = tempfile.mkdtemp()
# Write a test diary
with open(os.path.join(diary_dir, "2026-04-13.md"), "w") as f:
f.write("# 2026-04-13\n\n## 10:00 PDT — Test\n\nBuilt the auth system.\n")
def test_ingest_creates_drawers_and_closets(self, tmp_path):
diary_dir = tmp_path / "diaries"
diary_dir.mkdir()
(diary_dir / "2026-04-13.md").write_text(
"# 2026-04-13\n\n## 10:00 PDT — Test\n\nBuilt the auth system.\n"
)
palace_dir = tmp_path / "palace"
from mempalace.diary_ingest import ingest_diaries
from mempalace.diary_ingest import ingest_diaries
result = ingest_diaries(diary_dir, palace_dir, force=True)
assert result["days_updated"] >= 1
result = ingest_diaries(str(diary_dir), str(palace_dir), force=True)
assert result["days_updated"] >= 1
assert get_collection(str(palace_dir)).count() >= 1
# Check drawer exists
drawers = get_collection(palace_dir)
count = drawers.count()
assert count >= 1
def test_ingest_skips_unchanged_on_second_run(self, tmp_path):
diary_dir = tmp_path / "diaries"
diary_dir.mkdir()
(diary_dir / "2026-04-13.md").write_text(
"# 2026-04-13\n\n## 10:00 — Test\n\nContent here that's long enough.\n"
)
palace_dir = tmp_path / "palace"
def test_ingest_skips_unchanged(self):
with tempfile.TemporaryDirectory() as palace_dir:
diary_dir = tempfile.mkdtemp()
with open(os.path.join(diary_dir, "2026-04-13.md"), "w") as f:
f.write("# 2026-04-13\n\n## 10:00 — Test\n\nContent.\n")
from mempalace.diary_ingest import ingest_diaries
from mempalace.diary_ingest import ingest_diaries
ingest_diaries(str(diary_dir), str(palace_dir), force=True)
result = ingest_diaries(str(diary_dir), str(palace_dir))
assert result["days_updated"] == 0
ingest_diaries(diary_dir, palace_dir, force=True)
result = ingest_diaries(diary_dir, palace_dir) # second run, no force
assert result["days_updated"] == 0
def test_state_file_lives_outside_diary_dir(self, tmp_path):
# Regression: the original implementation wrote
# ``.diary_ingest_state.json`` *inside* the user's diary directory,
# polluting their content folder. State must live under
# ``~/.mempalace/state/`` instead.
diary_dir = tmp_path / "diaries"
diary_dir.mkdir()
(diary_dir / "2026-04-13.md").write_text(
"# 2026-04-13\n\n## 10:00 — Test\n\nBody content here long enough.\n"
)
palace_dir = tmp_path / "palace"
from mempalace.diary_ingest import _state_file_for, ingest_diaries
ingest_diaries(str(diary_dir), str(palace_dir), force=True)
# No state file inside the user's diary dir.
for entry in diary_dir.iterdir():
assert (
"diary_ingest" not in entry.name
), f"state file leaked into user diary dir: {entry}"
# State file does exist under ~/.mempalace/state/.
state_path = _state_file_for(str(palace_dir), diary_dir.resolve())
assert state_path.exists()
assert "/.mempalace/state/" in str(state_path)
def test_wing_prefixed_drawer_id_prevents_cross_diary_collision(self, tmp_path):
# Regression: the original implementation used
# ``drawer_diary_{date_str}`` regardless of wing — two diaries with
# the same date in different wings would clobber each other.
date_md = "# 2026-04-13\n\n## 10:00 — entry\n\nThis is the day's content.\n"
# Two separate diary dirs, ingested into the same palace under
# different wings. Each must produce a distinct drawer.
personal_dir = tmp_path / "personal"
personal_dir.mkdir()
(personal_dir / "2026-04-13.md").write_text(date_md + "Personal-only marker.\n")
work_dir = tmp_path / "work"
work_dir.mkdir()
(work_dir / "2026-04-13.md").write_text(date_md + "Work-only marker.\n")
palace_dir = tmp_path / "palace"
from mempalace.diary_ingest import _diary_drawer_id, ingest_diaries
ingest_diaries(str(personal_dir), str(palace_dir), wing="personal", force=True)
ingest_diaries(str(work_dir), str(palace_dir), wing="work", force=True)
col = get_collection(str(palace_dir))
personal_id = _diary_drawer_id("personal", "2026-04-13")
work_id = _diary_drawer_id("work", "2026-04-13")
assert personal_id != work_id
personal = col.get(ids=[personal_id])
work = col.get(ids=[work_id])
assert personal["ids"] == [personal_id]
assert work["ids"] == [work_id]
assert "Personal-only marker." in personal["documents"][0]
assert "Work-only marker." in work["documents"][0]
+83
View File
@@ -75,3 +75,86 @@ def test_mine_convos_does_not_reprocess_empty_chunk_files(capsys):
assert "Files skipped (already filed): 1" in out2
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def test_mine_convos_rebuilds_stale_drawers_after_schema_bump(capsys):
"""When stored drawers have an older normalize_version, the next mine
silently purges them and refiles — no manual erase required.
This is what makes the strip_noise upgrade apply to existing corpora:
users just run `mempalace mine` again and old noise-filled drawers get
replaced with clean ones."""
from mempalace.palace import NORMALIZE_VERSION
tmpdir = tempfile.mkdtemp()
try:
convo_path = Path(tmpdir) / "chat.txt"
convo_path.write_text(
"> What is memory?\nMemory is persistence.\n\n"
"> Why does it matter?\nIt enables continuity.\n\n"
"> How do we build it?\nWith structured storage.\n"
)
palace_path = os.path.join(tmpdir, "palace")
# First mine — stamps drawers with NORMALIZE_VERSION
mine_convos(tmpdir, palace_path, wing="test")
capsys.readouterr()
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
resolved = str(Path(tmpdir).resolve() / "chat.txt")
first_pass = col.get(where={"source_file": resolved})
first_ids = set(first_pass["ids"])
assert first_ids, "first mine should produce drawers"
for meta in first_pass["metadatas"]:
assert meta.get("normalize_version") == NORMALIZE_VERSION
# Simulate pre-v2 drawers: rewrite metadata to an older version,
# and replace content with "noise" so we can see it get cleaned up.
stale_metas = []
for meta in first_pass["metadatas"]:
stale = dict(meta)
stale["normalize_version"] = 1
stale_metas.append(stale)
col.update(
ids=list(first_pass["ids"]),
documents=["STALE NOISE"] * len(first_pass["ids"]),
metadatas=stale_metas,
)
# Add an extra orphan drawer that should also be purged.
col.add(
ids=["orphan_drawer"],
documents=["OLD ORPHAN"],
metadatas=[
{
"wing": "test",
"room": "default",
"source_file": resolved,
"chunk_index": 999,
"normalize_version": 1,
}
],
)
del col, client
# Second mine — version gate should trigger rebuild
mine_convos(tmpdir, palace_path, wing="test")
out = capsys.readouterr().out
assert (
"Files skipped (already filed): 0" in out
), "stale drawers should force a rebuild, not a skip"
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
rebuilt = col.get(where={"source_file": resolved})
# Orphan is gone
assert "orphan_drawer" not in rebuilt["ids"]
# No stale content survived
assert all("STALE NOISE" not in d for d in rebuilt["documents"])
assert all("OLD ORPHAN" not in d for d in rebuilt["documents"])
# All rebuilt drawers carry the current version
for meta in rebuilt["metadatas"]:
assert meta.get("normalize_version") == NORMALIZE_VERSION
del col, client
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
+43
View File
@@ -6,6 +6,7 @@ dispatch layer (integration-level). Uses isolated palace + KG fixtures
via monkeypatch to avoid touching real data.
"""
from datetime import datetime
import json
import sys
@@ -643,6 +644,48 @@ class TestDiaryTools:
r = tool_diary_read(agent_name="Nobody")
assert r["entries"] == []
def test_diary_write_same_second_shared_prefix_no_collision(
self, monkeypatch, config, palace_path, kg
):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace import mcp_server
class FrozenDateTime:
calls = [
datetime(2026, 4, 13, 22, 15, 30, 123456),
datetime(2026, 4, 13, 22, 15, 30, 123457),
]
fallback = datetime(2026, 4, 13, 22, 15, 30, 123457)
@classmethod
def now(cls):
if cls.calls:
return cls.calls.pop(0)
return cls.fallback
monkeypatch.setattr(mcp_server, "datetime", FrozenDateTime)
from mempalace.mcp_server import tool_diary_read, tool_diary_write
entry1 = "A" * 50 + " entry one"
entry2 = "A" * 50 + " entry two"
result1 = tool_diary_write(agent_name="TestAgent", entry=entry1, topic="status")
result2 = tool_diary_write(agent_name="TestAgent", entry=entry2, topic="status")
assert result1["success"] is True
assert result2["success"] is True
assert result1["entry_id"] != result2["entry_id"]
read_result = tool_diary_read(agent_name="TestAgent")
contents = [entry["content"] for entry in read_result["entries"]]
assert read_result["total"] == 2
assert entry1 in contents
assert entry2 in contents
# ── Cache Invalidation (inode/mtime) ──────────────────────────────────
+90 -4
View File
@@ -7,7 +7,7 @@ import chromadb
import yaml
from mempalace.miner import mine, scan_project, status
from mempalace.palace import file_already_mined
from mempalace.palace import NORMALIZE_VERSION, file_already_mined
def write_file(path: Path, content: str):
@@ -227,11 +227,17 @@ def test_file_already_mined_check_mtime():
assert file_already_mined(col, test_file) is False
assert file_already_mined(col, test_file, check_mtime=True) is False
# Add it with mtime
# Add it with mtime + current normalize_version
col.add(
ids=["d1"],
documents=["hello world"],
metadatas=[{"source_file": test_file, "source_mtime": str(mtime)}],
metadatas=[
{
"source_file": test_file,
"source_mtime": str(mtime),
"normalize_version": NORMALIZE_VERSION,
}
],
)
# Already mined (no mtime check)
@@ -253,7 +259,12 @@ def test_file_already_mined_check_mtime():
col.add(
ids=["d2"],
documents=["other"],
metadatas=[{"source_file": "/fake/no_mtime.txt"}],
metadatas=[
{
"source_file": "/fake/no_mtime.txt",
"normalize_version": NORMALIZE_VERSION,
}
],
)
assert file_already_mined(col, "/fake/no_mtime.txt", check_mtime=True) is False
finally:
@@ -296,3 +307,78 @@ def test_status_missing_palace_does_not_create_empty_collection(tmp_path, capsys
out = capsys.readouterr().out
assert "No palace found" in out
assert not palace_path.exists()
# ── normalize_version schema gate ───────────────────────────────────────
#
# When the normalization pipeline changes shape (e.g., strip_noise lands),
# `NORMALIZE_VERSION` is bumped so pre-existing drawers can be silently
# rebuilt on the next mine. These tests pin that contract.
def test_file_already_mined_returns_false_for_stale_normalize_version():
"""Pre-v2 drawers (no field, or older integer) must not short-circuit."""
tmpdir = tempfile.mkdtemp()
try:
palace_path = os.path.join(tmpdir, "palace")
os.makedirs(palace_path)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")
# Pre-v2 drawer: no normalize_version field at all
col.add(
ids=["d_old"],
documents=["old"],
metadatas=[{"source_file": "/fake/old.jsonl"}],
)
assert file_already_mined(col, "/fake/old.jsonl") is False
# Explicitly older version
col.add(
ids=["d_v1"],
documents=["v1"],
metadatas=[{"source_file": "/fake/v1.jsonl", "normalize_version": 1}],
)
assert file_already_mined(col, "/fake/v1.jsonl") is False
# Current version — short-circuits
col.add(
ids=["d_current"],
documents=["cur"],
metadatas=[
{
"source_file": "/fake/current.jsonl",
"normalize_version": NORMALIZE_VERSION,
}
],
)
assert file_already_mined(col, "/fake/current.jsonl") is True
finally:
del col, client
shutil.rmtree(tmpdir, ignore_errors=True)
def test_add_drawer_stamps_normalize_version(tmp_path):
"""Fresh drawers carry the current schema version so future upgrades work."""
from mempalace.miner import add_drawer
palace_path = tmp_path / "palace"
palace_path.mkdir()
client = chromadb.PersistentClient(path=str(palace_path))
col = client.get_or_create_collection("mempalace_drawers")
try:
added = add_drawer(
collection=col,
wing="test",
room="notes",
content="hello",
source_file=str(tmp_path / "src.md"),
chunk_index=0,
agent="unit",
)
assert added is True
stored = col.get(limit=1)
meta = stored["metadatas"][0]
assert meta["normalize_version"] == NORMALIZE_VERSION
finally:
del col, client
+146
View File
@@ -13,6 +13,7 @@ from mempalace.normalize import (
_try_normalize_json,
_try_slack_json,
normalize,
strip_noise,
)
@@ -1048,3 +1049,148 @@ def test_normalize_rejects_large_file():
assert False, "Should have raised IOError"
except IOError as e:
assert "too large" in str(e).lower()
# ── strip_noise() — verbatim-safety boundary tests ─────────────────────
#
# The "Verbatim always" design principle requires that we never delete
# user-authored text. These tests pin down the boundary between system
# noise (which we strip) and user prose that happens to mention the same
# strings (which must survive untouched).
class TestStripNoisePreservesUserContent:
"""User prose that mentions noise strings inline must be preserved."""
def test_user_discusses_stop_hook_in_prose(self):
# Regression: original regex with IGNORECASE + `.*\n?` ate the second
# sentence from real user commentary.
text = (
"> User:\n"
"> Our CI has a stop hook that rejects merges after 5pm. "
"Ran 2 stop hooks last week.\n"
"> Assistant:\n"
"> Got it."
)
assert strip_noise(text) == text.strip()
def test_user_mentions_system_reminder_inline(self):
# Inline <system-reminder> tags inside user prose (e.g. documenting
# Claude Code behavior) must not be stripped.
text = (
"> User:\n"
"> Here is what Claude Code emits: "
"<system-reminder>Auto-save reminder...</system-reminder>"
" — I want to ignore it."
)
assert strip_noise(text) == text.strip()
def test_ctrl_o_hint_in_prose_preserved(self):
# Regression: original `.*\(ctrl\+o to expand\).*\n?` nuked the whole
# line whenever a user documented the TUI shortcut.
text = (
"> User:\n"
"> In the TUI you hit (ctrl+o to expand) to see more. "
"That is the shortcut I want to document."
)
assert strip_noise(text) == text.strip()
def test_current_time_inline_in_prose(self):
text = "> User:\n> At CURRENT TIME: the meeting starts, not before."
assert strip_noise(text) == text.strip()
def test_plus_n_lines_marker_inline(self):
text = "> User:\n> The log showed … +50 lines of stack trace, useful."
assert strip_noise(text) == text.strip()
def test_dangling_open_tag_does_not_span_messages(self):
# THE span-eating bug: a stray unclosed <system-reminder> in one
# message must NOT merge with a closing tag in another message and
# silently delete everything in between.
text = (
"> User 1: normal content <system-reminder>A\n"
"> Assistant: reply\n"
"> User 2: more content</system-reminder> tail"
)
out = strip_noise(text)
assert "Assistant: reply" in out
assert "User 2: more content" in out
assert "User 1: normal content" in out
class TestStripNoiseRemovesSystemChrome:
"""System-injected noise with standalone/line-anchored shape must be stripped."""
def test_strips_line_anchored_system_reminder_block(self):
text = (
"> User:\n"
"<system-reminder>\n"
"Auto-save reminder...\n"
"</system-reminder>\n"
"> Real message."
)
out = strip_noise(text)
assert "system-reminder" not in out
assert "Auto-save reminder" not in out
assert "Real message." in out
def test_strips_system_reminder_with_blockquote_prefix(self):
# _messages_to_transcript prefixes lines with "> ", so the line
# anchor must also accept that shape.
text = "> User:\n" "> <system-reminder>Injected noise</system-reminder>\n" "> Real message."
out = strip_noise(text)
assert "Injected noise" not in out
assert "Real message." in out
def test_strips_standalone_ran_hook_line(self):
text = "Ran 2 Stop hook\n> User: real content"
out = strip_noise(text)
assert "Ran 2 Stop hook" not in out
assert "real content" in out
def test_strips_known_hook_names(self):
for hook in ("Stop", "PreCompact", "PreToolUse", "PostToolUse", "UserPromptSubmit"):
text = f"Ran 1 {hook} hook\n> User: content"
assert hook not in strip_noise(text)
def test_strips_current_time_standalone(self):
text = "CURRENT TIME: 2026-04-13 10:00 UTC\n> User: Hello"
out = strip_noise(text)
assert "CURRENT TIME" not in out
assert "Hello" in out
def test_strips_collapsed_lines_marker(self):
text = "… +42 lines\n> User: Hello"
out = strip_noise(text)
assert "+42 lines" not in out
assert "Hello" in out
def test_strips_token_count_ctrl_o_chrome(self):
# Claude Code's actual collapsed-output chrome: "[N tokens] (ctrl+o to expand)"
text = "> Assistant: some output [5 tokens] (ctrl+o to expand)\n> User: ok"
out = strip_noise(text)
assert "(ctrl+o to expand)" not in out
assert "[5 tokens]" not in out
assert "some output" in out
def test_strips_each_known_noise_tag(self):
for tag in (
"system-reminder",
"command-message",
"command-name",
"task-notification",
"user-prompt-submit-hook",
"hook_output",
):
text = f"> User:\n<{tag}>junk</{tag}>\n> Real."
out = strip_noise(text)
assert tag not in out, f"{tag} leaked into output"
assert "Real." in out
def test_collapses_excessive_blank_lines(self):
text = "line one\n\n\n\n\n\nline two"
out = strip_noise(text)
assert "line one" in out
assert "line two" in out
# Should collapse to no more than 3 newlines
assert "\n\n\n\n" not in out