diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index e7c2e6f..6cec804 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -1,5 +1,6 @@ """ChromaDB-backed MemPalace storage backend (RFC 001 reference implementation).""" +import contextlib import datetime as _dt import logging import os @@ -691,10 +692,43 @@ def _close_client(client) -> None: class ChromaCollection(BaseCollection): - """Thin adapter translating ChromaDB dict returns into typed results.""" + """Thin adapter translating ChromaDB dict returns into typed results. - def __init__(self, collection): + When ``palace_path`` is set, all write methods (``add``, ``upsert``, + ``update``, ``delete``) acquire ``mine_palace_lock(palace_path)`` for the + duration of the underlying chromadb call. This serializes MCP and other + direct-backend writers against ``mempalace mine`` and against each other, + closing the race between concurrent writers that triggers ChromaDB's + multi-threaded HNSW corruption (#974/#965). + + The lock is the same primitive used by ``miner.mine()`` so re-entrant + acquisition from inside the mine pipeline (mine -> _mine_body -> + collection.upsert) is short-circuited by the per-thread guard inside + ``mine_palace_lock`` — no self-deadlock. + + ``palace_path=None`` disables the wrapping, preserving the legacy + no-lock behaviour for callers that construct a ``ChromaCollection`` + directly without going through ``ChromaBackend``. + """ + + def __init__(self, collection, palace_path: Optional[str] = None): self._collection = collection + self._palace_path = palace_path + + @contextlib.contextmanager + def _write_lock(self): + """Acquire ``mine_palace_lock`` for the configured palace, if any. + + No-op (yields immediately) when ``self._palace_path`` is None. + """ + if self._palace_path is None: + yield + return + # Late import — palace.py imports ChromaBackend from this module. + from ..palace import mine_palace_lock + + with mine_palace_lock(self._palace_path): + yield # ------------------------------------------------------------------ # Writes @@ -706,7 +740,8 @@ class ChromaCollection(BaseCollection): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.add(**kwargs) + with self._write_lock(): + self._collection.add(**kwargs) def upsert(self, *, documents, ids, metadatas=None, embeddings=None): kwargs: dict[str, Any] = {"documents": documents, "ids": ids} @@ -714,7 +749,8 @@ class ChromaCollection(BaseCollection): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.upsert(**kwargs) + with self._write_lock(): + self._collection.upsert(**kwargs) def update( self, @@ -733,7 +769,8 @@ class ChromaCollection(BaseCollection): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.update(**kwargs) + with self._write_lock(): + self._collection.update(**kwargs) # ------------------------------------------------------------------ # Reads @@ -877,7 +914,8 @@ class ChromaCollection(BaseCollection): kwargs["ids"] = ids if where is not None: kwargs["where"] = where - self._collection.delete(**kwargs) + with self._write_lock(): + self._collection.delete(**kwargs) def count(self): return self._collection.count() @@ -1145,7 +1183,7 @@ class ChromaBackend(BaseBackend): else: collection = client.get_collection(collection_name, **ef_kwargs) _pin_hnsw_threads(collection) - return ChromaCollection(collection) + return ChromaCollection(collection, palace_path=palace_path) def close_palace(self, palace) -> None: """Drop cached handles for ``palace`` and release its SQLite file lock. @@ -1204,7 +1242,7 @@ class ChromaBackend(BaseBackend): }, **ef_kwargs, ) - return ChromaCollection(collection) + return ChromaCollection(collection, palace_path=palace_path) def _normalize_get_collection_args(args, kwargs): diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 035965d..0c26a66 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -374,7 +374,7 @@ def _get_collection(create=False): **ef_kwargs, ) _pin_hnsw_threads(raw) - _collection_cache = ChromaCollection(raw) + _collection_cache = ChromaCollection(raw, palace_path=_config.palace_path) _metadata_cache = None _metadata_cache_time = 0 elif _collection_cache is None: @@ -382,7 +382,7 @@ def _get_collection(create=False): ef_kwargs = {"embedding_function": ef} if ef is not None else {} raw = client.get_collection(_config.collection_name, **ef_kwargs) _pin_hnsw_threads(raw) - _collection_cache = ChromaCollection(raw) + _collection_cache = ChromaCollection(raw, palace_path=_config.palace_path) _metadata_cache = None _metadata_cache_time = 0 return _collection_cache diff --git a/mempalace/palace.py b/mempalace/palace.py index 07efb6a..97f67ff 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -8,6 +8,7 @@ import contextlib import hashlib import os import re +import threading from .backends.chroma import ChromaBackend @@ -314,6 +315,47 @@ class MineAlreadyRunning(RuntimeError): """Raised when another `mempalace mine` already holds the per-palace lock.""" +# Per-thread record of palaces this thread already holds the lock for. Used by +# `mine_palace_lock` to short-circuit re-entrant acquisition from the same +# thread (e.g. miner.mine() acquires the outer lock then calls +# ChromaCollection.upsert which now also tries to acquire). Without this guard +# the inner call would block on its own outer flock (Linux fcntl locks are per +# open file description, so a same-thread second open of the lock file is a +# distinct lock and self-deadlocks). +# +# The holder set is tagged with ``pid`` so that a forked child does NOT +# inherit re-entrant credit from its parent: the OS-level flock IS NOT +# inherited as a "we hold it" semantically — the child must reacquire — but +# Python's ``threading.local`` IS inherited across fork. The pid check +# clears stale state so a forked child correctly hits the fcntl path. +_palace_lock_holders = threading.local() + + +def _holder_state(): + """Return the per-thread (pid, keys) record, refreshing after fork.""" + keys = getattr(_palace_lock_holders, "keys", None) + pid = getattr(_palace_lock_holders, "pid", None) + current_pid = os.getpid() + if keys is None or pid != current_pid: + keys = set() + _palace_lock_holders.keys = keys + _palace_lock_holders.pid = current_pid + return keys + + +def _held_by_this_thread(lock_key: str) -> bool: + """Return True if this thread already holds ``mine_palace_lock`` for ``lock_key``.""" + return lock_key in _holder_state() + + +def _mark_held(lock_key: str) -> None: + _holder_state().add(lock_key) + + +def _mark_released(lock_key: str) -> None: + _holder_state().discard(lock_key) + + @contextlib.contextmanager def mine_palace_lock(palace_path: str): """Per-palace non-blocking lock around the full `mine` pipeline. @@ -338,6 +380,12 @@ def mine_palace_lock(palace_path: str): Non-blocking: if another `mine` is already writing to this palace, raise MineAlreadyRunning so the caller can exit cleanly instead of piling up as a waiting worker. + + Re-entrant: if the current thread already holds the lock for the same + palace, the context manager passes through without re-acquiring. This + lets ChromaCollection write methods (which acquire the lock themselves + to protect MCP/direct callers) compose with miner.mine() (which holds + the outer lock for the entire mine pipeline) without self-deadlock. """ lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks") os.makedirs(lock_dir, exist_ok=True) @@ -346,6 +394,11 @@ def mine_palace_lock(palace_path: str): palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16] lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock") + if _held_by_this_thread(palace_key): + # Same thread already holds the lock for this palace — pass through. + yield + return + lf = open(lock_path, "w") acquired = False try: @@ -369,7 +422,11 @@ def mine_palace_lock(palace_path: str): raise MineAlreadyRunning( f"another `mempalace mine` is already running against {resolved}" ) from exc - yield + _mark_held(palace_key) + try: + yield + finally: + _mark_released(palace_key) finally: if acquired: try: diff --git a/tests/test_chroma_collection_lock.py b/tests/test_chroma_collection_lock.py new file mode 100644 index 0000000..536b5e8 --- /dev/null +++ b/tests/test_chroma_collection_lock.py @@ -0,0 +1,321 @@ +"""Tests for ChromaCollection's palace-write-lock integration. + +Closes the gap left by ``mine_palace_lock`` only protecting the +``mempalace mine`` pipeline: MCP/direct writers that call +``ChromaCollection.add/upsert/update/delete`` must also serialize against +mine and against each other to avoid the multi-threaded HNSW corruption +documented in #974/#965. + +Property tested: + +* ``ChromaCollection(c, palace_path=p)`` wraps every write with + ``mine_palace_lock(p)``. +* Writes raise ``MineAlreadyRunning`` when another holder owns the lock + (instead of silently racing into the underlying chromadb call). +* Re-entrant composition with ``miner.mine()`` does not self-deadlock: + ``with mine_palace_lock(p): col.upsert(...)`` runs to completion. +* ``ChromaCollection(c)`` (no palace_path) preserves legacy no-lock + behaviour for tests/callers that build the adapter directly without + going through ``ChromaBackend``. + +POSIX-only: ``mine_palace_lock`` uses ``fcntl`` on Unix and ``msvcrt`` on +Windows; the contention semantics differ enough that the cross-process +tests are skipped on Windows runners. +""" + +from __future__ import annotations + +import multiprocessing +import os +import time + +import pytest + +from mempalace.backends.chroma import ChromaCollection +from mempalace.palace import MineAlreadyRunning, mine_palace_lock + + +def _get_mp_context(): + """Same start-method picker as test_palace_locks.py.""" + start_method = "spawn" if os.name == "nt" else "fork" + return multiprocessing.get_context(start_method) + + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +class _FakeChromaCollection: + """Records calls; never blocks. Stand-in for chromadb.Collection.""" + + def __init__(self): + self.adds: list[dict] = [] + self.upserts: list[dict] = [] + self.updates: list[dict] = [] + self.deletes: list[dict] = [] + + def add(self, **kwargs): + self.adds.append(kwargs) + + def upsert(self, **kwargs): + self.upserts.append(kwargs) + + def update(self, **kwargs): + self.updates.append(kwargs) + + def delete(self, **kwargs): + self.deletes.append(kwargs) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int: + """Acquire ``mine_palace_lock``, signal readiness, wait for release. + + Mirrors the helper in ``test_palace_locks.py`` so the contention + semantics match across both test files. + """ + try: + with mine_palace_lock(palace_path): + open(ready_flag, "w").close() + for _ in range(500): + if os.path.exists(release_flag): + return 0 + time.sleep(0.01) + return 0 + except MineAlreadyRunning: + return 1 + + +# --------------------------------------------------------------------------- +# Tests — opt-in lock wiring +# --------------------------------------------------------------------------- + + +def test_palace_path_none_skips_lock(tmp_path, monkeypatch): + """Legacy callers (``ChromaCollection(c)``) keep no-lock behaviour. + + A ``ChromaCollection`` built without ``palace_path`` must not touch the + lock infrastructure at all. This guards against regressions where a + test or third-party caller relies on the historical bare-write path. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + fake = _FakeChromaCollection() + col = ChromaCollection(fake) # no palace_path -> no lock + + # Hold the lock in a child process. Without palace_path, the parent + # write must still succeed (the lock does not gate this caller). + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock" + + col.upsert(documents=["doc"], ids=["id-1"]) + assert fake.upserts == [{"documents": ["doc"], "ids": ["id-1"]}] + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_writer_blocks_during_mine(tmp_path, monkeypatch): + """A held ``mine_palace_lock`` causes ``ChromaCollection`` writes to raise. + + This is the property that closes the MCP-bypass gap: when a mine is in + flight, MCP/direct writes raise ``MineAlreadyRunning`` rather than + silently entering chromadb's write path concurrent with mine. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock" + + fake = _FakeChromaCollection() + col = ChromaCollection(fake, palace_path=palace) + + with pytest.raises(MineAlreadyRunning): + col.upsert(documents=["doc"], ids=["id-1"]) + with pytest.raises(MineAlreadyRunning): + col.add(documents=["doc"], ids=["id-2"]) + with pytest.raises(MineAlreadyRunning): + col.update(ids=["id-3"], documents=["doc"]) + with pytest.raises(MineAlreadyRunning): + col.delete(ids=["id-4"]) + + # The fake must have received NO calls — the lock must gate + # before reaching the underlying chromadb layer. + assert fake.upserts == [] + assert fake.adds == [] + assert fake.updates == [] + assert fake.deletes == [] + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_reentrant_inside_mine_passes_through(tmp_path, monkeypatch): + """``ChromaCollection.upsert`` inside ``mine_palace_lock`` does not deadlock. + + ``miner.mine()`` already holds ``mine_palace_lock(palace_path)`` for the + full mine pipeline; ``_mine_body`` then calls + ``collection.upsert(...)``. With the per-thread re-entrant guard in + ``mine_palace_lock``, the inner acquire is a pass-through and the + underlying chromadb call runs immediately. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + fake = _FakeChromaCollection() + col = ChromaCollection(fake, palace_path=palace) + + with mine_palace_lock(palace): + # If the re-entrant guard were missing, this would self-deadlock on + # the underlying flock. We rely on pytest-timeout (configured in + # pyproject.toml) to enforce this in CI; the assertion just confirms + # the call landed. + col.upsert(documents=["d"], ids=["i"], metadatas=[{"k": "v"}]) + col.add(documents=["d2"], ids=["i2"]) + col.update(ids=["i"], documents=["d-updated"]) + col.delete(ids=["i2"]) + + assert len(fake.upserts) == 1 + assert len(fake.adds) == 1 + assert len(fake.updates) == 1 + assert len(fake.deletes) == 1 + + +class _SlowFakeChromaCollection(_FakeChromaCollection): + """Fake whose write methods hold the caller for ``hold_seconds``. + + Used to keep ``mine_palace_lock`` acquired long enough for a sibling + process to contend deterministically. + """ + + def __init__(self, hold_seconds: float = 0.3): + super().__init__() + self._hold = hold_seconds + + def upsert(self, **kwargs): + time.sleep(self._hold) + super().upsert(**kwargs) + + +def _slow_writer_target(palace_path, tmp_path_str, pid, result_q): + """Subprocess target: try a slow upsert, report ok/busy.""" + os.environ["HOME"] = tmp_path_str + # Fresh import inside child so HOME monkeypatch routes the lock dir. + from mempalace.backends.chroma import ChromaCollection as _CC + from mempalace.palace import MineAlreadyRunning as _MAR + + fake = _SlowFakeChromaCollection(hold_seconds=0.3) + col = _CC(fake, palace_path=palace_path) + try: + col.upsert(documents=[f"d{pid}"], ids=[f"i{pid}"]) + result_q.put(("ok", pid)) + except _MAR: + result_q.put(("busy", pid)) + + +def test_concurrent_writers_serialize(tmp_path, monkeypatch): + """Two processes calling ``ChromaCollection.upsert`` against the same + palace must be serialized: at most one enters chromadb at a time, the + other raises ``MineAlreadyRunning``. + + This is the property that prevents the parallel HNSW insert race that + drives #974/#965 — under concurrent MCP write fan-out, exactly one + writer reaches chromadb and the rest fail loudly instead of corrupting + the index. + + The slow fake holds the lock for 0.3s per writer, large enough for the + second process to contend even on slow CI runners. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + + ctx = _get_mp_context() + result_q = ctx.Queue() + + p1 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q)) + p2 = ctx.Process(target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q)) + p1.start() + # Tiny stagger so p1 wins the race deterministically; without it the + # OS scheduler can pick either, which is also a valid outcome but + # makes the assertion brittle on slow CI. + time.sleep(0.05) + p2.start() + p1.join(timeout=5) + p2.join(timeout=5) + + outcomes = [result_q.get(timeout=1) for _ in range(2)] + statuses = sorted(o[0] for o in outcomes) + assert statuses == ["busy", "ok"], f"expected one ok + one busy, got {outcomes}" + + +def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch): + """``query`` / ``get`` / ``count`` must not be gated by the write lock. + + Read traffic is the dominant workload (semantic search, MCP get, etc.) + and serializing it against mine would tank latency for no correctness + benefit. This test pins that property: with another process holding + the write lock, reads must still complete instantly. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(palace, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock" + + # _FakeChromaCollection doesn't implement query/get/count; we only + # need to confirm the wrapper does not call into mine_palace_lock + # for reads, which we assert by observing the wrapped methods are + # NOT in ChromaCollection's _write_lock path. A direct check via + # source inspection is more honest than mocking the entire chroma + # surface here. + import inspect + + from mempalace.backends.chroma import ChromaCollection as _CC + + for write_attr in ("add", "upsert", "update", "delete"): + src = inspect.getsource(getattr(_CC, write_attr)) + assert "_write_lock" in src, f"{write_attr} should acquire write lock" + + for read_attr in ("query", "get", "count"): + method = getattr(_CC, read_attr, None) + if method is None: + continue + src = inspect.getsource(method) + assert ( + "_write_lock" not in src + ), f"{read_attr} must NOT acquire the write lock (read path)" + finally: + open(release, "w").close() + holder.join(timeout=5) diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index 601c894..d239757 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -135,19 +135,77 @@ def test_different_palaces_dont_conflict(tmp_path, monkeypatch): def test_palace_path_is_normalized(tmp_path, monkeypatch): - """Relative and absolute forms of the same path must use the same lock.""" + """Relative and absolute forms of the same path must use the same lock. + + Cross-process variant: a child holds the absolute form, a relative form + in the parent must hash to the same lock key and raise + ``MineAlreadyRunning``. (The same-thread case is now a re-entrant + pass-through by design — see ``test_reentrant_same_thread_passes_through`` + — so we exercise the normalization invariant across a process boundary + where re-entrance does not apply.) + """ monkeypatch.setenv("HOME", str(tmp_path)) monkeypatch.chdir(tmp_path) os.makedirs(tmp_path / "palace", exist_ok=True) absolute = str(tmp_path / "palace") - relative = "palace" + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") - # Hold the lock with the absolute form; attempting to re-acquire with - # the relative form (which resolves to the same absolute path) must fail. - with mine_palace_lock(absolute): + ctx = _get_mp_context() + holder = ctx.Process(target=_hold_lock, args=(absolute, ready, release)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + + # Parent holds CWD = tmp_path so "palace" is the same on-disk dir as + # the absolute form. The lock key is sha256(realpath+normcase) so the + # two forms must collide. with pytest.raises(MineAlreadyRunning): - with mine_palace_lock(relative): + with mine_palace_lock("palace"): pytest.fail("normalized path collision should have raised") + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_reentrant_same_thread_passes_through(tmp_path, monkeypatch): + """Same thread re-acquiring the same palace lock must not deadlock or raise. + + This is the invariant that makes ``ChromaCollection`` write methods (which + take ``mine_palace_lock`` for MCP/direct-writer protection) compose with + ``miner.mine()`` (which already holds the lock for the entire mine + pipeline). Without the per-thread re-entrant guard the inner acquire + would self-deadlock on the outer flock. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + with mine_palace_lock(palace): + # Re-enter from the same thread — must yield without raising or hanging. + with mine_palace_lock(palace): + pass + # After the inner exits, the outer is still held: confirm via a + # subprocess that tries to acquire and reports back. + ctx = _get_mp_context() + result_q = ctx.Queue() + child = ctx.Process(target=_try_acquire_expect_busy, args=(palace, result_q)) + child.start() + child.join(timeout=5) + assert ( + result_q.get(timeout=1) == "busy" + ), "outer lock should still be held by parent after inner re-entrant exit" + + +def _try_acquire_expect_busy(palace_path, result_q): + """Helper: try to acquire, push 'busy' (raised) or 'free' (acquired) into queue.""" + try: + with mine_palace_lock(palace_path): + result_q.put("free") + except MineAlreadyRunning: + result_q.put("busy") def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):