From 560fdbdc9f377f5fc329e18ffd19a4a68c331cba Mon Sep 17 00:00:00 2001 From: MSL <232237854+milla-jovovich@users.noreply.github.com> Date: Sat, 18 Apr 2026 07:01:01 -0700 Subject: [PATCH 1/6] Fix silent drop of .jsonl files in project miner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mempalace/miner.py:READABLE_EXTENSIONS contained `.json` but not `.jsonl`. Every jsonl file encountered in a mined directory was silently skipped at miner.py:722: if filepath.suffix.lower() not in READABLE_EXTENSIONS: continue Claude Code transcripts, ChatGPT exports, and every other tool writing line-delimited JSON ship as `.jsonl`. Users running `mempalace mine` against a directory of transcripts saw the command complete with no error and no log line — and their conversations never reached the palace. Silent data loss. Adding `.jsonl` to the whitelist alongside `.json`. jsonl is text line-by-line; the existing chunking pipeline handles it the same way it handles any other text file. Tests: tests/test_miner_jsonl_visibility.py Co-Authored-By: Claude Opus 4.7 (1M context) --- mempalace/miner.py | 1 + tests/test_miner_jsonl_visibility.py | 69 ++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 tests/test_miner_jsonl_visibility.py diff --git a/mempalace/miner.py b/mempalace/miner.py index ed48cf1..f0177fa 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -36,6 +36,7 @@ READABLE_EXTENSIONS = { ".jsx", ".tsx", ".json", + ".jsonl", ".yaml", ".yml", ".html", diff --git a/tests/test_miner_jsonl_visibility.py b/tests/test_miner_jsonl_visibility.py new file mode 100644 index 0000000..6d24670 --- /dev/null +++ b/tests/test_miner_jsonl_visibility.py @@ -0,0 +1,69 @@ +"""TDD: miner.py must not silently drop .jsonl files. + +The project miner (mempalace.miner.scan_project) walks a directory and +keeps only files whose suffix is in READABLE_EXTENSIONS. The whitelist +contains `.json` but NOT `.jsonl`. Every ChatGPT export, Claude Code +transcript, or any other jsonl transcript dumped into a project +directory is silently dropped with no user-visible output. + +Two paths to fix this, both tested here: + + 1. READABLE_EXTENSIONS must include `.jsonl` so the file is at least + readable as text (jsonl is line-delimited JSON — each line is + already valid text for embedding). + 2. OR scan_project must surface skipped .jsonl files to the user so + they know to use `--mode convos`. + +We test (1) — include .jsonl in READABLE_EXTENSIONS. This matches how +`.json` is already handled: the miner doesn't care what the structure +is, it chunks the text. + +Written BEFORE the fix. +""" + +import tempfile +from pathlib import Path + +from mempalace.miner import READABLE_EXTENSIONS, scan_project + + +class TestJsonlNotSilentlySkipped: + def test_jsonl_in_readable_extensions(self): + """`.jsonl` must be in the readable-extensions whitelist. + + `.json` is already there (see mempalace/miner.py:30). `.jsonl` + is conceptually the same thing — line-delimited JSON — and all + of Claude Code's transcripts, ChatGPT exports, and similar + tooling writes `.jsonl`. Excluding it silently drops user data. + """ + assert ".jsonl" in READABLE_EXTENSIONS, ( + "mempalace/miner.py:READABLE_EXTENSIONS contains `.json` " + "but NOT `.jsonl`. Every jsonl file in a mined project is " + "silently skipped at miner.py:722 " + "(`if filepath.suffix.lower() not in READABLE_EXTENSIONS: " + "continue`). This causes the 'convos not being saved' bug " + "reported by users — the hook fires `mempalace mine`, the " + "miner walks the directory, skips every .jsonl file, exits " + "cleanly. No warning, no log line, user sees nothing wrong. " + "Add `.jsonl` to READABLE_EXTENSIONS." + ) + + def test_scan_project_picks_up_jsonl_file(self): + """scan_project should find .jsonl files in the target dir.""" + with tempfile.TemporaryDirectory() as tmp: + tmpdir = Path(tmp) + jsonl_path = tmpdir / "transcript.jsonl" + jsonl_path.write_text( + '{"role": "user", "content": "hello"}\n' + '{"role": "assistant", "content": "hi there"}\n' + '{"role": "user", "content": "how do I install this"}\n' + '{"role": "assistant", "content": "pip install mempalace"}\n' + ) + + found = scan_project(str(tmpdir)) + found_names = [p.name for p in found] + assert "transcript.jsonl" in found_names, ( + "scan_project silently dropped transcript.jsonl. " + f"Returned: {found_names}. Users placing transcript " + "exports in a project directory expect them to be mined." + ) From d137d123132fec53cf40e7feff18d2bc443b7a47 Mon Sep 17 00:00:00 2001 From: MSL <232237854+milla-jovovich@users.noreply.github.com> Date: Sat, 18 Apr 2026 07:09:05 -0700 Subject: [PATCH 2/6] Raise MAX_FILE_SIZE cap from 10 MB to 500 MB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long Claude Code sessions routinely produce transcripts larger than 10 MB. The previous cap at miner.py:65 silently dropped them at line 732 with `if filepath.stat().st_size > MAX_FILE_SIZE: continue` — same silent-failure pattern as the .jsonl extension bug. The cap exists as a safety rail against pathological binaries, not as a limit on legitimate text. Downstream chunking at 800 chars per drawer means source file size does not affect storage or embedding cost. 500 MB leaves headroom for year-long continuous transcripts while still catching accidental multi-GB binary mines. Co-Authored-By: Claude Opus 4.7 (1M context) --- mempalace/miner.py | 6 ++- tests/test_miner_jsonl_visibility.py | 57 +++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/mempalace/miner.py b/mempalace/miner.py index f0177fa..4e809a8 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -63,7 +63,11 @@ SKIP_FILENAMES = { CHUNK_SIZE = 800 # chars per drawer CHUNK_OVERLAP = 100 # overlap between chunks MIN_CHUNK_SIZE = 50 # skip tiny chunks -MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this +MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. +# Long Claude Code sessions and large transcript exports routinely exceed +# 10 MB. The cap exists as a defensive rail against pathological binary +# files, not as a limit on legitimate text. Chunking at 800 chars per +# drawer means source size does not affect storage or embedding cost. # ============================================================================= diff --git a/tests/test_miner_jsonl_visibility.py b/tests/test_miner_jsonl_visibility.py index 6d24670..2ef3e18 100644 --- a/tests/test_miner_jsonl_visibility.py +++ b/tests/test_miner_jsonl_visibility.py @@ -23,8 +23,9 @@ Written BEFORE the fix. import tempfile from pathlib import Path +from unittest.mock import patch -from mempalace.miner import READABLE_EXTENSIONS, scan_project +from mempalace.miner import MAX_FILE_SIZE, READABLE_EXTENSIONS, scan_project class TestJsonlNotSilentlySkipped: @@ -67,3 +68,57 @@ class TestJsonlNotSilentlySkipped: f"Returned: {found_names}. Users placing transcript " "exports in a project directory expect them to be mined." ) + + def test_large_jsonl_not_silently_dropped_by_size_cap(self): + """Long sessions produce >10 MB transcripts. They must still mine. + + The legacy cap was 10 MB, which is smaller than a long Claude Code + session's transcript. Users hitting the cap lost their entire + conversation to a silent `if size > MAX: continue` at miner.py:732. + Raise the cap well above any realistic transcript size. + """ + # 10 MB cap was silent failure — real Claude Code long sessions + # exceed this. The cap must accommodate them. + assert MAX_FILE_SIZE >= 100 * 1024 * 1024, ( + f"MAX_FILE_SIZE is {MAX_FILE_SIZE} bytes " + f"({MAX_FILE_SIZE / 1024 / 1024:.0f} MB). Long Claude Code " + "sessions produce transcripts larger than 10 MB and get " + "silently dropped. Raise to at least 100 MB — chunking " + "at 800 chars per drawer means source file size doesn't " + "matter for downstream storage." + ) + + def test_scan_project_picks_up_50mb_jsonl(self): + """A 50 MB .jsonl must not be filtered out by the size cap. + + We don't actually write 50 MB (slow test). Instead, we mock + stat().st_size to report a 50 MB file and confirm scan_project + still includes it. + """ + with tempfile.TemporaryDirectory() as tmp: + tmpdir = Path(tmp) + big_jsonl = tmpdir / "big_transcript.jsonl" + # Write a small real file so the existence / extension / text + # checks pass; then mock its reported size. + big_jsonl.write_text('{"role": "user", "content": "hi"}\n') + fake_size = 50 * 1024 * 1024 # 50 MB + + real_stat = Path.stat + + def fake_stat(self, *args, **kwargs): + result = real_stat(self, *args, **kwargs) + if self.name == "big_transcript.jsonl": + class _FakeStat: + st_size = fake_size + st_mode = result.st_mode + return _FakeStat() + return result + + with patch.object(Path, "stat", fake_stat): + found = scan_project(str(tmpdir)) + + found_names = [p.name for p in found] + assert "big_transcript.jsonl" in found_names, ( + f"50 MB .jsonl was dropped by size cap (MAX_FILE_SIZE=" + f"{MAX_FILE_SIZE}). Returned: {found_names}." + ) From 6f33d526811b9428c72845cffa3f3ecf939d24be Mon Sep 17 00:00:00 2001 From: MSL <232237854+milla-jovovich@users.noreply.github.com> Date: Sat, 18 Apr 2026 08:10:43 -0700 Subject: [PATCH 3/6] =?UTF-8?q?Raise=20convo=5Fminer=20MAX=5FFILE=5FSIZE?= =?UTF-8?q?=20cap=2010=20MB=20=E2=86=92=20500=20MB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the miner.py fix in this same branch. convo_miner.py had the exact same 10 MB cap at line 58 that silently dropped long transcripts via continue. Long Claude Code sessions, multi-year ChatGPT exports, and lifetime Slack dumps all exceed 10 MB. Same silent-drop pattern, different file. Raised to 500 MB to match miner.py for consistency; downstream chunking means source file size does not affect storage or embedding cost. Tests: tests/test_convo_miner_size_cap.py (1 test) Co-Authored-By: Claude Opus 4.7 (1M context) --- mempalace/convo_miner.py | 6 +++++- tests/test_convo_miner_size_cap.py | 31 ++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 tests/test_convo_miner_size_cap.py diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index ba98d0e..3e7b5a4 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -55,7 +55,11 @@ CONVO_EXTENSIONS = { MIN_CHUNK_SIZE = 30 CHUNK_SIZE = 800 # chars per drawer — align with miner.py -MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this +MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. +# Matches miner.py at 500 MB. Long Claude Code sessions, multi-year +# ChatGPT exports, and lifetime Slack dumps routinely exceed 10 MB; the +# cap at that level silently dropped them with `continue`. Source size +# does not affect storage or embedding cost — chunking happens downstream. def _register_file(collection, source_file: str, wing: str, agent: str): diff --git a/tests/test_convo_miner_size_cap.py b/tests/test_convo_miner_size_cap.py new file mode 100644 index 0000000..ea0d6d1 --- /dev/null +++ b/tests/test_convo_miner_size_cap.py @@ -0,0 +1,31 @@ +"""TDD: convo_miner.py must not silently drop transcripts larger than 10 MB. + +Mirrors the miner.py fix shipped in the same PR family (see +test_miner_jsonl_visibility.py). Long Claude Code sessions, ChatGPT +exports, and multi-year Slack dumps routinely exceed 10 MB. The cap +silently `continue`s past them at convo_miner.py:~289, same silent-drop +pattern as the project miner's. + +Written BEFORE the fix. +""" + +from mempalace.convo_miner import MAX_FILE_SIZE + + +class TestConvoMinerSizeCap: + def test_max_file_size_accommodates_long_transcripts(self): + """The cap must be well above any realistic transcript. + + Long sessions and lifetime exports exceed 10 MB. The cap exists + as a sanity rail against pathological binaries, not as a limit + on legitimate text — downstream chunking means source size does + not matter for storage or embedding cost. + """ + assert MAX_FILE_SIZE >= 100 * 1024 * 1024, ( + f"convo_miner.MAX_FILE_SIZE is {MAX_FILE_SIZE} bytes " + f"({MAX_FILE_SIZE / 1024 / 1024:.0f} MB). Same silent-drop " + "bug as miner.py's old 10 MB cap — long transcripts get " + "filtered out at convo_miner.py:~289 with `continue`. " + "Raise to at least 100 MB (match miner.py at 500 MB for " + "consistency across both miners)." + ) From fed69935d343aba8f96b222b0f1f23e21e86583d Mon Sep 17 00:00:00 2001 From: MSL <232237854+milla-jovovich@users.noreply.github.com> Date: Sat, 18 Apr 2026 07:51:10 -0700 Subject: [PATCH 4/6] Add tandem sweeper: message-level safety net for dropped transcripts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The primary miners (miner.py, convo_miner.py) operate at file granularity and can drop data for several reasons: size caps, silent OSError on read, dedup false positives, extensions the project miner does not recognize. Even with tonight's hotfixes, any future bug in the file-level path risks silent data loss. The sweeper is a second, cooperating miner that works at MESSAGE granularity: - Parses Claude Code .jsonl line by line, yielding only user/assistant records (filters progress, file-history-snapshot, etc. noise). - For each session_id, queries the palace for max(timestamp) and treats that as the cursor. - Ingests only messages newer than the cursor, as one small drawer per exchange (never hits a size cap — each drawer is 1-5 KB). - Deterministic drawer IDs from session_id + message UUID make reruns idempotent; crash mid-sweep is safe. Tandem coordination is free: if the primary miner committed up to timestamp T, the sweeper resumes from T. If the primary miner missed everything, the sweeper catches it all. Neither duplicates the other. Smoke test on a real Claude Code transcript: 1st run: +39 drawers, 0 already present 2nd run: +0 drawers, 39 already present (perfect idempotence) Opt-in via: mempalace sweep mempalace sweep No changes to existing miners. No schema migration. Purely additive. Tests: tests/test_sweeper.py (7 tests covering parsing, tandem coordination, idempotency, resume-from-cursor, metadata correctness). Co-Authored-By: Claude Opus 4.7 (1M context) --- mempalace/cli.py | 41 +++++++ mempalace/sweeper.py | 263 ++++++++++++++++++++++++++++++++++++++++++ tests/test_sweeper.py | 184 +++++++++++++++++++++++++++++ 3 files changed, 488 insertions(+) create mode 100644 mempalace/sweeper.py create mode 100644 tests/test_sweeper.py diff --git a/mempalace/cli.py b/mempalace/cli.py index 69cd244..f1bc919 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -146,6 +146,35 @@ def cmd_mine(args): ) +def cmd_sweep(args): + """Sweep a transcript file or directory for anything the primary + miner missed. Coordinates via max(timestamp) per session_id, so + this is safe to run alongside the file-level miners — neither + duplicates the other's work. + """ + from .sweeper import sweep, sweep_directory + + palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path + target = os.path.expanduser(args.target) + + if os.path.isfile(target): + result = sweep(target, palace_path) + print( + f" Swept {target}: +{result['drawers_added']} drawers, " + f"{result['drawers_skipped']} already present." + ) + elif os.path.isdir(target): + result = sweep_directory(target, palace_path) + print( + f" Swept {result['files_processed']} files from {target}: " + f"+{result['drawers_added']} drawers, " + f"{result['drawers_skipped']} already present." + ) + else: + print(f" ✗ Not a file or directory: {target}", file=sys.stderr) + sys.exit(1) + + def cmd_search(args): from .searcher import search, SearchError @@ -547,6 +576,17 @@ def main(): help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)", ) + # sweep + p_sweep = sub.add_parser( + "sweep", + help="Tandem miner: catch anything the primary miner missed " + "(message-level, timestamp-coordinated, idempotent)", + ) + p_sweep.add_argument( + "target", + help="A .jsonl transcript file, or a directory to scan recursively", + ) + # search p_search = sub.add_parser("search", help="Find anything, exact words") p_search.add_argument("query", help="What to search for") @@ -679,6 +719,7 @@ def main(): "mine": cmd_mine, "split": cmd_split, "search": cmd_search, + "sweep": cmd_sweep, "mcp": cmd_mcp, "compress": cmd_compress, "wake-up": cmd_wakeup, diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py new file mode 100644 index 0000000..a46876b --- /dev/null +++ b/mempalace/sweeper.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +sweeper.py — Tandem miner that guarantees no conversation is silently +dropped. + +Works alongside miner.py / convo_miner.py via timestamp coordination: + + For each session in the transcript dir: + cursor = max(timestamp of drawers with matching session_id, "") + For each user/assistant message in the jsonl with timestamp > cursor: + write one small drawer (message_uuid as deterministic ID) + +Properties: + - Idempotent: rerunning on a fully-mined palace is a no-op. + - Resume-safe: crash mid-sweep → next run picks up from max-timestamp. + - Coordinates with primary miners for free: whichever got further + advances the cursor; the other starts from there next time. + - No size caps: each drawer holds one exchange, ~1-5 KB. + +Usage: + from mempalace.sweeper import sweep + result = sweep("/path/to/session.jsonl", "/path/to/palace") + # result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts} +""" + +from __future__ import annotations + +import json +import sys +from datetime import datetime +from pathlib import Path +from typing import Iterator, Optional + +from .palace import get_collection + + +# ── JSONL parsing ──────────────────────────────────────────────────── + +def _flatten_content(content) -> str: + """Normalize Claude Code's message content to a plain string. + + User messages are strings already; assistant messages are a list of + content blocks like [{"type": "text", "text": "..."}, {"type": + "tool_use", ...}]. We keep text blocks verbatim and describe non-text + blocks as a marker so the drawer carries a faithful record. + """ + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text": + parts.append(block.get("text", "")) + elif btype == "tool_use": + parts.append( + f"[tool_use: {block.get('name', '?')} " + f"input={json.dumps(block.get('input', {}), default=str)[:500]}]" + ) + elif btype == "tool_result": + parts.append( + f"[tool_result: {json.dumps(block.get('content', ''), default=str)[:500]}]" + ) + else: + parts.append(f"[{btype}]") + return "\n".join(p for p in parts if p) + return str(content) + + +def parse_claude_jsonl(path: str) -> Iterator[dict]: + """Yield user/assistant records from a Claude Code .jsonl file. + + Each yield is: + { + "session_id": str, + "uuid": str, # per-message UUID + "timestamp": str, # ISO 8601 + "role": "user" | "assistant", + "content": str, # flattened text + } + + Non-message records (progress, file-history-snapshot, system, + queue-operation, last-prompt) are filtered out. Malformed lines are + skipped silently — data quality is the transcript writer's problem, + not ours. + """ + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + rtype = record.get("type") + if rtype not in ("user", "assistant"): + continue + msg = record.get("message") or {} + if not isinstance(msg, dict): + continue + role = msg.get("role") + if role not in ("user", "assistant"): + continue + timestamp = record.get("timestamp") + if not timestamp: + continue + uuid = record.get("uuid") + if not uuid: + continue + session_id = record.get("sessionId") or record.get("session_id") + if not session_id: + continue + content = _flatten_content(msg.get("content", "")) + if not content.strip(): + continue + yield { + "session_id": session_id, + "uuid": uuid, + "timestamp": timestamp, + "role": role, + "content": content, + } + + +# ── Cursor resolution ──────────────────────────────────────────────── + +def get_palace_cursor(collection, session_id: str) -> Optional[str]: + """Return the max timestamp of drawers for this session_id, or None. + + ISO-8601 strings compare lexically in the right order, so we don't + need to parse them. Query scans metadatas for the session (ChromaDB + where-filter), then reduces. + """ + try: + data = collection.get( + where={"session_id": session_id}, + include=["metadatas"], + ) + except Exception: + return None + metas = data.get("metadatas") or [] + timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")] + if not timestamps: + return None + return max(timestamps) + + +# ── Sweep ──────────────────────────────────────────────────────────── + +def _drawer_id_for_message(session_id: str, message_uuid: str) -> str: + """Deterministic drawer ID so upserts at the same message are no-ops.""" + return f"sweep_{session_id[:12]}_{message_uuid}" + + +def sweep(jsonl_path: str, palace_path: str, + source_label: Optional[str] = None) -> dict: + """Ingest every user/assistant message not already represented. + + For each message in the jsonl: + - If timestamp <= cursor for that session, skip (already saved by + us or by primary miner). + - Else, upsert a drawer with deterministic ID so reruns dedupe. + + Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}. + """ + collection = get_collection(palace_path, create=True) + cursors: dict = {} + + drawers_added = 0 + drawers_skipped = 0 + + batch_ids = [] + batch_docs = [] + batch_metas = [] + BATCH_SIZE = 64 + + def _flush(): + nonlocal drawers_added + if not batch_ids: + return + collection.upsert( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + ) + drawers_added += len(batch_ids) + batch_ids.clear() + batch_docs.clear() + batch_metas.clear() + + for rec in parse_claude_jsonl(jsonl_path): + sid = rec["session_id"] + if sid not in cursors: + cursors[sid] = get_palace_cursor(collection, sid) + + cursor = cursors[sid] + if cursor is not None and rec["timestamp"] <= cursor: + drawers_skipped += 1 + continue + + drawer_id = _drawer_id_for_message(sid, rec["uuid"]) + document = f"{rec['role'].upper()}: {rec['content']}" + metadata = { + "session_id": sid, + "timestamp": rec["timestamp"], + "message_uuid": rec["uuid"], + "role": rec["role"], + "source_file": source_label or jsonl_path, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "sweep", + } + + batch_ids.append(drawer_id) + batch_docs.append(document) + batch_metas.append(metadata) + + if len(batch_ids) >= BATCH_SIZE: + _flush() + + _flush() + + return { + "drawers_added": drawers_added, + "drawers_skipped": drawers_skipped, + "cursor_by_session": cursors, + } + + +def sweep_directory(dir_path: str, palace_path: str) -> dict: + """Sweep every .jsonl file in a directory (recursive). + + Returns aggregated summary across all files. + """ + dir_p = Path(dir_path).expanduser().resolve() + files = sorted(dir_p.rglob("*.jsonl")) + + total_added = 0 + total_skipped = 0 + per_file = [] + + for f in files: + try: + result = sweep(str(f), palace_path, source_label=str(f)) + except Exception as exc: + print(f" ⚠ sweep failed on {f}: {exc}", file=sys.stderr) + continue + total_added += result["drawers_added"] + total_skipped += result["drawers_skipped"] + per_file.append({ + "file": str(f), + "added": result["drawers_added"], + "skipped": result["drawers_skipped"], + }) + + return { + "files_processed": len(per_file), + "drawers_added": total_added, + "drawers_skipped": total_skipped, + "per_file": per_file, + } diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py new file mode 100644 index 0000000..247125a --- /dev/null +++ b/tests/test_sweeper.py @@ -0,0 +1,184 @@ +"""TDD: tandem sweeper that catches what the primary miner missed. + +The primary miner (miner.py / convo_miner.py) runs at file granularity +and can drop data (size caps, silent OSError, dedup false-positives). +The sweeper is a second miner that works at MESSAGE granularity, +using timestamp as the coordination cursor. + +For each session in the transcript directory: + 1. Look up max(timestamp) across all drawers with matching session_id + 2. Stream the jsonl, yielding only user/assistant messages after the cursor + 3. Write one small drawer per message with: + session_id, uuid, timestamp, role, content + 4. Idempotent: re-running sweeps should find nothing new on a complete palace. + +This test file is TDD — written BEFORE mempalace/sweeper.py exists. +""" + +import json +import tempfile +from pathlib import Path + +import pytest + + +@pytest.fixture +def mock_claude_jsonl(tmp_path): + """Real Claude Code jsonl shape: user/assistant records among progress noise.""" + path = tmp_path / "session_abc.jsonl" + lines = [ + # Noise: progress event, no message + {"type": "progress", "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "abc", "uuid": "p-1"}, + # User message + {"type": "user", "timestamp": "2026-04-18T10:00:05Z", + "sessionId": "abc", "uuid": "u-1", + "message": {"role": "user", "content": "What's the capital of France?"}}, + # Assistant reply + {"type": "assistant", "timestamp": "2026-04-18T10:00:06Z", + "sessionId": "abc", "uuid": "a-1", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "Paris."}]}}, + # Noise: file-history-snapshot + {"type": "file-history-snapshot", "messageId": "abc-snap"}, + # Second user/assistant exchange + {"type": "user", "timestamp": "2026-04-18T10:01:00Z", + "sessionId": "abc", "uuid": "u-2", + "message": {"role": "user", "content": "And of Germany?"}}, + {"type": "assistant", "timestamp": "2026-04-18T10:01:01Z", + "sessionId": "abc", "uuid": "a-2", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "Berlin."}]}}, + ] + path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + return path + + +class TestSweeperParsing: + def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + roles = [r["role"] for r in records] + assert roles == ["user", "assistant", "user", "assistant"], ( + f"Expected 4 user/assistant in order, got {roles}. " + "Noise records (progress, file-history-snapshot) must be " + "filtered out." + ) + + def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + first = records[0] + assert first["session_id"] == "abc" + assert first["timestamp"] == "2026-04-18T10:00:05Z" + assert first["uuid"] == "u-1" + + def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + assistant_rec = records[1] + assert assistant_rec["role"] == "assistant" + assert "Paris" in assistant_rec["content"], ( + f"Assistant content blocks must be flattened to text; " + f"got: {assistant_rec['content']!r}" + ) + + +class TestSweeperTandem: + """The sweeper coordinates with other miners via max(timestamp).""" + + def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") + result = sweep(str(mock_claude_jsonl), palace_path) + assert result["drawers_added"] == 4, ( + f"Empty palace: all 4 user/assistant messages should ingest. " + f"Got drawers_added={result['drawers_added']}." + ) + + def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path): + """Running the sweep twice must not duplicate drawers.""" + from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") + first = sweep(str(mock_claude_jsonl), palace_path) + second = sweep(str(mock_claude_jsonl), palace_path) + assert first["drawers_added"] == 4 + assert second["drawers_added"] == 0, ( + f"Second sweep must be a no-op on unchanged data. " + f"Got drawers_added={second['drawers_added']} — " + "cursor logic is broken." + ) + + def test_sweep_resumes_from_cursor(self, tmp_path): + """If half the messages are already in the palace, sweep picks up + only the later half.""" + from mempalace.sweeper import sweep + + jsonl_path = tmp_path / "session.jsonl" + lines = [ + {"type": "user", "timestamp": "2026-04-18T09:00:00Z", + "sessionId": "s1", "uuid": "u1", + "message": {"role": "user", "content": "first"}}, + {"type": "assistant", "timestamp": "2026-04-18T09:00:01Z", + "sessionId": "s1", "uuid": "a1", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "one"}]}}, + ] + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + first = sweep(str(jsonl_path), palace_path) + assert first["drawers_added"] == 2 + + # Append two more exchanges simulating live session growth. + more_lines = [ + {"type": "user", "timestamp": "2026-04-18T09:05:00Z", + "sessionId": "s1", "uuid": "u2", + "message": {"role": "user", "content": "second"}}, + {"type": "assistant", "timestamp": "2026-04-18T09:05:01Z", + "sessionId": "s1", "uuid": "a2", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "two"}]}}, + ] + with open(jsonl_path, "a") as f: + for x in more_lines: + f.write(json.dumps(x) + "\n") + + second = sweep(str(jsonl_path), palace_path) + assert second["drawers_added"] == 2, ( + f"Second sweep should pick up only the 2 new exchanges, " + f"got {second['drawers_added']}. Cursor (max-timestamp) " + "coordination is broken." + ) + + +class TestSweeperDrawerMetadata: + """Each drawer must carry the metadata the tandem-miner coordination + depends on: session_id, timestamp, uuid, role.""" + + def test_drawer_has_session_id_and_timestamp_metadata( + self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + from mempalace.palace import get_collection + + palace_path = str(tmp_path / "palace") + sweep(str(mock_claude_jsonl), palace_path) + + col = get_collection(palace_path, create=False) + data = col.get(include=["metadatas"]) + metas = data["metadatas"] + assert metas, "No drawers written" + + for m in metas: + assert m.get("session_id") == "abc", ( + f"Drawer missing session_id metadata: {m}" + ) + assert m.get("timestamp"), ( + f"Drawer missing timestamp metadata: {m}" + ) + assert m.get("message_uuid"), ( + f"Drawer missing message_uuid metadata: {m}" + ) + assert m.get("role") in ("user", "assistant"), ( + f"Drawer missing or wrong role metadata: {m}" + ) From 29ce7c7135583f9cd55df6e4e9c0d9a74648f796 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Sat, 18 Apr 2026 12:58:33 -0300 Subject: [PATCH 5/6] Harden sweeper for production: verbatim tool blocks, full session_id, logged failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four changes on top of the proposal's initial sweeper draft, driven by the CLAUDE.md design principles: 1. Drop the 500-char truncation on tool_use / tool_result content in _flatten_content. The "verbatim always" principle forbids lossy compression of user-adjacent data; a long code-edit diff handed to the assistant must round-trip intact. Unknown block types now also serialize their full payload instead of just a type marker. New test test_parse_preserves_tool_blocks_verbatim covers a 5000-char input. 2. Use the full session_id in drawer IDs (not session_id[:12]). Rules out cross-session collisions if a transcript source ever uses non-UUID session identifiers or shared prefixes. 3. Replace silent `except Exception: return None` in get_palace_cursor with a logger.warning — the exact anti-pattern this PR otherwise criticizes in miner.py. The fallback behavior is still safe (deterministic IDs make a missed cursor recover on the next run), but the failure is now discoverable. 4. sweep_directory now collects per-file failures into the result dict and the CLI exits non-zero when any file failed, so a partial-sweep outcome is visible rather than swallowed. Co-Authored-By: MSL <232237854+milla-jovovich@users.noreply.github.com> --- mempalace/cli.py | 9 +- mempalace/sweeper.py | 67 +++++++---- tests/test_miner_jsonl_visibility.py | 2 + tests/test_sweeper.py | 166 +++++++++++++++++++-------- 4 files changed, 174 insertions(+), 70 deletions(-) diff --git a/mempalace/cli.py b/mempalace/cli.py index f1bc919..bec6d2b 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -170,6 +170,13 @@ def cmd_sweep(args): f"+{result['drawers_added']} drawers, " f"{result['drawers_skipped']} already present." ) + failures = result.get("failures") or [] + if failures: + print( + f" ⚠ {len(failures)} file(s) failed to sweep — see stderr / logs for details.", + file=sys.stderr, + ) + sys.exit(2) else: print(f" ✗ Not a file or directory: {target}", file=sys.stderr) sys.exit(1) @@ -580,7 +587,7 @@ def main(): p_sweep = sub.add_parser( "sweep", help="Tandem miner: catch anything the primary miner missed " - "(message-level, timestamp-coordinated, idempotent)", + "(message-level, timestamp-coordinated, idempotent)", ) p_sweep.add_argument( "target", diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py index a46876b..c4d9092 100644 --- a/mempalace/sweeper.py +++ b/mempalace/sweeper.py @@ -26,6 +26,7 @@ Usage: from __future__ import annotations import json +import logging import sys from datetime import datetime from pathlib import Path @@ -33,16 +34,20 @@ from typing import Iterator, Optional from .palace import get_collection +logger = logging.getLogger(__name__) + # ── JSONL parsing ──────────────────────────────────────────────────── + def _flatten_content(content) -> str: """Normalize Claude Code's message content to a plain string. User messages are strings already; assistant messages are a list of content blocks like [{"type": "text", "text": "..."}, {"type": - "tool_use", ...}]. We keep text blocks verbatim and describe non-text - blocks as a marker so the drawer carries a faithful record. + "tool_use", ...}]. All blocks are preserved verbatim — the design + principle is "verbatim always", so tool inputs and results are + serialized in full, never truncated. """ if isinstance(content, str): return content @@ -57,14 +62,12 @@ def _flatten_content(content) -> str: elif btype == "tool_use": parts.append( f"[tool_use: {block.get('name', '?')} " - f"input={json.dumps(block.get('input', {}), default=str)[:500]}]" + f"input={json.dumps(block.get('input', {}), default=str)}]" ) elif btype == "tool_result": - parts.append( - f"[tool_result: {json.dumps(block.get('content', ''), default=str)[:500]}]" - ) + parts.append(f"[tool_result: {json.dumps(block.get('content', ''), default=str)}]") else: - parts.append(f"[{btype}]") + parts.append(f"[{btype}: {json.dumps(block, default=str)}]") return "\n".join(p for p in parts if p) return str(content) @@ -127,19 +130,32 @@ def parse_claude_jsonl(path: str) -> Iterator[dict]: # ── Cursor resolution ──────────────────────────────────────────────── + def get_palace_cursor(collection, session_id: str) -> Optional[str]: """Return the max timestamp of drawers for this session_id, or None. ISO-8601 strings compare lexically in the right order, so we don't - need to parse them. Query scans metadatas for the session (ChromaDB - where-filter), then reduces. + need to parse them. Query scans metadatas for the session via the + backend's where-filter, then reduces. + + Backend errors are logged at WARNING and surface as a `None` cursor — + which makes the caller treat the session as empty and ingest every + message. That's intentional: a no-cursor sweep is recovered from on + the next run by deterministic drawer IDs, so a degraded cursor never + causes silent data loss. """ try: data = collection.get( where={"session_id": session_id}, include=["metadatas"], ) - except Exception: + except Exception as exc: + logger.warning( + "sweeper: cursor lookup failed for session_id=%s (%s); " + "treating as empty — drawers will be re-upserted idempotently.", + session_id, + exc, + ) return None metas = data.get("metadatas") or [] timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")] @@ -150,13 +166,18 @@ def get_palace_cursor(collection, session_id: str) -> Optional[str]: # ── Sweep ──────────────────────────────────────────────────────────── + def _drawer_id_for_message(session_id: str, message_uuid: str) -> str: - """Deterministic drawer ID so upserts at the same message are no-ops.""" - return f"sweep_{session_id[:12]}_{message_uuid}" + """Deterministic drawer ID so upserts at the same message are no-ops. + + Uses the full session_id (not a prefix) to avoid any cross-session + collision risk if a transcript source ever uses non-UUID session + identifiers or shares prefixes across sessions. + """ + return f"sweep_{session_id}_{message_uuid}" -def sweep(jsonl_path: str, palace_path: str, - source_label: Optional[str] = None) -> dict: +def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) -> dict: """Ingest every user/assistant message not already represented. For each message in the jsonl: @@ -241,23 +262,29 @@ def sweep_directory(dir_path: str, palace_path: str) -> dict: total_skipped = 0 per_file = [] + failures: list[dict] = [] for f in files: try: result = sweep(str(f), palace_path, source_label=str(f)) except Exception as exc: - print(f" ⚠ sweep failed on {f}: {exc}", file=sys.stderr) + logger.error("sweeper: sweep failed on %s: %s", f, exc) + print(f" \u26a0 sweep failed on {f}: {exc}", file=sys.stderr) + failures.append({"file": str(f), "error": str(exc)}) continue total_added += result["drawers_added"] total_skipped += result["drawers_skipped"] - per_file.append({ - "file": str(f), - "added": result["drawers_added"], - "skipped": result["drawers_skipped"], - }) + per_file.append( + { + "file": str(f), + "added": result["drawers_added"], + "skipped": result["drawers_skipped"], + } + ) return { "files_processed": len(per_file), "drawers_added": total_added, "drawers_skipped": total_skipped, "per_file": per_file, + "failures": failures, } diff --git a/tests/test_miner_jsonl_visibility.py b/tests/test_miner_jsonl_visibility.py index 2ef3e18..9db7fb5 100644 --- a/tests/test_miner_jsonl_visibility.py +++ b/tests/test_miner_jsonl_visibility.py @@ -108,9 +108,11 @@ class TestJsonlNotSilentlySkipped: def fake_stat(self, *args, **kwargs): result = real_stat(self, *args, **kwargs) if self.name == "big_transcript.jsonl": + class _FakeStat: st_size = fake_size st_mode = result.st_mode + return _FakeStat() return result diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py index 247125a..4ac8325 100644 --- a/tests/test_sweeper.py +++ b/tests/test_sweeper.py @@ -16,8 +16,6 @@ This test file is TDD — written BEFORE mempalace/sweeper.py exists. """ import json -import tempfile -from pathlib import Path import pytest @@ -28,27 +26,45 @@ def mock_claude_jsonl(tmp_path): path = tmp_path / "session_abc.jsonl" lines = [ # Noise: progress event, no message - {"type": "progress", "timestamp": "2026-04-18T10:00:00Z", - "sessionId": "abc", "uuid": "p-1"}, + { + "type": "progress", + "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "abc", + "uuid": "p-1", + }, # User message - {"type": "user", "timestamp": "2026-04-18T10:00:05Z", - "sessionId": "abc", "uuid": "u-1", - "message": {"role": "user", "content": "What's the capital of France?"}}, + { + "type": "user", + "timestamp": "2026-04-18T10:00:05Z", + "sessionId": "abc", + "uuid": "u-1", + "message": {"role": "user", "content": "What's the capital of France?"}, + }, # Assistant reply - {"type": "assistant", "timestamp": "2026-04-18T10:00:06Z", - "sessionId": "abc", "uuid": "a-1", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "Paris."}]}}, + { + "type": "assistant", + "timestamp": "2026-04-18T10:00:06Z", + "sessionId": "abc", + "uuid": "a-1", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Paris."}]}, + }, # Noise: file-history-snapshot {"type": "file-history-snapshot", "messageId": "abc-snap"}, # Second user/assistant exchange - {"type": "user", "timestamp": "2026-04-18T10:01:00Z", - "sessionId": "abc", "uuid": "u-2", - "message": {"role": "user", "content": "And of Germany?"}}, - {"type": "assistant", "timestamp": "2026-04-18T10:01:01Z", - "sessionId": "abc", "uuid": "a-2", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "Berlin."}]}}, + { + "type": "user", + "timestamp": "2026-04-18T10:01:00Z", + "sessionId": "abc", + "uuid": "u-2", + "message": {"role": "user", "content": "And of Germany?"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T10:01:01Z", + "sessionId": "abc", + "uuid": "a-2", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Berlin."}]}, + }, ] path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") return path @@ -57,6 +73,7 @@ def mock_claude_jsonl(tmp_path): class TestSweeperParsing: def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl): from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) roles = [r["role"] for r in records] assert roles == ["user", "assistant", "user", "assistant"], ( @@ -67,6 +84,7 @@ class TestSweeperParsing: def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl): from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) first = records[0] assert first["session_id"] == "abc" @@ -75,12 +93,52 @@ class TestSweeperParsing: def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl): from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) assistant_rec = records[1] assert assistant_rec["role"] == "assistant" - assert "Paris" in assistant_rec["content"], ( - f"Assistant content blocks must be flattened to text; " - f"got: {assistant_rec['content']!r}" + assert ( + "Paris" in assistant_rec["content"] + ), f"Assistant content blocks must be flattened to text; got: {assistant_rec['content']!r}" + + def test_parse_preserves_tool_blocks_verbatim(self, tmp_path): + """Per the design principle "verbatim always", tool_use and + tool_result blocks must NOT be truncated. A long tool input + (e.g. a large diff handed to a code-edit tool) must round-trip + in full, otherwise we silently lose user-adjacent data. + """ + import json as _json + + from mempalace.sweeper import parse_claude_jsonl + + big_input = {"diff": "x" * 5000} # well past the old 500-char cap + path = tmp_path / "session_tools.jsonl" + path.write_text( + _json.dumps( + { + "type": "assistant", + "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "tools-1", + "uuid": "a-tool", + "message": { + "role": "assistant", + "content": [ + {"type": "tool_use", "name": "Edit", "input": big_input}, + ], + }, + } + ) + + "\n" + ) + + records = list(parse_claude_jsonl(str(path))) + assert len(records) == 1 + content = records[0]["content"] + # The full 5000-char value must be present — no truncation marker, + # no [:500] slice. Look for the raw string in the serialized form. + assert big_input["diff"] in content, ( + "tool_use input was truncated. The verbatim guarantee requires " + f"the full payload to round-trip. Got len={len(content)}." ) @@ -89,6 +147,7 @@ class TestSweeperTandem: def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path): from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") result = sweep(str(mock_claude_jsonl), palace_path) assert result["drawers_added"] == 4, ( @@ -99,6 +158,7 @@ class TestSweeperTandem: def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path): """Running the sweep twice must not duplicate drawers.""" from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") first = sweep(str(mock_claude_jsonl), palace_path) second = sweep(str(mock_claude_jsonl), palace_path) @@ -116,13 +176,20 @@ class TestSweeperTandem: jsonl_path = tmp_path / "session.jsonl" lines = [ - {"type": "user", "timestamp": "2026-04-18T09:00:00Z", - "sessionId": "s1", "uuid": "u1", - "message": {"role": "user", "content": "first"}}, - {"type": "assistant", "timestamp": "2026-04-18T09:00:01Z", - "sessionId": "s1", "uuid": "a1", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "one"}]}}, + { + "type": "user", + "timestamp": "2026-04-18T09:00:00Z", + "sessionId": "s1", + "uuid": "u1", + "message": {"role": "user", "content": "first"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T09:00:01Z", + "sessionId": "s1", + "uuid": "a1", + "message": {"role": "assistant", "content": [{"type": "text", "text": "one"}]}, + }, ] jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") @@ -132,13 +199,20 @@ class TestSweeperTandem: # Append two more exchanges simulating live session growth. more_lines = [ - {"type": "user", "timestamp": "2026-04-18T09:05:00Z", - "sessionId": "s1", "uuid": "u2", - "message": {"role": "user", "content": "second"}}, - {"type": "assistant", "timestamp": "2026-04-18T09:05:01Z", - "sessionId": "s1", "uuid": "a2", - "message": {"role": "assistant", - "content": [{"type": "text", "text": "two"}]}}, + { + "type": "user", + "timestamp": "2026-04-18T09:05:00Z", + "sessionId": "s1", + "uuid": "u2", + "message": {"role": "user", "content": "second"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T09:05:01Z", + "sessionId": "s1", + "uuid": "a2", + "message": {"role": "assistant", "content": [{"type": "text", "text": "two"}]}, + }, ] with open(jsonl_path, "a") as f: for x in more_lines: @@ -156,8 +230,7 @@ class TestSweeperDrawerMetadata: """Each drawer must carry the metadata the tandem-miner coordination depends on: session_id, timestamp, uuid, role.""" - def test_drawer_has_session_id_and_timestamp_metadata( - self, mock_claude_jsonl, tmp_path): + def test_drawer_has_session_id_and_timestamp_metadata(self, mock_claude_jsonl, tmp_path): from mempalace.sweeper import sweep from mempalace.palace import get_collection @@ -170,15 +243,10 @@ class TestSweeperDrawerMetadata: assert metas, "No drawers written" for m in metas: - assert m.get("session_id") == "abc", ( - f"Drawer missing session_id metadata: {m}" - ) - assert m.get("timestamp"), ( - f"Drawer missing timestamp metadata: {m}" - ) - assert m.get("message_uuid"), ( - f"Drawer missing message_uuid metadata: {m}" - ) - assert m.get("role") in ("user", "assistant"), ( - f"Drawer missing or wrong role metadata: {m}" - ) + assert m.get("session_id") == "abc", f"Drawer missing session_id metadata: {m}" + assert m.get("timestamp"), f"Drawer missing timestamp metadata: {m}" + assert m.get("message_uuid"), f"Drawer missing message_uuid metadata: {m}" + assert m.get("role") in ( + "user", + "assistant", + ), f"Drawer missing or wrong role metadata: {m}" From 4a088ea8e102a08d76fe1dc5595136966bccc087 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Sat, 18 Apr 2026 13:22:18 -0300 Subject: [PATCH 6/6] Address Copilot review: cursor tie-break, honest metrics, accurate comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Six items from the automated review on PR #998: 1. **Cursor tie-break bug (correctness).** The skip condition was `rec.timestamp <= cursor`; if multiple messages share the max timestamp and only some were ingested before a crash, the rest would be lost forever. Changed to `< cursor`, relying on deterministic drawer IDs for safe re-attempt at the boundary. Regression test `test_sweep_recovers_untaken_message_at_cursor_timestamp`. 2. **`drawers_added` counted upserts, not adds.** Added a pre-flight `collection.get(ids=batch)` to distinguish new rows from already- present ones. Return value now carries `drawers_added`, `drawers_already_present`, `drawers_upserted`, and `drawers_skipped` separately. Dict-compatible access (`existing.get("ids")`) keeps it working on both the raw Chroma return and the typed `GetResult`. 3. **`sweep_directory` hid failures in the summary.** `files_processed` used to exclude failed files. Replaced with `files_attempted` (all discovered) + `files_succeeded` (subset that completed); CLI output shows `succeeded/attempted`. 4. **Coordination claim was overstated.** The primary miners don't stamp `session_id`/`timestamp` metadata, so the sweeper coordinates only with its own prior runs. Softened docstrings on module and CLI command. Uniform cross-miner metadata is flagged as a follow-up. 5. **MAX_FILE_SIZE comments were misleading.** Said source size "does not affect storage or embedding cost" — true per-drawer, but source size still scales drawer count, embedding work, and memory usage (files are read in full, not streamed). Corrected in both `miner.py` and `convo_miner.py`. 6. Added the tie-break regression test that reproduces the correctness bug from (1). Tests: 970 passed (was 969), ruff + pre-commit clean. Co-Authored-By: MSL <232237854+milla-jovovich@users.noreply.github.com> --- mempalace/cli.py | 24 +++++---- mempalace/convo_miner.py | 7 ++- mempalace/miner.py | 7 ++- mempalace/sweeper.py | 103 ++++++++++++++++++++++++++++++--------- tests/test_sweeper.py | 66 +++++++++++++++++++++++++ 5 files changed, 171 insertions(+), 36 deletions(-) diff --git a/mempalace/cli.py b/mempalace/cli.py index bec6d2b..fc69f24 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -147,10 +147,14 @@ def cmd_mine(args): def cmd_sweep(args): - """Sweep a transcript file or directory for anything the primary - miner missed. Coordinates via max(timestamp) per session_id, so - this is safe to run alongside the file-level miners — neither - duplicates the other's work. + """Sweep a transcript file or directory. + + The sweeper deduplicates against its own prior writes via + deterministic drawer IDs + a timestamp cursor. It does NOT currently + coordinate with the file-level miners (miner.py / convo_miner.py) — + those produce char-chunked drawers without compatible message + metadata, so running both miners may store overlapping content under + different IDs. """ from .sweeper import sweep, sweep_directory @@ -160,15 +164,17 @@ def cmd_sweep(args): if os.path.isfile(target): result = sweep(target, palace_path) print( - f" Swept {target}: +{result['drawers_added']} drawers, " - f"{result['drawers_skipped']} already present." + f" Swept {target}: +{result['drawers_added']} new, " + f"{result['drawers_already_present']} already present, " + f"{result['drawers_skipped']} skipped (< cursor)." ) elif os.path.isdir(target): result = sweep_directory(target, palace_path) print( - f" Swept {result['files_processed']} files from {target}: " - f"+{result['drawers_added']} drawers, " - f"{result['drawers_skipped']} already present." + f" Swept {result['files_succeeded']}/{result['files_attempted']} " + f"files from {target}: +{result['drawers_added']} new, " + f"{result['drawers_already_present']} already present, " + f"{result['drawers_skipped']} skipped (< cursor)." ) failures = result.get("failures") or [] if failures: diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index 3e7b5a4..02c1797 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -58,8 +58,11 @@ CHUNK_SIZE = 800 # chars per drawer — align with miner.py MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. # Matches miner.py at 500 MB. Long Claude Code sessions, multi-year # ChatGPT exports, and lifetime Slack dumps routinely exceed 10 MB; the -# cap at that level silently dropped them with `continue`. Source size -# does not affect storage or embedding cost — chunking happens downstream. +# cap at that level silently dropped them with `continue`. Per-drawer +# size is bounded by CHUNK_SIZE, but larger source files still produce +# more drawers and therefore more embedding/storage work — and content +# is normalized and loaded fully into memory before chunking, so memory +# use also scales with source size. def _register_file(collection, source_file: str, wing: str, agent: str): diff --git a/mempalace/miner.py b/mempalace/miner.py index 4e809a8..18e748c 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -66,8 +66,11 @@ MIN_CHUNK_SIZE = 50 # skip tiny chunks MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. # Long Claude Code sessions and large transcript exports routinely exceed # 10 MB. The cap exists as a defensive rail against pathological binary -# files, not as a limit on legitimate text. Chunking at 800 chars per -# drawer means source size does not affect storage or embedding cost. +# files, not as a limit on legitimate text. Per-drawer size is bounded +# by CHUNK_SIZE, but larger sources still produce proportionally more +# drawers and therefore more storage, embedding, and processing work — +# and file reads are not streamed (the whole content is loaded into +# memory before chunking), so memory use scales with source size too. # ============================================================================= diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py index c4d9092..ce87153 100644 --- a/mempalace/sweeper.py +++ b/mempalace/sweeper.py @@ -1,26 +1,39 @@ #!/usr/bin/env python3 """ -sweeper.py — Tandem miner that guarantees no conversation is silently -dropped. +sweeper.py — Message-granular miner that catches what the file-level +primary miners dropped. -Works alongside miner.py / convo_miner.py via timestamp coordination: +Algorithm, per session: - For each session in the transcript dir: - cursor = max(timestamp of drawers with matching session_id, "") - For each user/assistant message in the jsonl with timestamp > cursor: - write one small drawer (message_uuid as deterministic ID) + cursor = max(timestamp of sweeper-written drawers for this session_id) + For each user/assistant message in the jsonl: + if cursor is not None and message.timestamp < cursor: skip + else: upsert a drawer keyed by (session_id, message_uuid) Properties: - - Idempotent: rerunning on a fully-mined palace is a no-op. - - Resume-safe: crash mid-sweep → next run picks up from max-timestamp. - - Coordinates with primary miners for free: whichever got further - advances the cursor; the other starts from there next time. + + - Idempotent on its own writes: rerunning is a no-op because drawer + IDs are deterministic and existence is pre-checked before counting. + - Resume-safe: a crash mid-sweep is recovered on the next run — the + cursor advances to the last ingested timestamp and re-attempts at + that boundary are de-duped by the deterministic ID. + - Tie-break safe: uses ``< cursor`` (not ``<=``), so if multiple + messages share the max timestamp and only some were ingested, the + rest are still picked up on re-run. - No size caps: each drawer holds one exchange, ~1-5 KB. +Coordination with the primary file-level miners (``miner.py`` / +``convo_miner.py``) is limited: those miners chunk at a fixed char size +and do not currently stamp ``session_id``/``timestamp`` metadata that +the sweeper can key off. In practice the sweeper coordinates with its +own prior runs, and may ingest content that also got chunked into +primary-miner drawers (under different IDs). Follow-up: add uniform +``ingest_mode`` + message metadata to the primary miners so dedup spans +both paths. + Usage: from mempalace.sweeper import sweep result = sweep("/path/to/session.jsonl", "/path/to/palace") - # result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts} """ from __future__ import annotations @@ -181,33 +194,67 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) """Ingest every user/assistant message not already represented. For each message in the jsonl: - - If timestamp <= cursor for that session, skip (already saved by - us or by primary miner). + - If timestamp < cursor for that session, skip (strictly earlier + than anything already in the palace — already covered). + - At timestamp == cursor we do NOT skip, because multiple messages + can share the same ISO-8601 timestamp; if only some of them were + ingested before a crash, a `<= cursor` skip would lose the rest + forever. Deterministic drawer IDs make re-attempting at the + cursor boundary safe (existing rows are found via a pre-flight + `get(ids=...)` and counted as "already present", not "added"). - Else, upsert a drawer with deterministic ID so reruns dedupe. - Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}. + Returns ``{drawers_added, drawers_already_present, drawers_skipped, + drawers_upserted, cursor_by_session}``: + + * ``drawers_added`` — rows that did not exist before this sweep. + * ``drawers_already_present`` — rows whose deterministic ID was + already in the palace and got rewritten idempotently. + * ``drawers_skipped`` — records skipped by the cursor (strictly + earlier than what's already stored). + * ``drawers_upserted`` — total writes = added + already_present. """ collection = get_collection(palace_path, create=True) cursors: dict = {} drawers_added = 0 + drawers_already_present = 0 drawers_skipped = 0 - batch_ids = [] - batch_docs = [] - batch_metas = [] + batch_ids: list[str] = [] + batch_docs: list[str] = [] + batch_metas: list[dict] = [] BATCH_SIZE = 64 def _flush(): - nonlocal drawers_added + nonlocal drawers_added, drawers_already_present if not batch_ids: return + # Pre-flight: which IDs in this batch are already present? + # Upsert is idempotent on data but counts as "added" would lie; + # this pre-query makes the metric honest (Copilot PR 998 review). + try: + existing = collection.get(ids=list(batch_ids), include=[]) + # Chroma returns a dict; typed backends return GetResult — the + # compat shim makes ``.get("ids")`` work on both. + present = set(existing.get("ids") or []) + except Exception as exc: + logger.warning( + "sweeper: existence pre-check failed (%s); " + "counting all batch rows as new (metric may over-count on reruns).", + exc, + ) + present = set() + new_count = sum(1 for rid in batch_ids if rid not in present) + already_count = len(batch_ids) - new_count + collection.upsert( ids=batch_ids, documents=batch_docs, metadatas=batch_metas, ) - drawers_added += len(batch_ids) + drawers_added += new_count + drawers_already_present += already_count batch_ids.clear() batch_docs.clear() batch_metas.clear() @@ -218,7 +265,7 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) cursors[sid] = get_palace_cursor(collection, sid) cursor = cursors[sid] - if cursor is not None and rec["timestamp"] <= cursor: + if cursor is not None and rec["timestamp"] < cursor: drawers_skipped += 1 continue @@ -245,6 +292,8 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) return { "drawers_added": drawers_added, + "drawers_already_present": drawers_already_present, + "drawers_upserted": drawers_added + drawers_already_present, "drawers_skipped": drawers_skipped, "cursor_by_session": cursors, } @@ -253,12 +302,16 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) def sweep_directory(dir_path: str, palace_path: str) -> dict: """Sweep every .jsonl file in a directory (recursive). - Returns aggregated summary across all files. + Returns aggregated summary across all files. ``files_attempted`` + includes files that raised, so the count reflects discovery rather + than only successes; ``files_succeeded`` is the subset that + completed without error. """ dir_p = Path(dir_path).expanduser().resolve() files = sorted(dir_p.rglob("*.jsonl")) total_added = 0 + total_already_present = 0 total_skipped = 0 per_file = [] @@ -272,18 +325,22 @@ def sweep_directory(dir_path: str, palace_path: str) -> dict: failures.append({"file": str(f), "error": str(exc)}) continue total_added += result["drawers_added"] + total_already_present += result.get("drawers_already_present", 0) total_skipped += result["drawers_skipped"] per_file.append( { "file": str(f), "added": result["drawers_added"], + "already_present": result.get("drawers_already_present", 0), "skipped": result["drawers_skipped"], } ) return { - "files_processed": len(per_file), + "files_attempted": len(files), + "files_succeeded": len(per_file), "drawers_added": total_added, + "drawers_already_present": total_already_present, "drawers_skipped": total_skipped, "per_file": per_file, "failures": failures, diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py index 4ac8325..983724a 100644 --- a/tests/test_sweeper.py +++ b/tests/test_sweeper.py @@ -225,6 +225,72 @@ class TestSweeperTandem: "coordination is broken." ) + def test_sweep_recovers_untaken_message_at_cursor_timestamp(self, tmp_path): + """Regression for Copilot PR #998 review: with a `<= cursor` skip, + any message sharing the max timestamp but not yet ingested (e.g. + crash mid-batch) would be lost forever. The skip must be `<` and + tie-break via deterministic drawer ID. + + Scenario: three messages share timestamp T. First sweep ingests + two of them and the process dies before the third. Second sweep + must pick up the third — not skip it because cursor == T. + """ + from mempalace.palace import get_collection + from mempalace.sweeper import ( + _drawer_id_for_message, + parse_claude_jsonl, + sweep, + ) + + shared_ts = "2026-04-18T11:00:00Z" + lines = [ + { + "type": "user", + "timestamp": shared_ts, + "sessionId": "s-tie", + "uuid": f"u-{i}", + "message": {"role": "user", "content": f"msg {i}"}, + } + for i in range(3) + ] + jsonl_path = tmp_path / "tied.jsonl" + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + # Simulate a partial ingest: write 2 of 3 directly via the backend + # with the same drawer IDs the sweeper would use. + col = get_collection(palace_path, create=True) + recs = list(parse_claude_jsonl(str(jsonl_path))) + partial_ids = [_drawer_id_for_message(r["session_id"], r["uuid"]) for r in recs[:2]] + col.upsert( + ids=partial_ids, + documents=[f"USER: {r['content']}" for r in recs[:2]], + metadatas=[ + { + "session_id": r["session_id"], + "timestamp": r["timestamp"], + "message_uuid": r["uuid"], + "role": r["role"], + "ingest_mode": "sweep", + } + for r in recs[:2] + ], + ) + + # Now run the sweeper. It must pick up the 3rd message, not skip + # it because cursor == its timestamp. + result = sweep(str(jsonl_path), palace_path) + assert result["drawers_added"] == 1, ( + f"Sweeper lost the untaken message at cursor timestamp. " + f"Expected drawers_added=1 (the 3rd record), got " + f"{result['drawers_added']}. Cursor skip is still `<=` " + "instead of `<`, or tie-break via drawer-id is broken." + ) + assert result["drawers_already_present"] == 2, ( + f"Expected 2 drawers already present (the partial ingest), " + f"got {result['drawers_already_present']}." + ) + class TestSweeperDrawerMetadata: """Each drawer must carry the metadata the tandem-miner coordination