Merge pull request #1135 from sha2fiddy/feature/max-seq-id-shim-fix

fix: narrow `_fix_blob_seq_ids` + add `repair --mode max-seq-id`
This commit is contained in:
Igor Lins e Silva
2026-04-27 03:21:49 -03:00
committed by GitHub
6 changed files with 686 additions and 20 deletions
+66 -6
View File
@@ -341,12 +341,9 @@ def test_fix_blob_seq_ids_converts_blobs_to_integers(tmp_path):
db_path = tmp_path / "chroma.sqlite3"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)")
conn.execute("CREATE TABLE max_seq_id (rowid INTEGER PRIMARY KEY, seq_id)")
# Insert BLOB seq_ids like ChromaDB 0.6.x would
# Insert BLOB seq_id like ChromaDB 0.6.x would
blob_42 = (42).to_bytes(8, byteorder="big")
blob_99 = (99).to_bytes(8, byteorder="big")
conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (blob_42,))
conn.execute("INSERT INTO max_seq_id (seq_id) VALUES (?)", (blob_99,))
conn.commit()
conn.close()
@@ -355,8 +352,6 @@ def test_fix_blob_seq_ids_converts_blobs_to_integers(tmp_path):
conn = sqlite3.connect(str(db_path))
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings").fetchone()
assert row == (42, "integer")
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM max_seq_id").fetchone()
assert row == (99, "integer")
conn.close()
@@ -382,6 +377,71 @@ def test_fix_blob_seq_ids_noop_without_database(tmp_path):
_fix_blob_seq_ids(str(tmp_path)) # should not raise
def test_fix_blob_seq_ids_does_not_touch_max_seq_id(tmp_path):
"""chromadb 1.5.x owns max_seq_id; the shim must not interpret its BLOBs.
Regression guard for the 2026-04-20 incident: the old shim ran
int.from_bytes(..., 'big') over chromadb 1.5.x's native
b'\\x11\\x11' + ASCII-digit BLOB, producing a ~1.23e18 integer that
silently suppressed every subsequent embeddings_queue write.
"""
db_path = tmp_path / "chroma.sqlite3"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)")
conn.execute("CREATE TABLE max_seq_id (rowid INTEGER PRIMARY KEY, seq_id)")
sysdb10_blob = b"\x11\x11502607"
conn.execute("INSERT INTO max_seq_id (seq_id) VALUES (?)", (sysdb10_blob,))
conn.commit()
conn.close()
_fix_blob_seq_ids(str(tmp_path))
conn = sqlite3.connect(str(db_path))
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM max_seq_id").fetchone()
assert row == (sysdb10_blob, "blob")
conn.close()
def test_fix_blob_seq_ids_skips_sysdb10_prefix_in_embeddings(tmp_path):
"""Defense-in-depth: sysdb-10 prefix in embeddings.seq_id is skipped."""
db_path = tmp_path / "chroma.sqlite3"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)")
sysdb10_blob = b"\x11\x11502607"
conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (sysdb10_blob,))
conn.commit()
conn.close()
_fix_blob_seq_ids(str(tmp_path))
conn = sqlite3.connect(str(db_path))
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings").fetchone()
# Still a BLOB — not converted to 1.23e18.
assert row == (sysdb10_blob, "blob")
conn.close()
def test_fix_blob_seq_ids_still_converts_legacy_blobs_in_embeddings(tmp_path):
"""Regression guard: pure big-endian u64 BLOBs still convert for genuine 0.6.x."""
db_path = tmp_path / "chroma.sqlite3"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)")
conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", ((42).to_bytes(8, "big"),))
conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (b"\x11\x11502607",))
conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", ((7).to_bytes(8, "big"),))
conn.commit()
conn.close()
_fix_blob_seq_ids(str(tmp_path))
conn = sqlite3.connect(str(db_path))
rows = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings ORDER BY rowid").fetchall()
assert rows[0] == (42, "integer")
assert rows[1] == (b"\x11\x11502607", "blob") # sysdb-10 row left alone
assert rows[2] == (7, "integer")
conn.close()
def test_fix_blob_seq_ids_writes_marker_after_blob_path(tmp_path):
"""The .blob_seq_ids_migrated marker is written after a successful BLOB → INTEGER conversion."""
from mempalace.backends.chroma import _BLOB_FIX_MARKER
+282
View File
@@ -1,8 +1,10 @@
"""Tests for mempalace.repair — scan, prune, and rebuild HNSW index."""
import os
import sqlite3
from unittest.mock import MagicMock, patch
import pytest
from mempalace import repair
@@ -374,3 +376,283 @@ def test_rebuild_index_proceeds_with_override(mock_backend_cls, mock_shutil, tmp
mock_backend.delete_collection.assert_called_once()
mock_backend.create_collection.assert_called_once()
mock_new_col.upsert.assert_called()
# ── repair_max_seq_id ─────────────────────────────────────────────────
# Realistic poisoned values from the 2026-04-20 incident — from the sysdb-10
# b'\x11\x11' + 6 ASCII digit format being misread as big-endian u64.
_POISON_VAL = 1_229_822_654_365_970_487
def _seed_poisoned_max_seq_id(
palace_path: str,
*,
drawers_meta_max: int = 502607,
closets_meta_max: int = 501418,
drawers_vec_poison: int = _POISON_VAL,
drawers_meta_poison: int = _POISON_VAL + 1,
closets_vec_poison: int = _POISON_VAL + 2,
closets_meta_poison: int = _POISON_VAL + 3,
):
"""Build a minimal palace with poisoned max_seq_id rows.
Returns a dict with segment UUIDs and the expected clean values.
"""
os.makedirs(palace_path, exist_ok=True)
db_path = os.path.join(palace_path, "chroma.sqlite3")
drawers_coll = "coll-drawers-0000-1111-2222-333344445555"
closets_coll = "coll-closets-0000-1111-2222-333344445555"
drawers_vec = "seg-drawers-vec-0000-1111-2222-333344445555"
drawers_meta = "seg-drawers-meta-0000-1111-2222-33334444555"
closets_vec = "seg-closets-vec-0000-1111-2222-333344445555"
closets_meta = "seg-closets-meta-0000-1111-2222-33334444555"
conn = sqlite3.connect(db_path)
conn.executescript(
"""
CREATE TABLE segments(
id TEXT PRIMARY KEY, type TEXT, scope TEXT, collection TEXT
);
CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id);
CREATE TABLE embeddings(
id INTEGER PRIMARY KEY AUTOINCREMENT,
segment_id TEXT,
embedding_id TEXT,
seq_id
);
CREATE TABLE embeddings_queue(seq_id INTEGER PRIMARY KEY, topic TEXT, id TEXT);
CREATE TABLE collection_metadata(collection_id TEXT, key TEXT, str_value TEXT);
"""
)
conn.executemany(
"INSERT INTO segments VALUES (?, ?, ?, ?)",
[
(drawers_vec, "urn:vector", "VECTOR", drawers_coll),
(drawers_meta, "urn:metadata", "METADATA", drawers_coll),
(closets_vec, "urn:vector", "VECTOR", closets_coll),
(closets_meta, "urn:metadata", "METADATA", closets_coll),
],
)
conn.executemany(
"INSERT INTO max_seq_id(segment_id, seq_id) VALUES (?, ?)",
[
(drawers_vec, drawers_vec_poison),
(drawers_meta, drawers_meta_poison),
(closets_vec, closets_vec_poison),
(closets_meta, closets_meta_poison),
],
)
# Populate embeddings so the collection-MAX heuristic has data to work with.
# drawers METADATA owns the max at drawers_meta_max; closets likewise.
for i in range(1, drawers_meta_max + 1, max(drawers_meta_max // 5, 1)):
conn.execute(
"INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)",
(drawers_meta, f"d-{i}", i),
)
conn.execute(
"INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)",
(drawers_meta, "d-max", drawers_meta_max),
)
for i in range(1, closets_meta_max + 1, max(closets_meta_max // 5, 1)):
conn.execute(
"INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)",
(closets_meta, f"c-{i}", i),
)
conn.execute(
"INSERT INTO embeddings(segment_id, embedding_id, seq_id) VALUES (?, ?, ?)",
(closets_meta, "c-max", closets_meta_max),
)
conn.commit()
conn.close()
return {
"drawers_vec": drawers_vec,
"drawers_meta": drawers_meta,
"closets_vec": closets_vec,
"closets_meta": closets_meta,
"drawers_meta_max": drawers_meta_max,
"closets_meta_max": closets_meta_max,
"poisoned_values": {
drawers_vec: drawers_vec_poison,
drawers_meta: drawers_meta_poison,
closets_vec: closets_vec_poison,
closets_meta: closets_meta_poison,
},
}
def test_max_seq_id_detects_poison_rows(tmp_path):
palace = str(tmp_path / "palace")
seg = _seed_poisoned_max_seq_id(palace)
db_path = os.path.join(palace, "chroma.sqlite3")
# Add one clean row to confirm the threshold actually filters.
with sqlite3.connect(db_path) as conn:
conn.execute(
"INSERT INTO segments VALUES ('seg-clean', 'urn:vector', 'VECTOR', 'coll-clean')"
)
conn.execute("INSERT INTO max_seq_id VALUES ('seg-clean', 1234)")
conn.commit()
found = repair._detect_poisoned_max_seq_ids(db_path)
ids = {sid for sid, _ in found}
assert ids == {
seg["drawers_vec"],
seg["drawers_meta"],
seg["closets_vec"],
seg["closets_meta"],
}
for sid, val in found:
assert val > repair.MAX_SEQ_ID_SANITY_THRESHOLD
assert "seg-clean" not in ids
def test_max_seq_id_heuristic_uses_collection_max(tmp_path):
palace = str(tmp_path / "palace")
seg = _seed_poisoned_max_seq_id(palace)
result = repair.repair_max_seq_id(palace, dry_run=True)
# Both drawers segments (VECTOR + METADATA) get the drawers collection max.
assert result["after"][seg["drawers_vec"]] == seg["drawers_meta_max"]
assert result["after"][seg["drawers_meta"]] == seg["drawers_meta_max"]
# Both closets segments get the closets collection max.
assert result["after"][seg["closets_vec"]] == seg["closets_meta_max"]
assert result["after"][seg["closets_meta"]] == seg["closets_meta_max"]
def test_max_seq_id_from_sidecar_exact_restore(tmp_path):
palace = str(tmp_path / "palace")
seg = _seed_poisoned_max_seq_id(palace)
# Craft a sidecar with known clean values that differ from the heuristic's
# collection-max, so we can prove the sidecar path is preferred.
sidecar_path = str(tmp_path / "chroma.sqlite3.sidecar")
clean = {
seg["drawers_vec"]: 499001,
seg["drawers_meta"]: 499002,
seg["closets_vec"]: 498001,
seg["closets_meta"]: 498002,
}
with sqlite3.connect(sidecar_path) as conn:
conn.execute("CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id INTEGER)")
conn.executemany(
"INSERT INTO max_seq_id VALUES (?, ?)",
list(clean.items()),
)
conn.commit()
result = repair.repair_max_seq_id(palace, from_sidecar=sidecar_path, assume_yes=True)
assert result["segment_repaired"]
db_path = os.path.join(palace, "chroma.sqlite3")
with sqlite3.connect(db_path) as conn:
rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall())
for sid, val in clean.items():
assert rows[sid] == val
def test_max_seq_id_dry_run_no_mutation(tmp_path):
palace = str(tmp_path / "palace")
seg = _seed_poisoned_max_seq_id(palace)
db_path = os.path.join(palace, "chroma.sqlite3")
with sqlite3.connect(db_path) as conn:
before = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall())
result = repair.repair_max_seq_id(palace, dry_run=True)
assert result["dry_run"] is True
assert result["segment_repaired"] == []
with sqlite3.connect(db_path) as conn:
after = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall())
assert before == after
# Nothing dropped into the palace dir either (no backup on dry-run).
assert not any(fn.startswith("chroma.sqlite3.max-seq-id-backup-") for fn in os.listdir(palace))
assert seg["drawers_vec"] in before # sanity
def test_max_seq_id_segment_filter(tmp_path):
palace = str(tmp_path / "palace")
seg = _seed_poisoned_max_seq_id(palace)
result = repair.repair_max_seq_id(palace, segment=seg["drawers_meta"], assume_yes=True)
assert result["segment_repaired"] == [seg["drawers_meta"]]
db_path = os.path.join(palace, "chroma.sqlite3")
with sqlite3.connect(db_path) as conn:
rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall())
# Filtered segment is fixed; the other three remain poisoned.
assert rows[seg["drawers_meta"]] == seg["drawers_meta_max"]
for other in (seg["drawers_vec"], seg["closets_vec"], seg["closets_meta"]):
assert rows[other] > repair.MAX_SEQ_ID_SANITY_THRESHOLD
def test_max_seq_id_no_poison_is_noop(tmp_path):
palace = str(tmp_path / "palace")
os.makedirs(palace)
db_path = os.path.join(palace, "chroma.sqlite3")
with sqlite3.connect(db_path) as conn:
conn.executescript(
"""
CREATE TABLE segments(
id TEXT PRIMARY KEY, type TEXT, scope TEXT, collection TEXT
);
CREATE TABLE max_seq_id(segment_id TEXT PRIMARY KEY, seq_id);
CREATE TABLE embeddings(
id INTEGER PRIMARY KEY AUTOINCREMENT,
segment_id TEXT, embedding_id TEXT, seq_id
);
INSERT INTO segments VALUES ('s1', 'urn:vector', 'VECTOR', 'coll');
INSERT INTO max_seq_id VALUES ('s1', 12345);
"""
)
conn.commit()
result = repair.repair_max_seq_id(palace, assume_yes=True)
assert result["segment_repaired"] == []
assert result["backup"] is None
with sqlite3.connect(db_path) as conn:
rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall())
assert rows == {"s1": 12345}
def test_max_seq_id_backup_created(tmp_path):
palace = str(tmp_path / "palace")
seg = _seed_poisoned_max_seq_id(palace)
result = repair.repair_max_seq_id(palace, assume_yes=True)
assert result["backup"] is not None
assert os.path.isfile(result["backup"])
with sqlite3.connect(result["backup"]) as conn:
rows = dict(conn.execute("SELECT segment_id, seq_id FROM max_seq_id").fetchall())
# Backup preserves the poisoned values from before the repair.
assert rows[seg["drawers_vec"]] == seg["poisoned_values"][seg["drawers_vec"]]
assert rows[seg["drawers_meta"]] == seg["poisoned_values"][seg["drawers_meta"]]
def test_max_seq_id_rollback_on_verification_failure(tmp_path, monkeypatch):
"""If the post-update detector still sees poison, raise and leave a backup."""
palace = str(tmp_path / "palace")
_seed_poisoned_max_seq_id(palace)
real_detect = repair._detect_poisoned_max_seq_ids
calls = {"n": 0}
def flaky_detect(*args, **kwargs):
calls["n"] += 1
# First call (pre-repair) returns the real set so the repair proceeds.
if calls["n"] == 1:
return real_detect(*args, **kwargs)
# Second call (post-repair verification) claims poison still exists.
return [("seg-fake-still-poisoned", repair.MAX_SEQ_ID_SANITY_THRESHOLD + 1)]
monkeypatch.setattr(repair, "_detect_poisoned_max_seq_ids", flaky_detect)
with pytest.raises(repair.MaxSeqIdVerificationError):
repair.repair_max_seq_id(palace, assume_yes=True)
# A backup file is still present — caller can roll back from it.
leftover = [fn for fn in os.listdir(palace) if "max-seq-id-backup-" in fn]
assert leftover