fix: address PR review — per-palace lock, MCP server path, hook timeout, tests

Addresses the six Copilot review comments on the initial commit.

1) #6 (critical) — mcp_server.py `_get_collection` bypassed ChromaBackend

   The MCP server creates its palace collection directly via
   `chromadb.PersistentClient.get_or_create_collection` in `_get_collection`,
   not through `ChromaBackend.get_collection`. That path was missing the
   `hnsw:num_threads=1` metadata, so the primary crash surface for #974
   and #965 was untouched by the original patch. Fixed by passing
   `hnsw:num_threads=1` at the mcp_server create site too. Documented
   in a code comment that the setting is only honored at creation
   time — existing palaces created before this fix still need a
   `mempalace nuke` + re-mine to gain the protection.

2) #3 — mine_global_lock over-serialized mines across unrelated palaces

   Replaced the single global lock file `mine_global.lock` with a
   per-palace lock keyed by `sha256(os.path.abspath(palace_path))`
   (`mine_palace_<hash>.lock`). Mines against the same palace still
   collapse to a single runner (the correctness boundary), but mines
   against *different* palaces are now free to run in parallel.
   `mine_global_lock` is kept as a backward-compatible alias for
   `mine_palace_lock` so any external callers that imported the
   previous name keep working.

3) #1 — hook_precompact swallowed OSError but not subprocess.TimeoutExpired

   `subprocess.run(..., timeout=60)` raises `TimeoutExpired` on slow
   palaces. The previous `except OSError` clause didn't catch it, so
   the hook could raise and fail to emit any JSON decision — leaving
   the harness without a block/passthrough signal. Fixed by catching
   `(OSError, subprocess.TimeoutExpired)` together and always falling
   through to the block decision so the hook reliably emits a response.

4) #2 + #4 — tests

   - tests/test_hooks_cli.py: added
     `test_precompact_first_two_attempts_block`,
     `test_precompact_passes_through_after_cap`, and
     `test_precompact_counter_is_per_session` to lock in the #955
     deadlock fix.
   - tests/test_palace_locks.py (new): covers `mine_palace_lock`
     single-acquire, reuse-after-release, cross-process serialization
     on the same palace, non-interference across different palaces,
     path normalization, and the `mine_global_lock` back-compat alias.

5) #5 — known limitation, documented but not auto-fixed

   Copilot suggested detecting collections missing `hnsw:num_threads=1`
   and calling `collection.modify(metadata=...)` to retrofit existing
   palaces. Verified against chromadb 1.5.7: `modify(metadata=...)`
   replaces metadata rather than merging, and re-passing
   `hnsw:space="cosine"` then raises `ValueError: Changing the
   distance function of a collection once it is created is not
   supported currently.` The HNSW runtime configuration
   (`configuration_json`) also does not expose `num_threads` in
   chromadb 1.5.x, so the flag appears to be read only at creation
   time. Rather than paper over the limitation with a best-effort
   `modify` that silently drops `hnsw:space`, documented in the
   mcp_server comment that pre-existing palaces need a
   `mempalace nuke` + re-mine to gain the protection. Fresh palaces
   are always protected.

Testing
- pytest tests/test_palace_locks.py tests/test_hooks_cli.py
  tests/test_backends.py tests/test_cli.py → **98 passed, 0 failed**.
- Runtime validation with two concurrent `mempalace mine` calls:
  - Different palaces → both complete in parallel ✓
  - Same palace     → one completes, the other exits with
    "another `mine` is already running against <palace> — exiting
    cleanly." ✓
