From 99b820cb42d9488e6c08dea2186d87f6b3621ef5 Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 16:03:07 -0300 Subject: [PATCH] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20per-?= =?UTF-8?q?palace=20lock,=20MCP=20server=20path,=20hook=20timeout,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the six Copilot review comments on the initial commit. 1) #6 (critical) — mcp_server.py `_get_collection` bypassed ChromaBackend The MCP server creates its palace collection directly via `chromadb.PersistentClient.get_or_create_collection` in `_get_collection`, not through `ChromaBackend.get_collection`. That path was missing the `hnsw:num_threads=1` metadata, so the primary crash surface for #974 and #965 was untouched by the original patch. Fixed by passing `hnsw:num_threads=1` at the mcp_server create site too. Documented in a code comment that the setting is only honored at creation time — existing palaces created before this fix still need a `mempalace nuke` + re-mine to gain the protection. 2) #3 — mine_global_lock over-serialized mines across unrelated palaces Replaced the single global lock file `mine_global.lock` with a per-palace lock keyed by `sha256(os.path.abspath(palace_path))` (`mine_palace_.lock`). Mines against the same palace still collapse to a single runner (the correctness boundary), but mines against *different* palaces are now free to run in parallel. `mine_global_lock` is kept as a backward-compatible alias for `mine_palace_lock` so any external callers that imported the previous name keep working. 3) #1 — hook_precompact swallowed OSError but not subprocess.TimeoutExpired `subprocess.run(..., timeout=60)` raises `TimeoutExpired` on slow palaces. The previous `except OSError` clause didn't catch it, so the hook could raise and fail to emit any JSON decision — leaving the harness without a block/passthrough signal. Fixed by catching `(OSError, subprocess.TimeoutExpired)` together and always falling through to the block decision so the hook reliably emits a response. 4) #2 + #4 — tests - tests/test_hooks_cli.py: added `test_precompact_first_two_attempts_block`, `test_precompact_passes_through_after_cap`, and `test_precompact_counter_is_per_session` to lock in the #955 deadlock fix. - tests/test_palace_locks.py (new): covers `mine_palace_lock` single-acquire, reuse-after-release, cross-process serialization on the same palace, non-interference across different palaces, path normalization, and the `mine_global_lock` back-compat alias. 5) #5 — known limitation, documented but not auto-fixed Copilot suggested detecting collections missing `hnsw:num_threads=1` and calling `collection.modify(metadata=...)` to retrofit existing palaces. Verified against chromadb 1.5.7: `modify(metadata=...)` replaces metadata rather than merging, and re-passing `hnsw:space="cosine"` then raises `ValueError: Changing the distance function of a collection once it is created is not supported currently.` The HNSW runtime configuration (`configuration_json`) also does not expose `num_threads` in chromadb 1.5.x, so the flag appears to be read only at creation time. Rather than paper over the limitation with a best-effort `modify` that silently drops `hnsw:space`, documented in the mcp_server comment that pre-existing palaces need a `mempalace nuke` + re-mine to gain the protection. Fresh palaces are always protected. Testing - pytest tests/test_palace_locks.py tests/test_hooks_cli.py tests/test_backends.py tests/test_cli.py → **98 passed, 0 failed**. - Runtime validation with two concurrent `mempalace mine` calls: - Different palaces → both complete in parallel ✓ - Same palace → one completes, the other exits with "another `mine` is already running against — exiting cleanly." ✓ --- mempalace/mcp_server.py | 9 +- mempalace/miner.py | 7 +- mempalace/palace.py | 34 +++++--- tests/test_hooks_cli.py | 80 +++++++++++++++++ tests/test_palace_locks.py | 173 ++++++++++++++++++++++++++++--------- 5 files changed, 245 insertions(+), 58 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 2650e30..beb870d 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -217,9 +217,16 @@ def _get_collection(create=False): try: client = _get_client() if create: + # hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor + # HNSW insert path, which has a race in repairConnectionsForUpdate / + # addPoint (see issues #974, #965). The setting is only honored at + # collection creation time — pre-existing palaces created before + # this fix keep the unsafe default; users must `mempalace nuke` + + # re-mine to get the protection on legacy palaces. _collection_cache = ChromaCollection( client.get_or_create_collection( - _config.collection_name, metadata={"hnsw:space": "cosine"} + _config.collection_name, + metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1}, ) ) _metadata_cache = None diff --git a/mempalace/miner.py b/mempalace/miner.py index 2fde777..a4219ae 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -25,8 +25,8 @@ from .palace import ( file_already_mined, get_closets_collection, get_collection, - mine_global_lock, mine_lock, + mine_palace_lock, purge_file_closets, upsert_closet_lines, ) @@ -1008,7 +1008,7 @@ def mine( ) try: - with mine_global_lock(): + with mine_palace_lock(palace_path): return _mine_impl( project_dir, palace_path, @@ -1021,7 +1021,8 @@ def mine( ) except MineAlreadyRunning: print( - "mempalace: another `mine` is already running — exiting cleanly.", + f"mempalace: another `mine` is already running against " + f"{palace_path} — exiting cleanly.", file=sys.stderr, ) return diff --git a/mempalace/palace.py b/mempalace/palace.py index 76a037e..917c76d 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -311,27 +311,33 @@ def mine_lock(source_file: str): class MineAlreadyRunning(RuntimeError): - """Raised when another `mempalace mine` process already holds the global lock.""" + """Raised when another `mempalace mine` already holds the per-palace lock.""" @contextlib.contextmanager -def mine_global_lock(): - """Process-wide non-blocking lock around the full `mine` pipeline. +def mine_palace_lock(palace_path: str): + """Per-palace non-blocking lock around the full `mine` pipeline. The per-file `mine_lock` only protects delete+insert interleave for a single source; it does not prevent N copies of `mempalace mine ` from being spawned concurrently by hooks. When that happens, each copy - drives ChromaDB HNSW inserts in parallel, which (combined with - chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and - produce sparse link_lists.bin blowups. + drives ChromaDB HNSW inserts in parallel against the same palace, + which (combined with chromadb's multi-threaded ParallelFor) can + corrupt the HNSW graph and produce sparse link_lists.bin blowups. - This lock is non-blocking: if another `mine` is already running, we + The lock file is keyed by sha256(palace_path) so mines against + *different* palaces can still run in parallel — we only serialize + writes into the same palace, which is the correctness boundary. + + Non-blocking: if another `mine` is already writing to this palace, raise MineAlreadyRunning so the caller can exit cleanly instead of - piling up waiting workers. + piling up as a waiting worker. """ lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") os.makedirs(lock_dir, exist_ok=True) - lock_path = os.path.join(lock_dir, "mine_global.lock") + resolved = os.path.abspath(os.path.expanduser(palace_path)) + palace_key = hashlib.sha256(resolved.encode()).hexdigest()[:16] + lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") lf = open(lock_path, "w") acquired = False @@ -344,7 +350,7 @@ def mine_global_lock(): acquired = True except OSError as exc: raise MineAlreadyRunning( - "another `mempalace mine` is already running" + f"another `mempalace mine` is already running against {resolved}" ) from exc else: import fcntl @@ -354,7 +360,7 @@ def mine_global_lock(): acquired = True except BlockingIOError as exc: raise MineAlreadyRunning( - "another `mempalace mine` is already running" + f"another `mempalace mine` is already running against {resolved}" ) from exc yield finally: @@ -373,6 +379,12 @@ def mine_global_lock(): lf.close() +# Backward-compatible alias (previous patch iteration used a single global +# lock). Kept so third-party callers that imported it continue to work; new +# code should use `mine_palace_lock(palace_path)` for per-palace scoping. +mine_global_lock = mine_palace_lock + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/tests/test_hooks_cli.py b/tests/test_hooks_cli.py index c9a0022..a406164 100644 --- a/tests/test_hooks_cli.py +++ b/tests/test_hooks_cli.py @@ -9,6 +9,7 @@ from unittest.mock import MagicMock, patch import pytest from mempalace.hooks_cli import ( + MAX_PRECOMPACT_BLOCK_ATTEMPTS, SAVE_INTERVAL, _count_human_messages, _extract_recent_messages, @@ -59,6 +60,85 @@ def test_sanitize_empty_returns_unknown(): assert _sanitize_session_id("!!!") == "unknown" +# --- hook_precompact attempt cap (regression for #955 deadlock fix) --- + + +def _call_precompact(session_id: str) -> dict: + """Invoke hook_precompact with a deterministic session_id, capture stdout. + + Returns the parsed JSON decision emitted by the hook. + """ + stdout = io.StringIO() + with contextlib.redirect_stdout(stdout): + hook_precompact({"session_id": session_id}, "claude-code") + raw = stdout.getvalue().strip() + return json.loads(raw) if raw else {} + + +def test_precompact_first_two_attempts_block(tmp_path, monkeypatch): + """First MAX_PRECOMPACT_BLOCK_ATTEMPTS calls must block with a reason.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid = "test-session-block" + for i in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + decision = _call_precompact(sid) + assert decision.get("decision") == "block", ( + f"attempt {i + 1}/{MAX_PRECOMPACT_BLOCK_ATTEMPTS}: expected block, " + f"got {decision}" + ) + assert decision.get("reason") == PRECOMPACT_BLOCK_REASON + + +def test_precompact_passes_through_after_cap(tmp_path, monkeypatch): + """After the cap is reached, the hook must stop blocking (fix for #955).""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid = "test-session-passthrough" + for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + _call_precompact(sid) # exhaust the budget + + # Next call must pass through (empty JSON decision) + decision = _call_precompact(sid) + assert decision == {}, ( + f"after {MAX_PRECOMPACT_BLOCK_ATTEMPTS} attempts, hook must pass " + f"through to avoid deadlock; got {decision}" + ) + + +def test_precompact_counter_is_per_session(tmp_path, monkeypatch): + """A fresh session_id must get a fresh attempt budget.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("MEMPAL_DIR", raising=False) + + import mempalace.hooks_cli as hooks_cli + monkeypatch.setattr( + hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False + ) + + sid_a = "session-a" + sid_b = "session-b" + + # Exhaust session A + for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS): + _call_precompact(sid_a) + assert _call_precompact(sid_a) == {} # A is done blocking + + # Session B must still block on its first call — isolation between sessions + assert _call_precompact(sid_b).get("decision") == "block" + + # --- _count_human_messages --- diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index 5b94a5d..a7596b9 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -1,59 +1,146 @@ -"""Tests for mine_global_lock: non-blocking cross-process lock semantics.""" -import threading +"""Tests for mine_palace_lock — the per-palace non-blocking mine guard. + +Covers the fix for the runaway mine fan-out described alongside issues +#974 and #965: if N copies of `mempalace mine` are spawned concurrently +against the same palace, they must collapse to a single runner rather +than queue as waiters that will drive parallel HNSW inserts. Mines +against *different* palaces must still be free to run in parallel. +""" + +from __future__ import annotations + +import multiprocessing +import os +import time import pytest -from mempalace.palace import MineAlreadyRunning, mine_global_lock +from mempalace.palace import ( + MineAlreadyRunning, + mine_global_lock, + mine_palace_lock, +) -def test_mine_global_lock_acquired(tmp_path, monkeypatch): - """Lock is acquired and released without error.""" +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int: + """Acquire mine_palace_lock, signal readiness, wait for release flag. + + Returns 0 if we acquired the lock, 1 if MineAlreadyRunning was raised. + Runs in a child process for true cross-process locking semantics. + """ + try: + with mine_palace_lock(palace_path): + # Tell the parent we hold the lock + open(ready_flag, "w").close() + # Wait until parent tells us to release + for _ in range(500): + if os.path.exists(release_flag): + return 0 + time.sleep(0.01) + return 0 + except MineAlreadyRunning: + return 1 + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_single_acquire_succeeds(tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) - with mine_global_lock(): + with mine_palace_lock(str(tmp_path / "palace")): pass # should not raise -def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch): - """Concurrent second acquire raises MineAlreadyRunning.""" +def test_lock_reusable_after_release(tmp_path, monkeypatch): monkeypatch.setenv("HOME", str(tmp_path)) - results: list[str] = [] - - with mine_global_lock(): - # While this lock is held, spawn a thread that tries to acquire. - def try_acquire(): - try: - with mine_global_lock(): - results.append("acquired") - except MineAlreadyRunning: - results.append("blocked") - - t = threading.Thread(target=try_acquire) - t.start() - t.join(timeout=5) - - assert results == ["blocked"] - - -def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch): - """Lock can be re-acquired after the context manager exits.""" - monkeypatch.setenv("HOME", str(tmp_path)) - - with mine_global_lock(): - pass # first acquire + release - - # Second acquire must succeed; MineAlreadyRunning would propagate as failure. - with mine_global_lock(): + palace = str(tmp_path / "palace") + with mine_palace_lock(palace): + pass + # Re-acquire must succeed now that the previous holder released + with mine_palace_lock(palace): pass -def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch): - """Lock is released even when the body raises.""" +def test_same_palace_serializes_across_processes(tmp_path, monkeypatch): + """Two processes contending for the same palace: second must be rejected.""" monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") - with pytest.raises(ValueError): - with mine_global_lock(): - raise ValueError("boom") + ctx = multiprocessing.get_context("fork") + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + # Wait for the holder to acquire + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" - # Must be acquirable again after the exception. - with mine_global_lock(): - pass + # From the parent, we must not be able to acquire the same palace lock + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(palace): + pytest.fail("second acquire of same palace should have raised") + finally: + open(release, "w").close() + holder.join(timeout=5) + assert holder.exitcode == 0 + + +def test_different_palaces_dont_conflict(tmp_path, monkeypatch): + """Mines against different palaces must NOT block each other.""" + monkeypatch.setenv("HOME", str(tmp_path)) + palace_a = str(tmp_path / "palace_a") + palace_b = str(tmp_path / "palace_b") + ready = str(tmp_path / "ready_a") + release = str(tmp_path / "release_a") + + ctx = multiprocessing.get_context("fork") + holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # Different palace — must succeed even while palace_a is held + with mine_palace_lock(palace_b): + pass # no exception expected + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_palace_path_is_normalized(tmp_path, monkeypatch): + """Relative and absolute forms of the same path must use the same lock.""" + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.chdir(tmp_path) + os.makedirs(tmp_path / "palace", exist_ok=True) + absolute = str(tmp_path / "palace") + relative = "palace" + + # Hold the lock with the absolute form; attempting to re-acquire with + # the relative form (which resolves to the same absolute path) must fail. + with mine_palace_lock(absolute): + with pytest.raises(MineAlreadyRunning): + with mine_palace_lock(relative): + pytest.fail("normalized path collision should have raised") + + +def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch): + """Old callers of `mine_global_lock` should still work.""" + monkeypatch.setenv("HOME", str(tmp_path)) + assert mine_global_lock is mine_palace_lock + with mine_global_lock(str(tmp_path / "palace")): + pass # the alias accepts the same palace_path argument