diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index 14ae9cd..c8d2f46 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -130,6 +130,35 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 3600.0) -> li return moved +def _pin_hnsw_threads(collection) -> None: + """Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection. + + Fresh collections set this via ``metadata=`` at creation. Legacy palaces + built before that change keep the default (parallel insert) and can hit + the HNSW race described in #974/#965. ChromaDB's + ``collection.modify(configuration=...)`` lets us re-apply ``num_threads=1`` + in memory at load time so every new process is protected. + + Note: in chromadb 1.5.x the modified ``configuration_json["hnsw"]`` does + not persist to disk across ``PersistentClient`` reopens, so this must + run on every ``get_collection`` call, not just once. + """ + try: + from chromadb.api.collection_configuration import ( + UpdateCollectionConfiguration, + UpdateHNSWConfiguration, + ) + except ImportError: + logger.debug("_pin_hnsw_threads skipped: chromadb too old", exc_info=True) + return + try: + collection.modify( + configuration=UpdateCollectionConfiguration(hnsw=UpdateHNSWConfiguration(num_threads=1)) + ) + except Exception: + logger.debug("_pin_hnsw_threads modify failed", exc_info=True) + + def _fix_blob_seq_ids(palace_path: str) -> None: """Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER. @@ -566,10 +595,13 @@ class ChromaBackend(BaseBackend): if create: collection = client.get_or_create_collection( - collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs + collection_name, + metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, + **ef_kwargs, ) else: collection = client.get_collection(collection_name, **ef_kwargs) + _pin_hnsw_threads(collection) return ChromaCollection(collection) def close_palace(self, palace) -> None: @@ -613,7 +645,9 @@ class ChromaBackend(BaseBackend): ef = self._resolve_embedding_function() ef_kwargs = {"embedding_function": ef} if ef is not None else {} collection = self._client(palace_path).create_collection( - collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs + collection_name, + metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, + **ef_kwargs, ) return ChromaCollection(collection) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 2650e30..485bbe5 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -57,7 +57,7 @@ from .config import ( # noqa: E402 sanitize_content, ) from .version import __version__ # noqa: E402 -from .backends.chroma import ChromaBackend, ChromaCollection # noqa: E402 +from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402 from .query_sanitizer import sanitize_query # noqa: E402 from .searcher import search_memories # noqa: E402 from .palace_graph import ( # noqa: E402 @@ -217,15 +217,25 @@ def _get_collection(create=False): try: client = _get_client() if create: - _collection_cache = ChromaCollection( - client.get_or_create_collection( - _config.collection_name, metadata={"hnsw:space": "cosine"} - ) + # hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor + # HNSW insert path, which has a race in repairConnectionsForUpdate / + # addPoint (see issues #974, #965). Set via metadata on fresh + # collections and re-applied via _pin_hnsw_threads() for legacy + # palaces whose collections were created before this fix (the + # runtime config does not persist cross-process in chromadb 1.5.x, + # so the retrofit runs every time _get_collection opens a cache). + raw = client.get_or_create_collection( + _config.collection_name, + metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) + _pin_hnsw_threads(raw) + _collection_cache = ChromaCollection(raw) _metadata_cache = None _metadata_cache_time = 0 elif _collection_cache is None: - _collection_cache = ChromaCollection(client.get_collection(_config.collection_name)) + raw = client.get_collection(_config.collection_name) + _pin_hnsw_threads(raw) + _collection_cache = ChromaCollection(raw) _metadata_cache = None _metadata_cache_time = 0 return _collection_cache diff --git a/mempalace/miner.py b/mempalace/miner.py index b593797..2d610ea 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -20,11 +20,13 @@ from typing import Optional from .palace import ( NORMALIZE_VERSION, SKIP_DIRS, + MineAlreadyRunning, build_closet_lines, file_already_mined, get_closets_collection, get_collection, mine_lock, + mine_palace_lock, purge_file_closets, upsert_closet_lines, ) @@ -993,6 +995,52 @@ def mine( ``mine`` walks the tree itself just like before. """ + if dry_run: + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + files=files, + ) + + try: + with mine_palace_lock(palace_path): + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + files=files, + ) + except MineAlreadyRunning: + print( + f"mempalace: another `mine` is already running against " + f"{palace_path} — exiting cleanly.", + file=sys.stderr, + ) + return + + +def _mine_impl( + project_dir: str, + palace_path: str, + wing_override: str = None, + agent: str = "mempalace", + limit: int = 0, + dry_run: bool = False, + respect_gitignore: bool = True, + include_ignored: list = None, + files: list = None, +): project_path = Path(project_dir).expanduser().resolve() config = load_config(project_dir) diff --git a/mempalace/palace.py b/mempalace/palace.py index a2a4a8e..07efb6a 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -310,6 +310,88 @@ def mine_lock(source_file: str): lf.close() +class MineAlreadyRunning(RuntimeError): + """Raised when another `mempalace mine` already holds the per-palace lock.""" + + +@contextlib.contextmanager +def mine_palace_lock(palace_path: str): + """Per-palace non-blocking lock around the full `mine` pipeline. + + The per-file `mine_lock` only protects delete+insert interleave for a + single source; it does not prevent N copies of `mempalace mine ` + from being spawned concurrently by hooks. When that happens, each copy + drives ChromaDB HNSW inserts in parallel against the same palace, + which (combined with chromadb's multi-threaded ParallelFor) can + corrupt the HNSW graph and produce sparse link_lists.bin blowups. + + The lock file is keyed by sha256(palace_path) so mines against + *different* palaces can still run in parallel — we only serialize + writes into the same palace, which is the correctness boundary. + + The key is derived from a fully normalized form of the path: + `realpath` resolves symlinks and `..` segments, and `normcase` folds + case on Windows (which has a case-insensitive filesystem). Without + normcase, `C:\\Palace` and `c:\\palace` would hash to different keys + on Windows and let two concurrent mines touch the same on-disk palace. + + Non-blocking: if another `mine` is already writing to this palace, + raise MineAlreadyRunning so the caller can exit cleanly instead of + piling up as a waiting worker. + """ + lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") + os.makedirs(lock_dir, exist_ok=True) + resolved = os.path.realpath(os.path.expanduser(palace_path)) + lock_key_source = os.path.normcase(resolved) + palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16] + lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") + + lf = open(lock_path, "w") + acquired = False + try: + if os.name == "nt": + import msvcrt + + try: + msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1) + acquired = True + except OSError as exc: + raise MineAlreadyRunning( + f"another `mempalace mine` is already running against {resolved}" + ) from exc + else: + import fcntl + + try: + fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB) + acquired = True + except BlockingIOError as exc: + raise MineAlreadyRunning( + f"another `mempalace mine` is already running against {resolved}" + ) from exc + yield + finally: + if acquired: + try: + if os.name == "nt": + import msvcrt + + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + + fcntl.flock(lf, fcntl.LOCK_UN) + except Exception: + pass + lf.close() + + +# Backward-compatible alias (previous patch iteration used a single global +# lock). Kept so third-party callers that imported it continue to work; new +# code should use `mine_palace_lock(palace_path)` for per-palace scoping. +mine_global_lock = mine_palace_lock + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/tests/test_backends.py b/tests/test_backends.py index b3f009a..e47eb6f 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -16,6 +16,7 @@ from mempalace.backends.chroma import ( ChromaBackend, ChromaCollection, _fix_blob_seq_ids, + _pin_hnsw_threads, quarantine_stale_hnsw, ) @@ -443,3 +444,52 @@ def test_quarantine_stale_hnsw_skips_already_quarantined(tmp_path): moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0) assert moved == [] assert drift.exists() + + +# ── _pin_hnsw_threads ───────────────────────────────────────────────────── + + +def test_pin_hnsw_threads_retrofits_legacy_collection(tmp_path): + """Legacy collections (created without num_threads) get the retrofit applied.""" + palace_path = tmp_path / "legacy-palace" + palace_path.mkdir() + + client = chromadb.PersistentClient(path=str(palace_path)) + col = client.create_collection( + "mempalace_drawers", + metadata={"hnsw:space": "cosine"}, # no num_threads — legacy + ) + assert col.configuration_json.get("hnsw", {}).get("num_threads") is None + + _pin_hnsw_threads(col) + + assert col.configuration_json["hnsw"]["num_threads"] == 1 + + +def test_pin_hnsw_threads_swallows_all_errors(): + """Retrofit never raises even when collection.modify explodes.""" + + class _ExplodingCollection: + def modify(self, *args, **kwargs): + raise RuntimeError("boom") + + _pin_hnsw_threads(_ExplodingCollection()) # must not raise + + +def test_get_collection_applies_retrofit_on_existing_palace(tmp_path): + """ChromaBackend.get_collection(create=False) applies the retrofit.""" + palace_path = tmp_path / "palace" + palace_path.mkdir() + + # Simulate a legacy palace: create collection without num_threads + bootstrap_client = chromadb.PersistentClient(path=str(palace_path)) + bootstrap_client.create_collection("mempalace_drawers", metadata={"hnsw:space": "cosine"}) + del bootstrap_client # drop reference so a fresh client reopens cleanly + + wrapper = ChromaBackend().get_collection( + str(palace_path), + collection_name="mempalace_drawers", + create=False, + ) + + assert wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1 diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py new file mode 100644 index 0000000..601c894 --- /dev/null +++ b/tests/test_palace_locks.py @@ -0,0 +1,158 @@ +"""Tests for mine_palace_lock — the per-palace non-blocking mine guard. + +Covers the fix for the runaway mine fan-out described alongside issues +#974 and #965: if N copies of `mempalace mine` are spawned concurrently +against the same palace, they must collapse to a single runner rather +than queue as waiters that will drive parallel HNSW inserts. Mines +against *different* palaces must still be free to run in parallel. +""" + +from __future__ import annotations + +import multiprocessing +import os +import time + +import pytest + +from mempalace.palace import ( + MineAlreadyRunning, + mine_global_lock, + mine_palace_lock, +) + + +def _get_mp_context(): + """Pick a start method that works on every CI runner. + + `fork` is cheaper (no re-import) but is unavailable on Windows, so we fall + back to `spawn` there. `spawn` inherits ``os.environ`` (including the + monkeypatched ``HOME``) and re-imports the ``mempalace`` package in the + child, which is sufficient for the lock-file semantics exercised here. + """ + start_method = "spawn" if os.name == "nt" else "fork" + return multiprocessing.get_context(start_method) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int: + """Acquire mine_palace_lock, signal readiness, wait for release flag. + + Returns 0 if we acquired the lock, 1 if MineAlreadyRunning was raised. + Runs in a child process for true cross-process locking semantics. + """ + try: + with mine_palace_lock(palace_path): + # Tell the parent we hold the lock + open(ready_flag, "w").close() + # Wait until parent tells us to release + for _ in range(500): + if os.path.exists(release_flag): + return 0 + time.sleep(0.01) + return 0 + except MineAlreadyRunning: + return 1 + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_single_acquire_succeeds(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + with mine_palace_lock(str(tmp_path / "palace")): + pass # should not raise + + +def test_lock_reusable_after_release(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + with mine_palace_lock(palace): + pass + # Re-acquire must succeed now that the previous holder released + with mine_palace_lock(palace): + pass + + +def test_same_palace_serializes_across_processes(tmp_path, monkeypatch): + """Two processes contending for the same palace: second must be rejected.""" + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + # Wait for the holder to acquire + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # From the parent, we must not be able to acquire the same palace lock + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(palace): + pytest.fail("second acquire of same palace should have raised") + finally: + open(release, "w").close() + holder.join(timeout=5) + assert holder.exitcode == 0 + + +def test_different_palaces_dont_conflict(tmp_path, monkeypatch): + """Mines against different palaces must NOT block each other.""" + monkeypatch.setenv("HOME", str(tmp_path)) + palace_a = str(tmp_path / "palace_a") + palace_b = str(tmp_path / "palace_b") + ready = str(tmp_path / "ready_a") + release = str(tmp_path / "release_a") + + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # Different palace — must succeed even while palace_a is held + with mine_palace_lock(palace_b): + pass # no exception expected + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_palace_path_is_normalized(tmp_path, monkeypatch): + """Relative and absolute forms of the same path must use the same lock.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.chdir(tmp_path) + os.makedirs(tmp_path / "palace", exist_ok=True) + absolute = str(tmp_path / "palace") + relative = "palace" + + # Hold the lock with the absolute form; attempting to re-acquire with + # the relative form (which resolves to the same absolute path) must fail. + with mine_palace_lock(absolute): + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(relative): + pytest.fail("normalized path collision should have raised") + + +def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch): + """Old callers of `mine_global_lock` should still work.""" + monkeypatch.setenv("HOME", str(tmp_path)) + assert mine_global_lock is mine_palace_lock + with mine_global_lock(str(tmp_path / "palace")): + pass # the alias accepts the same palace_path argument