This commit is contained in:
Felipe Truman
2026-04-17 16:03:07 -03:00
committed by Igor Lins e Silva
parent 7e18a70796
commit 99b820cb42
5 changed files with 245 additions and 58 deletions
+8 -1
View File
@@ -217,9 +217,16 @@ def _get_collection(create=False):
try: try:
client = _get_client() client = _get_client()
if create: if create:
# hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor
# HNSW insert path, which has a race in repairConnectionsForUpdate /
# addPoint (see issues #974, #965). The setting is only honored at
# collection creation time — pre-existing palaces created before
# this fix keep the unsafe default; users must `mempalace nuke` +
# re-mine to get the protection on legacy palaces.
_collection_cache = ChromaCollection( _collection_cache = ChromaCollection(
client.get_or_create_collection( client.get_or_create_collection(
_config.collection_name, metadata={"hnsw:space": "cosine"} _config.collection_name,
metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1},
) )
) )
_metadata_cache = None _metadata_cache = None
+4 -3
View File
@@ -25,8 +25,8 @@ from .palace import (
file_already_mined, file_already_mined,
get_closets_collection, get_closets_collection,
get_collection, get_collection,
mine_global_lock,
mine_lock, mine_lock,
mine_palace_lock,
purge_file_closets, purge_file_closets,
upsert_closet_lines, upsert_closet_lines,
) )
@@ -1008,7 +1008,7 @@ def mine(
) )
try: try:
with mine_global_lock(): with mine_palace_lock(palace_path):
return _mine_impl( return _mine_impl(
project_dir, project_dir,
palace_path, palace_path,
@@ -1021,7 +1021,8 @@ def mine(
) )
except MineAlreadyRunning: except MineAlreadyRunning:
print( print(
"mempalace: another `mine` is already running — exiting cleanly.", f"mempalace: another `mine` is already running against "
f"{palace_path} — exiting cleanly.",
file=sys.stderr, file=sys.stderr,
) )
return return
+23 -11
View File
@@ -311,27 +311,33 @@ def mine_lock(source_file: str):
class MineAlreadyRunning(RuntimeError): class MineAlreadyRunning(RuntimeError):
"""Raised when another `mempalace mine` process already holds the global lock.""" """Raised when another `mempalace mine` already holds the per-palace lock."""
@contextlib.contextmanager @contextlib.contextmanager
def mine_global_lock(): def mine_palace_lock(palace_path: str):
"""Process-wide non-blocking lock around the full `mine` pipeline. """Per-palace non-blocking lock around the full `mine` pipeline.
The per-file `mine_lock` only protects delete+insert interleave for a The per-file `mine_lock` only protects delete+insert interleave for a
single source; it does not prevent N copies of `mempalace mine <dir>` single source; it does not prevent N copies of `mempalace mine <dir>`
from being spawned concurrently by hooks. When that happens, each copy from being spawned concurrently by hooks. When that happens, each copy
drives ChromaDB HNSW inserts in parallel, which (combined with drives ChromaDB HNSW inserts in parallel against the same palace,
chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and which (combined with chromadb's multi-threaded ParallelFor) can
produce sparse link_lists.bin blowups. corrupt the HNSW graph and produce sparse link_lists.bin blowups.
This lock is non-blocking: if another `mine` is already running, we The lock file is keyed by sha256(palace_path) so mines against
*different* palaces can still run in parallel — we only serialize
writes into the same palace, which is the correctness boundary.
Non-blocking: if another `mine` is already writing to this palace,
raise MineAlreadyRunning so the caller can exit cleanly instead of raise MineAlreadyRunning so the caller can exit cleanly instead of
piling up waiting workers. piling up as a waiting worker.
""" """
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
os.makedirs(lock_dir, exist_ok=True) os.makedirs(lock_dir, exist_ok=True)
lock_path = os.path.join(lock_dir, "mine_global.lock") resolved = os.path.abspath(os.path.expanduser(palace_path))
palace_key = hashlib.sha256(resolved.encode()).hexdigest()[:16]
lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock")
lf = open(lock_path, "w") lf = open(lock_path, "w")
acquired = False acquired = False
@@ -344,7 +350,7 @@ def mine_global_lock():
acquired = True acquired = True
except OSError as exc: except OSError as exc:
raise MineAlreadyRunning( raise MineAlreadyRunning(
"another `mempalace mine` is already running" f"another `mempalace mine` is already running against {resolved}"
) from exc ) from exc
else: else:
import fcntl import fcntl
@@ -354,7 +360,7 @@ def mine_global_lock():
acquired = True acquired = True
except BlockingIOError as exc: except BlockingIOError as exc:
raise MineAlreadyRunning( raise MineAlreadyRunning(
"another `mempalace mine` is already running" f"another `mempalace mine` is already running against {resolved}"
) from exc ) from exc
yield yield
finally: finally:
@@ -373,6 +379,12 @@ def mine_global_lock():
lf.close() lf.close()
# Backward-compatible alias (previous patch iteration used a single global
# lock). Kept so third-party callers that imported it continue to work; new
# code should use `mine_palace_lock(palace_path)` for per-palace scoping.
mine_global_lock = mine_palace_lock
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
"""Check if a file has already been filed in the palace. """Check if a file has already been filed in the palace.
+80
View File
@@ -9,6 +9,7 @@ from unittest.mock import MagicMock, patch
import pytest import pytest
from mempalace.hooks_cli import ( from mempalace.hooks_cli import (
MAX_PRECOMPACT_BLOCK_ATTEMPTS,
SAVE_INTERVAL, SAVE_INTERVAL,
_count_human_messages, _count_human_messages,
_extract_recent_messages, _extract_recent_messages,
@@ -59,6 +60,85 @@ def test_sanitize_empty_returns_unknown():
assert _sanitize_session_id("!!!") == "unknown" assert _sanitize_session_id("!!!") == "unknown"
# --- hook_precompact attempt cap (regression for #955 deadlock fix) ---
def _call_precompact(session_id: str) -> dict:
"""Invoke hook_precompact with a deterministic session_id, capture stdout.
Returns the parsed JSON decision emitted by the hook.
"""
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
hook_precompact({"session_id": session_id}, "claude-code")
raw = stdout.getvalue().strip()
return json.loads(raw) if raw else {}
def test_precompact_first_two_attempts_block(tmp_path, monkeypatch):
"""First MAX_PRECOMPACT_BLOCK_ATTEMPTS calls must block with a reason."""
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.delenv("MEMPAL_DIR", raising=False)
import mempalace.hooks_cli as hooks_cli
monkeypatch.setattr(
hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False
)
sid = "test-session-block"
for i in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS):
decision = _call_precompact(sid)
assert decision.get("decision") == "block", (
f"attempt {i + 1}/{MAX_PRECOMPACT_BLOCK_ATTEMPTS}: expected block, "
f"got {decision}"
)
assert decision.get("reason") == PRECOMPACT_BLOCK_REASON
def test_precompact_passes_through_after_cap(tmp_path, monkeypatch):
"""After the cap is reached, the hook must stop blocking (fix for #955)."""
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.delenv("MEMPAL_DIR", raising=False)
import mempalace.hooks_cli as hooks_cli
monkeypatch.setattr(
hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False
)
sid = "test-session-passthrough"
for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS):
_call_precompact(sid) # exhaust the budget
# Next call must pass through (empty JSON decision)
decision = _call_precompact(sid)
assert decision == {}, (
f"after {MAX_PRECOMPACT_BLOCK_ATTEMPTS} attempts, hook must pass "
f"through to avoid deadlock; got {decision}"
)
def test_precompact_counter_is_per_session(tmp_path, monkeypatch):
"""A fresh session_id must get a fresh attempt budget."""
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.delenv("MEMPAL_DIR", raising=False)
import mempalace.hooks_cli as hooks_cli
monkeypatch.setattr(
hooks_cli, "STATE_DIR", tmp_path / "hook_state", raising=False
)
sid_a = "session-a"
sid_b = "session-b"
# Exhaust session A
for _ in range(MAX_PRECOMPACT_BLOCK_ATTEMPTS):
_call_precompact(sid_a)
assert _call_precompact(sid_a) == {} # A is done blocking
# Session B must still block on its first call — isolation between sessions
assert _call_precompact(sid_b).get("decision") == "block"
# --- _count_human_messages --- # --- _count_human_messages ---
+130 -43
View File
@@ -1,59 +1,146 @@
"""Tests for mine_global_lock: non-blocking cross-process lock semantics.""" """Tests for mine_palace_lock — the per-palace non-blocking mine guard.
import threading
Covers the fix for the runaway mine fan-out described alongside issues
#974 and #965: if N copies of `mempalace mine` are spawned concurrently
against the same palace, they must collapse to a single runner rather
than queue as waiters that will drive parallel HNSW inserts. Mines
against *different* palaces must still be free to run in parallel.
"""
from __future__ import annotations
import multiprocessing
import os
import time
import pytest import pytest
from mempalace.palace import MineAlreadyRunning, mine_global_lock from mempalace.palace import (
MineAlreadyRunning,
mine_global_lock,
mine_palace_lock,
)
def test_mine_global_lock_acquired(tmp_path, monkeypatch): # ---------------------------------------------------------------------------
"""Lock is acquired and released without error.""" # Helpers
# ---------------------------------------------------------------------------
def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int:
"""Acquire mine_palace_lock, signal readiness, wait for release flag.
Returns 0 if we acquired the lock, 1 if MineAlreadyRunning was raised.
Runs in a child process for true cross-process locking semantics.
"""
try:
with mine_palace_lock(palace_path):
# Tell the parent we hold the lock
open(ready_flag, "w").close()
# Wait until parent tells us to release
for _ in range(500):
if os.path.exists(release_flag):
return 0
time.sleep(0.01)
return 0
except MineAlreadyRunning:
return 1
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_single_acquire_succeeds(tmp_path, monkeypatch):
monkeypatch.setenv("HOME", str(tmp_path)) monkeypatch.setenv("HOME", str(tmp_path))
with mine_global_lock(): with mine_palace_lock(str(tmp_path / "palace")):
pass # should not raise pass # should not raise
def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch): def test_lock_reusable_after_release(tmp_path, monkeypatch):
"""Concurrent second acquire raises MineAlreadyRunning."""
monkeypatch.setenv("HOME", str(tmp_path)) monkeypatch.setenv("HOME", str(tmp_path))
results: list[str] = [] palace = str(tmp_path / "palace")
with mine_palace_lock(palace):
with mine_global_lock(): pass
# While this lock is held, spawn a thread that tries to acquire. # Re-acquire must succeed now that the previous holder released
def try_acquire(): with mine_palace_lock(palace):
try:
with mine_global_lock():
results.append("acquired")
except MineAlreadyRunning:
results.append("blocked")
t = threading.Thread(target=try_acquire)
t.start()
t.join(timeout=5)
assert results == ["blocked"]
def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch):
"""Lock can be re-acquired after the context manager exits."""
monkeypatch.setenv("HOME", str(tmp_path))
with mine_global_lock():
pass # first acquire + release
# Second acquire must succeed; MineAlreadyRunning would propagate as failure.
with mine_global_lock():
pass pass
def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch): def test_same_palace_serializes_across_processes(tmp_path, monkeypatch):
"""Lock is released even when the body raises.""" """Two processes contending for the same palace: second must be rejected."""
monkeypatch.setenv("HOME", str(tmp_path)) monkeypatch.setenv("HOME", str(tmp_path))
palace = str(tmp_path / "palace")
ready = str(tmp_path / "ready")
release = str(tmp_path / "release")
with pytest.raises(ValueError): ctx = multiprocessing.get_context("fork")
with mine_global_lock(): holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
raise ValueError("boom") holder.start()
try:
# Wait for the holder to acquire
for _ in range(500):
if os.path.exists(ready):
break
time.sleep(0.01)
assert os.path.exists(ready), "holder failed to acquire lock in time"
# Must be acquirable again after the exception. # From the parent, we must not be able to acquire the same palace lock
with mine_global_lock(): with pytest.raises(MineAlreadyRunning):
pass with mine_palace_lock(palace):
pytest.fail("second acquire of same palace should have raised")
finally:
open(release, "w").close()
holder.join(timeout=5)
assert holder.exitcode == 0
def test_different_palaces_dont_conflict(tmp_path, monkeypatch):
"""Mines against different palaces must NOT block each other."""
monkeypatch.setenv("HOME", str(tmp_path))
palace_a = str(tmp_path / "palace_a")
palace_b = str(tmp_path / "palace_b")
ready = str(tmp_path / "ready_a")
release = str(tmp_path / "release_a")
ctx = multiprocessing.get_context("fork")
holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release))
holder.start()
try:
for _ in range(500):
if os.path.exists(ready):
break
time.sleep(0.01)
assert os.path.exists(ready), "holder failed to acquire lock in time"
# Different palace — must succeed even while palace_a is held
with mine_palace_lock(palace_b):
pass # no exception expected
finally:
open(release, "w").close()
holder.join(timeout=5)
def test_palace_path_is_normalized(tmp_path, monkeypatch):
"""Relative and absolute forms of the same path must use the same lock."""
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.chdir(tmp_path)
os.makedirs(tmp_path / "palace", exist_ok=True)
absolute = str(tmp_path / "palace")
relative = "palace"
# Hold the lock with the absolute form; attempting to re-acquire with
# the relative form (which resolves to the same absolute path) must fail.
with mine_palace_lock(absolute):
with pytest.raises(MineAlreadyRunning):
with mine_palace_lock(relative):
pytest.fail("normalized path collision should have raised")
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
"""Old callers of `mine_global_lock` should still work."""
monkeypatch.setenv("HOME", str(tmp_path))
assert mine_global_lock is mine_palace_lock
with mine_global_lock(str(tmp_path / "palace")):
pass # the alias accepts the same palace_path argument