diff --git a/mempalace/cli.py b/mempalace/cli.py index 6a531e7..964fa84 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -500,31 +500,41 @@ def cmd_mine(args): llm_provider=None, ) - if args.mode == "convos": - from .convo_miner import mine_convos + from .palace import MineAlreadyRunning - mine_convos( - convo_dir=args.dir, - palace_path=palace_path, - wing=args.wing, - agent=args.agent, - limit=args.limit, - dry_run=args.dry_run, - extract_mode=args.extract, - ) - else: - from .miner import mine + try: + if args.mode == "convos": + from .convo_miner import mine_convos - mine( - project_dir=args.dir, - palace_path=palace_path, - wing_override=args.wing, - agent=args.agent, - limit=args.limit, - dry_run=args.dry_run, - respect_gitignore=not args.no_gitignore, - include_ignored=include_ignored, - ) + mine_convos( + convo_dir=args.dir, + palace_path=palace_path, + wing=args.wing, + agent=args.agent, + limit=args.limit, + dry_run=args.dry_run, + extract_mode=args.extract, + ) + else: + from .miner import mine + + mine( + project_dir=args.dir, + palace_path=palace_path, + wing_override=args.wing, + agent=args.agent, + limit=args.limit, + dry_run=args.dry_run, + respect_gitignore=not args.no_gitignore, + include_ignored=include_ignored, + ) + except MineAlreadyRunning as exc: + # A live MCP server or another mine is already writing to this + # palace. Surface the holder identity so the operator knows what + # to wait for (or stop), and exit non-zero so wrappers like + # nohup / scripts can detect the contention. + print(f"mempalace: {exc}", file=sys.stderr) + sys.exit(1) def cmd_sweep(args): diff --git a/mempalace/miner.py b/mempalace/miner.py index 6aeddd4..09cc517 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -21,7 +21,6 @@ from typing import Optional from .palace import ( NORMALIZE_VERSION, SKIP_DIRS, - MineAlreadyRunning, build_closet_lines, file_already_mined, get_closets_collection, @@ -1035,26 +1034,21 @@ def mine( files=files, ) - try: - with mine_palace_lock(palace_path): - return _mine_impl( - project_dir, - palace_path, - wing_override=wing_override, - agent=agent, - limit=limit, - dry_run=dry_run, - respect_gitignore=respect_gitignore, - include_ignored=include_ignored, - files=files, - ) - except MineAlreadyRunning: - print( - f"mempalace: another `mine` is already running against " - f"{palace_path} — exiting cleanly.", - file=sys.stderr, + # MineAlreadyRunning propagates so the CLI can render a clear holder-aware + # message and exit non-zero. In-process callers (tests, library users) that + # expect to coexist with another writer should handle the exception. + with mine_palace_lock(palace_path): + return _mine_impl( + project_dir, + palace_path, + wing_override=wing_override, + agent=agent, + limit=limit, + dry_run=dry_run, + respect_gitignore=respect_gitignore, + include_ignored=include_ignored, + files=files, ) - return def _mine_impl( diff --git a/mempalace/palace.py b/mempalace/palace.py index dee5c8f..7ed315c 100644 --- a/mempalace/palace.py +++ b/mempalace/palace.py @@ -9,6 +9,7 @@ import hashlib import logging import os import re +import sys import threading from typing import Optional @@ -364,6 +365,53 @@ def _mark_released(lock_key: str) -> None: _holder_state().discard(lock_key) +def _format_lock_holder(content: str) -> str: + """Render a lock-file body as 'PID N (cmdline)' for diagnostic messages.""" + parts = content.split(maxsplit=1) + if not parts or not parts[0].isdigit(): + return "another writer (identity not recorded)" + pid = parts[0] + if len(parts) > 1 and parts[1].strip(): + return f"PID {pid} ({parts[1].strip()})" + return f"PID {pid}" + + +# Byte 0 of the lock file is reserved as the OS lock sentinel. +# Holder identity is written from byte 1 onward so contenders can read +# the identity without colliding with byte 0 (Windows msvcrt.locking +# blocks both reads and writes on the locked byte). +_LOCK_SENTINEL_BYTES = 1 + + +def _read_lock_holder(lock_file) -> str: + """Read the prior holder's identity from the lock-file body, best-effort.""" + try: + lock_file.seek(_LOCK_SENTINEL_BYTES) + content = lock_file.read().strip() + except OSError: + return "another writer (identity not recorded)" + if not content: + return "another writer (identity not recorded)" + return _format_lock_holder(content) + + +def _write_lock_holder(lock_file) -> None: + """Record this process's identity in the lock-file body. Best-effort. + + Writes from byte 1 onward; byte 0 is the lock sentinel and must not + be touched after acquire (truncating it on Windows can interact + badly with the active byte-range lock). + """ + try: + ident = f"{os.getpid()} {' '.join(sys.argv[:3])}".strip() + lock_file.seek(_LOCK_SENTINEL_BYTES) + lock_file.truncate(_LOCK_SENTINEL_BYTES + len(ident.encode("utf-8"))) + lock_file.write(ident) + lock_file.flush() + except OSError: + pass + + @contextlib.contextmanager def mine_palace_lock(palace_path: str): """Per-palace non-blocking lock around the full `mine` pipeline. @@ -407,9 +455,27 @@ def mine_palace_lock(palace_path: str): yield return - lf = open(lock_path, "w") + # Ensure the file exists, then open r+ so we can both read the prior + # holder's identity (for failure diagnostics) and write our own. "w" + # truncates and erases the prior holder. "a+" puts the position at EOF, + # which on Windows breaks ``msvcrt.locking`` (it locks 1 byte at the + # *current* position, so two contenders end up locking different bytes + # and silently both acquire — observed as Windows-CI lock test + # failures during #1264 development). + if not os.path.exists(lock_path): + # Touch atomically: O_CREAT|O_EXCL would fail if a concurrent + # contender just created it, which is fine — we proceed to open. + try: + fd = os.open(lock_path, os.O_CREAT | os.O_WRONLY, 0o600) + os.close(fd) + except FileExistsError: + pass + lf = open(lock_path, "r+") acquired = False try: + # Lock byte 0 explicitly. msvcrt.locking is byte-position dependent; + # fcntl.flock is whole-file but the seek is harmless there. + lf.seek(0) if os.name == "nt": import msvcrt @@ -417,8 +483,10 @@ def mine_palace_lock(palace_path: str): msvcrt.locking(lf.fileno(), msvcrt.LK_NBLCK, 1) acquired = True except OSError as exc: + holder = _read_lock_holder(lf) raise MineAlreadyRunning( - f"another `mempalace mine` is already running against {resolved}" + f"palace {resolved} is held by {holder}; " + "wait for it to finish or stop the holder before retrying" ) from exc else: import fcntl @@ -427,9 +495,13 @@ def mine_palace_lock(palace_path: str): fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB) acquired = True except BlockingIOError as exc: + holder = _read_lock_holder(lf) raise MineAlreadyRunning( - f"another `mempalace mine` is already running against {resolved}" + f"palace {resolved} is held by {holder}; " + "wait for it to finish or stop the holder before retrying" ) from exc + # Record our own identity for any later contender's diagnostic message. + _write_lock_holder(lf) _mark_held(palace_key) try: yield @@ -441,6 +513,8 @@ def mine_palace_lock(palace_path: str): if os.name == "nt": import msvcrt + # Match the lock region: byte 0. + lf.seek(0) msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1) else: import fcntl diff --git a/tests/test_cli.py b/tests/test_cli.py index fa5680d..547286d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -555,6 +555,45 @@ def test_cmd_mine_include_ignored_comma_split(mock_config_cls): assert call_kwargs["include_ignored"] == ["a.txt", "b.txt", "c.txt"] +@patch("mempalace.cli.MempalaceConfig") +def test_cmd_mine_exits_nonzero_on_lock_holder(mock_config_cls, capsys): + """Regression #1264: lock contention must exit non-zero with a clear message. + + Before this fix the CLI silently returned 0 when another writer held + the palace lock — operators using nohup/scripts had no way to detect + the contention. The new behavior raises MineAlreadyRunning out of + miner.mine() and cmd_mine catches it, printing the holder identity + to stderr and exiting non-zero. + """ + from mempalace.palace import MineAlreadyRunning + + mock_config_cls.return_value.palace_path = "/fake/palace" + args = argparse.Namespace( + dir="/src", + palace=None, + mode="projects", + wing=None, + agent="mempalace", + limit=0, + dry_run=False, + no_gitignore=False, + include_ignored=[], + extract="exchange", + ) + with patch( + "mempalace.miner.mine", + side_effect=MineAlreadyRunning( + "palace /fake/palace is held by PID 12345 (mempalace mcp_server); wait for it to finish" + ), + ): + with pytest.raises(SystemExit) as excinfo: + cmd_mine(args) + assert excinfo.value.code == 1 + captured = capsys.readouterr() + assert "PID 12345" in captured.err + assert "mcp_server" in captured.err + + # ── cmd_wakeup ───────────────────────────────────────────────────────── diff --git a/tests/test_palace_locks.py b/tests/test_palace_locks.py index d239757..2e9f82f 100644 --- a/tests/test_palace_locks.py +++ b/tests/test_palace_locks.py @@ -208,6 +208,93 @@ def _try_acquire_expect_busy(palace_path, result_q): result_q.put("busy") +def _hold_lock_send_pid(palace_path: str, ready_flag: str, release_flag: str, pid_q) -> None: + """Acquire the lock, push our PID + cmdline through the queue, then wait.""" + import sys as _sys + + try: + with mine_palace_lock(palace_path): + pid_q.put((os.getpid(), list(_sys.argv[:3]))) + open(ready_flag, "w").close() + for _ in range(500): + if os.path.exists(release_flag): + return + time.sleep(0.01) + except MineAlreadyRunning: + pid_q.put(("error", "raised")) + + +def test_lock_failure_message_names_holder(tmp_path, monkeypatch): + """Regression #1264: failed acquire must identify the holder by PID. + + Before this fix, a `mempalace mine` colliding with another writer + (mine, MCP server, anything taking mine_palace_lock) saw a generic + "another `mempalace mine` is already running" message and exited + silently. The operator had no signal of which process to wait for + or stop. The new message includes ``PID N`` so the holder can be + identified directly. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + palace = str(tmp_path / "palace") + ready = str(tmp_path / "ready") + release = str(tmp_path / "release") + + ctx = _get_mp_context() + pid_q = ctx.Queue() + holder = ctx.Process(target=_hold_lock_send_pid, args=(palace, ready, release, pid_q)) + holder.start() + try: + for _ in range(500): + if os.path.exists(ready): + break + time.sleep(0.01) + assert os.path.exists(ready), "holder failed to acquire lock in time" + holder_pid, _holder_argv = pid_q.get(timeout=2) + + with pytest.raises(MineAlreadyRunning) as excinfo: + with mine_palace_lock(palace): + pytest.fail("second acquire of same palace should have raised") + + msg = str(excinfo.value) + assert ( + f"PID {holder_pid}" in msg + ), f"lock-failure message must name the holder PID; got: {msg!r}" + finally: + open(release, "w").close() + holder.join(timeout=5) + + +def test_lock_holder_identity_persists_across_release(tmp_path, monkeypatch): + """The holder line is overwritten by each new acquirer, not appended. + + Without explicit truncate the lock file would accumulate lines across + runs and grow without bound. Verify that re-acquire keeps the body + bounded. + """ + # ``os.path.expanduser("~")`` reads HOME on POSIX but USERPROFILE on + # Windows; setting both makes the ``~/.mempalace/locks`` lookup land + # under ``tmp_path`` regardless of platform. + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("USERPROFILE", str(tmp_path)) + palace = str(tmp_path / "palace") + for _ in range(5): + with mine_palace_lock(palace): + pass + + # Locate the lock file. The key derivation is internal but we can find + # it by scanning the mempalace locks dir for mine_palace_*.lock entries. + lock_dir = tmp_path / ".mempalace" / "locks" + lock_files = list(lock_dir.glob("mine_palace_*.lock")) + assert lock_files, "expected the palace lock file to exist after acquire/release" + # Read as bytes so the byte-0 sentinel (\x00) is preserved without + # decode quirks; the bound is on the file size, not its line count. + body = lock_files[0].read_bytes() + # Body is byte-0 sentinel + identity (no trailing accumulation). + # Identity is ``f"{pid} {sys.argv[:3]}"``; cap at a generous bound that + # still rules out unbounded growth across the 5 re-acquires. + assert len(body) < 1024, f"lock body must not grow across re-acquires; got {len(body)} bytes" + + def test_mine_global_lock_is_alias_for_back_compat(tmp_path, monkeypatch): """Old callers of `mine_global_lock` should still work.""" monkeypatch.setenv("HOME", str(tmp_path))