merge: develop into fix/1362-repair-sqlite-integrity-preflight (round 2)
#1357 (max_seq_id preflight) merged into develop while this branch was in CI, opening a fresh conflict between the two preflight helpers. mempalace/repair.py: - Kept both: this branch's sqlite_integrity_errors() / print_sqlite_ integrity_abort() AND develop's maybe_repair_poisoned_max_seq_id_ before_rebuild() from #1357. They check for distinct corruption classes and run as separate preflights. tests/test_repair.py: - Kept both this branch's sqlite_integrity_errors test group and develop's max_seq_id preflight test group; non-overlapping coverage. Local: 1623 tests pass, ruff lint+format clean against 0.4.x CI pin.
This commit is contained in:
@@ -102,6 +102,13 @@ _HNSW_BLOAT_GUARD = {
|
|||||||
"hnsw:sync_threshold": 50_000,
|
"hnsw:sync_threshold": 50_000,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Missing index_metadata.pickle is normal only while a segment is still fresh
|
||||||
|
# or effectively empty. Once data_level0.bin has non-trivial payload, a
|
||||||
|
# missing metadata pickle means the segment was interrupted after writing HNSW
|
||||||
|
# data but before writing its metadata. Letting Chroma open that shape can
|
||||||
|
# segfault or hang in native HNSW code.
|
||||||
|
_HNSW_MISSING_METADATA_DATA_FLOOR = 1024
|
||||||
|
|
||||||
|
|
||||||
def _validate_where(where: Optional[dict]) -> None:
|
def _validate_where(where: Optional[dict]) -> None:
|
||||||
"""Scan a where-clause for unknown operators and raise ``UnsupportedFilterError``.
|
"""Scan a where-clause for unknown operators and raise ``UnsupportedFilterError``.
|
||||||
@@ -132,16 +139,13 @@ def _segment_appears_healthy(seg_dir: str) -> bool:
|
|||||||
parsing it. ChromaDB writes that file after a successful HNSW flush;
|
parsing it. ChromaDB writes that file after a successful HNSW flush;
|
||||||
a complete write starts with byte ``0x80`` and ends with byte
|
a complete write starts with byte ``0x80`` and ends with byte
|
||||||
``0x2e`` (the protocol/terminator byte sequence chromadb serializes
|
``0x2e`` (the protocol/terminator byte sequence chromadb serializes
|
||||||
with). If both bytes are present and the file is non-trivially sized,
|
with).
|
||||||
chromadb will load the segment cleanly even when its on-disk mtime
|
|
||||||
trails ``chroma.sqlite3`` — which is the *steady state* under
|
|
||||||
chromadb 1.5.x's async batched flush, not corruption.
|
|
||||||
|
|
||||||
A missing metadata file is treated as "fresh / never-flushed" and
|
Missing metadata is healthy only while the segment still looks fresh or
|
||||||
considered healthy. Renaming an empty dir orphans nothing, and a
|
empty. If ``data_level0.bin`` already has non-trivial payload but
|
||||||
real corruption case manifests as a present-but-malformed file or a
|
``index_metadata.pickle`` is missing, the segment is partially flushed:
|
||||||
chromadb load error caught downstream by palace-daemon's
|
Chroma wrote vector data without the metadata it needs to reopen the
|
||||||
``_auto_repair`` retry path.
|
HNSW reader safely.
|
||||||
|
|
||||||
Deliberately format-sniffs only; never deserializes. Deserialization
|
Deliberately format-sniffs only; never deserializes. Deserialization
|
||||||
can execute arbitrary code, and the byte-sniff is sufficient to
|
can execute arbitrary code, and the byte-sniff is sufficient to
|
||||||
@@ -152,16 +156,26 @@ def _segment_appears_healthy(seg_dir: str) -> bool:
|
|||||||
chromadb writes today; if a future chromadb version emits protocol
|
chromadb writes today; if a future chromadb version emits protocol
|
||||||
0/1 segments, this check would start returning False on healthy
|
0/1 segments, this check would start returning False on healthy
|
||||||
files and quarantine_stale_hnsw would conservatively rename them
|
files and quarantine_stale_hnsw would conservatively rename them
|
||||||
out of the way (lazy rebuild on next open recovers).
|
out of the way.
|
||||||
"""
|
"""
|
||||||
if not _hnsw_payload_appears_sane(seg_dir):
|
if not _hnsw_payload_appears_sane(seg_dir):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
meta_path = os.path.join(seg_dir, "index_metadata.pickle")
|
meta_path = os.path.join(seg_dir, "index_metadata.pickle")
|
||||||
if not os.path.isfile(meta_path):
|
if not os.path.isfile(meta_path):
|
||||||
# No metadata file yet — segment hasn't flushed (fresh / empty).
|
data_path = os.path.join(seg_dir, "data_level0.bin")
|
||||||
# Renaming would orphan nothing; consider healthy.
|
try:
|
||||||
|
if (
|
||||||
|
os.path.isfile(data_path)
|
||||||
|
and os.path.getsize(data_path) > _HNSW_MISSING_METADATA_DATA_FLOOR
|
||||||
|
):
|
||||||
|
return False
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# No metadata and no meaningful vector payload yet: fresh/empty segment.
|
||||||
return True
|
return True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
size = os.path.getsize(meta_path)
|
size = os.path.getsize(meta_path)
|
||||||
# A real chromadb metadata file is at least tens of bytes; a
|
# A real chromadb metadata file is at least tens of bytes; a
|
||||||
|
|||||||
+12
-2
@@ -661,6 +661,7 @@ def cmd_repair(args):
|
|||||||
_extract_drawers,
|
_extract_drawers,
|
||||||
_rebuild_collection_via_temp,
|
_rebuild_collection_via_temp,
|
||||||
check_extraction_safety,
|
check_extraction_safety,
|
||||||
|
maybe_repair_poisoned_max_seq_id_before_rebuild,
|
||||||
)
|
)
|
||||||
|
|
||||||
config = MempalaceConfig()
|
config = MempalaceConfig()
|
||||||
@@ -739,11 +740,20 @@ def cmd_repair(args):
|
|||||||
print(f"\n No palace found at {palace_path}")
|
print(f"\n No palace found at {palace_path}")
|
||||||
return
|
return
|
||||||
if not contains_palace_database(palace_path):
|
if not contains_palace_database(palace_path):
|
||||||
print(f"\n No palace database found at {db_path}")
|
print(f"\n No palace database found at {db_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
preflight = maybe_repair_poisoned_max_seq_id_before_rebuild(
|
||||||
|
palace_path,
|
||||||
|
backup=getattr(args, "backup", True),
|
||||||
|
dry_run=getattr(args, "dry_run", False),
|
||||||
|
assume_yes=getattr(args, "yes", False),
|
||||||
|
)
|
||||||
|
if preflight is not None:
|
||||||
return
|
return
|
||||||
|
|
||||||
print(f"\n{'=' * 55}")
|
print(f"\n{'=' * 55}")
|
||||||
print(" MemPalace Repair")
|
print(" MemPalace Repair")
|
||||||
print(f"{'=' * 55}\n")
|
print(f"{'=' * 55}\n")
|
||||||
print(f" Palace: {palace_path}")
|
print(f" Palace: {palace_path}")
|
||||||
|
|
||||||
|
|||||||
+60
-1
@@ -551,6 +551,58 @@ def print_sqlite_integrity_abort(palace_path: str, errors: list[str]) -> None:
|
|||||||
print(" 6. Re-run `mempalace repair --yes`.")
|
print(" 6. Re-run `mempalace repair --yes`.")
|
||||||
|
|
||||||
|
|
||||||
|
def maybe_repair_poisoned_max_seq_id_before_rebuild(
|
||||||
|
palace_path: str,
|
||||||
|
*,
|
||||||
|
backup: bool = True,
|
||||||
|
dry_run: bool = False,
|
||||||
|
assume_yes: bool = False,
|
||||||
|
) -> "dict | None":
|
||||||
|
"""Run non-destructive max_seq_id repair before a rebuild if needed.
|
||||||
|
|
||||||
|
A poisoned ``max_seq_id`` row can make Chroma believe it has already
|
||||||
|
consumed every row in ``embeddings_queue``. Writes then report success
|
||||||
|
because they land in the queue, but they never become visible in
|
||||||
|
``embeddings``.
|
||||||
|
|
||||||
|
If this precise corruption is present, do the narrow bookmark repair and
|
||||||
|
stop instead of continuing into the legacy rebuild path. The rebuild path
|
||||||
|
extracts only already-visible embeddings and can discard queued writes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
db_path = os.path.join(palace_path, "chroma.sqlite3")
|
||||||
|
if not os.path.isfile(db_path):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
poisoned = _detect_poisoned_max_seq_ids(db_path)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not poisoned:
|
||||||
|
return None
|
||||||
|
|
||||||
|
print("\n Detected poisoned max_seq_id rows before repair rebuild.")
|
||||||
|
print(
|
||||||
|
" This can make writes report success while embeddings_queue grows "
|
||||||
|
"and embeddings stay static."
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
" Running the non-destructive max_seq_id repair instead of rebuilding " "the collection."
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
" Queued writes remain in chroma.sqlite3 for Chroma to drain after "
|
||||||
|
"the bookmark is unpoisoned."
|
||||||
|
)
|
||||||
|
|
||||||
|
return repair_max_seq_id(
|
||||||
|
palace_path,
|
||||||
|
backup=backup,
|
||||||
|
dry_run=dry_run,
|
||||||
|
assume_yes=assume_yes,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def rebuild_index(
|
def rebuild_index(
|
||||||
palace_path=None,
|
palace_path=None,
|
||||||
confirm_truncation_ok: bool = False,
|
confirm_truncation_ok: bool = False,
|
||||||
@@ -579,7 +631,14 @@ def rebuild_index(
|
|||||||
print(f"\n{'=' * 55}")
|
print(f"\n{'=' * 55}")
|
||||||
print(" MemPalace Repair — Index Rebuild")
|
print(" MemPalace Repair — Index Rebuild")
|
||||||
print(f"{'=' * 55}\n")
|
print(f"{'=' * 55}\n")
|
||||||
print(f" Palace: {palace_path}")
|
print(f" Palace: {palace_path}")
|
||||||
|
|
||||||
|
preflight = maybe_repair_poisoned_max_seq_id_before_rebuild(
|
||||||
|
palace_path,
|
||||||
|
assume_yes=True,
|
||||||
|
)
|
||||||
|
if preflight is not None:
|
||||||
|
return
|
||||||
|
|
||||||
backend = ChromaBackend()
|
backend = ChromaBackend()
|
||||||
try:
|
try:
|
||||||
|
|||||||
+51
-3
@@ -18,8 +18,10 @@ from mempalace.backends import (
|
|||||||
from mempalace.backends.chroma import (
|
from mempalace.backends.chroma import (
|
||||||
ChromaBackend,
|
ChromaBackend,
|
||||||
ChromaCollection,
|
ChromaCollection,
|
||||||
|
_HNSW_MISSING_METADATA_DATA_FLOOR,
|
||||||
_fix_blob_seq_ids,
|
_fix_blob_seq_ids,
|
||||||
_pin_hnsw_threads,
|
_pin_hnsw_threads,
|
||||||
|
_segment_appears_healthy,
|
||||||
quarantine_invalid_hnsw_metadata,
|
quarantine_invalid_hnsw_metadata,
|
||||||
quarantine_stale_hnsw,
|
quarantine_stale_hnsw,
|
||||||
)
|
)
|
||||||
@@ -685,9 +687,9 @@ def test_quarantine_stale_hnsw_leaves_healthy_segment_with_drift_alone(tmp_path)
|
|||||||
assert seg.exists()
|
assert seg.exists()
|
||||||
|
|
||||||
|
|
||||||
def test_quarantine_stale_hnsw_leaves_segment_without_metadata_alone(tmp_path):
|
def test_quarantine_stale_hnsw_leaves_empty_segment_without_metadata_alone(tmp_path):
|
||||||
"""Segment with no metadata file is treated as fresh / never-flushed
|
"""Missing metadata is okay only when the segment has no meaningful data yet."""
|
||||||
and not quarantined — renaming an empty dir orphans nothing."""
|
|
||||||
now = 1_700_000_000.0
|
now = 1_700_000_000.0
|
||||||
palace, seg = _make_palace_with_segment(
|
palace, seg = _make_palace_with_segment(
|
||||||
tmp_path,
|
tmp_path,
|
||||||
@@ -695,11 +697,57 @@ def test_quarantine_stale_hnsw_leaves_segment_without_metadata_alone(tmp_path):
|
|||||||
sqlite_mtime=now,
|
sqlite_mtime=now,
|
||||||
meta_bytes=None,
|
meta_bytes=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||||
|
|
||||||
assert moved == []
|
assert moved == []
|
||||||
assert seg.exists()
|
assert seg.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_segment_without_metadata_but_with_nontrivial_data_is_unhealthy(tmp_path):
|
||||||
|
"""Data without index_metadata.pickle is a partial flush, not a fresh segment."""
|
||||||
|
|
||||||
|
seg = tmp_path / "abcd-1234-5678"
|
||||||
|
seg.mkdir()
|
||||||
|
(seg / "data_level0.bin").write_bytes(b"\0" * (_HNSW_MISSING_METADATA_DATA_FLOOR + 1))
|
||||||
|
|
||||||
|
assert not _segment_appears_healthy(str(seg))
|
||||||
|
|
||||||
|
|
||||||
|
def test_segment_without_metadata_and_tiny_data_is_still_treated_as_fresh(tmp_path):
|
||||||
|
"""Tiny data payloads can occur before metadata has flushed; leave them alone."""
|
||||||
|
|
||||||
|
seg = tmp_path / "abcd-1234-5678"
|
||||||
|
seg.mkdir()
|
||||||
|
(seg / "data_level0.bin").write_bytes(b"\0" * _HNSW_MISSING_METADATA_DATA_FLOOR)
|
||||||
|
|
||||||
|
assert _segment_appears_healthy(str(seg))
|
||||||
|
|
||||||
|
|
||||||
|
def test_quarantine_stale_hnsw_renames_missing_metadata_with_nontrivial_data(tmp_path):
|
||||||
|
"""Regression for #1274: missing pickle + non-trivial data must quarantine."""
|
||||||
|
|
||||||
|
now = 1_700_000_000.0
|
||||||
|
palace, seg = _make_palace_with_segment(
|
||||||
|
tmp_path,
|
||||||
|
hnsw_mtime=now - 7200,
|
||||||
|
sqlite_mtime=now,
|
||||||
|
meta_bytes=None,
|
||||||
|
)
|
||||||
|
(seg / "data_level0.bin").write_bytes(b"\0" * (_HNSW_MISSING_METADATA_DATA_FLOOR + 1))
|
||||||
|
os.utime(seg / "data_level0.bin", (now - 7200, now - 7200))
|
||||||
|
|
||||||
|
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||||
|
|
||||||
|
assert len(moved) == 1
|
||||||
|
assert ".drift-" in moved[0]
|
||||||
|
assert not seg.exists()
|
||||||
|
|
||||||
|
drift_dirs = [p for p in palace.iterdir() if ".drift-" in p.name]
|
||||||
|
assert len(drift_dirs) == 1
|
||||||
|
assert (drift_dirs[0] / "data_level0.bin").exists()
|
||||||
|
|
||||||
|
|
||||||
def test_quarantine_stale_hnsw_renames_truncated_metadata(tmp_path):
|
def test_quarantine_stale_hnsw_renames_truncated_metadata(tmp_path):
|
||||||
"""Segment with a truncated (under-floor-size) metadata file is
|
"""Segment with a truncated (under-floor-size) metadata file is
|
||||||
quarantined — shape of a partial-flush during process kill."""
|
quarantined — shape of a partial-flush during process kill."""
|
||||||
|
|||||||
@@ -1153,6 +1153,72 @@ def test_rebuild_index_aborts_on_sqlite_integrity_errors_before_delete_collectio
|
|||||||
mock_shutil.copy2.assert_not_called()
|
mock_shutil.copy2.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_max_seq_id_preflight_preserves_embeddings_queue(tmp_path):
|
||||||
|
"""#1295: default repair preflight must not drop queued writes."""
|
||||||
|
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
seg = _seed_poisoned_max_seq_id(
|
||||||
|
palace,
|
||||||
|
drawers_meta_max=102,
|
||||||
|
closets_meta_max=11,
|
||||||
|
)
|
||||||
|
db_path = os.path.join(palace, "chroma.sqlite3")
|
||||||
|
|
||||||
|
with sqlite3.connect(db_path) as conn:
|
||||||
|
conn.executemany(
|
||||||
|
"INSERT INTO embeddings_queue(seq_id, topic, id) VALUES (?, ?, ?)",
|
||||||
|
[
|
||||||
|
(seq_id, "persistent://default/default/mempalace_drawers", f"queued-{seq_id}")
|
||||||
|
for seq_id in range(103, 123)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
result = repair.maybe_repair_poisoned_max_seq_id_before_rebuild(
|
||||||
|
palace,
|
||||||
|
assume_yes=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert result["segment_repaired"]
|
||||||
|
|
||||||
|
with sqlite3.connect(db_path) as conn:
|
||||||
|
max_seq_rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id"))
|
||||||
|
queue_count = conn.execute("SELECT COUNT(*) FROM embeddings_queue").fetchone()[0]
|
||||||
|
|
||||||
|
assert max_seq_rows[seg["drawers_vec"]] == seg["drawers_meta_max"]
|
||||||
|
assert max_seq_rows[seg["drawers_meta"]] == seg["drawers_meta_max"]
|
||||||
|
assert max_seq_rows[seg["closets_vec"]] == seg["closets_meta_max"]
|
||||||
|
assert max_seq_rows[seg["closets_meta"]] == seg["closets_meta_max"]
|
||||||
|
|
||||||
|
# The old legacy rebuild path can discard queued writes. The preflight
|
||||||
|
# repair must leave them on disk for Chroma to drain after the bookmark is
|
||||||
|
# unpoisoned.
|
||||||
|
assert queue_count == 20
|
||||||
|
|
||||||
|
|
||||||
|
def test_rebuild_index_repairs_poisoned_max_seq_id_before_collection_rebuild(tmp_path, capsys):
|
||||||
|
"""A poisoned bookmark should short-circuit before the legacy rebuild path."""
|
||||||
|
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
_seed_poisoned_max_seq_id(palace)
|
||||||
|
|
||||||
|
with patch("mempalace.repair.ChromaBackend") as mock_backend:
|
||||||
|
repair.rebuild_index(palace)
|
||||||
|
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
backend = mock_backend.return_value
|
||||||
|
|
||||||
|
# repair_max_seq_id may instantiate ChromaBackend to close cached clients
|
||||||
|
# after editing sqlite directly. That is safe. The important thing is that
|
||||||
|
# rebuild_index must not continue into the legacy Chroma collection read /
|
||||||
|
# count / rebuild path after the max_seq_id preflight handles the issue.
|
||||||
|
backend.get_collection.assert_not_called()
|
||||||
|
|
||||||
|
assert "Detected poisoned max_seq_id rows" in out
|
||||||
|
assert "non-destructive max_seq_id repair" in out
|
||||||
|
|
||||||
|
|
||||||
# ── extract_via_sqlite + rebuild_from_sqlite (#1308) ──────────────────
|
# ── extract_via_sqlite + rebuild_from_sqlite (#1308) ──────────────────
|
||||||
#
|
#
|
||||||
# These tests build real chromadb palaces in tmp_path rather than mocking
|
# These tests build real chromadb palaces in tmp_path rather than mocking
|
||||||
|
|||||||
Reference in New Issue
Block a user