From 0ff4121404457bc45595553252e3a88af4fb38d8 Mon Sep 17 00:00:00 2001 From: mvalentsev Date: Sat, 9 May 2026 04:01:34 +0500 Subject: [PATCH] fix(sync): symmetric source_file resolve + perf optimizations CI fix: `_classify_drawer` now resolves `source_file` symmetric to `project_roots` (which `_normalize_project_dirs` and `_auto_detect_project_roots` already `.resolve()`). Without this, on platforms where the temp directory is a symlink (macOS `/var/folders` -> `/private/var/folders`, Windows 8.3 short-name normalization), every drawer mis-bucketed as `out_of_scope` and survived prune. Perf: - `_resolve_project_root`: early-return on first match (sorted-desc precondition). - `_normalize_project_dirs`: sort `(-len(str(p)), str(p))` desc for early-return + deterministic tie-break on equal-length paths. - `_auto_detect_project_roots`: `seen_sources` dedupe so a 200-chunk file costs one disk walk, not 200. - `sync_palace` main loop: per-file classification cache; registry sentinels (`_reg_*`, `room=_registry`, `ingest_mode=registry`) routed to "kept" before cache lookup so a sentinel sharing a `source_file` with a pruned drawer cannot inherit a stale "gitignored" verdict. - Closet purge: collapse O(N) per-file purge into one `where={"source_file": {"$in": [...]}}` get + one bulk delete. Tests (5 new in `TestSyncPalace`, 38 total): - `test_symlinked_project_root_resolves`: pins symmetric resolve via real `os.symlink` (skipped on Windows). - `test_classification_cache_avoids_redundant_disk_hits`: monkeypatch counter on `_classify_drawer` asserts `call_count == 1` for 5 chunks sharing one source_file. - `test_closet_batch_purge_single_call`: wraps closets collection with `CallCountingCol` (forwards `.get`/`.delete`); asserts `delete_calls == 1` and `get_calls == 1`; expected `removed_closets` derived from `report["by_source"]` to stay robust to fixture changes. - `test_registry_check_runs_before_cache_lookup`: a regular drawer caches "gitignored" first; a sentinel with the same source_file must still be kept. - `test_normalize_project_dirs_sort_stable_on_equal_length`: pins the alphabetical secondary key when paths share length. --- mempalace/sync.py | 73 ++++++++---- tests/test_sync.py | 292 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 328 insertions(+), 37 deletions(-) diff --git a/mempalace/sync.py b/mempalace/sync.py index 8b4c2d9..788be85 100644 --- a/mempalace/sync.py +++ b/mempalace/sync.py @@ -22,7 +22,6 @@ from .palace import ( get_closets_collection, get_collection, mine_palace_lock, - purge_file_closets, ) @@ -44,16 +43,18 @@ class SyncReport(TypedDict): def _resolve_project_root(source_file: Path, project_roots: list) -> Optional[Path]: - """Return the longest project_root that source_file lives under.""" - best: Optional[Path] = None + """Return the longest project_root that source_file lives under. + + Assumes ``project_roots`` is sorted by path-length descending so the + first match is the longest (deepest) prefix. + """ for root in project_roots: try: source_file.relative_to(root) + return root except ValueError: continue - if best is None or len(str(root)) > len(str(best)): - best = root - return best + return None def _ancestor_matchers(source_file: Path, root: Path, matcher_cache: dict) -> list: @@ -102,6 +103,7 @@ def _classify_drawer( Returns one of: kept, gitignored, missing, no_source, out_of_scope. """ + # Defensive: main loop filters registry rows; this guards direct callers. if _is_registry_row(meta, drawer_id): return "kept" @@ -112,6 +114,7 @@ def _classify_drawer( src = Path(source_file) if not src.is_absolute(): return "no_source" + src = src.resolve(strict=False) root = _resolve_project_root(src, project_roots) if root is None: @@ -154,12 +157,17 @@ def _auto_detect_project_roots(col, wing: Optional[str]) -> list: a `.git` directory or a `.gitignore` file. The deepest such ancestor wins, so nested-but-still-tracked subprojects are honoured. `Path.parents` iterates deepest-first, so the first hit IS deepest. + + Dedupes on ``source_file`` string so a 200-chunk file costs one disk + walk, not 200. """ - roots = set() + roots: set = set() + seen_sources: set = set() for _, meta in _iter_drawer_metadata(col, wing): source_file = (meta or {}).get("source_file") - if not source_file: + if not source_file or source_file in seen_sources: continue + seen_sources.add(source_file) src = Path(source_file) if not src.is_absolute(): continue @@ -167,13 +175,13 @@ def _auto_detect_project_roots(col, wing: Optional[str]) -> list: if (parent / ".git").exists() or (parent / ".gitignore").is_file(): roots.add(parent.resolve(strict=False)) break - # Sort by depth (deepest first) with secondary lexicographic key for - # deterministic order when two roots share string length. return sorted(roots, key=lambda p: (-len(str(p)), str(p))) def _normalize_project_dirs(project_dirs) -> list: - return [Path(p).resolve(strict=False) for p in project_dirs] + """Resolve and sort project dirs so deepest-prefix wins on first match.""" + resolved = [Path(p).resolve(strict=False) for p in project_dirs] + return sorted(resolved, key=lambda p: (-len(str(p)), str(p))) def _delete_in_batches(col, ids: list, batch_size: int, wal_log: Optional[Callable]): @@ -246,17 +254,30 @@ def sync_palace( roots = _auto_detect_project_roots(col, wing) matcher_cache: dict = {} + # Same source_file → same verdict holds because mine_palace_lock + # blocks concurrent writers and the loop is synchronous. + classification_cache: dict = {} for drawer_id, meta in _iter_drawer_metadata(col, wing): counts["scanned"] += 1 - bucket = _classify_drawer(meta or {}, matcher_cache, roots, drawer_id) + meta = meta or {} + source_file = meta.get("source_file") + + if _is_registry_row(meta, drawer_id): + bucket = "kept" + elif source_file and source_file in classification_cache: + bucket = classification_cache[source_file] + else: + bucket = _classify_drawer(meta, matcher_cache, roots, drawer_id) + if source_file: + classification_cache[source_file] = bucket + counts[bucket] += 1 if bucket in ("gitignored", "missing"): removable_ids.append(drawer_id) - src = (meta or {}).get("source_file") - if src: - removable_sources.add(src) - by_source[src] += 1 + if source_file: + removable_sources.add(source_file) + by_source[source_file] += 1 report: SyncReport = { **counts, @@ -278,15 +299,17 @@ def sync_palace( logger.warning("Closet purge skipped (collection unavailable): %s", exc) closets_removed = 0 - if closets_col is not None: - for source_file in removable_sources: - before = ( - closets_col.get(where={"source_file": source_file}, include=[]).get("ids") or [] - ) - if not before: - continue - purge_file_closets(closets_col, source_file) - closets_removed += len(before) + if closets_col is not None and removable_sources: + closet_ids = ( + closets_col.get( + where={"source_file": {"$in": list(removable_sources)}}, + include=[], + ).get("ids") + or [] + ) + if closet_ids: + closets_col.delete(ids=closet_ids) + closets_removed = len(closet_ids) report["removed_closets"] = closets_removed return report diff --git a/tests/test_sync.py b/tests/test_sync.py index f18261e..38cbf80 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -441,9 +441,9 @@ class TestSyncPalace: # Allow-list — params must be exactly the documented audit shape so # any future leak (source_file, content, ID lists, etc.) trips a # test failure rather than slipping through a deny-list. - assert set(params.keys()) <= { - "first_id" - }, f"WAL params drifted from the audit allow-list: {params.keys()}" + assert set(params.keys()) <= {"first_id"}, ( + f"WAL params drifted from the audit allow-list: {params.keys()}" + ) def test_registry_sentinels_preserved_on_apply(self, tmp_dir, palace_path): """F2 regression: convo miner `_reg_*` sentinels must survive sync apply. @@ -564,9 +564,9 @@ class TestSyncPalace: inner_resolved = inner.resolve(strict=False) outer_resolved = outer.resolve(strict=False) assert inner_resolved in roots, f"expected inner in roots, got {roots}" - assert ( - outer_resolved not in roots - ), f"deepest should win exclusively: roots={roots}, outer leaked" + assert outer_resolved not in roots, ( + f"deepest should win exclusively: roots={roots}, outer leaked" + ) def test_apply_with_empty_project_dirs_raises(self, palace_path): """Round-2 P1: `project_dirs=[]` (empty list) with apply must raise, @@ -605,9 +605,9 @@ class TestSyncPalace: project_dirs=[synced_world["repo_path"]], dry_run=False, ) - assert any( - "Closet purge skipped" in record.getMessage() for record in caplog.records - ), f"expected closet-skip warning, got: {[r.getMessage() for r in caplog.records]}" + assert any("Closet purge skipped" in record.getMessage() for record in caplog.records), ( + f"expected closet-skip warning, got: {[r.getMessage() for r in caplog.records]}" + ) def test_metadata_cache_cleared_on_exception(self, monkeypatch, config, synced_world, kg): """F9 regression: tool_sync's try/finally must clear `_metadata_cache` @@ -648,9 +648,9 @@ class TestSyncPalace: assert result.get("success") is False assert "simulated" in result.get("error", "") - assert ( - mcp_server._metadata_cache is None - ), "F9: cache must be cleared even when sync_palace raises" + assert mcp_server._metadata_cache is None, ( + "F9: cache must be cleared even when sync_palace raises" + ) def test_sync_report_keys_stable(self, synced_world): """Regression: SyncReport schema must not silently drop a field.""" @@ -897,6 +897,274 @@ class TestSyncPalace: dry_run=True, ) + @pytest.mark.skipif(os.name == "nt", reason="os.symlink needs admin on Windows") + def test_symlinked_project_root_resolves(self, tmp_dir, palace_path): + """source_file may be written through a symlinked tmp directory + (real macOS behaviour: /var/folders/... is a symlink to + /private/var/folders/...). project_dirs goes through .resolve() + which follows the symlink. Without matching .resolve() on the + source side, _resolve_project_root would mis-bucket every drawer + as out_of_scope. This test pins symmetric resolution. + """ + from mempalace.sync import sync_palace + + real_root = Path(tmp_dir) / "real" + (real_root / "build").mkdir(parents=True) + (real_root / ".gitignore").write_text("build/\n") + (real_root / "build" / "x.py").write_text("# ignored\n") + + link_root = Path(tmp_dir) / "link" + os.symlink(str(real_root), str(link_root)) + + client = chromadb.PersistentClient(path=palace_path) + col = client.get_or_create_collection( + "mempalace_drawers", metadata={"hnsw:space": "cosine"} + ) + col.add( + ids=["d_via_link"], + documents=["x"], + embeddings=[[1.0, 0.0, 0.0]], + metadatas=[ + { + "wing": "demo", + "room": "build", + "source_file": str(link_root / "build" / "x.py"), + "chunk_index": 0, + "added_by": "miner", + "filed_at": "2026-05-09T00:00:00", + } + ], + ) + del client + + report = sync_palace( + palace_path=palace_path, + project_dirs=[str(real_root)], + wing="demo", + dry_run=True, + ) + assert report["gitignored"] == 1, ( + f"symmetric resolve broken: drawer mis-bucketed; report={report}" + ) + assert report["out_of_scope"] == 0 + + def test_classification_cache_avoids_redundant_disk_hits( + self, tmp_dir, palace_path, monkeypatch + ): + """Per-file classification cache: N chunks of the same source_file + cost one _classify_drawer invocation, not N. Verifies the perf + optimisation actually short-circuits without changing behaviour. + """ + from mempalace import sync as sync_mod + from mempalace.sync import sync_palace + + repo_path = Path(tmp_dir) / "repo" + (repo_path / "build").mkdir(parents=True) + (repo_path / ".gitignore").write_text("build/\n") + (repo_path / "build" / "shared.py").write_text("# ignored\n") + + client = chromadb.PersistentClient(path=palace_path) + col = client.get_or_create_collection( + "mempalace_drawers", metadata={"hnsw:space": "cosine"} + ) + col.add( + ids=[f"d_chunk_{i}" for i in range(5)], + documents=[f"chunk{i}" for i in range(5)], + embeddings=[[float(i + 1), 0.0, 0.0] for i in range(5)], + metadatas=[ + { + "wing": "demo", + "room": "build", + "source_file": str(repo_path / "build" / "shared.py"), + "chunk_index": i, + "added_by": "miner", + "filed_at": "2026-05-09T00:00:00", + } + for i in range(5) + ], + ) + del client + + call_count = {"n": 0} + real_classify = sync_mod._classify_drawer + + def counting_classify(*args, **kwargs): + call_count["n"] += 1 + return real_classify(*args, **kwargs) + + monkeypatch.setattr(sync_mod, "_classify_drawer", counting_classify) + + report = sync_palace( + palace_path=palace_path, + project_dirs=[str(repo_path)], + wing="demo", + dry_run=True, + ) + assert report["scanned"] == 5 + assert report["gitignored"] == 5 + assert call_count["n"] == 1, ( + f"cache miss: expected 1 _classify_drawer call (4 cache hits), got {call_count['n']}" + ) + + def test_closet_batch_purge_single_call(self, synced_world, monkeypatch): + """Batched $in closet purge: one delete() call across all removable + source files, not N. Wraps the real collection so chromadb still + does the work; only the call count is intercepted. + """ + from mempalace import sync as sync_mod + + repo_path = Path(synced_world["repo_path"]) + palace_path = synced_world["palace_path"] + + client = chromadb.PersistentClient(path=palace_path) + closets_col = client.get_or_create_collection( + "mempalace_closets", metadata={"hnsw:space": "cosine"} + ) + closets_col.add( + ids=["c1", "c2", "c3"], + documents=["c1", "c2", "c3"], + embeddings=[[1.0, 0.0, 0.0], [2.0, 0.0, 0.0], [3.0, 0.0, 0.0]], + metadatas=[ + {"source_file": str(repo_path / "build" / "ignored.py")}, + {"source_file": str(repo_path / "app.log")}, + {"source_file": str(repo_path / "deleted.py")}, + ], + ) + del client + + class CallCountingCol: + def __init__(self, real): + self._real = real + self.delete_calls = 0 + self.get_calls = 0 + + def get(self, *args, **kwargs): + self.get_calls += 1 + return self._real.get(*args, **kwargs) + + def delete(self, *args, **kwargs): + self.delete_calls += 1 + return self._real.delete(*args, **kwargs) + + captured: dict = {} + real_get_closets = sync_mod.get_closets_collection + + def wrapped_get_closets(p, create=False): + real = real_get_closets(p, create=create) + wrapper = CallCountingCol(real) + captured["wrapper"] = wrapper + return wrapper + + monkeypatch.setattr(sync_mod, "get_closets_collection", wrapped_get_closets) + + from mempalace.sync import sync_palace + + report = sync_palace( + palace_path=palace_path, + project_dirs=[synced_world["repo_path"]], + dry_run=False, + ) + + seeded_sources = { + str(repo_path / "build" / "ignored.py"), + str(repo_path / "app.log"), + str(repo_path / "deleted.py"), + } + expected = len(seeded_sources & set(report["by_source"].keys())) + assert report["removed_closets"] == expected, ( + f"removed_closets ({report['removed_closets']}) != |seeded ∩ removable| ({expected})" + ) + assert "wrapper" in captured, "get_closets_collection patch not invoked" + assert captured["wrapper"].delete_calls == 1, ( + f"expected one batch delete call, got {captured['wrapper'].delete_calls}" + ) + assert captured["wrapper"].get_calls == 1, ( + f"expected one batch get call, got {captured['wrapper'].get_calls}" + ) + + def test_registry_check_runs_before_cache_lookup(self, tmp_dir, palace_path): + """A non-registry drawer with the same source_file must NOT poison + the bucket of a subsequent _reg_* drawer via the classification + cache. Order matters for chromadb iteration: seed the regular + drawer FIRST so it caches `gitignored`, then a registry sentinel + with the same source_file. Without the registry-bypass at the + top of the main loop, the cache lookup would route the sentinel + to gitignored and delete it. + """ + from mempalace.sync import sync_palace + + repo_path = Path(tmp_dir) / "repo" + (repo_path / "build").mkdir(parents=True) + (repo_path / ".gitignore").write_text("build/\n") + (repo_path / "build" / "shared.py").write_text("# ignored\n") + + client = chromadb.PersistentClient(path=palace_path) + col = client.get_or_create_collection( + "mempalace_drawers", metadata={"hnsw:space": "cosine"} + ) + shared_source = str(repo_path / "build" / "shared.py") + col.add( + ids=["a_regular", "_reg_zzz_sentinel"], + documents=["regular chunk", "registry sentinel"], + embeddings=[[1.0, 0.0, 0.0], [2.0, 0.0, 0.0]], + metadatas=[ + { + "wing": "demo", + "room": "build", + "source_file": shared_source, + "chunk_index": 0, + "added_by": "miner", + "filed_at": "2026-05-09T00:00:00", + }, + { + "wing": "demo", + "room": "_registry", + "source_file": shared_source, + "chunk_index": 0, + "ingest_mode": "registry", + "added_by": "convo_miner", + "filed_at": "2026-05-09T00:00:00", + }, + ], + ) + del client + + report = sync_palace( + palace_path=palace_path, + project_dirs=[str(repo_path)], + wing="demo", + dry_run=False, + ) + assert report["gitignored"] == 1 + assert report["kept"] == 1 + assert report["removed_drawers"] == 1 + + client, col = _open_drawers(palace_path) + try: + survivors = _drawer_ids(col) + finally: + del client + assert "a_regular" not in survivors + assert "_reg_zzz_sentinel" in survivors, ( + "registry sentinel was incorrectly pruned via cached non-registry verdict" + ) + + def test_normalize_project_dirs_sort_stable_on_equal_length(self): + """`_normalize_project_dirs` must sort by `(-len, str)` so equal-length + roots are alphabetically deterministic; otherwise overlapping nested + scope choice depends on argv order. + """ + from mempalace.sync import _normalize_project_dirs + + result = _normalize_project_dirs(["/tmp/zzz", "/tmp/aaa"]) + names = [p.name for p in result] + assert names == ["aaa", "zzz"], f"equal-length sort not deterministic: got {names}" + + # Different lengths: deepest first. + deep = _normalize_project_dirs(["/tmp/short", "/tmp/much/deeper/path"]) + assert str(deep[0]).endswith("path") + assert str(deep[1]).endswith("short") + class TestSyncMcpTool: """T2: `mempalace_sync` MCP entry point must keep apply polarity stable."""