diff --git a/mempalace/convo_miner.py b/mempalace/convo_miner.py index f24fa69..6a021ec 100644 --- a/mempalace/convo_miner.py +++ b/mempalace/convo_miner.py @@ -272,6 +272,47 @@ def scan_convos(convo_dir: str) -> list: # ============================================================================= +def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode): + """Acquire the per-file lock, double-check mined status, and upsert chunks. + + Returns (drawers_added, room_counts_delta, skipped). + """ + room_counts_delta: dict = defaultdict(int) + drawers_added = 0 + with mine_lock(source_file): + # Re-check after lock — another agent may have just finished this file + if file_already_mined(collection, source_file): + return 0, room_counts_delta, True + + for chunk in chunks: + chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room + if extract_mode == "general": + room_counts_delta[chunk_room] += 1 + drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" + try: + collection.upsert( + documents=[chunk["content"]], + ids=[drawer_id], + metadatas=[ + { + "wing": wing, + "room": chunk_room, + "source_file": source_file, + "chunk_index": chunk["chunk_index"], + "added_by": agent, + "filed_at": datetime.now().isoformat(), + "ingest_mode": "convos", + "extract_mode": extract_mode, + } + ], + ) + drawers_added += 1 + except Exception as e: + if "already exists" not in str(e).lower(): + raise + return drawers_added, room_counts_delta, False + + def mine_convos( convo_dir: str, palace_path: str, @@ -376,39 +417,14 @@ def mine_convos( room_counts[room] += 1 # File each chunk — lock to prevent concurrent agents duplicating - drawers_added = 0 - with mine_lock(source_file): - # Re-check after lock — another agent may have just finished this file - if file_already_mined(collection, source_file): - files_skipped += 1 - continue - - for chunk in chunks: - chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room - if extract_mode == "general": - room_counts[chunk_room] += 1 - drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" - try: - collection.upsert( - documents=[chunk["content"]], - ids=[drawer_id], - metadatas=[ - { - "wing": wing, - "room": chunk_room, - "source_file": source_file, - "chunk_index": chunk["chunk_index"], - "added_by": agent, - "filed_at": datetime.now().isoformat(), - "ingest_mode": "convos", - "extract_mode": extract_mode, - } - ], - ) - drawers_added += 1 - except Exception as e: - if "already exists" not in str(e).lower(): - raise + drawers_added, room_delta, skipped = _file_chunks_locked( + collection, source_file, chunks, wing, room, agent, extract_mode + ) + if skipped: + files_skipped += 1 + continue + for r, n in room_delta.items(): + room_counts[r] += n total_drawers += drawers_added print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")