diff --git a/mempalace/cli.py b/mempalace/cli.py index 69cd244..f1bc919 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -146,6 +146,35 @@ def cmd_mine(args): ) +def cmd_sweep(args): + """Sweep a transcript file or directory for anything the primary + miner missed. Coordinates via max(timestamp) per session_id, so + this is safe to run alongside the file-level miners — neither + duplicates the other's work. + """ + from .sweeper import sweep, sweep_directory + + palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path + target = os.path.expanduser(args.target) + + if os.path.isfile(target): + result = sweep(target, palace_path) + print( + f" Swept {target}: +{result['drawers_added']} drawers, " + f"{result['drawers_skipped']} already present." + ) + elif os.path.isdir(target): + result = sweep_directory(target, palace_path) + print( + f" Swept {result['files_processed']} files from {target}: " + f"+{result['drawers_added']} drawers, " + f"{result['drawers_skipped']} already present." + ) + else: + print(f" ✗ Not a file or directory: {target}", file=sys.stderr) + sys.exit(1) + + def cmd_search(args): from .searcher import search, SearchError @@ -547,6 +576,17 @@ def main(): help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)", ) + # sweep + p_sweep = sub.add_parser( + "sweep", + help="Tandem miner: catch anything the primary miner missed " + "(message-level, timestamp-coordinated, idempotent)", + ) + p_sweep.add_argument( + "target", + help="A .jsonl transcript file, or a directory to scan recursively", + ) + # search p_search = sub.add_parser("search", help="Find anything, exact words") p_search.add_argument("query", help="What to search for") @@ -679,6 +719,7 @@ def main(): "mine": cmd_mine, "split": cmd_split, "search": cmd_search, + "sweep": cmd_sweep, "mcp": cmd_mcp, "compress": cmd_compress, "wake-up": cmd_wakeup, diff --git a/mempalace/sweeper.py b/mempalace/sweeper.py new file mode 100644 index 0000000..a46876b --- /dev/null +++ b/mempalace/sweeper.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +sweeper.py — Tandem miner that guarantees no conversation is silently +dropped. + +Works alongside miner.py / convo_miner.py via timestamp coordination: + + For each session in the transcript dir: + cursor = max(timestamp of drawers with matching session_id, "") + For each user/assistant message in the jsonl with timestamp > cursor: + write one small drawer (message_uuid as deterministic ID) + +Properties: + - Idempotent: rerunning on a fully-mined palace is a no-op. + - Resume-safe: crash mid-sweep → next run picks up from max-timestamp. + - Coordinates with primary miners for free: whichever got further + advances the cursor; the other starts from there next time. + - No size caps: each drawer holds one exchange, ~1-5 KB. + +Usage: + from mempalace.sweeper import sweep + result = sweep("/path/to/session.jsonl", "/path/to/palace") + # result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts} +""" + +from __future__ import annotations + +import json +import sys +from datetime import datetime +from pathlib import Path +from typing import Iterator, Optional + +from .palace import get_collection + + +# ── JSONL parsing ──────────────────────────────────────────────────── + +def _flatten_content(content) -> str: + """Normalize Claude Code's message content to a plain string. + + User messages are strings already; assistant messages are a list of + content blocks like [{"type": "text", "text": "..."}, {"type": + "tool_use", ...}]. We keep text blocks verbatim and describe non-text + blocks as a marker so the drawer carries a faithful record. + """ + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text": + parts.append(block.get("text", "")) + elif btype == "tool_use": + parts.append( + f"[tool_use: {block.get('name', '?')} " + f"input={json.dumps(block.get('input', {}), default=str)[:500]}]" + ) + elif btype == "tool_result": + parts.append( + f"[tool_result: {json.dumps(block.get('content', ''), default=str)[:500]}]" + ) + else: + parts.append(f"[{btype}]") + return "\n".join(p for p in parts if p) + return str(content) + + +def parse_claude_jsonl(path: str) -> Iterator[dict]: + """Yield user/assistant records from a Claude Code .jsonl file. + + Each yield is: + { + "session_id": str, + "uuid": str, # per-message UUID + "timestamp": str, # ISO 8601 + "role": "user" | "assistant", + "content": str, # flattened text + } + + Non-message records (progress, file-history-snapshot, system, + queue-operation, last-prompt) are filtered out. Malformed lines are + skipped silently — data quality is the transcript writer's problem, + not ours. + """ + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + rtype = record.get("type") + if rtype not in ("user", "assistant"): + continue + msg = record.get("message") or {} + if not isinstance(msg, dict): + continue + role = msg.get("role") + if role not in ("user", "assistant"): + continue + timestamp = record.get("timestamp") + if not timestamp: + continue + uuid = record.get("uuid") + if not uuid: + continue + session_id = record.get("sessionId") or record.get("session_id") + if not session_id: + continue + content = _flatten_content(msg.get("content", "")) + if not content.strip(): + continue + yield { + "session_id": session_id, + "uuid": uuid, + "timestamp": timestamp, + "role": role, + "content": content, + } + + +# ── Cursor resolution ──────────────────────────────────────────────── + +def get_palace_cursor(collection, session_id: str) -> Optional[str]: + """Return the max timestamp of drawers for this session_id, or None. + + ISO-8601 strings compare lexically in the right order, so we don't + need to parse them. Query scans metadatas for the session (ChromaDB + where-filter), then reduces. + """ + try: + data = collection.get( + where={"session_id": session_id}, + include=["metadatas"], + ) + except Exception: + return None + metas = data.get("metadatas") or [] + timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")] + if not timestamps: + return None + return max(timestamps) + + +# ── Sweep ──────────────────────────────────────────────────────────── + +def _drawer_id_for_message(session_id: str, message_uuid: str) -> str: + """Deterministic drawer ID so upserts at the same message are no-ops.""" + return f"sweep_{session_id[:12]}_{message_uuid}" + + +def sweep(jsonl_path: str, palace_path: str, + source_label: Optional[str] = None) -> dict: + """Ingest every user/assistant message not already represented. + + For each message in the jsonl: + - If timestamp <= cursor for that session, skip (already saved by + us or by primary miner). + - Else, upsert a drawer with deterministic ID so reruns dedupe. + + Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}. + """ + collection = get_collection(palace_path, create=True) + cursors: dict = {} + + drawers_added = 0 + drawers_skipped = 0 + + batch_ids = [] + batch_docs = [] + batch_metas = [] + BATCH_SIZE = 64 + + def _flush(): + nonlocal drawers_added + if not batch_ids: + return + collection.upsert( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + ) + drawers_added += len(batch_ids) + batch_ids.clear() + batch_docs.clear() + batch_metas.clear() + + for rec in parse_claude_jsonl(jsonl_path): + sid = rec["session_id"] + if sid not in cursors: + cursors[sid] = get_palace_cursor(collection, sid) + + cursor = cursors[sid] + if cursor is not None and rec["timestamp"] <= cursor: + drawers_skipped += 1 + continue + + drawer_id = _drawer_id_for_message(sid, rec["uuid"]) + document = f"{rec['role'].upper()}: {rec['content']}" + metadata = { + "session_id": sid, + "timestamp": rec["timestamp"], + "message_uuid": rec["uuid"], + "role": rec["role"], + "source_file": source_label or jsonl_path, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "sweep", + } + + batch_ids.append(drawer_id) + batch_docs.append(document) + batch_metas.append(metadata) + + if len(batch_ids) >= BATCH_SIZE: + _flush() + + _flush() + + return { + "drawers_added": drawers_added, + "drawers_skipped": drawers_skipped, + "cursor_by_session": cursors, + } + + +def sweep_directory(dir_path: str, palace_path: str) -> dict: + """Sweep every .jsonl file in a directory (recursive). + + Returns aggregated summary across all files. + """ + dir_p = Path(dir_path).expanduser().resolve() + files = sorted(dir_p.rglob("*.jsonl")) + + total_added = 0 + total_skipped = 0 + per_file = [] + + for f in files: + try: + result = sweep(str(f), palace_path, source_label=str(f)) + except Exception as exc: + print(f" ⚠ sweep failed on {f}: {exc}", file=sys.stderr) + continue + total_added += result["drawers_added"] + total_skipped += result["drawers_skipped"] + per_file.append({ + "file": str(f), + "added": result["drawers_added"], + "skipped": result["drawers_skipped"], + }) + + return { + "files_processed": len(per_file), + "drawers_added": total_added, + "drawers_skipped": total_skipped, + "per_file": per_file, + } diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py new file mode 100644 index 0000000..247125a --- /dev/null +++ b/tests/test_sweeper.py @@ -0,0 +1,184 @@ +"""TDD: tandem sweeper that catches what the primary miner missed. + +The primary miner (miner.py / convo_miner.py) runs at file granularity +and can drop data (size caps, silent OSError, dedup false-positives). +The sweeper is a second miner that works at MESSAGE granularity, +using timestamp as the coordination cursor. + +For each session in the transcript directory: + 1. Look up max(timestamp) across all drawers with matching session_id + 2. Stream the jsonl, yielding only user/assistant messages after the cursor + 3. Write one small drawer per message with: + session_id, uuid, timestamp, role, content + 4. Idempotent: re-running sweeps should find nothing new on a complete palace. + +This test file is TDD — written BEFORE mempalace/sweeper.py exists. +""" + +import json +import tempfile +from pathlib import Path + +import pytest + + +@pytest.fixture +def mock_claude_jsonl(tmp_path): + """Real Claude Code jsonl shape: user/assistant records among progress noise.""" + path = tmp_path / "session_abc.jsonl" + lines = [ + # Noise: progress event, no message + {"type": "progress", "timestamp": "2026-04-18T10:00:00Z", + "sessionId": "abc", "uuid": "p-1"}, + # User message + {"type": "user", "timestamp": "2026-04-18T10:00:05Z", + "sessionId": "abc", "uuid": "u-1", + "message": {"role": "user", "content": "What's the capital of France?"}}, + # Assistant reply + {"type": "assistant", "timestamp": "2026-04-18T10:00:06Z", + "sessionId": "abc", "uuid": "a-1", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "Paris."}]}}, + # Noise: file-history-snapshot + {"type": "file-history-snapshot", "messageId": "abc-snap"}, + # Second user/assistant exchange + {"type": "user", "timestamp": "2026-04-18T10:01:00Z", + "sessionId": "abc", "uuid": "u-2", + "message": {"role": "user", "content": "And of Germany?"}}, + {"type": "assistant", "timestamp": "2026-04-18T10:01:01Z", + "sessionId": "abc", "uuid": "a-2", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "Berlin."}]}}, + ] + path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + return path + + +class TestSweeperParsing: + def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + roles = [r["role"] for r in records] + assert roles == ["user", "assistant", "user", "assistant"], ( + f"Expected 4 user/assistant in order, got {roles}. " + "Noise records (progress, file-history-snapshot) must be " + "filtered out." + ) + + def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + first = records[0] + assert first["session_id"] == "abc" + assert first["timestamp"] == "2026-04-18T10:00:05Z" + assert first["uuid"] == "u-1" + + def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl): + from mempalace.sweeper import parse_claude_jsonl + records = list(parse_claude_jsonl(str(mock_claude_jsonl))) + assistant_rec = records[1] + assert assistant_rec["role"] == "assistant" + assert "Paris" in assistant_rec["content"], ( + f"Assistant content blocks must be flattened to text; " + f"got: {assistant_rec['content']!r}" + ) + + +class TestSweeperTandem: + """The sweeper coordinates with other miners via max(timestamp).""" + + def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") + result = sweep(str(mock_claude_jsonl), palace_path) + assert result["drawers_added"] == 4, ( + f"Empty palace: all 4 user/assistant messages should ingest. " + f"Got drawers_added={result['drawers_added']}." + ) + + def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path): + """Running the sweep twice must not duplicate drawers.""" + from mempalace.sweeper import sweep + palace_path = str(tmp_path / "palace") + first = sweep(str(mock_claude_jsonl), palace_path) + second = sweep(str(mock_claude_jsonl), palace_path) + assert first["drawers_added"] == 4 + assert second["drawers_added"] == 0, ( + f"Second sweep must be a no-op on unchanged data. " + f"Got drawers_added={second['drawers_added']} — " + "cursor logic is broken." + ) + + def test_sweep_resumes_from_cursor(self, tmp_path): + """If half the messages are already in the palace, sweep picks up + only the later half.""" + from mempalace.sweeper import sweep + + jsonl_path = tmp_path / "session.jsonl" + lines = [ + {"type": "user", "timestamp": "2026-04-18T09:00:00Z", + "sessionId": "s1", "uuid": "u1", + "message": {"role": "user", "content": "first"}}, + {"type": "assistant", "timestamp": "2026-04-18T09:00:01Z", + "sessionId": "s1", "uuid": "a1", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "one"}]}}, + ] + jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n") + + palace_path = str(tmp_path / "palace") + first = sweep(str(jsonl_path), palace_path) + assert first["drawers_added"] == 2 + + # Append two more exchanges simulating live session growth. + more_lines = [ + {"type": "user", "timestamp": "2026-04-18T09:05:00Z", + "sessionId": "s1", "uuid": "u2", + "message": {"role": "user", "content": "second"}}, + {"type": "assistant", "timestamp": "2026-04-18T09:05:01Z", + "sessionId": "s1", "uuid": "a2", + "message": {"role": "assistant", + "content": [{"type": "text", "text": "two"}]}}, + ] + with open(jsonl_path, "a") as f: + for x in more_lines: + f.write(json.dumps(x) + "\n") + + second = sweep(str(jsonl_path), palace_path) + assert second["drawers_added"] == 2, ( + f"Second sweep should pick up only the 2 new exchanges, " + f"got {second['drawers_added']}. Cursor (max-timestamp) " + "coordination is broken." + ) + + +class TestSweeperDrawerMetadata: + """Each drawer must carry the metadata the tandem-miner coordination + depends on: session_id, timestamp, uuid, role.""" + + def test_drawer_has_session_id_and_timestamp_metadata( + self, mock_claude_jsonl, tmp_path): + from mempalace.sweeper import sweep + from mempalace.palace import get_collection + + palace_path = str(tmp_path / "palace") + sweep(str(mock_claude_jsonl), palace_path) + + col = get_collection(palace_path, create=False) + data = col.get(include=["metadatas"]) + metas = data["metadatas"] + assert metas, "No drawers written" + + for m in metas: + assert m.get("session_id") == "abc", ( + f"Drawer missing session_id metadata: {m}" + ) + assert m.get("timestamp"), ( + f"Drawer missing timestamp metadata: {m}" + ) + assert m.get("message_uuid"), ( + f"Drawer missing message_uuid metadata: {m}" + ) + assert m.get("role") in ("user", "assistant"), ( + f"Drawer missing or wrong role metadata: {m}" + )