diff --git a/tests/test_closets.py b/tests/test_closets.py index 458f767..fba4cc8 100644 --- a/tests/test_closets.py +++ b/tests/test_closets.py @@ -24,6 +24,7 @@ Coverage map: """ import json +import multiprocessing import os import tempfile import threading @@ -63,6 +64,18 @@ from mempalace.searcher import ( # ── mine_lock ──────────────────────────────────────────────────────────── +def _lock_worker(target: str, name: str, hold_seconds: float, queue) -> None: + """Module-level worker for multiprocessing spawn; must be pickle-able.""" + from mempalace.palace import mine_lock as _mine_lock + + start = time.time() + with _mine_lock(target): + elapsed = time.time() - start + queue.put((name, elapsed)) + if hold_seconds > 0: + time.sleep(hold_seconds) + + class TestMineLock: def test_lock_acquires_and_releases(self, tmp_path): target = str(tmp_path / "lock_target.txt") @@ -76,28 +89,37 @@ class TestMineLock: assert time.time() - start < 1.0 def test_lock_blocks_concurrent_access(self, tmp_path): + """The lock's contract is inter-*process* (multi-agent), not + inter-thread. Use multiprocessing so the test reflects the real + use case and is portable: on macOS/BSD, ``fcntl.flock`` is + per-process, so two threads in one process would both acquire — + a threading-based test would flake there even when the lock is + behaving correctly for its intended users.""" target = str(tmp_path / "concurrent_lock.txt") + # Use multiprocessing so each worker has its own process. + # Use "spawn" to stay consistent across platforms (macOS defaults + # to spawn on 3.8+; Linux defaults to fork). Both work here. + ctx = multiprocessing.get_context("spawn") + queue = ctx.Queue() + + p1 = ctx.Process(target=_lock_worker, args=(target, "a", 0.3, queue)) + p2 = ctx.Process(target=_lock_worker, args=(target, "b", 0.0, queue)) + p1.start() + time.sleep(0.2) # ensure p1 acquires first + p2.start() + p1.join(timeout=10) + p2.join(timeout=10) + results = [] + while not queue.empty(): + results.append(queue.get()) + assert len(results) == 2, f"both workers should report, got {results}" - def worker(name): - start = time.time() - with mine_lock(target): - results.append((name, time.time() - start)) - time.sleep(0.2) - - t1 = threading.Thread(target=worker, args=("a",)) - t2 = threading.Thread(target=worker, args=("b",)) - t1.start() - time.sleep(0.05) # ensure t1 acquires first - t2.start() - t1.join() - t2.join() - - # The second worker must have waited at least most of t1's hold time. + # The second worker must have waited until p1 released the lock. wait_times = sorted(r[1] for r in results) assert ( wait_times[1] > 0.1 - ), f"second thread should block on mine_lock, waited only {wait_times[1]:.3f}s" + ), f"second process should block on mine_lock, waited only {wait_times[1]:.3f}s" # ── build_closet_lines ─────────────────────────────────────────────────