diff --git a/mempalace/cli.py b/mempalace/cli.py index 69cd244..fc69f24 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -146,6 +146,48 @@ def cmd_mine(args): ) +def cmd_sweep(args): + """Sweep a transcript file or directory. + + The sweeper deduplicates against its own prior writes via + deterministic drawer IDs + a timestamp cursor. It does NOT currently + coordinate with the file-level miners (miner.py / convo_miner.py) — + those produce char-chunked drawers without compatible message + metadata, so running both miners may store overlapping content under + different IDs. + """ + from .sweeper import sweep, sweep_directory + + palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path + target = os.path.expanduser(args.target) + + if os.path.isfile(target): + result = sweep(target, palace_path) + print( + f" Swept {target}: +{result['drawers_added']} new, " + f"{result['drawers_already_present']} already present, " + f"{result['drawers_skipped']} skipped (< cursor)." + ) + elif os.path.isdir(target): + result = sweep_directory(target, palace_path) + print( + f" Swept {result['files_succeeded']}/{result['files_attempted']} " + f"files from {target}: +{result['drawers_added']} new, " + f"{result['drawers_already_present']} already present, " + f"{result['drawers_skipped']} skipped (< cursor)." + ) + failures = result.get("failures") or [] + if failures: + print( + f" ⚠ {len(failures)} file(s) failed to sweep — see stderr / logs for details.", + file=sys.stderr, + ) + sys.exit(2) + else: + print(f" ✗ Not a file or directory: {target}", file=sys.stderr) + sys.exit(1) + + def cmd_search(args): from .searcher import search, SearchError @@ -547,6 +589,17 @@ def main(): help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)", ) + # sweep + p_sweep = sub.add_parser( + "sweep", + help="Tandem miner: catch anything the primary miner missed " + "(message-level, timestamp-coordinated, idempotent)", + ) + p_sweep.add_argument( + "target", + help="A .jsonl transcript file, or a directory to scan recursively", + ) + # search p_search = sub.add_parser("search", help="Find anything, exact words") p_search.add_argument("query", help="What to search for") @@ -679,6 +732,7 @@ def main(): "mine": cmd_mine, "split": cmd_split, "search": cmd_search, + "sweep": cmd_sweep, "mcp": cmd_mcp, "compress": cmd_compress, "wake-up": cmd_wakeup, diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index ba98d0e..02c1797 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -55,7 +55,14 @@ CONVO_EXTENSIONS = { MIN_CHUNK_SIZE = 30 CHUNK_SIZE = 800 # chars per drawer — align with miner.py -MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this +MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. +# Matches miner.py at 500 MB. Long Claude Code sessions, multi-year +# ChatGPT exports, and lifetime Slack dumps routinely exceed 10 MB; the +# cap at that level silently dropped them with `continue`. Per-drawer +# size is bounded by CHUNK_SIZE, but larger source files still produce +# more drawers and therefore more embedding/storage work — and content +# is normalized and loaded fully into memory before chunking, so memory +# use also scales with source size. def _register_file(collection, source_file: str, wing: str, agent: str): diff --git a/mempalace/miner.py b/mempalace/miner.py index ed48cf1..18e748c 100644 --- a/mempalace/miner.py +++ b/mempalace/miner.py @@ -36,6 +36,7 @@ READABLE_EXTENSIONS = { ".jsx", ".tsx", ".json", + ".jsonl", ".yaml", ".yml", ".html", @@ -62,7 +63,14 @@ SKIP_FILENAMES = { CHUNK_SIZE = 800 # chars per drawer CHUNK_OVERLAP = 100 # overlap between chunks MIN_CHUNK_SIZE = 50 # skip tiny chunks -MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this +MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. +# Long Claude Code sessions and large transcript exports routinely exceed +# 10 MB. The cap exists as a defensive rail against pathological binary +# files, not as a limit on legitimate text. Per-drawer size is bounded +# by CHUNK_SIZE, but larger sources still produce proportionally more +# drawers and therefore more storage, embedding, and processing work — +# and file reads are not streamed (the whole content is loaded into +# memory before chunking), so memory use scales with source size too. # ============================================================================= diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py new file mode 100644 index 0000000..ce87153 --- /dev/null +++ b/mempalace/sweeper.py @@ -0,0 +1,347 @@ +#!/usr/bin/env python3 +""" +sweeper.py — Message-granular miner that catches what the file-level +primary miners dropped. + +Algorithm, per session: + + cursor = max(timestamp of sweeper-written drawers for this session_id) + For each user/assistant message in the jsonl: + if cursor is not None and message.timestamp < cursor: skip + else: upsert a drawer keyed by (session_id, message_uuid) + +Properties: + + - Idempotent on its own writes: rerunning is a no-op because drawer + IDs are deterministic and existence is pre-checked before counting. + - Resume-safe: a crash mid-sweep is recovered on the next run — the + cursor advances to the last ingested timestamp and re-attempts at + that boundary are de-duped by the deterministic ID. + - Tie-break safe: uses ``< cursor`` (not ``<=``), so if multiple + messages share the max timestamp and only some were ingested, the + rest are still picked up on re-run. + - No size caps: each drawer holds one exchange, ~1-5 KB. + +Coordination with the primary file-level miners (``miner.py`` / +``convo_miner.py``) is limited: those miners chunk at a fixed char size +and do not currently stamp ``session_id``/``timestamp`` metadata that +the sweeper can key off. In practice the sweeper coordinates with its +own prior runs, and may ingest content that also got chunked into +primary-miner drawers (under different IDs). Follow-up: add uniform +``ingest_mode`` + message metadata to the primary miners so dedup spans +both paths. + +Usage: + from mempalace.sweeper import sweep + result = sweep("/path/to/session.jsonl", "/path/to/palace") +""" + +from __future__ import annotations + +import json +import logging +import sys +from datetime import datetime +from pathlib import Path +from typing import Iterator, Optional + +from .palace import get_collection + +logger = logging.getLogger(__name__) + + +# ── JSONL parsing ──────────────────────────────────────────────────── + + +def _flatten_content(content) -> str: + """Normalize Claude Code's message content to a plain string. + + User messages are strings already; assistant messages are a list of + content blocks like [{"type": "text", "text": "..."}, {"type": + "tool_use", ...}]. All blocks are preserved verbatim — the design + principle is "verbatim always", so tool inputs and results are + serialized in full, never truncated. + """ + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text": + parts.append(block.get("text", "")) + elif btype == "tool_use": + parts.append( + f"[tool_use: {block.get('name', '?')} " + f"input={json.dumps(block.get('input', {}), default=str)}]" + ) + elif btype == "tool_result": + parts.append(f"[tool_result: {json.dumps(block.get('content', ''), default=str)}]") + else: + parts.append(f"[{btype}: {json.dumps(block, default=str)}]") + return "\n".join(p for p in parts if p) + return str(content) + + +def parse_claude_jsonl(path: str) -> Iterator[dict]: + """Yield user/assistant records from a Claude Code .jsonl file. + + Each yield is: + { + "session_id": str, + "uuid": str, # per-message UUID + "timestamp": str, # ISO 8601 + "role": "user" | "assistant", + "content": str, # flattened text + } + + Non-message records (progress, file-history-snapshot, system, + queue-operation, last-prompt) are filtered out. Malformed lines are + skipped silently — data quality is the transcript writer's problem, + not ours. + """ + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + rtype = record.get("type") + if rtype not in ("user", "assistant"): + continue + msg = record.get("message") or {} + if not isinstance(msg, dict): + continue + role = msg.get("role") + if role not in ("user", "assistant"): + continue + timestamp = record.get("timestamp") + if not timestamp: + continue + uuid = record.get("uuid") + if not uuid: + continue + session_id = record.get("sessionId") or record.get("session_id") + if not session_id: + continue + content = _flatten_content(msg.get("content", "")) + if not content.strip(): + continue + yield { + "session_id": session_id, + "uuid": uuid, + "timestamp": timestamp, + "role": role, + "content": content, + } + + +# ── Cursor resolution ──────────────────────────────────────────────── + + +def get_palace_cursor(collection, session_id: str) -> Optional[str]: + """Return the max timestamp of drawers for this session_id, or None. + + ISO-8601 strings compare lexically in the right order, so we don't + need to parse them. Query scans metadatas for the session via the + backend's where-filter, then reduces. + + Backend errors are logged at WARNING and surface as a `None` cursor — + which makes the caller treat the session as empty and ingest every + message. That's intentional: a no-cursor sweep is recovered from on + the next run by deterministic drawer IDs, so a degraded cursor never + causes silent data loss. + """ + try: + data = collection.get( + where={"session_id": session_id}, + include=["metadatas"], + ) + except Exception as exc: + logger.warning( + "sweeper: cursor lookup failed for session_id=%s (%s); " + "treating as empty — drawers will be re-upserted idempotently.", + session_id, + exc, + ) + return None + metas = data.get("metadatas") or [] + timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")] + if not timestamps: + return None + return max(timestamps) + + +# ── Sweep ──────────────────────────────────────────────────────────── + + +def _drawer_id_for_message(session_id: str, message_uuid: str) -> str: + """Deterministic drawer ID so upserts at the same message are no-ops. + + Uses the full session_id (not a prefix) to avoid any cross-session + collision risk if a transcript source ever uses non-UUID session + identifiers or shares prefixes across sessions. + """ + return f"sweep_{session_id}_{message_uuid}" + + +def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None) -> dict: + """Ingest every user/assistant message not already represented. + + For each message in the jsonl: + - If timestamp < cursor for that session, skip (strictly earlier + than anything already in the palace — already covered). + - At timestamp == cursor we do NOT skip, because multiple messages + can share the same ISO-8601 timestamp; if only some of them were + ingested before a crash, a `<= cursor` skip would lose the rest + forever. Deterministic drawer IDs make re-attempting at the + cursor boundary safe (existing rows are found via a pre-flight + `get(ids=...)` and counted as "already present", not "added"). + - Else, upsert a drawer with deterministic ID so reruns dedupe. + + Returns ``{drawers_added, drawers_already_present, drawers_skipped, + drawers_upserted, cursor_by_session}``: + + * ``drawers_added`` — rows that did not exist before this sweep. + * ``drawers_already_present`` — rows whose deterministic ID was + already in the palace and got rewritten idempotently. + * ``drawers_skipped`` — records skipped by the cursor (strictly + earlier than what's already stored). + * ``drawers_upserted`` — total writes = added + already_present. + """ + collection = get_collection(palace_path, create=True) + cursors: dict = {} + + drawers_added = 0 + drawers_already_present = 0 + drawers_skipped = 0 + + batch_ids: list[str] = [] + batch_docs: list[str] = [] + batch_metas: list[dict] = [] + BATCH_SIZE = 64 + + def _flush(): + nonlocal drawers_added, drawers_already_present + if not batch_ids: + return + # Pre-flight: which IDs in this batch are already present? + # Upsert is idempotent on data but counts as "added" would lie; + # this pre-query makes the metric honest (Copilot PR 998 review). + try: + existing = collection.get(ids=list(batch_ids), include=[]) + # Chroma returns a dict; typed backends return GetResult — the + # compat shim makes ``.get("ids")`` work on both. + present = set(existing.get("ids") or []) + except Exception as exc: + logger.warning( + "sweeper: existence pre-check failed (%s); " + "counting all batch rows as new (metric may over-count on reruns).", + exc, + ) + present = set() + new_count = sum(1 for rid in batch_ids if rid not in present) + already_count = len(batch_ids) - new_count + + collection.upsert( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + ) + drawers_added += new_count + drawers_already_present += already_count + batch_ids.clear() + batch_docs.clear() + batch_metas.clear() + + for rec in parse_claude_jsonl(jsonl_path): + sid = rec["session_id"] + if sid not in cursors: + cursors[sid] = get_palace_cursor(collection, sid) + + cursor = cursors[sid] + if cursor is not None and rec["timestamp"] < cursor: + drawers_skipped += 1 + continue + + drawer_id = _drawer_id_for_message(sid, rec["uuid"]) + document = f"{rec['role'].upper()}: {rec['content']}" + metadata = { + "session_id": sid, + "timestamp": rec["timestamp"], + "message_uuid": rec["uuid"], + "role": rec["role"], + "source_file": source_label or jsonl_path, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "sweep", + } + + batch_ids.append(drawer_id) + batch_docs.append(document) + batch_metas.append(metadata) + + if len(batch_ids) >= BATCH_SIZE: + _flush() + + _flush() + + return { + "drawers_added": drawers_added, + "drawers_already_present": drawers_already_present, + "drawers_upserted": drawers_added + drawers_already_present, + "drawers_skipped": drawers_skipped, + "cursor_by_session": cursors, + } + + +def sweep_directory(dir_path: str, palace_path: str) -> dict: + """Sweep every .jsonl file in a directory (recursive). + + Returns aggregated summary across all files. ``files_attempted`` + includes files that raised, so the count reflects discovery rather + than only successes; ``files_succeeded`` is the subset that + completed without error. + """ + dir_p = Path(dir_path).expanduser().resolve() + files = sorted(dir_p.rglob("*.jsonl")) + + total_added = 0 + total_already_present = 0 + total_skipped = 0 + per_file = [] + + failures: list[dict] = [] + for f in files: + try: + result = sweep(str(f), palace_path, source_label=str(f)) + except Exception as exc: + logger.error("sweeper: sweep failed on %s: %s", f, exc) + print(f" \u26a0 sweep failed on {f}: {exc}", file=sys.stderr) + failures.append({"file": str(f), "error": str(exc)}) + continue + total_added += result["drawers_added"] + total_already_present += result.get("drawers_already_present", 0) + total_skipped += result["drawers_skipped"] + per_file.append( + { + "file": str(f), + "added": result["drawers_added"], + "already_present": result.get("drawers_already_present", 0), + "skipped": result["drawers_skipped"], + } + ) + + return { + "files_attempted": len(files), + "files_succeeded": len(per_file), + "drawers_added": total_added, + "drawers_already_present": total_already_present, + "drawers_skipped": total_skipped, + "per_file": per_file, + "failures": failures, + } diff --git a/tests/test_convo_miner_size_cap.py b/tests/test_convo_miner_size_cap.py new file mode 100644 index 0000000..ea0d6d1 --- /dev/null +++ b/tests/test_convo_miner_size_cap.py @@ -0,0 +1,31 @@ +"""TDD: convo_miner.py must not silently drop transcripts larger than 10 MB. + +Mirrors the miner.py fix shipped in the same PR family (see +test_miner_jsonl_visibility.py). Long Claude Code sessions, ChatGPT +exports, and multi-year Slack dumps routinely exceed 10 MB. The cap +silently `continue`s past them at convo_miner.py:~289, same silent-drop +pattern as the project miner's. + +Written BEFORE the fix. +""" + +from mempalace.convo_miner import MAX_FILE_SIZE + + +class TestConvoMinerSizeCap: + def test_max_file_size_accommodates_long_transcripts(self): + """The cap must be well above any realistic transcript. + + Long sessions and lifetime exports exceed 10 MB. The cap exists + as a sanity rail against pathological binaries, not as a limit + on legitimate text — downstream chunking means source size does + not matter for storage or embedding cost. + """ + assert MAX_FILE_SIZE >= 100 * 1024 * 1024, ( + f"convo_miner.MAX_FILE_SIZE is {MAX_FILE_SIZE} bytes " + f"({MAX_FILE_SIZE / 1024 / 1024:.0f} MB). Same silent-drop " + "bug as miner.py's old 10 MB cap — long transcripts get " + "filtered out at convo_miner.py:~289 with `continue`. " + "Raise to at least 100 MB (match miner.py at 500 MB for " + "consistency across both miners)." + ) diff --git a/tests/test_miner_jsonl_visibility.py b/tests/test_miner_jsonl_visibility.py new file mode 100644 index 0000000..9db7fb5 --- /dev/null +++ b/tests/test_miner_jsonl_visibility.py @@ -0,0 +1,126 @@ +"""TDD: miner.py must not silently drop .jsonl files. + +The project miner (mempalace.miner.scan_project) walks a directory and +keeps only files whose suffix is in READABLE_EXTENSIONS. The whitelist +contains `.json` but NOT `.jsonl`. Every ChatGPT export, Claude Code +transcript, or any other jsonl transcript dumped into a project +directory is silently dropped with no user-visible output. + +Two paths to fix this, both tested here: + + 1. READABLE_EXTENSIONS must include `.jsonl` so the file is at least + readable as text (jsonl is line-delimited JSON — each line is + already valid text for embedding). + 2. OR scan_project must surface skipped .jsonl files to the user so + they know to use `--mode convos`. + +We test (1) — include .jsonl in READABLE_EXTENSIONS. This matches how +`.json` is already handled: the miner doesn't care what the structure +is, it chunks the text. + +Written BEFORE the fix. +""" + +import tempfile +from pathlib import Path +from unittest.mock import patch + +from mempalace.miner import MAX_FILE_SIZE, READABLE_EXTENSIONS, scan_project + + +class TestJsonlNotSilentlySkipped: + def test_jsonl_in_readable_extensions(self): + """`.jsonl` must be in the readable-extensions whitelist. + + `.json` is already there (see mempalace/miner.py:30). `.jsonl` + is conceptually the same thing — line-delimited JSON — and all + of Claude Code's transcripts, ChatGPT exports, and similar + tooling writes `.jsonl`. Excluding it silently drops user data. + """ + assert ".jsonl" in READABLE_EXTENSIONS, ( + "mempalace/miner.py:READABLE_EXTENSIONS contains `.json` " + "but NOT `.jsonl`. Every jsonl file in a mined project is " + "silently skipped at miner.py:722 " + "(`if filepath.suffix.lower() not in READABLE_EXTENSIONS: " + "continue`). This causes the 'convos not being saved' bug " + "reported by users — the hook fires `mempalace mine`, the " + "miner walks the directory, skips every .jsonl file, exits " + "cleanly. No warning, no log line, user sees nothing wrong. " + "Add `.jsonl` to READABLE_EXTENSIONS." + ) + + def test_scan_project_picks_up_jsonl_file(self): + """scan_project should find .jsonl files in the target dir.""" + with tempfile.TemporaryDirectory() as tmp: + tmpdir = Path(tmp) + jsonl_path = tmpdir / "transcript.jsonl" + jsonl_path.write_text( + '{"role": "user", "content": "hello"}\n' + '{"role": "assistant", "content": "hi there"}\n' + '{"role": "user", "content": "how do I install this"}\n' + '{"role": "assistant", "content": "pip install mempalace"}\n' + ) + + found = scan_project(str(tmpdir)) + found_names = [p.name for p in found] + assert "transcript.jsonl" in found_names, ( + "scan_project silently dropped transcript.jsonl. " + f"Returned: {found_names}. Users placing transcript " + "exports in a project directory expect them to be mined." + ) + + def test_large_jsonl_not_silently_dropped_by_size_cap(self): + """Long sessions produce >10 MB transcripts. They must still mine. + + The legacy cap was 10 MB, which is smaller than a long Claude Code + session's transcript. Users hitting the cap lost their entire + conversation to a silent `if size > MAX: continue` at miner.py:732. + Raise the cap well above any realistic transcript size. + """ + # 10 MB cap was silent failure — real Claude Code long sessions + # exceed this. The cap must accommodate them. + assert MAX_FILE_SIZE >= 100 * 1024 * 1024, ( + f"MAX_FILE_SIZE is {MAX_FILE_SIZE} bytes " + f"({MAX_FILE_SIZE / 1024 / 1024:.0f} MB). Long Claude Code " + "sessions produce transcripts larger than 10 MB and get " + "silently dropped. Raise to at least 100 MB — chunking " + "at 800 chars per drawer means source file size doesn't " + "matter for downstream storage." + ) + + def test_scan_project_picks_up_50mb_jsonl(self): + """A 50 MB .jsonl must not be filtered out by the size cap. + + We don't actually write 50 MB (slow test). Instead, we mock + stat().st_size to report a 50 MB file and confirm scan_project + still includes it. + """ + with tempfile.TemporaryDirectory() as tmp: + tmpdir = Path(tmp) + big_jsonl = tmpdir / "big_transcript.jsonl" + # Write a small real file so the existence / extension / text + # checks pass; then mock its reported size. + big_jsonl.write_text('{"role": "user", "content": "hi"}\n') + fake_size = 50 * 1024 * 1024 # 50 MB + + real_stat = Path.stat + + def fake_stat(self, *args, **kwargs): + result = real_stat(self, *args, **kwargs) + if self.name == "big_transcript.jsonl": + + class _FakeStat: + st_size = fake_size + st_mode = result.st_mode + + return _FakeStat() + return result + + with patch.object(Path, "stat", fake_stat): + found = scan_project(str(tmpdir)) + + found_names = [p.name for p in found] + assert "big_transcript.jsonl" in found_names, ( + f"50 MB .jsonl was dropped by size cap (MAX_FILE_SIZE=" + f"{MAX_FILE_SIZE}). Returned: {found_names}." + ) diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py new file mode 100644 index 0000000..983724a --- /dev/null +++ b/tests/test_sweeper.py @@ -0,0 +1,318 @@ +"""TDD: tandem sweeper that catches what the primary miner missed. + +The primary miner (miner.py / convo_miner.py) runs at file granularity +and can drop data (size caps, silent OSError, dedup false-positives). +The sweeper is a second miner that works at MESSAGE granularity, +using timestamp as the coordination cursor. + +For each session in the transcript directory: + 1. Look up max(timestamp) across all drawers with matching session_id + 2. Stream the jsonl, yielding only user/assistant messages after the cursor + 3. Write one small drawer per message with: + session_id, uuid, timestamp, role, content + 4. Idempotent: re-running sweeps should find nothing new on a complete palace. + +This test file is TDD — written BEFORE mempalace/sweeper.py exists. +""" + +import json + +import pytest + + +@pytest.fixture +def mock_claude_jsonl(tmp_path): + """Real Claude Code jsonl shape: user/assistant records among progress noise.""" + path = tmp_path / "session_abc.jsonl" + lines = [ + # Noise: progress event, no message + { + "type": "progress", + "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "abc", + "uuid": "p-1", + }, + # User message + { + "type": "user", + "timestamp": "2026-04-18T10:00:05Z", + "sessionId": "abc", + "uuid": "u-1", + "message": {"role": "user", "content": "What's the capital of France?"}, + }, + # Assistant reply + { + "type": "assistant", + "timestamp": "2026-04-18T10:00:06Z", + "sessionId": "abc", + "uuid": "a-1", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Paris."}]}, + }, + # Noise: file-history-snapshot + {"type": "file-history-snapshot", "messageId": "abc-snap"}, + # Second user/assistant exchange + { + "type": "user", + "timestamp": "2026-04-18T10:01:00Z", + "sessionId": "abc", + "uuid": "u-2", + "message": {"role": "user", "content": "And of Germany?"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T10:01:01Z", + "sessionId": "abc", + "uuid": "a-2", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Berlin."}]}, + }, + ] + path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + return path + + +class TestSweeperParsing: + def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + roles = [r["role"] for r in records] + assert roles == ["user", "assistant", "user", "assistant"], ( + f"Expected 4 user/assistant in order, got {roles}. " + "Noise records (progress, file-history-snapshot) must be " + "filtered out." + ) + + def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + first = records[0] + assert first["session_id"] == "abc" + assert first["timestamp"] == "2026-04-18T10:00:05Z" + assert first["uuid"] == "u-1" + + def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + assistant_rec = records[1] + assert assistant_rec["role"] == "assistant" + assert ( + "Paris" in assistant_rec["content"] + ), f"Assistant content blocks must be flattened to text; got: {assistant_rec['content']!r}" + + def test_parse_preserves_tool_blocks_verbatim(self, tmp_path): + """Per the design principle "verbatim always", tool_use and + tool_result blocks must NOT be truncated. A long tool input + (e.g. a large diff handed to a code-edit tool) must round-trip + in full, otherwise we silently lose user-adjacent data. + """ + import json as _json + + from mempalace.sweeper import parse_claude_jsonl + + big_input = {"diff": "x" * 5000} # well past the old 500-char cap + path = tmp_path / "session_tools.jsonl" + path.write_text( + _json.dumps( + { + "type": "assistant", + "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "tools-1", + "uuid": "a-tool", + "message": { + "role": "assistant", + "content": [ + {"type": "tool_use", "name": "Edit", "input": big_input}, + ], + }, + } + ) + + "\n" + ) + + records = list(parse_claude_jsonl(str(path))) + assert len(records) == 1 + content = records[0]["content"] + # The full 5000-char value must be present — no truncation marker, + # no [:500] slice. Look for the raw string in the serialized form. + assert big_input["diff"] in content, ( + "tool_use input was truncated. The verbatim guarantee requires " + f"the full payload to round-trip. Got len={len(content)}." + ) + + +class TestSweeperTandem: + """The sweeper coordinates with other miners via max(timestamp).""" + + def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + + palace_path = str(tmp_path / "palace") + result = sweep(str(mock_claude_jsonl), palace_path) + assert result["drawers_added"] == 4, ( + f"Empty palace: all 4 user/assistant messages should ingest. " + f"Got drawers_added={result['drawers_added']}." + ) + + def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path): + """Running the sweep twice must not duplicate drawers.""" + from mempalace.sweeper import sweep + + palace_path = str(tmp_path / "palace") + first = sweep(str(mock_claude_jsonl), palace_path) + second = sweep(str(mock_claude_jsonl), palace_path) + assert first["drawers_added"] == 4 + assert second["drawers_added"] == 0, ( + f"Second sweep must be a no-op on unchanged data. " + f"Got drawers_added={second['drawers_added']} — " + "cursor logic is broken." + ) + + def test_sweep_resumes_from_cursor(self, tmp_path): + """If half the messages are already in the palace, sweep picks up + only the later half.""" + from mempalace.sweeper import sweep + + jsonl_path = tmp_path / "session.jsonl" + lines = [ + { + "type": "user", + "timestamp": "2026-04-18T09:00:00Z", + "sessionId": "s1", + "uuid": "u1", + "message": {"role": "user", "content": "first"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T09:00:01Z", + "sessionId": "s1", + "uuid": "a1", + "message": {"role": "assistant", "content": [{"type": "text", "text": "one"}]}, + }, + ] + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + first = sweep(str(jsonl_path), palace_path) + assert first["drawers_added"] == 2 + + # Append two more exchanges simulating live session growth. + more_lines = [ + { + "type": "user", + "timestamp": "2026-04-18T09:05:00Z", + "sessionId": "s1", + "uuid": "u2", + "message": {"role": "user", "content": "second"}, + }, + { + "type": "assistant", + "timestamp": "2026-04-18T09:05:01Z", + "sessionId": "s1", + "uuid": "a2", + "message": {"role": "assistant", "content": [{"type": "text", "text": "two"}]}, + }, + ] + with open(jsonl_path, "a") as f: + for x in more_lines: + f.write(json.dumps(x) + "\n") + + second = sweep(str(jsonl_path), palace_path) + assert second["drawers_added"] == 2, ( + f"Second sweep should pick up only the 2 new exchanges, " + f"got {second['drawers_added']}. Cursor (max-timestamp) " + "coordination is broken." + ) + + def test_sweep_recovers_untaken_message_at_cursor_timestamp(self, tmp_path): + """Regression for Copilot PR #998 review: with a `<= cursor` skip, + any message sharing the max timestamp but not yet ingested (e.g. + crash mid-batch) would be lost forever. The skip must be `<` and + tie-break via deterministic drawer ID. + + Scenario: three messages share timestamp T. First sweep ingests + two of them and the process dies before the third. Second sweep + must pick up the third — not skip it because cursor == T. + """ + from mempalace.palace import get_collection + from mempalace.sweeper import ( + _drawer_id_for_message, + parse_claude_jsonl, + sweep, + ) + + shared_ts = "2026-04-18T11:00:00Z" + lines = [ + { + "type": "user", + "timestamp": shared_ts, + "sessionId": "s-tie", + "uuid": f"u-{i}", + "message": {"role": "user", "content": f"msg {i}"}, + } + for i in range(3) + ] + jsonl_path = tmp_path / "tied.jsonl" + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + # Simulate a partial ingest: write 2 of 3 directly via the backend + # with the same drawer IDs the sweeper would use. + col = get_collection(palace_path, create=True) + recs = list(parse_claude_jsonl(str(jsonl_path))) + partial_ids = [_drawer_id_for_message(r["session_id"], r["uuid"]) for r in recs[:2]] + col.upsert( + ids=partial_ids, + documents=[f"USER: {r['content']}" for r in recs[:2]], + metadatas=[ + { + "session_id": r["session_id"], + "timestamp": r["timestamp"], + "message_uuid": r["uuid"], + "role": r["role"], + "ingest_mode": "sweep", + } + for r in recs[:2] + ], + ) + + # Now run the sweeper. It must pick up the 3rd message, not skip + # it because cursor == its timestamp. + result = sweep(str(jsonl_path), palace_path) + assert result["drawers_added"] == 1, ( + f"Sweeper lost the untaken message at cursor timestamp. " + f"Expected drawers_added=1 (the 3rd record), got " + f"{result['drawers_added']}. Cursor skip is still `<=` " + "instead of `<`, or tie-break via drawer-id is broken." + ) + assert result["drawers_already_present"] == 2, ( + f"Expected 2 drawers already present (the partial ingest), " + f"got {result['drawers_already_present']}." + ) + + +class TestSweeperDrawerMetadata: + """Each drawer must carry the metadata the tandem-miner coordination + depends on: session_id, timestamp, uuid, role.""" + + def test_drawer_has_session_id_and_timestamp_metadata(self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + from mempalace.palace import get_collection + + palace_path = str(tmp_path / "palace") + sweep(str(mock_claude_jsonl), palace_path) + + col = get_collection(palace_path, create=False) + data = col.get(include=["metadatas"]) + metas = data["metadatas"] + assert metas, "No drawers written" + + for m in metas: + assert m.get("session_id") == "abc", f"Drawer missing session_id metadata: {m}" + assert m.get("timestamp"), f"Drawer missing timestamp metadata: {m}" + assert m.get("message_uuid"), f"Drawer missing message_uuid metadata: {m}" + assert m.get("role") in ( + "user", + "assistant", + ), f"Drawer missing or wrong role metadata: {m}"