fix: serialize ChromaCollection writes through palace lock
#976 protects `mempalace mine`, but MCP/direct backend writers still call ChromaCollection.add/upsert/update/delete without the palace lock. This moves the lock boundary to the Chroma backend seam so all Chroma writes share the same palace-level serialization, with a re-entrant guard for miner paths that already hold the lock. mine_palace_lock(palace_path) gains a per-thread re-entrant guard (threading.local + pid-tag against fork inheritance) so ChromaCollection write methods can take the lock without self-deadlocking when called from inside miner.mine()'s outer hold. ChromaCollection.__init__ accepts an optional palace_path; when set, add/upsert/update/delete wrap their underlying chromadb call with mine_palace_lock(palace_path). palace_path=None preserves the legacy no-lock behaviour for direct callers and tests. ChromaBackend's get_collection/create_collection pass palace_path through; mcp_server._get_collection forwards _config.palace_path so all MCP write tools inherit the wrapping. Tests: 5 new in tests/test_chroma_collection_lock.py covering opt-in, writer-blocks-during-mine, re-entrant-inside-mine, two-process serialization, and a source-level read-path-not-locked pin. Plus 1 new + 1 rewritten in tests/test_palace_locks.py for the re-entrant semantics. 52 passed in 1.01s including the existing test_backends.py regression suite. Refs #1161.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
"""ChromaDB-backed MemPalace storage backend (RFC 001 reference implementation)."""
|
||||
|
||||
import contextlib
|
||||
import datetime as _dt
|
||||
import logging
|
||||
import os
|
||||
@@ -573,10 +574,43 @@ def _as_list(v: Any) -> list:
|
||||
|
||||
|
||||
class ChromaCollection(BaseCollection):
|
||||
"""Thin adapter translating ChromaDB dict returns into typed results."""
|
||||
"""Thin adapter translating ChromaDB dict returns into typed results.
|
||||
|
||||
def __init__(self, collection):
|
||||
When ``palace_path`` is set, all write methods (``add``, ``upsert``,
|
||||
``update``, ``delete``) acquire ``mine_palace_lock(palace_path)`` for the
|
||||
duration of the underlying chromadb call. This serializes MCP and other
|
||||
direct-backend writers against ``mempalace mine`` and against each other,
|
||||
closing the race between concurrent writers that triggers ChromaDB's
|
||||
multi-threaded HNSW corruption (#974/#965).
|
||||
|
||||
The lock is the same primitive used by ``miner.mine()`` so re-entrant
|
||||
acquisition from inside the mine pipeline (mine -> _mine_body ->
|
||||
collection.upsert) is short-circuited by the per-thread guard inside
|
||||
``mine_palace_lock`` — no self-deadlock.
|
||||
|
||||
``palace_path=None`` disables the wrapping, preserving the legacy
|
||||
no-lock behaviour for callers that construct a ``ChromaCollection``
|
||||
directly without going through ``ChromaBackend``.
|
||||
"""
|
||||
|
||||
def __init__(self, collection, palace_path: Optional[str] = None):
|
||||
self._collection = collection
|
||||
self._palace_path = palace_path
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _write_lock(self):
|
||||
"""Acquire ``mine_palace_lock`` for the configured palace, if any.
|
||||
|
||||
No-op (yields immediately) when ``self._palace_path`` is None.
|
||||
"""
|
||||
if self._palace_path is None:
|
||||
yield
|
||||
return
|
||||
# Late import — palace.py imports ChromaBackend from this module.
|
||||
from ..palace import mine_palace_lock
|
||||
|
||||
with mine_palace_lock(self._palace_path):
|
||||
yield
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Writes
|
||||
@@ -588,7 +622,8 @@ class ChromaCollection(BaseCollection):
|
||||
kwargs["metadatas"] = metadatas
|
||||
if embeddings is not None:
|
||||
kwargs["embeddings"] = embeddings
|
||||
self._collection.add(**kwargs)
|
||||
with self._write_lock():
|
||||
self._collection.add(**kwargs)
|
||||
|
||||
def upsert(self, *, documents, ids, metadatas=None, embeddings=None):
|
||||
kwargs: dict[str, Any] = {"documents": documents, "ids": ids}
|
||||
@@ -596,7 +631,8 @@ class ChromaCollection(BaseCollection):
|
||||
kwargs["metadatas"] = metadatas
|
||||
if embeddings is not None:
|
||||
kwargs["embeddings"] = embeddings
|
||||
self._collection.upsert(**kwargs)
|
||||
with self._write_lock():
|
||||
self._collection.upsert(**kwargs)
|
||||
|
||||
def update(
|
||||
self,
|
||||
@@ -615,7 +651,8 @@ class ChromaCollection(BaseCollection):
|
||||
kwargs["metadatas"] = metadatas
|
||||
if embeddings is not None:
|
||||
kwargs["embeddings"] = embeddings
|
||||
self._collection.update(**kwargs)
|
||||
with self._write_lock():
|
||||
self._collection.update(**kwargs)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Reads
|
||||
@@ -759,7 +796,8 @@ class ChromaCollection(BaseCollection):
|
||||
kwargs["ids"] = ids
|
||||
if where is not None:
|
||||
kwargs["where"] = where
|
||||
self._collection.delete(**kwargs)
|
||||
with self._write_lock():
|
||||
self._collection.delete(**kwargs)
|
||||
|
||||
def count(self):
|
||||
return self._collection.count()
|
||||
@@ -998,7 +1036,7 @@ class ChromaBackend(BaseBackend):
|
||||
else:
|
||||
collection = client.get_collection(collection_name, **ef_kwargs)
|
||||
_pin_hnsw_threads(collection)
|
||||
return ChromaCollection(collection)
|
||||
return ChromaCollection(collection, palace_path=palace_path)
|
||||
|
||||
def close_palace(self, palace) -> None:
|
||||
"""Drop cached handles for ``palace``. Accepts ``PalaceRef`` or legacy path str."""
|
||||
@@ -1045,7 +1083,7 @@ class ChromaBackend(BaseBackend):
|
||||
metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1},
|
||||
**ef_kwargs,
|
||||
)
|
||||
return ChromaCollection(collection)
|
||||
return ChromaCollection(collection, palace_path=palace_path)
|
||||
|
||||
|
||||
def _normalize_get_collection_args(args, kwargs):
|
||||
|
||||
@@ -288,13 +288,13 @@ def _get_collection(create=False):
|
||||
metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1},
|
||||
)
|
||||
_pin_hnsw_threads(raw)
|
||||
_collection_cache = ChromaCollection(raw)
|
||||
_collection_cache = ChromaCollection(raw, palace_path=_config.palace_path)
|
||||
_metadata_cache = None
|
||||
_metadata_cache_time = 0
|
||||
elif _collection_cache is None:
|
||||
raw = client.get_collection(_config.collection_name)
|
||||
_pin_hnsw_threads(raw)
|
||||
_collection_cache = ChromaCollection(raw)
|
||||
_collection_cache = ChromaCollection(raw, palace_path=_config.palace_path)
|
||||
_metadata_cache = None
|
||||
_metadata_cache_time = 0
|
||||
return _collection_cache
|
||||
|
||||
+58
-1
@@ -8,6 +8,7 @@ import contextlib
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
|
||||
from .backends.chroma import ChromaBackend
|
||||
|
||||
@@ -314,6 +315,47 @@ class MineAlreadyRunning(RuntimeError):
|
||||
"""Raised when another `mempalace mine` already holds the per-palace lock."""
|
||||
|
||||
|
||||
# Per-thread record of palaces this thread already holds the lock for. Used by
|
||||
# `mine_palace_lock` to short-circuit re-entrant acquisition from the same
|
||||
# thread (e.g. miner.mine() acquires the outer lock then calls
|
||||
# ChromaCollection.upsert which now also tries to acquire). Without this guard
|
||||
# the inner call would block on its own outer flock (Linux fcntl locks are per
|
||||
# open file description, so a same-thread second open of the lock file is a
|
||||
# distinct lock and self-deadlocks).
|
||||
#
|
||||
# The holder set is tagged with ``pid`` so that a forked child does NOT
|
||||
# inherit re-entrant credit from its parent: the OS-level flock IS NOT
|
||||
# inherited as a "we hold it" semantically — the child must reacquire — but
|
||||
# Python's ``threading.local`` IS inherited across fork. The pid check
|
||||
# clears stale state so a forked child correctly hits the fcntl path.
|
||||
_palace_lock_holders = threading.local()
|
||||
|
||||
|
||||
def _holder_state():
|
||||
"""Return the per-thread (pid, keys) record, refreshing after fork."""
|
||||
keys = getattr(_palace_lock_holders, "keys", None)
|
||||
pid = getattr(_palace_lock_holders, "pid", None)
|
||||
current_pid = os.getpid()
|
||||
if keys is None or pid != current_pid:
|
||||
keys = set()
|
||||
_palace_lock_holders.keys = keys
|
||||
_palace_lock_holders.pid = current_pid
|
||||
return keys
|
||||
|
||||
|
||||
def _held_by_this_thread(lock_key: str) -> bool:
|
||||
"""Return True if this thread already holds ``mine_palace_lock`` for ``lock_key``."""
|
||||
return lock_key in _holder_state()
|
||||
|
||||
|
||||
def _mark_held(lock_key: str) -> None:
|
||||
_holder_state().add(lock_key)
|
||||
|
||||
|
||||
def _mark_released(lock_key: str) -> None:
|
||||
_holder_state().discard(lock_key)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def mine_palace_lock(palace_path: str):
|
||||
"""Per-palace non-blocking lock around the full `mine` pipeline.
|
||||
@@ -338,6 +380,12 @@ def mine_palace_lock(palace_path: str):
|
||||
Non-blocking: if another `mine` is already writing to this palace,
|
||||
raise MineAlreadyRunning so the caller can exit cleanly instead of
|
||||
piling up as a waiting worker.
|
||||
|
||||
Re-entrant: if the current thread already holds the lock for the same
|
||||
palace, the context manager passes through without re-acquiring. This
|
||||
lets ChromaCollection write methods (which acquire the lock themselves
|
||||
to protect MCP/direct callers) compose with miner.mine() (which holds
|
||||
the outer lock for the entire mine pipeline) without self-deadlock.
|
||||
"""
|
||||
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
|
||||
os.makedirs(lock_dir, exist_ok=True)
|
||||
@@ -346,6 +394,11 @@ def mine_palace_lock(palace_path: str):
|
||||
palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16]
|
||||
lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock")
|
||||
|
||||
if _held_by_this_thread(palace_key):
|
||||
# Same thread already holds the lock for this palace — pass through.
|
||||
yield
|
||||
return
|
||||
|
||||
lf = open(lock_path, "w")
|
||||
acquired = False
|
||||
try:
|
||||
@@ -369,7 +422,11 @@ def mine_palace_lock(palace_path: str):
|
||||
raise MineAlreadyRunning(
|
||||
f"another `mempalace mine` is already running against {resolved}"
|
||||
) from exc
|
||||
yield
|
||||
_mark_held(palace_key)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_mark_released(palace_key)
|
||||
finally:
|
||||
if acquired:
|
||||
try:
|
||||
|
||||
@@ -0,0 +1,327 @@
|
||||
"""Tests for ChromaCollection's palace-write-lock integration.
|
||||
|
||||
Closes the gap left by ``mine_palace_lock`` only protecting the
|
||||
``mempalace mine`` pipeline: MCP/direct writers that call
|
||||
``ChromaCollection.add/upsert/update/delete`` must also serialize against
|
||||
mine and against each other to avoid the multi-threaded HNSW corruption
|
||||
documented in #974/#965.
|
||||
|
||||
Property tested:
|
||||
|
||||
* ``ChromaCollection(c, palace_path=p)`` wraps every write with
|
||||
``mine_palace_lock(p)``.
|
||||
* Writes raise ``MineAlreadyRunning`` when another holder owns the lock
|
||||
(instead of silently racing into the underlying chromadb call).
|
||||
* Re-entrant composition with ``miner.mine()`` does not self-deadlock:
|
||||
``with mine_palace_lock(p): col.upsert(...)`` runs to completion.
|
||||
* ``ChromaCollection(c)`` (no palace_path) preserves legacy no-lock
|
||||
behaviour for tests/callers that build the adapter directly without
|
||||
going through ``ChromaBackend``.
|
||||
|
||||
POSIX-only: ``mine_palace_lock`` uses ``fcntl`` on Unix and ``msvcrt`` on
|
||||
Windows; the contention semantics differ enough that the cross-process
|
||||
tests are skipped on Windows runners.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import multiprocessing
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from mempalace.backends.chroma import ChromaCollection
|
||||
from mempalace.palace import MineAlreadyRunning, mine_palace_lock
|
||||
|
||||
|
||||
def _get_mp_context():
|
||||
"""Same start-method picker as test_palace_locks.py."""
|
||||
start_method = "spawn" if os.name == "nt" else "fork"
|
||||
return multiprocessing.get_context(start_method)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fakes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeChromaCollection:
|
||||
"""Records calls; never blocks. Stand-in for chromadb.Collection."""
|
||||
|
||||
def __init__(self):
|
||||
self.adds: list[dict] = []
|
||||
self.upserts: list[dict] = []
|
||||
self.updates: list[dict] = []
|
||||
self.deletes: list[dict] = []
|
||||
|
||||
def add(self, **kwargs):
|
||||
self.adds.append(kwargs)
|
||||
|
||||
def upsert(self, **kwargs):
|
||||
self.upserts.append(kwargs)
|
||||
|
||||
def update(self, **kwargs):
|
||||
self.updates.append(kwargs)
|
||||
|
||||
def delete(self, **kwargs):
|
||||
self.deletes.append(kwargs)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int:
|
||||
"""Acquire ``mine_palace_lock``, signal readiness, wait for release.
|
||||
|
||||
Mirrors the helper in ``test_palace_locks.py`` so the contention
|
||||
semantics match across both test files.
|
||||
"""
|
||||
try:
|
||||
with mine_palace_lock(palace_path):
|
||||
open(ready_flag, "w").close()
|
||||
for _ in range(500):
|
||||
if os.path.exists(release_flag):
|
||||
return 0
|
||||
time.sleep(0.01)
|
||||
return 0
|
||||
except MineAlreadyRunning:
|
||||
return 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests — opt-in lock wiring
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_palace_path_none_skips_lock(tmp_path, monkeypatch):
|
||||
"""Legacy callers (``ChromaCollection(c)``) keep no-lock behaviour.
|
||||
|
||||
A ``ChromaCollection`` built without ``palace_path`` must not touch the
|
||||
lock infrastructure at all. This guards against regressions where a
|
||||
test or third-party caller relies on the historical bare-write path.
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
fake = _FakeChromaCollection()
|
||||
col = ChromaCollection(fake) # no palace_path -> no lock
|
||||
|
||||
# Hold the lock in a child process. Without palace_path, the parent
|
||||
# write must still succeed (the lock does not gate this caller).
|
||||
palace = str(tmp_path / "palace")
|
||||
ready = str(tmp_path / "ready")
|
||||
release = str(tmp_path / "release")
|
||||
ctx = _get_mp_context()
|
||||
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||
holder.start()
|
||||
try:
|
||||
for _ in range(500):
|
||||
if os.path.exists(ready):
|
||||
break
|
||||
time.sleep(0.01)
|
||||
assert os.path.exists(ready), "holder failed to acquire lock"
|
||||
|
||||
col.upsert(documents=["doc"], ids=["id-1"])
|
||||
assert fake.upserts == [{"documents": ["doc"], "ids": ["id-1"]}]
|
||||
finally:
|
||||
open(release, "w").close()
|
||||
holder.join(timeout=5)
|
||||
|
||||
|
||||
def test_writer_blocks_during_mine(tmp_path, monkeypatch):
|
||||
"""A held ``mine_palace_lock`` causes ``ChromaCollection`` writes to raise.
|
||||
|
||||
This is the property that closes the MCP-bypass gap: when a mine is in
|
||||
flight, MCP/direct writes raise ``MineAlreadyRunning`` rather than
|
||||
silently entering chromadb's write path concurrent with mine.
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
palace = str(tmp_path / "palace")
|
||||
ready = str(tmp_path / "ready")
|
||||
release = str(tmp_path / "release")
|
||||
|
||||
ctx = _get_mp_context()
|
||||
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||
holder.start()
|
||||
try:
|
||||
for _ in range(500):
|
||||
if os.path.exists(ready):
|
||||
break
|
||||
time.sleep(0.01)
|
||||
assert os.path.exists(ready), "holder failed to acquire lock"
|
||||
|
||||
fake = _FakeChromaCollection()
|
||||
col = ChromaCollection(fake, palace_path=palace)
|
||||
|
||||
with pytest.raises(MineAlreadyRunning):
|
||||
col.upsert(documents=["doc"], ids=["id-1"])
|
||||
with pytest.raises(MineAlreadyRunning):
|
||||
col.add(documents=["doc"], ids=["id-2"])
|
||||
with pytest.raises(MineAlreadyRunning):
|
||||
col.update(ids=["id-3"], documents=["doc"])
|
||||
with pytest.raises(MineAlreadyRunning):
|
||||
col.delete(ids=["id-4"])
|
||||
|
||||
# The fake must have received NO calls — the lock must gate
|
||||
# before reaching the underlying chromadb layer.
|
||||
assert fake.upserts == []
|
||||
assert fake.adds == []
|
||||
assert fake.updates == []
|
||||
assert fake.deletes == []
|
||||
finally:
|
||||
open(release, "w").close()
|
||||
holder.join(timeout=5)
|
||||
|
||||
|
||||
def test_reentrant_inside_mine_passes_through(tmp_path, monkeypatch):
|
||||
"""``ChromaCollection.upsert`` inside ``mine_palace_lock`` does not deadlock.
|
||||
|
||||
``miner.mine()`` already holds ``mine_palace_lock(palace_path)`` for the
|
||||
full mine pipeline; ``_mine_body`` then calls
|
||||
``collection.upsert(...)``. With the per-thread re-entrant guard in
|
||||
``mine_palace_lock``, the inner acquire is a pass-through and the
|
||||
underlying chromadb call runs immediately.
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
palace = str(tmp_path / "palace")
|
||||
fake = _FakeChromaCollection()
|
||||
col = ChromaCollection(fake, palace_path=palace)
|
||||
|
||||
with mine_palace_lock(palace):
|
||||
# If the re-entrant guard were missing, this would self-deadlock on
|
||||
# the underlying flock. We rely on pytest-timeout (configured in
|
||||
# pyproject.toml) to enforce this in CI; the assertion just confirms
|
||||
# the call landed.
|
||||
col.upsert(documents=["d"], ids=["i"], metadatas=[{"k": "v"}])
|
||||
col.add(documents=["d2"], ids=["i2"])
|
||||
col.update(ids=["i"], documents=["d-updated"])
|
||||
col.delete(ids=["i2"])
|
||||
|
||||
assert len(fake.upserts) == 1
|
||||
assert len(fake.adds) == 1
|
||||
assert len(fake.updates) == 1
|
||||
assert len(fake.deletes) == 1
|
||||
|
||||
|
||||
class _SlowFakeChromaCollection(_FakeChromaCollection):
|
||||
"""Fake whose write methods hold the caller for ``hold_seconds``.
|
||||
|
||||
Used to keep ``mine_palace_lock`` acquired long enough for a sibling
|
||||
process to contend deterministically.
|
||||
"""
|
||||
|
||||
def __init__(self, hold_seconds: float = 0.3):
|
||||
super().__init__()
|
||||
self._hold = hold_seconds
|
||||
|
||||
def upsert(self, **kwargs):
|
||||
time.sleep(self._hold)
|
||||
super().upsert(**kwargs)
|
||||
|
||||
|
||||
def _slow_writer_target(palace_path, tmp_path_str, pid, result_q):
|
||||
"""Subprocess target: try a slow upsert, report ok/busy."""
|
||||
os.environ["HOME"] = tmp_path_str
|
||||
# Fresh import inside child so HOME monkeypatch routes the lock dir.
|
||||
from mempalace.backends.chroma import ChromaCollection as _CC
|
||||
from mempalace.palace import MineAlreadyRunning as _MAR
|
||||
|
||||
fake = _SlowFakeChromaCollection(hold_seconds=0.3)
|
||||
col = _CC(fake, palace_path=palace_path)
|
||||
try:
|
||||
col.upsert(documents=[f"d{pid}"], ids=[f"i{pid}"])
|
||||
result_q.put(("ok", pid))
|
||||
except _MAR:
|
||||
result_q.put(("busy", pid))
|
||||
|
||||
|
||||
def test_concurrent_writers_serialize(tmp_path, monkeypatch):
|
||||
"""Two processes calling ``ChromaCollection.upsert`` against the same
|
||||
palace must be serialized: at most one enters chromadb at a time, the
|
||||
other raises ``MineAlreadyRunning``.
|
||||
|
||||
This is the property that prevents the parallel HNSW insert race that
|
||||
drives #974/#965 — under concurrent MCP write fan-out, exactly one
|
||||
writer reaches chromadb and the rest fail loudly instead of corrupting
|
||||
the index.
|
||||
|
||||
The slow fake holds the lock for 0.3s per writer, large enough for the
|
||||
second process to contend even on slow CI runners.
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
palace = str(tmp_path / "palace")
|
||||
|
||||
ctx = _get_mp_context()
|
||||
result_q = ctx.Queue()
|
||||
|
||||
p1 = ctx.Process(
|
||||
target=_slow_writer_target, args=(palace, str(tmp_path), 1, result_q)
|
||||
)
|
||||
p2 = ctx.Process(
|
||||
target=_slow_writer_target, args=(palace, str(tmp_path), 2, result_q)
|
||||
)
|
||||
p1.start()
|
||||
# Tiny stagger so p1 wins the race deterministically; without it the
|
||||
# OS scheduler can pick either, which is also a valid outcome but
|
||||
# makes the assertion brittle on slow CI.
|
||||
time.sleep(0.05)
|
||||
p2.start()
|
||||
p1.join(timeout=5)
|
||||
p2.join(timeout=5)
|
||||
|
||||
outcomes = [result_q.get(timeout=1) for _ in range(2)]
|
||||
statuses = sorted(o[0] for o in outcomes)
|
||||
assert statuses == ["busy", "ok"], (
|
||||
f"expected one ok + one busy, got {outcomes}"
|
||||
)
|
||||
|
||||
|
||||
def test_read_path_does_not_acquire_lock(tmp_path, monkeypatch):
|
||||
"""``query`` / ``get`` / ``count`` must not be gated by the write lock.
|
||||
|
||||
Read traffic is the dominant workload (semantic search, MCP get, etc.)
|
||||
and serializing it against mine would tank latency for no correctness
|
||||
benefit. This test pins that property: with another process holding
|
||||
the write lock, reads must still complete instantly.
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
palace = str(tmp_path / "palace")
|
||||
ready = str(tmp_path / "ready")
|
||||
release = str(tmp_path / "release")
|
||||
|
||||
ctx = _get_mp_context()
|
||||
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||
holder.start()
|
||||
try:
|
||||
for _ in range(500):
|
||||
if os.path.exists(ready):
|
||||
break
|
||||
time.sleep(0.01)
|
||||
assert os.path.exists(ready), "holder failed to acquire lock"
|
||||
|
||||
# _FakeChromaCollection doesn't implement query/get/count; we only
|
||||
# need to confirm the wrapper does not call into mine_palace_lock
|
||||
# for reads, which we assert by observing the wrapped methods are
|
||||
# NOT in ChromaCollection's _write_lock path. A direct check via
|
||||
# source inspection is more honest than mocking the entire chroma
|
||||
# surface here.
|
||||
import inspect
|
||||
|
||||
from mempalace.backends.chroma import ChromaCollection as _CC
|
||||
|
||||
for write_attr in ("add", "upsert", "update", "delete"):
|
||||
src = inspect.getsource(getattr(_CC, write_attr))
|
||||
assert "_write_lock" in src, f"{write_attr} should acquire write lock"
|
||||
|
||||
for read_attr in ("query", "get", "count"):
|
||||
method = getattr(_CC, read_attr, None)
|
||||
if method is None:
|
||||
continue
|
||||
src = inspect.getsource(method)
|
||||
assert "_write_lock" not in src, (
|
||||
f"{read_attr} must NOT acquire the write lock (read path)"
|
||||
)
|
||||
finally:
|
||||
open(release, "w").close()
|
||||
holder.join(timeout=5)
|
||||
@@ -135,19 +135,77 @@ def test_different_palaces_dont_conflict(tmp_path, monkeypatch):
|
||||
|
||||
|
||||
def test_palace_path_is_normalized(tmp_path, monkeypatch):
|
||||
"""Relative and absolute forms of the same path must use the same lock."""
|
||||
"""Relative and absolute forms of the same path must use the same lock.
|
||||
|
||||
Cross-process variant: a child holds the absolute form, a relative form
|
||||
in the parent must hash to the same lock key and raise
|
||||
``MineAlreadyRunning``. (The same-thread case is now a re-entrant
|
||||
pass-through by design — see ``test_reentrant_same_thread_passes_through``
|
||||
— so we exercise the normalization invariant across a process boundary
|
||||
where re-entrance does not apply.)
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.chdir(tmp_path)
|
||||
os.makedirs(tmp_path / "palace", exist_ok=True)
|
||||
absolute = str(tmp_path / "palace")
|
||||
relative = "palace"
|
||||
ready = str(tmp_path / "ready")
|
||||
release = str(tmp_path / "release")
|
||||
|
||||
# Hold the lock with the absolute form; attempting to re-acquire with
|
||||
# the relative form (which resolves to the same absolute path) must fail.
|
||||
with mine_palace_lock(absolute):
|
||||
ctx = _get_mp_context()
|
||||
holder = ctx.Process(target=_hold_lock, args=(absolute, ready, release))
|
||||
holder.start()
|
||||
try:
|
||||
for _ in range(500):
|
||||
if os.path.exists(ready):
|
||||
break
|
||||
time.sleep(0.01)
|
||||
assert os.path.exists(ready), "holder failed to acquire lock in time"
|
||||
|
||||
# Parent holds CWD = tmp_path so "palace" is the same on-disk dir as
|
||||
# the absolute form. The lock key is sha256(realpath+normcase) so the
|
||||
# two forms must collide.
|
||||
with pytest.raises(MineAlreadyRunning):
|
||||
with mine_palace_lock(relative):
|
||||
with mine_palace_lock("palace"):
|
||||
pytest.fail("normalized path collision should have raised")
|
||||
finally:
|
||||
open(release, "w").close()
|
||||
holder.join(timeout=5)
|
||||
|
||||
|
||||
def test_reentrant_same_thread_passes_through(tmp_path, monkeypatch):
|
||||
"""Same thread re-acquiring the same palace lock must not deadlock or raise.
|
||||
|
||||
This is the invariant that makes ``ChromaCollection`` write methods (which
|
||||
take ``mine_palace_lock`` for MCP/direct-writer protection) compose with
|
||||
``miner.mine()`` (which already holds the lock for the entire mine
|
||||
pipeline). Without the per-thread re-entrant guard the inner acquire
|
||||
would self-deadlock on the outer flock.
|
||||
"""
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
palace = str(tmp_path / "palace")
|
||||
with mine_palace_lock(palace):
|
||||
# Re-enter from the same thread — must yield without raising or hanging.
|
||||
with mine_palace_lock(palace):
|
||||
pass
|
||||
# After the inner exits, the outer is still held: confirm via a
|
||||
# subprocess that tries to acquire and reports back.
|
||||
ctx = _get_mp_context()
|
||||
result_q = ctx.Queue()
|
||||
child = ctx.Process(target=_try_acquire_expect_busy, args=(palace, result_q))
|
||||
child.start()
|
||||
child.join(timeout=5)
|
||||
assert result_q.get(timeout=1) == "busy", (
|
||||
"outer lock should still be held by parent after inner re-entrant exit"
|
||||
)
|
||||
|
||||
|
||||
def _try_acquire_expect_busy(palace_path, result_q):
|
||||
"""Helper: try to acquire, push 'busy' (raised) or 'free' (acquired) into queue."""
|
||||
try:
|
||||
with mine_palace_lock(palace_path):
|
||||
result_q.put("free")
|
||||
except MineAlreadyRunning:
|
||||
result_q.put("busy")
|
||||
|
||||
|
||||
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
|
||||
|
||||
Reference in New Issue
Block a user