Add tandem sweeper: message-level safety net for dropped transcripts

The primary miners (miner.py, convo_miner.py) operate at file
granularity and can drop data for several reasons: size caps, silent
OSError on read, dedup false positives, extensions the project miner
does not recognize. Even with tonight's hotfixes, any future bug in
the file-level path risks silent data loss.

The sweeper is a second, cooperating miner that works at MESSAGE
granularity:

  - Parses Claude Code .jsonl line by line, yielding only
    user/assistant records (filters progress, file-history-snapshot,
    etc. noise).
  - For each session_id, queries the palace for max(timestamp) and
    treats that as the cursor.
  - Ingests only messages newer than the cursor, as one small drawer
    per exchange (never hits a size cap — each drawer is 1-5 KB).
  - Deterministic drawer IDs from session_id + message UUID make
    reruns idempotent; crash mid-sweep is safe.

Tandem coordination is free: if the primary miner committed up to
timestamp T, the sweeper resumes from T. If the primary miner missed
everything, the sweeper catches it all. Neither duplicates the other.

Smoke test on a real Claude Code transcript:
  1st run: +39 drawers, 0 already present
  2nd run: +0 drawers, 39 already present  (perfect idempotence)

Opt-in via:
  mempalace sweep <file.jsonl>
  mempalace sweep <transcript-dir>

No changes to existing miners. No schema migration. Purely additive.

