merge: develop into fix/1274-missing-hnsw-metadata-gate

Two conflicts, both because #1339 (bloated link payloads) merged into
develop after this branch was authored:

- mempalace/backends/chroma.py: _segment_appears_healthy now stacks
  three checks — bloated-link from #1339 (top), missing-metadata-with-
  data-floor from this branch (middle), pickle format sniff (bottom).
  All three are complementary; #1339 catches structural payload
  corruption, this branch catches pickle truncation, the original
  catches pickle protocol-byte corruption.

- tests/test_backends.py: kept both new imports (_segment_appears_healthy
  from this branch, quarantine_invalid_hnsw_metadata from #1285).

Local: 1618 tests pass, ruff lint+format clean against 0.4.x CI pin.
This commit is contained in:
Igor Lins e Silva
2026-05-07 10:35:19 -03:00
49 changed files with 5338 additions and 363 deletions
+346 -1
View File
@@ -1,4 +1,6 @@
import os
import pickle
import shutil
import sqlite3
from pathlib import Path
@@ -20,6 +22,7 @@ from mempalace.backends.chroma import (
_fix_blob_seq_ids,
_pin_hnsw_threads,
_segment_appears_healthy,
quarantine_invalid_hnsw_metadata,
quarantine_stale_hnsw,
)
@@ -208,6 +211,52 @@ def test_query_empty_preserves_embeddings_outer_shape_when_requested():
assert not_requested.embeddings is None
def test_chroma_close_palace_releases_sqlite_lock_for_reopen(tmp_path):
"""close_palace must release chromadb's rust-side SQLite file lock so
a fresh PersistentClient on the same path after shutil.rmtree can
write without hitting SQLITE_READONLY_DBMOVED."""
backend = ChromaBackend()
palace_path = tmp_path / "palace-a"
ref = PalaceRef(id=str(palace_path), local_path=str(palace_path))
col = backend.get_collection(palace=ref, collection_name="mempalace_drawers", create=True)
col.upsert(documents=["hello"], ids=["a"], metadatas=[{"k": "v"}])
backend.close_palace(ref)
shutil.rmtree(palace_path)
col = backend.get_collection(palace=ref, collection_name="mempalace_drawers", create=True)
col.upsert(documents=["world"], ids=["b"], metadatas=[{"k": "v2"}])
assert col.count() == 1
def test_chroma_close_releases_all_cached_clients(tmp_path):
"""close() must release every cached client's SQLite file lock so any
of their palace paths can be reopened by a fresh backend in the same
process."""
backend = ChromaBackend()
palace_a = tmp_path / "palace-a"
palace_b = tmp_path / "palace-b"
ref_a = PalaceRef(id=str(palace_a), local_path=str(palace_a))
ref_b = PalaceRef(id=str(palace_b), local_path=str(palace_b))
for ref in (ref_a, ref_b):
backend.get_collection(palace=ref, collection_name="mempalace_drawers", create=True).upsert(
documents=["x"], ids=["x"], metadatas=[{"k": "v"}]
)
backend.close()
for path in (palace_a, palace_b):
shutil.rmtree(path)
ref = PalaceRef(id=str(path), local_path=str(path))
fresh = ChromaBackend()
col = fresh.get_collection(palace=ref, collection_name="mempalace_drawers", create=True)
col.upsert(documents=["y"], ids=["y"], metadatas=[{"k": "v2"}])
assert col.count() == 1
fresh.close()
def test_chroma_cache_invalidates_when_db_file_missing(tmp_path):
"""A palace rebuild that removes chroma.sqlite3 must drop the stale cache.
@@ -756,7 +805,10 @@ def test_make_client_quarantines_only_on_first_call_per_palace(tmp_path, monkeyp
"""Quarantine fires on first ``make_client()`` for a palace, then is
skipped on subsequent calls — prevents runtime thrash where a daemon's
own steady writes bump ``chroma.sqlite3`` faster than HNSW flushes,
making the mtime heuristic falsely trigger every reconnect."""
making the mtime heuristic falsely trigger every reconnect.
Invalid metadata quarantine shares the same cold-start gate here; the
more aggressive refresh path lives in ``_client()``."""
from mempalace.backends.chroma import ChromaBackend
palace_path = str(tmp_path / "palace")
@@ -783,6 +835,34 @@ def test_make_client_quarantines_only_on_first_call_per_palace(tmp_path, monkeyp
], "quarantine_stale_hnsw should fire once per palace per process, not on every reconnect"
def test_make_client_gates_invalid_metadata_on_first_call(tmp_path, monkeypatch):
"""Invalid metadata quarantine is gated on the first make_client() call."""
from mempalace.backends.chroma import ChromaBackend
palace_path = str(tmp_path / "palace")
os.makedirs(palace_path, exist_ok=True)
(Path(palace_path) / "chroma.sqlite3").write_text("")
monkeypatch.setattr(ChromaBackend, "_quarantined_paths", set())
calls: list[str] = []
def _invalid(path, *args, **kwargs):
calls.append(path)
return []
def _stale(path, stale_seconds=300.0):
return []
monkeypatch.setattr("mempalace.backends.chroma.quarantine_invalid_hnsw_metadata", _invalid)
monkeypatch.setattr("mempalace.backends.chroma.quarantine_stale_hnsw", _stale)
ChromaBackend.make_client(palace_path)
ChromaBackend.make_client(palace_path)
assert calls == [palace_path]
def test_make_client_quarantines_each_palace_independently(tmp_path, monkeypatch):
"""Two distinct palaces each get one quarantine attempt — the gate is
keyed by palace path, not global."""
@@ -920,3 +1000,268 @@ def test_get_collection_applies_retrofit_on_existing_palace(tmp_path):
)
assert wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1
def test_quarantine_invalid_hnsw_metadata_renames_missing_dimensionality(tmp_path):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
with open(seg / "index_metadata.pickle", "wb") as f:
pickle.dump({"dimensionality": None, "id_to_label": {"a": 1}}, f)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert len(moved) == 1
assert ".corrupt-" in moved[0]
assert not seg.exists()
def test_quarantine_invalid_hnsw_metadata_allows_uninitialized_segment(tmp_path):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
with open(seg / "index_metadata.pickle", "wb") as f:
pickle.dump({"dimensionality": None, "id_to_label": {}}, f)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert moved == []
assert seg.exists()
def test_quarantine_invalid_hnsw_metadata_rejects_non_dict_id_to_label(tmp_path):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
with open(seg / "index_metadata.pickle", "wb") as f:
pickle.dump({"dimensionality": 8, "id_to_label": ["a", "b"]}, f)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert len(moved) == 1
assert ".corrupt-" in moved[0]
assert not seg.exists()
def test_quarantine_invalid_hnsw_metadata_rejects_non_schema_payload(tmp_path):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
with open(seg / "index_metadata.pickle", "wb") as f:
pickle.dump(["not", "a", "metadata", "object"], f)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert len(moved) == 1
assert ".corrupt-" in moved[0]
assert not seg.exists()
def _dangerous_pickle_payload_executed():
raise AssertionError("unsafe pickle payload executed")
class _DangerousPickle:
def __reduce__(self):
return (_dangerous_pickle_payload_executed, ())
def test_quarantine_invalid_hnsw_metadata_rejects_unsafe_pickle(tmp_path):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
with open(seg / "index_metadata.pickle", "wb") as f:
pickle.dump(_DangerousPickle(), f)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert len(moved) == 1
assert ".corrupt-" in moved[0]
assert not seg.exists()
def test_quarantine_invalid_hnsw_metadata_skips_transient_read_errors(tmp_path, monkeypatch):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
meta = seg / "index_metadata.pickle"
meta.write_bytes(b"partial")
monkeypatch.setattr(
"mempalace.backends.chroma._SafePersistentDataUnpickler.load",
lambda path: (_ for _ in ()).throw(EOFError("flush in progress")),
)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert moved == []
assert seg.exists()
def test_quarantine_invalid_hnsw_metadata_skips_truncated_pickle(tmp_path, monkeypatch):
palace = tmp_path / "palace"
palace.mkdir()
seg = palace / "abcd-1234-5678"
seg.mkdir()
meta = seg / "index_metadata.pickle"
meta.write_bytes(b"partial")
monkeypatch.setattr(
"mempalace.backends.chroma._SafePersistentDataUnpickler.load",
lambda path: (_ for _ in ()).throw(pickle.UnpicklingError("pickle data was truncated")),
)
moved = quarantine_invalid_hnsw_metadata(str(palace))
assert moved == []
assert seg.exists()
def test_chroma_backend_preflights_metadata_before_persistent_client(tmp_path, monkeypatch):
palace = tmp_path / "palace"
palace.mkdir()
calls = []
def _record(name):
def inner(path, *args, **kwargs):
calls.append((name, path))
return [] if name != "blob" else None
return inner
monkeypatch.setattr("mempalace.backends.chroma._fix_blob_seq_ids", _record("blob"))
monkeypatch.setattr(
"mempalace.backends.chroma.quarantine_invalid_hnsw_metadata", _record("invalid")
)
monkeypatch.setattr("mempalace.backends.chroma.quarantine_stale_hnsw", _record("stale"))
class DummyClient:
pass
monkeypatch.setattr(
"mempalace.backends.chroma.chromadb.PersistentClient", lambda path: DummyClient()
)
backend = ChromaBackend()
backend._client(str(palace))
assert calls == [
("blob", str(palace)),
("invalid", str(palace)),
("stale", str(palace)),
]
def test_chroma_backend_stale_quarantine_is_cold_start_only_on_refresh(tmp_path, monkeypatch):
palace = tmp_path / "palace"
palace.mkdir()
(palace / "chroma.sqlite3").write_text("")
calls = []
def _record(name):
def inner(path, *args, **kwargs):
calls.append((name, path))
return [] if name != "blob" else None
return inner
monkeypatch.setattr(ChromaBackend, "_quarantined_paths", set())
monkeypatch.setattr("mempalace.backends.chroma._fix_blob_seq_ids", _record("blob"))
monkeypatch.setattr(
"mempalace.backends.chroma.quarantine_invalid_hnsw_metadata", _record("invalid")
)
monkeypatch.setattr("mempalace.backends.chroma.quarantine_stale_hnsw", _record("stale"))
class DummyClient:
pass
monkeypatch.setattr(
"mempalace.backends.chroma.chromadb.PersistentClient", lambda path: DummyClient()
)
backend = ChromaBackend()
stats = iter([(1, 1.0), (1, 1.0), (1, 2.0), (1, 2.0)])
monkeypatch.setattr(backend, "_db_stat", lambda path: next(stats))
backend._client(str(palace))
backend._client(str(palace))
assert calls == [
("blob", str(palace)),
("invalid", str(palace)),
("stale", str(palace)),
("blob", str(palace)),
]
def test_chroma_backend_requarantines_after_inode_replacement(tmp_path, monkeypatch):
palace = tmp_path / "palace"
palace.mkdir()
(palace / "chroma.sqlite3").write_text("")
calls = []
def _record(name):
def inner(path, *args, **kwargs):
calls.append((name, path))
return [] if name != "blob" else None
return inner
monkeypatch.setattr(ChromaBackend, "_quarantined_paths", set())
monkeypatch.setattr("mempalace.backends.chroma._fix_blob_seq_ids", _record("blob"))
monkeypatch.setattr(
"mempalace.backends.chroma.quarantine_invalid_hnsw_metadata", _record("invalid")
)
monkeypatch.setattr("mempalace.backends.chroma.quarantine_stale_hnsw", _record("stale"))
class DummyClient:
pass
monkeypatch.setattr(
"mempalace.backends.chroma.chromadb.PersistentClient", lambda path: DummyClient()
)
backend = ChromaBackend()
stats = iter([(1, 1.0), (1, 1.0), (2, 2.0), (2, 2.0)])
monkeypatch.setattr(backend, "_db_stat", lambda path: next(stats))
backend._client(str(palace))
backend._client(str(palace))
assert calls == [
("blob", str(palace)),
("invalid", str(palace)),
("stale", str(palace)),
("blob", str(palace)),
("invalid", str(palace)),
("stale", str(palace)),
]
def test_palace_get_collection_uses_configured_collection_name(monkeypatch):
from mempalace import palace
captured = {}
def fake_get_collection(palace_path, collection_name=None, create=False):
captured["palace_path"] = palace_path
captured["collection_name"] = collection_name
captured["create"] = create
return object()
monkeypatch.setattr(palace._DEFAULT_BACKEND, "get_collection", fake_get_collection)
monkeypatch.setattr("mempalace.config.get_configured_collection_name", lambda: "custom_drawers")
palace.get_collection("/palace", create=False)
assert captured == {
"palace_path": "/palace",
"collection_name": "custom_drawers",
"create": False,
}