From 1dc20e307b8294f6e0917d2abe433929287b7c21 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Mon, 13 Apr 2026 19:08:57 -0300 Subject: [PATCH] test: verify mine_lock via disjoint critical-section intervals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous revision used multiprocessing but still relied on timing ("second process waited at least N seconds") which flakes on CI where spawn overhead eats into the hold window. Linux CI observed the second process report a 0.088s wait — below the 0.1s threshold — even though the lock behavior was correct; spawn was just slow enough that the first process had nearly finished holding when the second got past its own spawn. Switch to effect-based verification: each worker logs its [enter_time, exit_time] inside the critical section, and the test asserts the two intervals are disjoint after sorting. A broken lock would produce overlapping intervals regardless of spawn latency; a working lock cannot. Also removed the mp.Queue since we no longer pass timing data back. --- tests/test_closets.py | 78 +++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/tests/test_closets.py b/tests/test_closets.py index fba4cc8..976086d 100644 --- a/tests/test_closets.py +++ b/tests/test_closets.py @@ -64,16 +64,22 @@ from mempalace.searcher import ( # ── mine_lock ──────────────────────────────────────────────────────────── -def _lock_worker(target: str, name: str, hold_seconds: float, queue) -> None: - """Module-level worker for multiprocessing spawn; must be pickle-able.""" +def _lock_worker(target: str, name: str, hold_seconds: float, log_path: str) -> None: + """Worker for multiprocessing-spawn concurrency test. Writes its + critical-section enter/exit timestamps to ``log_path`` so the test + can verify the sections did not overlap in time.""" + import time as _time + from mempalace.palace import mine_lock as _mine_lock - start = time.time() with _mine_lock(target): - elapsed = time.time() - start - queue.put((name, elapsed)) - if hold_seconds > 0: - time.sleep(hold_seconds) + t_enter = _time.time() + _time.sleep(hold_seconds) + t_exit = _time.time() + # Append atomically so concurrent writers don't stomp each other. + with open(log_path, "a") as f: + f.write(f"{name} {t_enter} {t_exit}\n") + f.flush() class TestMineLock: @@ -91,35 +97,51 @@ class TestMineLock: def test_lock_blocks_concurrent_access(self, tmp_path): """The lock's contract is inter-*process* (multi-agent), not inter-thread. Use multiprocessing so the test reflects the real - use case and is portable: on macOS/BSD, ``fcntl.flock`` is - per-process, so two threads in one process would both acquire — - a threading-based test would flake there even when the lock is - behaving correctly for its intended users.""" + use case and is portable: on macOS/BSD ``fcntl.flock`` is + per-process, so two threads would both acquire — a thread-based + test would flake there even when the lock is correct. + + Verify mutual exclusion by the effect the critical section + actually has — each worker records its enter/exit timestamps + under the lock, and the test asserts the two intervals do not + overlap. This is robust to spawn-overhead timing, unlike + "second worker waited at least N seconds" which flakes when CI + spawn latency eats into the hold window. + """ target = str(tmp_path / "concurrent_lock.txt") - # Use multiprocessing so each worker has its own process. - # Use "spawn" to stay consistent across platforms (macOS defaults - # to spawn on 3.8+; Linux defaults to fork). Both work here. + log_path = str(tmp_path / "critical_section.log") + # Spawn so the same code path runs on every OS (macOS 3.8+ and + # Windows already default to spawn; Linux is fork by default). ctx = multiprocessing.get_context("spawn") - queue = ctx.Queue() - p1 = ctx.Process(target=_lock_worker, args=(target, "a", 0.3, queue)) - p2 = ctx.Process(target=_lock_worker, args=(target, "b", 0.0, queue)) + # Each worker holds the lock for HOLD seconds. With real mutual + # exclusion, the two [enter, exit] intervals must be disjoint. + HOLD = 0.3 + p1 = ctx.Process(target=_lock_worker, args=(target, "a", HOLD, log_path)) + p2 = ctx.Process(target=_lock_worker, args=(target, "b", HOLD, log_path)) p1.start() - time.sleep(0.2) # ensure p1 acquires first p2.start() - p1.join(timeout=10) - p2.join(timeout=10) + p1.join(timeout=30) + p2.join(timeout=30) - results = [] - while not queue.empty(): - results.append(queue.get()) - assert len(results) == 2, f"both workers should report, got {results}" + assert p1.exitcode == 0, f"p1 exited non-zero: {p1.exitcode}" + assert p2.exitcode == 0, f"p2 exited non-zero: {p2.exitcode}" - # The second worker must have waited until p1 released the lock. - wait_times = sorted(r[1] for r in results) + # Parse the log: " ". + intervals = [] + with open(log_path) as f: + for line in f: + parts = line.strip().split() + if len(parts) == 3: + intervals.append((parts[0], float(parts[1]), float(parts[2]))) + assert len(intervals) == 2, f"expected two critical sections, got {intervals}" + + # Sort by entry time and verify the second entry is after the first exit. + intervals.sort(key=lambda iv: iv[1]) + (_, enter_a, exit_a), (_, enter_b, exit_b) = intervals assert ( - wait_times[1] > 0.1 - ), f"second process should block on mine_lock, waited only {wait_times[1]:.3f}s" + enter_a < exit_a <= enter_b < exit_b + ), f"critical sections overlapped — lock failed to serialize: {intervals}" # ── build_closet_lines ─────────────────────────────────────────────────