Merge pull request #976 from felipetruman/fix/hnsw-race-and-fanout
fix: HNSW graph corruption, PreCompact deadlock, mine fan-out (closes #974, #965, #955)
This commit is contained in:
@@ -130,6 +130,35 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 3600.0) -> li
|
|||||||
return moved
|
return moved
|
||||||
|
|
||||||
|
|
||||||
|
def _pin_hnsw_threads(collection) -> None:
|
||||||
|
"""Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection.
|
||||||
|
|
||||||
|
Fresh collections set this via ``metadata=`` at creation. Legacy palaces
|
||||||
|
built before that change keep the default (parallel insert) and can hit
|
||||||
|
the HNSW race described in #974/#965. ChromaDB's
|
||||||
|
``collection.modify(configuration=...)`` lets us re-apply ``num_threads=1``
|
||||||
|
in memory at load time so every new process is protected.
|
||||||
|
|
||||||
|
Note: in chromadb 1.5.x the modified ``configuration_json["hnsw"]`` does
|
||||||
|
not persist to disk across ``PersistentClient`` reopens, so this must
|
||||||
|
run on every ``get_collection`` call, not just once.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from chromadb.api.collection_configuration import (
|
||||||
|
UpdateCollectionConfiguration,
|
||||||
|
UpdateHNSWConfiguration,
|
||||||
|
)
|
||||||
|
except ImportError:
|
||||||
|
logger.debug("_pin_hnsw_threads skipped: chromadb too old", exc_info=True)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
collection.modify(
|
||||||
|
configuration=UpdateCollectionConfiguration(hnsw=UpdateHNSWConfiguration(num_threads=1))
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("_pin_hnsw_threads modify failed", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
def _fix_blob_seq_ids(palace_path: str) -> None:
|
def _fix_blob_seq_ids(palace_path: str) -> None:
|
||||||
"""Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER.
|
"""Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER.
|
||||||
|
|
||||||
@@ -566,10 +595,13 @@ class ChromaBackend(BaseBackend):
|
|||||||
|
|
||||||
if create:
|
if create:
|
||||||
collection = client.get_or_create_collection(
|
collection = client.get_or_create_collection(
|
||||||
collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs
|
collection_name,
|
||||||
|
metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1},
|
||||||
|
**ef_kwargs,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
collection = client.get_collection(collection_name, **ef_kwargs)
|
collection = client.get_collection(collection_name, **ef_kwargs)
|
||||||
|
_pin_hnsw_threads(collection)
|
||||||
return ChromaCollection(collection)
|
return ChromaCollection(collection)
|
||||||
|
|
||||||
def close_palace(self, palace) -> None:
|
def close_palace(self, palace) -> None:
|
||||||
@@ -613,7 +645,9 @@ class ChromaBackend(BaseBackend):
|
|||||||
ef = self._resolve_embedding_function()
|
ef = self._resolve_embedding_function()
|
||||||
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
|
ef_kwargs = {"embedding_function": ef} if ef is not None else {}
|
||||||
collection = self._client(palace_path).create_collection(
|
collection = self._client(palace_path).create_collection(
|
||||||
collection_name, metadata={"hnsw:space": hnsw_space}, **ef_kwargs
|
collection_name,
|
||||||
|
metadata={"hnsw:space": hnsw_space, "hnsw:num_threads": 1},
|
||||||
|
**ef_kwargs,
|
||||||
)
|
)
|
||||||
return ChromaCollection(collection)
|
return ChromaCollection(collection)
|
||||||
|
|
||||||
|
|||||||
+16
-6
@@ -57,7 +57,7 @@ from .config import ( # noqa: E402
|
|||||||
sanitize_content,
|
sanitize_content,
|
||||||
)
|
)
|
||||||
from .version import __version__ # noqa: E402
|
from .version import __version__ # noqa: E402
|
||||||
from .backends.chroma import ChromaBackend, ChromaCollection # noqa: E402
|
from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402
|
||||||
from .query_sanitizer import sanitize_query # noqa: E402
|
from .query_sanitizer import sanitize_query # noqa: E402
|
||||||
from .searcher import search_memories # noqa: E402
|
from .searcher import search_memories # noqa: E402
|
||||||
from .palace_graph import ( # noqa: E402
|
from .palace_graph import ( # noqa: E402
|
||||||
@@ -217,15 +217,25 @@ def _get_collection(create=False):
|
|||||||
try:
|
try:
|
||||||
client = _get_client()
|
client = _get_client()
|
||||||
if create:
|
if create:
|
||||||
_collection_cache = ChromaCollection(
|
# hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor
|
||||||
client.get_or_create_collection(
|
# HNSW insert path, which has a race in repairConnectionsForUpdate /
|
||||||
_config.collection_name, metadata={"hnsw:space": "cosine"}
|
# addPoint (see issues #974, #965). Set via metadata on fresh
|
||||||
)
|
# collections and re-applied via _pin_hnsw_threads() for legacy
|
||||||
|
# palaces whose collections were created before this fix (the
|
||||||
|
# runtime config does not persist cross-process in chromadb 1.5.x,
|
||||||
|
# so the retrofit runs every time _get_collection opens a cache).
|
||||||
|
raw = client.get_or_create_collection(
|
||||||
|
_config.collection_name,
|
||||||
|
metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1},
|
||||||
)
|
)
|
||||||
|
_pin_hnsw_threads(raw)
|
||||||
|
_collection_cache = ChromaCollection(raw)
|
||||||
_metadata_cache = None
|
_metadata_cache = None
|
||||||
_metadata_cache_time = 0
|
_metadata_cache_time = 0
|
||||||
elif _collection_cache is None:
|
elif _collection_cache is None:
|
||||||
_collection_cache = ChromaCollection(client.get_collection(_config.collection_name))
|
raw = client.get_collection(_config.collection_name)
|
||||||
|
_pin_hnsw_threads(raw)
|
||||||
|
_collection_cache = ChromaCollection(raw)
|
||||||
_metadata_cache = None
|
_metadata_cache = None
|
||||||
_metadata_cache_time = 0
|
_metadata_cache_time = 0
|
||||||
return _collection_cache
|
return _collection_cache
|
||||||
|
|||||||
@@ -20,11 +20,13 @@ from typing import Optional
|
|||||||
from .palace import (
|
from .palace import (
|
||||||
NORMALIZE_VERSION,
|
NORMALIZE_VERSION,
|
||||||
SKIP_DIRS,
|
SKIP_DIRS,
|
||||||
|
MineAlreadyRunning,
|
||||||
build_closet_lines,
|
build_closet_lines,
|
||||||
file_already_mined,
|
file_already_mined,
|
||||||
get_closets_collection,
|
get_closets_collection,
|
||||||
get_collection,
|
get_collection,
|
||||||
mine_lock,
|
mine_lock,
|
||||||
|
mine_palace_lock,
|
||||||
purge_file_closets,
|
purge_file_closets,
|
||||||
upsert_closet_lines,
|
upsert_closet_lines,
|
||||||
)
|
)
|
||||||
@@ -993,6 +995,52 @@ def mine(
|
|||||||
``mine`` walks the tree itself just like before.
|
``mine`` walks the tree itself just like before.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
return _mine_impl(
|
||||||
|
project_dir,
|
||||||
|
palace_path,
|
||||||
|
wing_override=wing_override,
|
||||||
|
agent=agent,
|
||||||
|
limit=limit,
|
||||||
|
dry_run=dry_run,
|
||||||
|
respect_gitignore=respect_gitignore,
|
||||||
|
include_ignored=include_ignored,
|
||||||
|
files=files,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with mine_palace_lock(palace_path):
|
||||||
|
return _mine_impl(
|
||||||
|
project_dir,
|
||||||
|
palace_path,
|
||||||
|
wing_override=wing_override,
|
||||||
|
agent=agent,
|
||||||
|
limit=limit,
|
||||||
|
dry_run=dry_run,
|
||||||
|
respect_gitignore=respect_gitignore,
|
||||||
|
include_ignored=include_ignored,
|
||||||
|
files=files,
|
||||||
|
)
|
||||||
|
except MineAlreadyRunning:
|
||||||
|
print(
|
||||||
|
f"mempalace: another `mine` is already running against "
|
||||||
|
f"{palace_path} — exiting cleanly.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def _mine_impl(
|
||||||
|
project_dir: str,
|
||||||
|
palace_path: str,
|
||||||
|
wing_override: str = None,
|
||||||
|
agent: str = "mempalace",
|
||||||
|
limit: int = 0,
|
||||||
|
dry_run: bool = False,
|
||||||
|
respect_gitignore: bool = True,
|
||||||
|
include_ignored: list = None,
|
||||||
|
files: list = None,
|
||||||
|
):
|
||||||
project_path = Path(project_dir).expanduser().resolve()
|
project_path = Path(project_dir).expanduser().resolve()
|
||||||
config = load_config(project_dir)
|
config = load_config(project_dir)
|
||||||
|
|
||||||
|
|||||||
@@ -310,6 +310,88 @@ def mine_lock(source_file: str):
|
|||||||
lf.close()
|
lf.close()
|
||||||
|
|
||||||
|
|
||||||
|
class MineAlreadyRunning(RuntimeError):
|
||||||
|
"""Raised when another `mempalace mine` already holds the per-palace lock."""
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def mine_palace_lock(palace_path: str):
|
||||||
|
"""Per-palace non-blocking lock around the full `mine` pipeline.
|
||||||
|
|
||||||
|
The per-file `mine_lock` only protects delete+insert interleave for a
|
||||||
|
single source; it does not prevent N copies of `mempalace mine <dir>`
|
||||||
|
from being spawned concurrently by hooks. When that happens, each copy
|
||||||
|
drives ChromaDB HNSW inserts in parallel against the same palace,
|
||||||
|
which (combined with chromadb's multi-threaded ParallelFor) can
|
||||||
|
corrupt the HNSW graph and produce sparse link_lists.bin blowups.
|
||||||
|
|
||||||
|
The lock file is keyed by sha256(palace_path) so mines against
|
||||||
|
*different* palaces can still run in parallel — we only serialize
|
||||||
|
writes into the same palace, which is the correctness boundary.
|
||||||
|
|
||||||
|
The key is derived from a fully normalized form of the path:
|
||||||
|
`realpath` resolves symlinks and `..` segments, and `normcase` folds
|
||||||
|
case on Windows (which has a case-insensitive filesystem). Without
|
||||||
|
normcase, `C:\\Palace` and `c:\\palace` would hash to different keys
|
||||||
|
on Windows and let two concurrent mines touch the same on-disk palace.
|
||||||
|
|
||||||
|
Non-blocking: if another `mine` is already writing to this palace,
|
||||||
|
raise MineAlreadyRunning so the caller can exit cleanly instead of
|
||||||
|
piling up as a waiting worker.
|
||||||
|
"""
|
||||||
|
lock_dir = os.path.join(os.path.expanduser("~"), ".mempalace", "locks")
|
||||||
|
os.makedirs(lock_dir, exist_ok=True)
|
||||||
|
resolved = os.path.realpath(os.path.expanduser(palace_path))
|
||||||
|
lock_key_source = os.path.normcase(resolved)
|
||||||
|
palace_key = hashlib.sha256(lock_key_source.encode()).hexdigest()[:16]
|
||||||
|
lock_path = os.path.join(lock_dir, f"mine_palace_{palace_key}.lock")
|
||||||
|
|
||||||
|
lf = open(lock_path, "w")
|
||||||
|
acquired = False
|
||||||
|
try:
|
||||||
|
if os.name == "nt":
|
||||||
|
import msvcrt
|
||||||
|
|
||||||
|
try:
|
||||||
|
msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1)
|
||||||
|
acquired = True
|
||||||
|
except OSError as exc:
|
||||||
|
raise MineAlreadyRunning(
|
||||||
|
f"another `mempalace mine` is already running against {resolved}"
|
||||||
|
) from exc
|
||||||
|
else:
|
||||||
|
import fcntl
|
||||||
|
|
||||||
|
try:
|
||||||
|
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
acquired = True
|
||||||
|
except BlockingIOError as exc:
|
||||||
|
raise MineAlreadyRunning(
|
||||||
|
f"another `mempalace mine` is already running against {resolved}"
|
||||||
|
) from exc
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
if acquired:
|
||||||
|
try:
|
||||||
|
if os.name == "nt":
|
||||||
|
import msvcrt
|
||||||
|
|
||||||
|
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
|
||||||
|
else:
|
||||||
|
import fcntl
|
||||||
|
|
||||||
|
fcntl.flock(lf, fcntl.LOCK_UN)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
lf.close()
|
||||||
|
|
||||||
|
|
||||||
|
# Backward-compatible alias (previous patch iteration used a single global
|
||||||
|
# lock). Kept so third-party callers that imported it continue to work; new
|
||||||
|
# code should use `mine_palace_lock(palace_path)` for per-palace scoping.
|
||||||
|
mine_global_lock = mine_palace_lock
|
||||||
|
|
||||||
|
|
||||||
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
|
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
|
||||||
"""Check if a file has already been filed in the palace.
|
"""Check if a file has already been filed in the palace.
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from mempalace.backends.chroma import (
|
|||||||
ChromaBackend,
|
ChromaBackend,
|
||||||
ChromaCollection,
|
ChromaCollection,
|
||||||
_fix_blob_seq_ids,
|
_fix_blob_seq_ids,
|
||||||
|
_pin_hnsw_threads,
|
||||||
quarantine_stale_hnsw,
|
quarantine_stale_hnsw,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -443,3 +444,52 @@ def test_quarantine_stale_hnsw_skips_already_quarantined(tmp_path):
|
|||||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||||
assert moved == []
|
assert moved == []
|
||||||
assert drift.exists()
|
assert drift.exists()
|
||||||
|
|
||||||
|
|
||||||
|
# ── _pin_hnsw_threads ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_pin_hnsw_threads_retrofits_legacy_collection(tmp_path):
|
||||||
|
"""Legacy collections (created without num_threads) get the retrofit applied."""
|
||||||
|
palace_path = tmp_path / "legacy-palace"
|
||||||
|
palace_path.mkdir()
|
||||||
|
|
||||||
|
client = chromadb.PersistentClient(path=str(palace_path))
|
||||||
|
col = client.create_collection(
|
||||||
|
"mempalace_drawers",
|
||||||
|
metadata={"hnsw:space": "cosine"}, # no num_threads — legacy
|
||||||
|
)
|
||||||
|
assert col.configuration_json.get("hnsw", {}).get("num_threads") is None
|
||||||
|
|
||||||
|
_pin_hnsw_threads(col)
|
||||||
|
|
||||||
|
assert col.configuration_json["hnsw"]["num_threads"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_pin_hnsw_threads_swallows_all_errors():
|
||||||
|
"""Retrofit never raises even when collection.modify explodes."""
|
||||||
|
|
||||||
|
class _ExplodingCollection:
|
||||||
|
def modify(self, *args, **kwargs):
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
|
||||||
|
_pin_hnsw_threads(_ExplodingCollection()) # must not raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_collection_applies_retrofit_on_existing_palace(tmp_path):
|
||||||
|
"""ChromaBackend.get_collection(create=False) applies the retrofit."""
|
||||||
|
palace_path = tmp_path / "palace"
|
||||||
|
palace_path.mkdir()
|
||||||
|
|
||||||
|
# Simulate a legacy palace: create collection without num_threads
|
||||||
|
bootstrap_client = chromadb.PersistentClient(path=str(palace_path))
|
||||||
|
bootstrap_client.create_collection("mempalace_drawers", metadata={"hnsw:space": "cosine"})
|
||||||
|
del bootstrap_client # drop reference so a fresh client reopens cleanly
|
||||||
|
|
||||||
|
wrapper = ChromaBackend().get_collection(
|
||||||
|
str(palace_path),
|
||||||
|
collection_name="mempalace_drawers",
|
||||||
|
create=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1
|
||||||
|
|||||||
@@ -0,0 +1,158 @@
|
|||||||
|
"""Tests for mine_palace_lock — the per-palace non-blocking mine guard.
|
||||||
|
|
||||||
|
Covers the fix for the runaway mine fan-out described alongside issues
|
||||||
|
#974 and #965: if N copies of `mempalace mine` are spawned concurrently
|
||||||
|
against the same palace, they must collapse to a single runner rather
|
||||||
|
than queue as waiters that will drive parallel HNSW inserts. Mines
|
||||||
|
against *different* palaces must still be free to run in parallel.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from mempalace.palace import (
|
||||||
|
MineAlreadyRunning,
|
||||||
|
mine_global_lock,
|
||||||
|
mine_palace_lock,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_mp_context():
|
||||||
|
"""Pick a start method that works on every CI runner.
|
||||||
|
|
||||||
|
`fork` is cheaper (no re-import) but is unavailable on Windows, so we fall
|
||||||
|
back to `spawn` there. `spawn` inherits ``os.environ`` (including the
|
||||||
|
monkeypatched ``HOME``) and re-imports the ``mempalace`` package in the
|
||||||
|
child, which is sufficient for the lock-file semantics exercised here.
|
||||||
|
"""
|
||||||
|
start_method = "spawn" if os.name == "nt" else "fork"
|
||||||
|
return multiprocessing.get_context(start_method)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _hold_lock(palace_path: str, ready_flag: str, release_flag: str) -> int:
|
||||||
|
"""Acquire mine_palace_lock, signal readiness, wait for release flag.
|
||||||
|
|
||||||
|
Returns 0 if we acquired the lock, 1 if MineAlreadyRunning was raised.
|
||||||
|
Runs in a child process for true cross-process locking semantics.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with mine_palace_lock(palace_path):
|
||||||
|
# Tell the parent we hold the lock
|
||||||
|
open(ready_flag, "w").close()
|
||||||
|
# Wait until parent tells us to release
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(release_flag):
|
||||||
|
return 0
|
||||||
|
time.sleep(0.01)
|
||||||
|
return 0
|
||||||
|
except MineAlreadyRunning:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_single_acquire_succeeds(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
with mine_palace_lock(str(tmp_path / "palace")):
|
||||||
|
pass # should not raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_lock_reusable_after_release(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
pass
|
||||||
|
# Re-acquire must succeed now that the previous holder released
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_same_palace_serializes_across_processes(tmp_path, monkeypatch):
|
||||||
|
"""Two processes contending for the same palace: second must be rejected."""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
ready = str(tmp_path / "ready")
|
||||||
|
release = str(tmp_path / "release")
|
||||||
|
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
holder = ctx.Process(target=_hold_lock, args=(palace, ready, release))
|
||||||
|
holder.start()
|
||||||
|
try:
|
||||||
|
# Wait for the holder to acquire
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock in time"
|
||||||
|
|
||||||
|
# From the parent, we must not be able to acquire the same palace lock
|
||||||
|
with pytest.raises(MineAlreadyRunning):
|
||||||
|
with mine_palace_lock(palace):
|
||||||
|
pytest.fail("second acquire of same palace should have raised")
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
|
assert holder.exitcode == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_different_palaces_dont_conflict(tmp_path, monkeypatch):
|
||||||
|
"""Mines against different palaces must NOT block each other."""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
palace_a = str(tmp_path / "palace_a")
|
||||||
|
palace_b = str(tmp_path / "palace_b")
|
||||||
|
ready = str(tmp_path / "ready_a")
|
||||||
|
release = str(tmp_path / "release_a")
|
||||||
|
|
||||||
|
ctx = _get_mp_context()
|
||||||
|
holder = ctx.Process(target=_hold_lock, args=(palace_a, ready, release))
|
||||||
|
holder.start()
|
||||||
|
try:
|
||||||
|
for _ in range(500):
|
||||||
|
if os.path.exists(ready):
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
assert os.path.exists(ready), "holder failed to acquire lock in time"
|
||||||
|
|
||||||
|
# Different palace — must succeed even while palace_a is held
|
||||||
|
with mine_palace_lock(palace_b):
|
||||||
|
pass # no exception expected
|
||||||
|
finally:
|
||||||
|
open(release, "w").close()
|
||||||
|
holder.join(timeout=5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_palace_path_is_normalized(tmp_path, monkeypatch):
|
||||||
|
"""Relative and absolute forms of the same path must use the same lock."""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
os.makedirs(tmp_path / "palace", exist_ok=True)
|
||||||
|
absolute = str(tmp_path / "palace")
|
||||||
|
relative = "palace"
|
||||||
|
|
||||||
|
# Hold the lock with the absolute form; attempting to re-acquire with
|
||||||
|
# the relative form (which resolves to the same absolute path) must fail.
|
||||||
|
with mine_palace_lock(absolute):
|
||||||
|
with pytest.raises(MineAlreadyRunning):
|
||||||
|
with mine_palace_lock(relative):
|
||||||
|
pytest.fail("normalized path collision should have raised")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch):
|
||||||
|
"""Old callers of `mine_global_lock` should still work."""
|
||||||
|
monkeypatch.setenv("HOME", str(tmp_path))
|
||||||
|
assert mine_global_lock is mine_palace_lock
|
||||||
|
with mine_global_lock(str(tmp_path / "palace")):
|
||||||
|
pass # the alias accepts the same palace_path argument
|
||||||
Reference in New Issue
Block a user