Merge pull request #1287 from messelink/fix/hnsw-divergence-scales-with-sync-threshold
fix(repair): scale HNSW divergence floor with hnsw:sync_threshold
This commit is contained in:
@@ -372,20 +372,71 @@ def _hnsw_element_count(palace_path: str, segment_id: str) -> Optional[int]:
|
||||
|
||||
|
||||
# Divergence threshold: chromadb's HNSW flushes asynchronously, so HNSW
|
||||
# typically lags sqlite by up to ``sync_threshold`` (default 1000) records
|
||||
# under active write load — that's the *brute-force batch* that hasn't
|
||||
# been compacted into HNSW yet, plus the un-persisted tail beyond the
|
||||
# last sync. Two synchronization windows worth (2 × sync_threshold = 2000)
|
||||
# is a safe steady-state ceiling; anything past that is real divergence,
|
||||
# not flush-lag.
|
||||
# typically lags sqlite by up to ``sync_threshold`` records under active
|
||||
# write load — that's the *brute-force batch* that hasn't been compacted
|
||||
# into HNSW yet, plus the un-persisted tail beyond the last sync. Two
|
||||
# synchronization windows worth (2 × sync_threshold) is a safe steady-
|
||||
# state ceiling; anything past that is real divergence, not flush-lag.
|
||||
#
|
||||
# The #1222 case was 176 613 missing out of 192 997 (91% gone) — orders
|
||||
# of magnitude past 2000. A typical post-mine palace shows a few hundred
|
||||
# to ~1000 missing, well under threshold.
|
||||
_HNSW_DIVERGENCE_ABSOLUTE = 2000
|
||||
# The threshold floor scales with whatever ``hnsw:sync_threshold`` the
|
||||
# collection was created with (read via :func:`_read_sync_threshold`).
|
||||
# ``_HNSW_DIVERGENCE_FALLBACK_FLOOR`` is the floor used when we can't
|
||||
# read the collection metadata (older palaces missing the row, sqlite
|
||||
# unreadable). 2000 = 2 × chromadb's default sync_threshold of 1000.
|
||||
#
|
||||
# Why dynamic: PR #1191 set ``hnsw:sync_threshold = 50_000`` to prevent
|
||||
# index bloat, which means flush-lag can grow up to 50K naturally. A
|
||||
# fixed 2000 floor would flag every actively-written palace as DIVERGED
|
||||
# the moment its queue exceeded 10% of sqlite_count, even though chromadb
|
||||
# is behaving correctly. The floor must scale with sync_threshold to
|
||||
# distinguish real corruption (#1222 was 176 613 missing of 192 997 —
|
||||
# orders of magnitude past 2 × any reasonable sync_threshold) from
|
||||
# expected steady-state lag.
|
||||
_HNSW_DIVERGENCE_FALLBACK_FLOOR = 2000
|
||||
_HNSW_DIVERGENCE_FRACTION = 0.10
|
||||
|
||||
|
||||
def _read_sync_threshold(palace_path: str, collection_name: str) -> int:
|
||||
"""Return the ``hnsw:sync_threshold`` for a collection, or 1000 default.
|
||||
|
||||
The configured sync_threshold drives chromadb's HNSW flush cadence —
|
||||
larger values mean fewer, bigger flushes (less index-bloat risk per
|
||||
PR #1191) but also larger steady-state lag between
|
||||
``index_metadata.pickle`` and the live sqlite count. The divergence
|
||||
probe scales its tolerance to ``2 × sync_threshold`` so that lag is
|
||||
not mistaken for corruption.
|
||||
|
||||
Falls back to 1000 (chromadb's own default) if the collection has no
|
||||
explicit setting — matches what older mempalace palaces were created
|
||||
with before PR #1191.
|
||||
"""
|
||||
db_path = os.path.join(palace_path, "chroma.sqlite3")
|
||||
if not os.path.isfile(db_path):
|
||||
return 1000
|
||||
try:
|
||||
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||||
try:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT cm.int_value
|
||||
FROM collection_metadata cm
|
||||
JOIN collections c ON cm.collection_id = c.id
|
||||
WHERE c.name = ? AND cm.key = 'hnsw:sync_threshold'
|
||||
""",
|
||||
(collection_name,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row and row[0] is not None:
|
||||
return int(row[0])
|
||||
return 1000
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
logger.debug("_read_sync_threshold failed", exc_info=True)
|
||||
return 1000
|
||||
|
||||
|
||||
def hnsw_capacity_status(palace_path: str, collection_name: str = "mempalace_drawers") -> dict:
|
||||
"""Compare sqlite embedding count against HNSW element count.
|
||||
|
||||
@@ -431,13 +482,18 @@ def hnsw_capacity_status(palace_path: str, collection_name: str = "mempalace_dra
|
||||
hnsw_count = _hnsw_element_count(palace_path, seg_id)
|
||||
out["hnsw_count"] = hnsw_count
|
||||
|
||||
sync_threshold = _read_sync_threshold(palace_path, collection_name)
|
||||
# Two synchronization windows worth — see comment above
|
||||
# _HNSW_DIVERGENCE_FALLBACK_FLOOR for the rationale.
|
||||
divergence_floor = max(_HNSW_DIVERGENCE_FALLBACK_FLOOR, 2 * sync_threshold)
|
||||
|
||||
if hnsw_count is None:
|
||||
# No pickle yet — segment hasn't persisted metadata. Could be
|
||||
# fresh-but-unflushed (normal) or interrupted-mid-flush (bad).
|
||||
# We can't distinguish without the pickle, so only flag
|
||||
# divergence when sqlite holds clearly more than two flush
|
||||
# windows worth — same threshold as the with-pickle path.
|
||||
if sqlite_count > _HNSW_DIVERGENCE_ABSOLUTE:
|
||||
if sqlite_count > divergence_floor:
|
||||
out["status"] = "diverged"
|
||||
out["diverged"] = True
|
||||
out["divergence"] = sqlite_count
|
||||
@@ -452,7 +508,7 @@ def hnsw_capacity_status(palace_path: str, collection_name: str = "mempalace_dra
|
||||
|
||||
divergence = sqlite_count - hnsw_count
|
||||
out["divergence"] = divergence
|
||||
threshold = max(_HNSW_DIVERGENCE_ABSOLUTE, int(sqlite_count * _HNSW_DIVERGENCE_FRACTION))
|
||||
threshold = max(divergence_floor, int(sqlite_count * _HNSW_DIVERGENCE_FRACTION))
|
||||
if divergence > threshold:
|
||||
out["status"] = "diverged"
|
||||
out["diverged"] = True
|
||||
|
||||
+101
-4
@@ -28,13 +28,23 @@ COLLECTION = "mempalace_drawers"
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _seed_chroma_db(palace: str, sqlite_count: int, segment_id: str) -> None:
|
||||
def _seed_chroma_db(
|
||||
palace: str,
|
||||
sqlite_count: int,
|
||||
segment_id: str,
|
||||
sync_threshold: int | None = None,
|
||||
) -> None:
|
||||
"""Create a minimal chroma.sqlite3 with one collection + VECTOR segment.
|
||||
|
||||
Mirrors the columns the probe queries: ``segments``, ``collections``,
|
||||
``embeddings``, ``embedding_metadata``. Schema matches chromadb
|
||||
1.5.x; column types are kept loose because we read with COUNT(*) and
|
||||
SELECT key, *_value rather than driver-specific casts.
|
||||
``collection_metadata``, ``embeddings``, ``embedding_metadata``.
|
||||
Schema matches chromadb 1.5.x; column types are kept loose because
|
||||
we read with COUNT(*) and SELECT key, *_value rather than driver-
|
||||
specific casts.
|
||||
|
||||
When ``sync_threshold`` is supplied, an ``hnsw:sync_threshold`` row
|
||||
is added to ``collection_metadata`` so the divergence floor scales
|
||||
accordingly. Omit to model an older palace that pre-dates PR #1191.
|
||||
"""
|
||||
db_path = os.path.join(palace, "chroma.sqlite3")
|
||||
conn = sqlite3.connect(db_path)
|
||||
@@ -45,6 +55,15 @@ def _seed_chroma_db(palace: str, sqlite_count: int, segment_id: str) -> None:
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL
|
||||
);
|
||||
CREATE TABLE collection_metadata (
|
||||
collection_id TEXT REFERENCES collections(id) ON DELETE CASCADE,
|
||||
key TEXT NOT NULL,
|
||||
str_value TEXT,
|
||||
int_value INTEGER,
|
||||
float_value REAL,
|
||||
bool_value INTEGER,
|
||||
PRIMARY KEY (collection_id, key)
|
||||
);
|
||||
CREATE TABLE segments (
|
||||
id TEXT PRIMARY KEY,
|
||||
collection TEXT NOT NULL,
|
||||
@@ -73,6 +92,12 @@ def _seed_chroma_db(palace: str, sqlite_count: int, segment_id: str) -> None:
|
||||
col_id = "col-test"
|
||||
meta_seg = "seg-meta"
|
||||
conn.execute("INSERT INTO collections (id, name) VALUES (?, ?)", (col_id, COLLECTION))
|
||||
if sync_threshold is not None:
|
||||
conn.execute(
|
||||
"""INSERT INTO collection_metadata (collection_id, key, int_value)
|
||||
VALUES (?, 'hnsw:sync_threshold', ?)""",
|
||||
(col_id, sync_threshold),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO segments (id, collection, scope) VALUES (?, ?, 'VECTOR')",
|
||||
(segment_id, col_id),
|
||||
@@ -229,6 +254,78 @@ def test_capacity_status_quiet_for_empty_palace(tmp_path):
|
||||
assert info["status"] == "unknown"
|
||||
|
||||
|
||||
# ── Divergence threshold scales with hnsw:sync_threshold ───────────────
|
||||
|
||||
|
||||
def test_capacity_status_tolerates_lag_under_large_sync_threshold(tmp_path):
|
||||
"""Regression for the PR #1191 / PR #1227 conflict.
|
||||
|
||||
Palaces created via mempalace's _HNSW_BLOAT_GUARD (sync_threshold=
|
||||
50_000) naturally accumulate up to ~50K queued entries between
|
||||
flushes. The pickle-vs-sqlite probe must scale its tolerance to
|
||||
``2 × sync_threshold`` so this expected lag is not flagged as
|
||||
corruption — otherwise vector search disables for ~80% of the
|
||||
write cycle on any actively-mined ≥100K palace.
|
||||
"""
|
||||
seg = "seg-bloat-guard"
|
||||
_seed_chroma_db(str(tmp_path), sqlite_count=100_000, segment_id=seg, sync_threshold=50_000)
|
||||
_write_pickle(str(tmp_path), seg, hnsw_count=50_000)
|
||||
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
|
||||
# 50K divergence is exactly one flush window — well within 2× = 100K.
|
||||
assert info["diverged"] is False, info["message"]
|
||||
assert info["status"] == "ok"
|
||||
assert info["divergence"] == 50_000
|
||||
|
||||
|
||||
def test_capacity_status_still_flags_real_corruption_under_large_sync(tmp_path):
|
||||
"""The dynamic floor must still catch genuine #1222-style corruption.
|
||||
|
||||
sqlite at 200K with HNSW frozen at 16K is the original #1222 shape —
|
||||
any reasonable threshold should flag it, regardless of whether the
|
||||
collection was created with sync_threshold=1000 or 50_000.
|
||||
"""
|
||||
seg = "seg-1222-with-bloat-guard"
|
||||
_seed_chroma_db(str(tmp_path), sqlite_count=200_000, segment_id=seg, sync_threshold=50_000)
|
||||
_write_pickle(str(tmp_path), seg, hnsw_count=16_384)
|
||||
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
|
||||
# 183,616 missing — far past 2 × 50K = 100K floor and 10% of 200K = 20K.
|
||||
assert info["diverged"] is True
|
||||
assert info["status"] == "diverged"
|
||||
assert info["divergence"] == 183_616
|
||||
|
||||
|
||||
def test_capacity_status_default_threshold_when_no_sync_metadata(tmp_path):
|
||||
"""Older palaces without ``hnsw:sync_threshold`` fall back to 2000 floor.
|
||||
|
||||
Pre-PR-#1191 collections only carry ``hnsw:space``. The probe must
|
||||
use chromadb's own default sync_threshold of 1000 → floor of 2000,
|
||||
matching pre-fix behavior.
|
||||
"""
|
||||
seg = "seg-legacy"
|
||||
# No sync_threshold supplied — collection_metadata stays empty.
|
||||
_seed_chroma_db(str(tmp_path), sqlite_count=10_000, segment_id=seg)
|
||||
_write_pickle(str(tmp_path), seg, hnsw_count=7_500)
|
||||
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
|
||||
# 2,500 divergence > max(2000 floor, 10% of 10K = 1000) → DIVERGED
|
||||
assert info["diverged"] is True
|
||||
assert info["divergence"] == 2_500
|
||||
|
||||
|
||||
def test_unflushed_path_also_uses_dynamic_floor(tmp_path):
|
||||
"""The never-flushed branch must scale with sync_threshold too.
|
||||
|
||||
A 30K-drawer collection under sync_threshold=50_000 hasn't reached
|
||||
its first flush yet — pickle is absent. Pre-fix this would flag
|
||||
DIVERGED (30K > fixed 2000 floor); post-fix the 30K stays under
|
||||
the dynamic 100K floor.
|
||||
"""
|
||||
seg = "seg-preflush-large"
|
||||
_seed_chroma_db(str(tmp_path), sqlite_count=30_000, segment_id=seg, sync_threshold=50_000)
|
||||
info = hnsw_capacity_status(str(tmp_path), COLLECTION)
|
||||
assert info["hnsw_count"] is None
|
||||
assert info["diverged"] is False, info["message"]
|
||||
|
||||
|
||||
# ── BM25-only sqlite fallback ─────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user