From 7e18a707963a0c96653bfdb04fd56eff56964549 Mon Sep 17 00:00:00 2001 From: Felipe Truman Date: Fri, 17 Apr 2026 17:14:22 -0300 Subject: [PATCH] fix: resolve hooks_cli.py merge conflict + add mine_global_lock tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Resolve UU conflict in hooks_cli.py: take develop/HEAD approach (mine synchronously via _mine_sync, then pass through unconditionally). _mine_sync already catches subprocess.TimeoutExpired — fixes Copilot #1. - Add tests/test_palace_locks.py: 4 tests covering mine_global_lock non-blocking semantics (acquire, second-acquire raises MineAlreadyRunning, reusable after release, release on exception) — fixes Copilot #4. Co-Authored-By: Claude Sonnet 4.6 --- mempalace/backends/chroma.py | 8 +++-- mempalace/hooks_cli.py | 3 ++ mempalace/miner.py | 44 +++++++++++++++++++++++++ mempalace/palace.py | 63 ++++++++++++++++++++++++++++++++++++ tests/test_palace_locks.py | 59 +++++++++++++++++++++++++++++++++ 5 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 tests/test_palace_locks.py diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index 14ae9cd..c36e2c7 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -566,7 +566,9 @@ class ChromaBackend(BaseBackend): if create: collection = client.get_or_create_collection( - collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs + collection_name, + metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, + **ef_kwargs, ) else: collection = client.get_collection(collection_name, **ef_kwargs) @@ -613,7 +615,9 @@ class ChromaBackend(BaseBackend): ef = self._resolve_embedding_function() ef_kwargs = {"embedding_function": ef} if ef is not None else {} collection = self._client(palace_path).create_collection( - collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs + collection_name, + metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1}, + **ef_kwargs, ) return ChromaCollection(collection) diff --git a/mempalace/hooks_cli.py b/mempalace/hooks_cli.py index 01eca3f..bdcc97d 100644 --- a/mempalace/hooks_cli.py +++ b/mempalace/hooks_cli.py @@ -643,6 +643,9 @@ def hook_session_start(data: dict, harness: str): _output({}) +MAX_PRECOMPACT_BLOCK_ATTEMPTS = 2 + + def hook_precompact(data: dict, harness: str): """Precompact hook: mine transcript synchronously, then allow compaction.""" parsed = _parse_harness_input(data, harness) diff --git a/mempalace/miner.py b/mempalace/miner.py index b593797..2fde777 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -20,10 +20,12 @@ from typing import Optional from .palace import ( NORMALIZE_VERSION, SKIP_DIRS, + MineAlreadyRunning, build_closet_lines, file_already_mined, get_closets_collection, get_collection, + mine_global_lock, mine_lock, purge_file_closets, upsert_closet_lines, @@ -993,6 +995,48 @@ def mine( ``mine`` walks the tree itself just like before. """ + if dry_run: + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + ) + + try: + with mine_global_lock(): + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + ) + except MineAlreadyRunning: + print( + "mempalace: another `mine` is already running — exiting cleanly.", + file=sys.stderr, + ) + return + + +def _mine_impl( + project_dir: str, + palace_path: str, + wing_override: str = None, + agent: str = "mempalace", + limit: int = 0, + dry_run: bool = False, + respect_gitignore: bool = True, + include_ignored: list = None, +): project_path = Path(project_dir).expanduser().resolve() config = load_config(project_dir) diff --git a/mempalace/palace.py b/mempalace/palace.py index a2a4a8e..76a037e 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -310,6 +310,69 @@ def mine_lock(source_file: str): lf.close() +class MineAlreadyRunning(RuntimeError): + """Raised when another `mempalace mine` process already holds the global lock.""" + + +@contextlib.contextmanager +def mine_global_lock(): + """Process-wide non-blocking lock around the full `mine` pipeline. + + The per-file `mine_lock` only protects delete+insert interleave for a + single source; it does not prevent N copies of `mempalace mine ` + from being spawned concurrently by hooks. When that happens, each copy + drives ChromaDB HNSW inserts in parallel, which (combined with + chromadb's multi-threaded ParallelFor) can corrupt the HNSW graph and + produce sparse link_lists.bin blowups. + + This lock is non-blocking: if another `mine` is already running, we + raise MineAlreadyRunning so the caller can exit cleanly instead of + piling up waiting workers. + """ + lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") + os.makedirs(lock_dir, exist_ok=True) + lock_path = os.path.join(lock_dir, "mine_global.lock") + + lf = open(lock_path, "w") + acquired = False + try: + if os.name == "nt": + import msvcrt + + try: + msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1) + acquired = True + except OSError as exc: + raise MineAlreadyRunning( + "another `mempalace mine` is already running" + ) from exc + else: + import fcntl + + try: + fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB) + acquired = True + except BlockingIOError as exc: + raise MineAlreadyRunning( + "another `mempalace mine` is already running" + ) from exc + yield + finally: + if acquired: + try: + if os.name == "nt": + import msvcrt + + msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + + fcntl.flock(lf, fcntl.LOCK_UN) + except Exception: + pass + lf.close() + + def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool: """Check if a file has already been filed in the palace. diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py new file mode 100644 index 0000000..5b94a5d --- /dev/null +++ b/tests/test_palace_locks.py @@ -0,0 +1,59 @@ +"""Tests for mine_global_lock: non-blocking cross-process lock semantics.""" +import threading + +import pytest + +from mempalace.palace import MineAlreadyRunning, mine_global_lock + + +def test_mine_global_lock_acquired(tmp_path, monkeypatch): + """Lock is acquired and released without error.""" + monkeypatch.setenv("HOME", str(tmp_path)) + with mine_global_lock(): + pass # should not raise + + +def test_mine_global_lock_second_acquire_raises(tmp_path, monkeypatch): + """Concurrent second acquire raises MineAlreadyRunning.""" + monkeypatch.setenv("HOME", str(tmp_path)) + results: list[str] = [] + + with mine_global_lock(): + # While this lock is held, spawn a thread that tries to acquire. + def try_acquire(): + try: + with mine_global_lock(): + results.append("acquired") + except MineAlreadyRunning: + results.append("blocked") + + t = threading.Thread(target=try_acquire) + t.start() + t.join(timeout=5) + + assert results == ["blocked"] + + +def test_mine_global_lock_reusable_after_release(tmp_path, monkeypatch): + """Lock can be re-acquired after the context manager exits.""" + monkeypatch.setenv("HOME", str(tmp_path)) + + with mine_global_lock(): + pass # first acquire + release + + # Second acquire must succeed; MineAlreadyRunning would propagate as failure. + with mine_global_lock(): + pass + + +def test_mine_global_lock_exception_still_releases(tmp_path, monkeypatch): + """Lock is released even when the body raises.""" + monkeypatch.setenv("HOME", str(tmp_path)) + + with pytest.raises(ValueError): + with mine_global_lock(): + raise ValueError("boom") + + # Must be acquirable again after the exception. + with mine_global_lock(): + pass