feat(normalize): auto-rebuild stale drawers via NORMALIZE_VERSION schema gate
Without this, the strip_noise improvement only helps new mines. Every user who had already mined Claude Code JSONL sessions would keep their noise-polluted drawers forever, because convo_miner's file_already_mined skip short-circuits before re-processing. Adds a versioned schema gate so upgrades propagate silently: - palace.NORMALIZE_VERSION=2 — bumped when the normalization pipeline changes shape (this PR's strip_noise is the v1→v2 bump). - file_already_mined now returns False if the stored normalize_version is missing or less than current, triggering a rebuild on next mine. - Both miners stamp drawers with the current normalize_version. - convo_miner now purges stale drawers before inserting fresh chunks (mirrors miner.py's existing delete+insert), extracted into _file_convo_chunks helper to keep mine_convos under ruff's C901 limit. User experience: upgrade mempalace, run `mempalace mine` as usual, old noisy drawers get silently replaced with clean ones. No erase needed, no "you need to rebuild" changelog footgun. Tests: - test_file_already_mined_returns_false_for_stale_normalize_version — pins the version gate contract for missing/v1/current. - test_add_drawer_stamps_normalize_version — fresh project-miner drawers carry the field. - test_mine_convos_rebuilds_stale_drawers_after_schema_bump — end-to-end proof that a pre-v2 palace gets silently cleaned on next mine, with orphan drawers purged and NOT skipped. Existing test_file_already_mined_check_mtime updated to include the new field; all other tests unaffected.
This commit is contained in:
+54
-29
@@ -16,7 +16,7 @@ from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
from .normalize import normalize
|
||||
from .palace import SKIP_DIRS, get_collection, file_already_mined
|
||||
from .palace import NORMALIZE_VERSION, SKIP_DIRS, file_already_mined, get_collection
|
||||
|
||||
|
||||
# File types that might contain conversations
|
||||
@@ -51,6 +51,7 @@ def _register_file(collection, source_file: str, wing: str, agent: str):
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "registry",
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
@@ -272,6 +273,52 @@ def scan_convos(convo_dir: str) -> list:
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _file_convo_chunks(collection, source_file, chunks, wing, room, agent, extract_mode):
|
||||
"""Purge stale drawers for ``source_file`` then upsert fresh chunks.
|
||||
|
||||
Returns (drawers_added, room_counts_delta).
|
||||
"""
|
||||
# Purge stale drawers first. When the normalize schema bumps,
|
||||
# file_already_mined() returns False for pre-v2 drawers and we land
|
||||
# here — clean them out so the source doesn't end up with a mix of
|
||||
# old-noise and new-clean drawers.
|
||||
try:
|
||||
collection.delete(where={"source_file": source_file})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
room_counts_delta: dict = defaultdict(int)
|
||||
drawers_added = 0
|
||||
for chunk in chunks:
|
||||
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
||||
if extract_mode == "general":
|
||||
room_counts_delta[chunk_room] += 1
|
||||
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
|
||||
try:
|
||||
collection.upsert(
|
||||
documents=[chunk["content"]],
|
||||
ids=[drawer_id],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": wing,
|
||||
"room": chunk_room,
|
||||
"source_file": source_file,
|
||||
"chunk_index": chunk["chunk_index"],
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "convos",
|
||||
"extract_mode": extract_mode,
|
||||
"normalize_version": NORMALIZE_VERSION,
|
||||
}
|
||||
],
|
||||
)
|
||||
drawers_added += 1
|
||||
except Exception as e:
|
||||
if "already exists" not in str(e).lower():
|
||||
raise
|
||||
return drawers_added, room_counts_delta
|
||||
|
||||
|
||||
def mine_convos(
|
||||
convo_dir: str,
|
||||
palace_path: str,
|
||||
@@ -375,34 +422,12 @@ def mine_convos(
|
||||
if extract_mode != "general":
|
||||
room_counts[room] += 1
|
||||
|
||||
# File each chunk
|
||||
drawers_added = 0
|
||||
for chunk in chunks:
|
||||
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
|
||||
if extract_mode == "general":
|
||||
room_counts[chunk_room] += 1
|
||||
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
|
||||
try:
|
||||
collection.upsert(
|
||||
documents=[chunk["content"]],
|
||||
ids=[drawer_id],
|
||||
metadatas=[
|
||||
{
|
||||
"wing": wing,
|
||||
"room": chunk_room,
|
||||
"source_file": source_file,
|
||||
"chunk_index": chunk["chunk_index"],
|
||||
"added_by": agent,
|
||||
"filed_at": datetime.now().isoformat(),
|
||||
"ingest_mode": "convos",
|
||||
"extract_mode": extract_mode,
|
||||
}
|
||||
],
|
||||
)
|
||||
drawers_added += 1
|
||||
except Exception as e:
|
||||
if "already exists" not in str(e).lower():
|
||||
raise
|
||||
# Purge stale drawers + file fresh chunks.
|
||||
drawers_added, room_delta = _file_convo_chunks(
|
||||
collection, source_file, chunks, wing, room, agent, extract_mode
|
||||
)
|
||||
for r, n in room_delta.items():
|
||||
room_counts[r] += n
|
||||
|
||||
total_drawers += drawers_added
|
||||
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
|
||||
|
||||
Reference in New Issue
Block a user