Tests: tests/test_sweeper.py (7 tests covering parsing, tandem
coordination, idempotency, resume-from-cursor, metadata correctness).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MSL
2026-04-18 07:51:10 -07:00
committed by Igor Lins e Silva
parent 6f33d52681
commit fed69935d3
3 changed files with 488 additions and 0 deletions
+41
View File
@@ -146,6 +146,35 @@ def cmd_mine(args):
)
def cmd_sweep(args):
"""Sweep a transcript file or directory for anything the primary
miner missed. Coordinates via max(timestamp) per session_id, so
this is safe to run alongside the file-level miners — neither
duplicates the other's work.
"""
from .sweeper import sweep, sweep_directory
palace_path = os.path.expanduser(args.palace) if args.palace else MempalaceConfig().palace_path
target = os.path.expanduser(args.target)
if os.path.isfile(target):
result = sweep(target, palace_path)
print(
f" Swept {target}: +{result['drawers_added']} drawers, "
f"{result['drawers_skipped']} already present."
)
elif os.path.isdir(target):
result = sweep_directory(target, palace_path)
print(
f" Swept {result['files_processed']} files from {target}: "
f"+{result['drawers_added']} drawers, "
f"{result['drawers_skipped']} already present."
)
else:
print(f" ✗ Not a file or directory: {target}", file=sys.stderr)
sys.exit(1)
def cmd_search(args):
from .searcher import search, SearchError
@@ -547,6 +576,17 @@ def main():
help="Extraction strategy for convos mode: 'exchange' (default) or 'general' (5 memory types)",
)
# sweep
p_sweep = sub.add_parser(
"sweep",
help="Tandem miner: catch anything the primary miner missed "
"(message-level, timestamp-coordinated, idempotent)",
)
p_sweep.add_argument(
"target",
help="A .jsonl transcript file, or a directory to scan recursively",
)
# search
p_search = sub.add_parser("search", help="Find anything, exact words")
p_search.add_argument("query", help="What to search for")
@@ -679,6 +719,7 @@ def main():
"mine": cmd_mine,
"split": cmd_split,
"search": cmd_search,
"sweep": cmd_sweep,
"mcp": cmd_mcp,
"compress": cmd_compress,
"wake-up": cmd_wakeup,
+263
View File
@@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
sweeper.py — Tandem miner that guarantees no conversation is silently
dropped.
Works alongside miner.py / convo_miner.py via timestamp coordination:
For each session in the transcript dir:
cursor = max(timestamp of drawers with matching session_id, "")
For each user/assistant message in the jsonl with timestamp > cursor:
write one small drawer (message_uuid as deterministic ID)
Properties:
- Idempotent: rerunning on a fully-mined palace is a no-op.
- Resume-safe: crash mid-sweep → next run picks up from max-timestamp.
- Coordinates with primary miners for free: whichever got further
advances the cursor; the other starts from there next time.
- No size caps: each drawer holds one exchange, ~1-5 KB.
Usage:
from mempalace.sweeper import sweep
result = sweep("/path/to/session.jsonl", "/path/to/palace")
# result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts}
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Iterator, Optional
from .palace import get_collection
# ── JSONL parsing ────────────────────────────────────────────────────
def _flatten_content(content) -> str:
"""Normalize Claude Code's message content to a plain string.
User messages are strings already; assistant messages are a list of
content blocks like [{"type": "text", "text": "..."}, {"type":
"tool_use", ...}]. We keep text blocks verbatim and describe non-text
blocks as a marker so the drawer carries a faithful record.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type", "")
if btype == "text":
parts.append(block.get("text", ""))
elif btype == "tool_use":
parts.append(
f"[tool_use: {block.get('name', '?')} "
f"input={json.dumps(block.get('input', {}), default=str)[:500]}]"
)
elif btype == "tool_result":
parts.append(
f"[tool_result: {json.dumps(block.get('content', ''), default=str)[:500]}]"
)
else:
parts.append(f"[{btype}]")
return "\n".join(p for p in parts if p)
return str(content)
def parse_claude_jsonl(path: str) -> Iterator[dict]:
"""Yield user/assistant records from a Claude Code .jsonl file.
Each yield is:
{
"session_id": str,
"uuid": str, # per-message UUID
"timestamp": str, # ISO 8601
"role": "user" | "assistant",
"content": str, # flattened text
}
Non-message records (progress, file-history-snapshot, system,
queue-operation, last-prompt) are filtered out. Malformed lines are
skipped silently — data quality is the transcript writer's problem,
not ours.
"""
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
rtype = record.get("type")
if rtype not in ("user", "assistant"):
continue
msg = record.get("message") or {}
if not isinstance(msg, dict):
continue
role = msg.get("role")
if role not in ("user", "assistant"):
continue
timestamp = record.get("timestamp")
if not timestamp:
continue
uuid = record.get("uuid")
if not uuid:
continue
session_id = record.get("sessionId") or record.get("session_id")
if not session_id:
continue
content = _flatten_content(msg.get("content", ""))
if not content.strip():
continue
yield {
"session_id": session_id,
"uuid": uuid,
"timestamp": timestamp,
"role": role,
"content": content,
}
# ── Cursor resolution ────────────────────────────────────────────────
def get_palace_cursor(collection, session_id: str) -> Optional[str]:
"""Return the max timestamp of drawers for this session_id, or None.
ISO-8601 strings compare lexically in the right order, so we don't
need to parse them. Query scans metadatas for the session (ChromaDB
where-filter), then reduces.
"""
try:
data = collection.get(
where={"session_id": session_id},
include=["metadatas"],
)
except Exception:
return None
metas = data.get("metadatas") or []
timestamps = [m.get("timestamp") for m in metas if m and m.get("timestamp")]
if not timestamps:
return None
return max(timestamps)
# ── Sweep ────────────────────────────────────────────────────────────
def _drawer_id_for_message(session_id: str, message_uuid: str) -> str:
"""Deterministic drawer ID so upserts at the same message are no-ops."""
return f"sweep_{session_id[:12]}_{message_uuid}"
def sweep(jsonl_path: str, palace_path: str,
source_label: Optional[str] = None) -> dict:
"""Ingest every user/assistant message not already represented.
For each message in the jsonl:
- If timestamp <= cursor for that session, skip (already saved by
us or by primary miner).
- Else, upsert a drawer with deterministic ID so reruns dedupe.
Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}.
"""
collection = get_collection(palace_path, create=True)
cursors: dict = {}
drawers_added = 0
drawers_skipped = 0
batch_ids = []
batch_docs = []
batch_metas = []
BATCH_SIZE = 64
def _flush():
nonlocal drawers_added
if not batch_ids:
return
collection.upsert(
ids=batch_ids,
documents=batch_docs,
metadatas=batch_metas,
)
drawers_added += len(batch_ids)
batch_ids.clear()
batch_docs.clear()
batch_metas.clear()
for rec in parse_claude_jsonl(jsonl_path):
sid = rec["session_id"]
if sid not in cursors:
cursors[sid] = get_palace_cursor(collection, sid)
cursor = cursors[sid]
if cursor is not None and rec["timestamp"] <= cursor:
drawers_skipped += 1
continue
drawer_id = _drawer_id_for_message(sid, rec["uuid"])
document = f"{rec['role'].upper()}: {rec['content']}"
metadata = {
"session_id": sid,
"timestamp": rec["timestamp"],
"message_uuid": rec["uuid"],
"role": rec["role"],
"source_file": source_label or jsonl_path,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "sweep",
}
batch_ids.append(drawer_id)
batch_docs.append(document)
batch_metas.append(metadata)
if len(batch_ids) >= BATCH_SIZE:
_flush()
_flush()
return {
"drawers_added": drawers_added,
"drawers_skipped": drawers_skipped,
"cursor_by_session": cursors,
}
def sweep_directory(dir_path: str, palace_path: str) -> dict:
"""Sweep every .jsonl file in a directory (recursive).
Returns aggregated summary across all files.
"""
dir_p = Path(dir_path).expanduser().resolve()
files = sorted(dir_p.rglob("*.jsonl"))
total_added = 0
total_skipped = 0
per_file = []
for f in files:
try:
result = sweep(str(f), palace_path, source_label=str(f))
except Exception as exc:
print(f" ⚠ sweep failed on {f}: {exc}", file=sys.stderr)
continue
total_added += result["drawers_added"]
total_skipped += result["drawers_skipped"]
per_file.append({
"file": str(f),
"added": result["drawers_added"],
"skipped": result["drawers_skipped"],
})
return {
"files_processed": len(per_file),
"drawers_added": total_added,
"drawers_skipped": total_skipped,
"per_file": per_file,
}
+184
View File
@@ -0,0 +1,184 @@
"""TDD: tandem sweeper that catches what the primary miner missed.
The primary miner (miner.py / convo_miner.py) runs at file granularity
and can drop data (size caps, silent OSError, dedup false-positives).
The sweeper is a second miner that works at MESSAGE granularity,
using timestamp as the coordination cursor.
For each session in the transcript directory:
1. Look up max(timestamp) across all drawers with matching session_id
2. Stream the jsonl, yielding only user/assistant messages after the cursor
3. Write one small drawer per message with:
session_id, uuid, timestamp, role, content
4. Idempotent: re-running sweeps should find nothing new on a complete palace.
This test file is TDD — written BEFORE mempalace/sweeper.py exists.
"""
import json
import tempfile
from pathlib import Path
import pytest
@pytest.fixture
def mock_claude_jsonl(tmp_path):
"""Real Claude Code jsonl shape: user/assistant records among progress noise."""
path = tmp_path / "session_abc.jsonl"
lines = [
# Noise: progress event, no message
{"type": "progress", "timestamp": "2026-04-18T10:00:00Z",
"sessionId": "abc", "uuid": "p-1"},
# User message
{"type": "user", "timestamp": "2026-04-18T10:00:05Z",
"sessionId": "abc", "uuid": "u-1",
"message": {"role": "user", "content": "What's the capital of France?"}},
# Assistant reply
{"type": "assistant", "timestamp": "2026-04-18T10:00:06Z",
"sessionId": "abc", "uuid": "a-1",
"message": {"role": "assistant",
"content": [{"type": "text", "text": "Paris."}]}},
# Noise: file-history-snapshot
{"type": "file-history-snapshot", "messageId": "abc-snap"},
# Second user/assistant exchange
{"type": "user", "timestamp": "2026-04-18T10:01:00Z",
"sessionId": "abc", "uuid": "u-2",
"message": {"role": "user", "content": "And of Germany?"}},
{"type": "assistant", "timestamp": "2026-04-18T10:01:01Z",
"sessionId": "abc", "uuid": "a-2",
"message": {"role": "assistant",
"content": [{"type": "text", "text": "Berlin."}]}},
]
path.write_text("\n".join(json.dumps(x) for x in lines) + "\n")
return path
class TestSweeperParsing:
def test_parse_yields_only_user_and_assistant(self, mock_claude_jsonl):
from mempalace.sweeper import parse_claude_jsonl
records = list(parse_claude_jsonl(str(mock_claude_jsonl)))
roles = [r["role"] for r in records]
assert roles == ["user", "assistant", "user", "assistant"], (
f"Expected 4 user/assistant in order, got {roles}. "
"Noise records (progress, file-history-snapshot) must be "
"filtered out."
)
def test_parse_extracts_session_id_and_timestamp(self, mock_claude_jsonl):
from mempalace.sweeper import parse_claude_jsonl
records = list(parse_claude_jsonl(str(mock_claude_jsonl)))
first = records[0]
assert first["session_id"] == "abc"
assert first["timestamp"] == "2026-04-18T10:00:05Z"
assert first["uuid"] == "u-1"
def test_parse_normalizes_assistant_content_list_to_text(self, mock_claude_jsonl):
from mempalace.sweeper import parse_claude_jsonl
records = list(parse_claude_jsonl(str(mock_claude_jsonl)))
assistant_rec = records[1]
assert assistant_rec["role"] == "assistant"
assert "Paris" in assistant_rec["content"], (
f"Assistant content blocks must be flattened to text; "
f"got: {assistant_rec['content']!r}"
)
class TestSweeperTandem:
"""The sweeper coordinates with other miners via max(timestamp)."""
def test_sweep_empty_palace_ingests_all_messages(self, mock_claude_jsonl, tmp_path):
from mempalace.sweeper import sweep
palace_path = str(tmp_path / "palace")
result = sweep(str(mock_claude_jsonl), palace_path)
assert result["drawers_added"] == 4, (
f"Empty palace: all 4 user/assistant messages should ingest. "
f"Got drawers_added={result['drawers_added']}."
)
def test_sweep_is_idempotent(self, mock_claude_jsonl, tmp_path):
"""Running the sweep twice must not duplicate drawers."""
from mempalace.sweeper import sweep
palace_path = str(tmp_path / "palace")
first = sweep(str(mock_claude_jsonl), palace_path)
second = sweep(str(mock_claude_jsonl), palace_path)
assert first["drawers_added"] == 4
assert second["drawers_added"] == 0, (
f"Second sweep must be a no-op on unchanged data. "
f"Got drawers_added={second['drawers_added']}"
"cursor logic is broken."
)
def test_sweep_resumes_from_cursor(self, tmp_path):
"""If half the messages are already in the palace, sweep picks up
only the later half."""
from mempalace.sweeper import sweep
jsonl_path = tmp_path / "session.jsonl"
lines = [
{"type": "user", "timestamp": "2026-04-18T09:00:00Z",
"sessionId": "s1", "uuid": "u1",
"message": {"role": "user", "content": "first"}},
{"type": "assistant", "timestamp": "2026-04-18T09:00:01Z",
"sessionId": "s1", "uuid": "a1",
"message": {"role": "assistant",
"content": [{"type": "text", "text": "one"}]}},
]
jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n")
palace_path = str(tmp_path / "palace")
first = sweep(str(jsonl_path), palace_path)
assert first["drawers_added"] == 2
# Append two more exchanges simulating live session growth.
more_lines = [
{"type": "user", "timestamp": "2026-04-18T09:05:00Z",
"sessionId": "s1", "uuid": "u2",
"message": {"role": "user", "content": "second"}},
{"type": "assistant", "timestamp": "2026-04-18T09:05:01Z",
"sessionId": "s1", "uuid": "a2",
"message": {"role": "assistant",
"content": [{"type": "text", "text": "two"}]}},
]
with open(jsonl_path, "a") as f:
for x in more_lines:
f.write(json.dumps(x) + "\n")
second = sweep(str(jsonl_path), palace_path)
assert second["drawers_added"] == 2, (
f"Second sweep should pick up only the 2 new exchanges, "
f"got {second['drawers_added']}. Cursor (max-timestamp) "
"coordination is broken."
)
class TestSweeperDrawerMetadata:
"""Each drawer must carry the metadata the tandem-miner coordination
depends on: session_id, timestamp, uuid, role."""
def test_drawer_has_session_id_and_timestamp_metadata(
self, mock_claude_jsonl, tmp_path):
from mempalace.sweeper import sweep
from mempalace.palace import get_collection
palace_path = str(tmp_path / "palace")
sweep(str(mock_claude_jsonl), palace_path)
col = get_collection(palace_path, create=False)
data = col.get(include=["metadatas"])
metas = data["metadatas"]
assert metas, "No drawers written"
for m in metas:
assert m.get("session_id") == "abc", (
f"Drawer missing session_id metadata: {m}"
)
assert m.get("timestamp"), (
f"Drawer missing timestamp metadata: {m}"
)
assert m.get("message_uuid"), (
f"Drawer missing message_uuid metadata: {m}"
)
assert m.get("role") in ("user", "assistant"), (
f"Drawer missing or wrong role metadata: {m}"
)