Address Copilot review: cursor tie-break, honest metrics, accurate comments

Six items from the automated review on PR #998:

1. **Cursor tie-break bug (correctness).** The skip condition was
   `rec.timestamp <= cursor`; if multiple messages share the max
   timestamp and only some were ingested before a crash, the rest
   would be lost forever. Changed to `< cursor`, relying on
   deterministic drawer IDs for safe re-attempt at the boundary.
   Regression test
   `test_sweep_recovers_untaken_message_at_cursor_timestamp`.

2. **`drawers_added` counted upserts, not adds.** Added a pre-flight
   `collection.get(ids=batch)` to distinguish new rows from already-
   present ones. Return value now carries `drawers_added`,
   `drawers_already_present`, `drawers_upserted`, and `drawers_skipped`
   separately. Dict-compatible access (`existing.get("ids")`) keeps it
   working on both the raw Chroma return and the typed `GetResult`.

3. **`sweep_directory` hid failures in the summary.** `files_processed`
   used to exclude failed files. Replaced with `files_attempted` (all
   discovered) + `files_succeeded` (subset that completed); CLI output
   shows `succeeded/attempted`.

4. **Coordination claim was overstated.** The primary miners don't
   stamp `session_id`/`timestamp` metadata, so the sweeper coordinates
   only with its own prior runs. Softened docstrings on module and CLI
   command. Uniform cross-miner metadata is flagged as a follow-up.

5. **MAX_FILE_SIZE comments were misleading.** Said source size "does
   not affect storage or embedding cost" — true per-drawer, but source
   size still scales drawer count, embedding work, and memory usage
   (files are read in full, not streamed). Corrected in both
   `miner.py` and `convo_miner.py`.

6. Added the tie-break regression test that reproduces the correctness
   bug from (1).

Tests: 970 passed (was 969), ruff + pre-commit clean.

Co-Authored-By: MSL <232237854+milla-jovovich@users.noreply.github.com>
This commit is contained in:
Igor Lins e Silva
2026-04-18 13:22:18 -03:00
parent 29ce7c7135
commit 4a088ea8e1
5 changed files with 171 additions and 36 deletions
+15 -9
View File
@@ -147,10 +147,14 @@ def cmd_mine(args):
def cmd_sweep(args):
"""Sweep a transcript file or directory for anything the primary
miner missed. Coordinates via max(timestamp) per session_id, so
this is safe to run alongside the file-level miners — neither
duplicates the other's work.
"""Sweep a transcript file or directory.
The sweeper deduplicates against its own prior writes via
deterministic drawer IDs + a timestamp cursor. It does NOT currently
coordinate with the file-level miners (miner.py / convo_miner.py) —
those produce char-chunked drawers without compatible message
metadata, so running both miners may store overlapping content under
different IDs.
"""
from .sweeper import sweep, sweep_directory
@@ -160,15 +164,17 @@ def cmd_sweep(args):
if os.path.isfile(target):
result = sweep(target, palace_path)
print(
f" Swept {target}: +{result['drawers_added']} drawers, "
f"{result['drawers_skipped']} already present."
f" Swept {target}: +{result['drawers_added']} new, "
f"{result['drawers_already_present']} already present, "
f"{result['drawers_skipped']} skipped (< cursor)."
)
elif os.path.isdir(target):
result = sweep_directory(target, palace_path)
print(
f" Swept {result['files_processed']} files from {target}: "
f"+{result['drawers_added']} drawers, "
f"{result['drawers_skipped']} already present."
f" Swept {result['files_succeeded']}/{result['files_attempted']} "
f"files from {target}: +{result['drawers_added']} new, "
f"{result['drawers_already_present']} already present, "
f"{result['drawers_skipped']} skipped (< cursor)."
)
failures = result.get("failures") or []
if failures:
+5 -2
View File
@@ -58,8 +58,11 @@ CHUNK_SIZE = 800 # chars per drawer — align with miner.py
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this.
# Matches miner.py at 500 MB. Long Claude Code sessions, multi-year
# ChatGPT exports, and lifetime Slack dumps routinely exceed 10 MB; the
# cap at that level silently dropped them with `continue`. Source size
# does not affect storage or embedding cost — chunking happens downstream.
# cap at that level silently dropped them with `continue`. Per-drawer
# size is bounded by CHUNK_SIZE, but larger source files still produce
# more drawers and therefore more embedding/storage work — and content
# is normalized and loaded fully into memory before chunking, so memory
# use also scales with source size.
def _register_file(collection, source_file: str, wing: str, agent: str):
+5 -2
View File
@@ -66,8 +66,11 @@ MIN_CHUNK_SIZE = 50 # skip tiny chunks
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this.
# Long Claude Code sessions and large transcript exports routinely exceed
# 10 MB. The cap exists as a defensive rail against pathological binary
# files, not as a limit on legitimate text. Chunking at 800 chars per
# drawer means source size does not affect storage or embedding cost.
# files, not as a limit on legitimate text. Per-drawer size is bounded
# by CHUNK_SIZE, but larger sources still produce proportionally more
# drawers and therefore more storage, embedding, and processing work —
# and file reads are not streamed (the whole content is loaded into
# memory before chunking), so memory use scales with source size too.
# =============================================================================
+80 -23
View File
@@ -1,26 +1,39 @@
#!/usr/bin/env python3
"""
sweeper.py — Tandem miner that guarantees no conversation is silently
dropped.
sweeper.py — Message-granular miner that catches what the file-level
primary miners dropped.
Works alongside miner.py / convo_miner.py via timestamp coordination:
Algorithm, per session:
For each session in the transcript dir:
cursor = max(timestamp of drawers with matching session_id, "")
For each user/assistant message in the jsonl with timestamp > cursor:
write one small drawer (message_uuid as deterministic ID)
cursor = max(timestamp of sweeper-written drawers for this session_id)
For each user/assistant message in the jsonl:
if cursor is not None and message.timestamp < cursor: skip
else: upsert a drawer keyed by (session_id, message_uuid)
Properties:
- Idempotent: rerunning on a fully-mined palace is a no-op.
- Resume-safe: crash mid-sweep → next run picks up from max-timestamp.
- Coordinates with primary miners for free: whichever got further
advances the cursor; the other starts from there next time.
- Idempotent on its own writes: rerunning is a no-op because drawer
IDs are deterministic and existence is pre-checked before counting.
- Resume-safe: a crash mid-sweep is recovered on the next run — the
cursor advances to the last ingested timestamp and re-attempts at
that boundary are de-duped by the deterministic ID.
- Tie-break safe: uses ``< cursor`` (not ``<=``), so if multiple
messages share the max timestamp and only some were ingested, the
rest are still picked up on re-run.
- No size caps: each drawer holds one exchange, ~1-5 KB.
Coordination with the primary file-level miners (``miner.py`` /
``convo_miner.py``) is limited: those miners chunk at a fixed char size
and do not currently stamp ``session_id``/``timestamp`` metadata that
the sweeper can key off. In practice the sweeper coordinates with its
own prior runs, and may ingest content that also got chunked into
primary-miner drawers (under different IDs). Follow-up: add uniform
``ingest_mode`` + message metadata to the primary miners so dedup spans
both paths.
Usage:
from mempalace.sweeper import sweep
result = sweep("/path/to/session.jsonl", "/path/to/palace")
# result: {"drawers_added": N, "drawers_skipped": M, "cursor": ts}
"""
from __future__ import annotations
@@ -181,33 +194,67 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None)
"""Ingest every user/assistant message not already represented.
For each message in the jsonl:
- If timestamp <= cursor for that session, skip (already saved by
us or by primary miner).
- If timestamp < cursor for that session, skip (strictly earlier
than anything already in the palace — already covered).
- At timestamp == cursor we do NOT skip, because multiple messages
can share the same ISO-8601 timestamp; if only some of them were
ingested before a crash, a `<= cursor` skip would lose the rest
forever. Deterministic drawer IDs make re-attempting at the
cursor boundary safe (existing rows are found via a pre-flight
`get(ids=...)` and counted as "already present", not "added").
- Else, upsert a drawer with deterministic ID so reruns dedupe.
Returns a summary dict: {drawers_added, drawers_skipped, cursor_by_session}.
Returns ``{drawers_added, drawers_already_present, drawers_skipped,
drawers_upserted, cursor_by_session}``:
* ``drawers_added`` — rows that did not exist before this sweep.
* ``drawers_already_present`` — rows whose deterministic ID was
already in the palace and got rewritten idempotently.
* ``drawers_skipped`` — records skipped by the cursor (strictly
earlier than what's already stored).
* ``drawers_upserted`` — total writes = added + already_present.
"""
collection = get_collection(palace_path, create=True)
cursors: dict = {}
drawers_added = 0
drawers_already_present = 0
drawers_skipped = 0
batch_ids = []
batch_docs = []
batch_metas = []
batch_ids: list[str] = []
batch_docs: list[str] = []
batch_metas: list[dict] = []
BATCH_SIZE = 64
def _flush():
nonlocal drawers_added
nonlocal drawers_added, drawers_already_present
if not batch_ids:
return
# Pre-flight: which IDs in this batch are already present?
# Upsert is idempotent on data but counts as "added" would lie;
# this pre-query makes the metric honest (Copilot PR 998 review).
try:
existing = collection.get(ids=list(batch_ids), include=[])
# Chroma returns a dict; typed backends return GetResult — the
# compat shim makes ``.get("ids")`` work on both.
present = set(existing.get("ids") or [])
except Exception as exc:
logger.warning(
"sweeper: existence pre-check failed (%s); "
"counting all batch rows as new (metric may over-count on reruns).",
exc,
)
present = set()
new_count = sum(1 for rid in batch_ids if rid not in present)
already_count = len(batch_ids) - new_count
collection.upsert(
ids=batch_ids,
documents=batch_docs,
metadatas=batch_metas,
)
drawers_added += len(batch_ids)
drawers_added += new_count
drawers_already_present += already_count
batch_ids.clear()
batch_docs.clear()
batch_metas.clear()
@@ -218,7 +265,7 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None)
cursors[sid] = get_palace_cursor(collection, sid)
cursor = cursors[sid]
if cursor is not None and rec["timestamp"] <= cursor:
if cursor is not None and rec["timestamp"] < cursor:
drawers_skipped += 1
continue
@@ -245,6 +292,8 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None)
return {
"drawers_added": drawers_added,
"drawers_already_present": drawers_already_present,
"drawers_upserted": drawers_added + drawers_already_present,
"drawers_skipped": drawers_skipped,
"cursor_by_session": cursors,
}
@@ -253,12 +302,16 @@ def sweep(jsonl_path: str, palace_path: str, source_label: Optional[str] = None)
def sweep_directory(dir_path: str, palace_path: str) -> dict:
"""Sweep every .jsonl file in a directory (recursive).
Returns aggregated summary across all files.
Returns aggregated summary across all files. ``files_attempted``
includes files that raised, so the count reflects discovery rather
than only successes; ``files_succeeded`` is the subset that
completed without error.
"""
dir_p = Path(dir_path).expanduser().resolve()
files = sorted(dir_p.rglob("*.jsonl"))
total_added = 0
total_already_present = 0
total_skipped = 0
per_file = []
@@ -272,18 +325,22 @@ def sweep_directory(dir_path: str, palace_path: str) -> dict:
failures.append({"file": str(f), "error": str(exc)})
continue
total_added += result["drawers_added"]
total_already_present += result.get("drawers_already_present", 0)
total_skipped += result["drawers_skipped"]
per_file.append(
{
"file": str(f),
"added": result["drawers_added"],
"already_present": result.get("drawers_already_present", 0),
"skipped": result["drawers_skipped"],
}
)
return {
"files_processed": len(per_file),
"files_attempted": len(files),
"files_succeeded": len(per_file),
"drawers_added": total_added,
"drawers_already_present": total_already_present,
"drawers_skipped": total_skipped,
"per_file": per_file,
"failures": failures,
+66
View File
@@ -225,6 +225,72 @@ class TestSweeperTandem:
"coordination is broken."
)
def test_sweep_recovers_untaken_message_at_cursor_timestamp(self, tmp_path):
"""Regression for Copilot PR #998 review: with a `<= cursor` skip,
any message sharing the max timestamp but not yet ingested (e.g.
crash mid-batch) would be lost forever. The skip must be `<` and
tie-break via deterministic drawer ID.
Scenario: three messages share timestamp T. First sweep ingests
two of them and the process dies before the third. Second sweep
must pick up the third — not skip it because cursor == T.
"""
from mempalace.palace import get_collection
from mempalace.sweeper import (
_drawer_id_for_message,
parse_claude_jsonl,
sweep,
)
shared_ts = "2026-04-18T11:00:00Z"
lines = [
{
"type": "user",
"timestamp": shared_ts,
"sessionId": "s-tie",
"uuid": f"u-{i}",
"message": {"role": "user", "content": f"msg {i}"},
}
for i in range(3)
]
jsonl_path = tmp_path / "tied.jsonl"
jsonl_path.write_text("\n".join(json.dumps(x) for x in lines) + "\n")
palace_path = str(tmp_path / "palace")
# Simulate a partial ingest: write 2 of 3 directly via the backend
# with the same drawer IDs the sweeper would use.
col = get_collection(palace_path, create=True)
recs = list(parse_claude_jsonl(str(jsonl_path)))
partial_ids = [_drawer_id_for_message(r["session_id"], r["uuid"]) for r in recs[:2]]
col.upsert(
ids=partial_ids,
documents=[f"USER: {r['content']}" for r in recs[:2]],
metadatas=[
{
"session_id": r["session_id"],
"timestamp": r["timestamp"],
"message_uuid": r["uuid"],
"role": r["role"],
"ingest_mode": "sweep",
}
for r in recs[:2]
],
)
# Now run the sweeper. It must pick up the 3rd message, not skip
# it because cursor == its timestamp.
result = sweep(str(jsonl_path), palace_path)
assert result["drawers_added"] == 1, (
f"Sweeper lost the untaken message at cursor timestamp. "
f"Expected drawers_added=1 (the 3rd record), got "
f"{result['drawers_added']}. Cursor skip is still `<=` "
"instead of `<`, or tie-break via drawer-id is broken."
)
assert result["drawers_already_present"] == 2, (
f"Expected 2 drawers already present (the partial ingest), "
f"got {result['drawers_already_present']}."
)
class TestSweeperDrawerMetadata:
"""Each drawer must carry the metadata the tandem-miner coordination