fix: best-effort HNSW thread-pin retrofit + drop dead attempt-cap constant
Addresses remaining PR #976 review items after rebase on develop. `get_collection(create=False)` previously returned existing collections without re-applying `hnsw:num_threads=1`, so palaces created before the fix kept the unsafe parallel-insert path. Add `_pin_hnsw_threads()` helper that calls `collection.modify(configuration=UpdateCollectionConfiguration( hnsw=UpdateHNSWConfiguration(num_threads=1)))` best-effort on every `get_collection` call (including the MCP server's `_get_collection`). In chromadb 1.5.x the runtime config does not persist to disk across `PersistentClient` reopens, so the retrofit is re-applied each process start rather than being a one-shot migration. Fresh palaces keep the metadata-based pin as primary defense; legacy palaces now also get per-session protection without requiring `mempalace nuke` + re-mine. After the rebase on develop, `hook_precompact` delegates to `_mine_sync` and no longer emits `decision: block`, so the attempt-cap constant was orphaned. Grep confirms 0 usages in the repo — remove it. - `_pin_hnsw_threads` retrofits legacy collection (num_threads None -> 1) - `_pin_hnsw_threads` swallows all errors (never raises) - `ChromaBackend.get_collection(create=False)` applies retrofit on legacy palace - 62 tests pass (10 backends + 6 palace locks + 46 hooks_cli)
This commit is contained in:
committed by
Igor Lins e Silva
parent
40d7958ca1
commit
8df944a54d
@@ -130,6 +130,37 @@ def quarantine_stale_hnsw(palace_path: str, stale_seconds: float = 3600.0) -> li
|
|||||||
return moved
|
return moved
|
||||||
|
|
||||||
|
|
||||||
|
def _pin_hnsw_threads(collection) -> None:
|
||||||
|
"""Best-effort retrofit: pin ``hnsw:num_threads=1`` on an existing collection.
|
||||||
|
|
||||||
|
Fresh collections set this via ``metadata=`` at creation. Legacy palaces
|
||||||
|
built before that change keep the default (parallel insert) and can hit
|
||||||
|
the HNSW race described in #974/#965. ChromaDB's
|
||||||
|
``collection.modify(configuration=...)`` lets us re-apply ``num_threads=1``
|
||||||
|
in memory at load time so every new process is protected.
|
||||||
|
|
||||||
|
Note: in chromadb 1.5.x the modified ``configuration_json["hnsw"]`` does
|
||||||
|
not persist to disk across ``PersistentClient`` reopens, so this must
|
||||||
|
run on every ``get_collection`` call, not just once.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from chromadb.api.collection_configuration import (
|
||||||
|
UpdateCollectionConfiguration,
|
||||||
|
UpdateHNSWConfiguration,
|
||||||
|
)
|
||||||
|
except ImportError:
|
||||||
|
logger.debug("_pin_hnsw_threads skipped: chromadb too old", exc_info=True)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
collection.modify(
|
||||||
|
configuration=UpdateCollectionConfiguration(
|
||||||
|
hnsw=UpdateHNSWConfiguration(num_threads=1)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("_pin_hnsw_threads modify failed", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
def _fix_blob_seq_ids(palace_path: str) -> None:
|
def _fix_blob_seq_ids(palace_path: str) -> None:
|
||||||
"""Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER.
|
"""Fix ChromaDB 0.6.x -> 1.5.x migration bug: BLOB seq_ids -> INTEGER.
|
||||||
|
|
||||||
@@ -572,6 +603,7 @@ class ChromaBackend(BaseBackend):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
collection = client.get_collection(collection_name, **ef_kwargs)
|
collection = client.get_collection(collection_name, **ef_kwargs)
|
||||||
|
_pin_hnsw_threads(collection)
|
||||||
return ChromaCollection(collection)
|
return ChromaCollection(collection)
|
||||||
|
|
||||||
def close_palace(self, palace) -> None:
|
def close_palace(self, palace) -> None:
|
||||||
|
|||||||
@@ -643,9 +643,6 @@ def hook_session_start(data: dict, harness: str):
|
|||||||
_output({})
|
_output({})
|
||||||
|
|
||||||
|
|
||||||
MAX_PRECOMPACT_BLOCK_ATTEMPTS = 2
|
|
||||||
|
|
||||||
|
|
||||||
def hook_precompact(data: dict, harness: str):
|
def hook_precompact(data: dict, harness: str):
|
||||||
"""Precompact hook: mine transcript synchronously, then allow compaction."""
|
"""Precompact hook: mine transcript synchronously, then allow compaction."""
|
||||||
parsed = _parse_harness_input(data, harness)
|
parsed = _parse_harness_input(data, harness)
|
||||||
|
|||||||
+14
-11
@@ -57,7 +57,7 @@ from .config import ( # noqa: E402
|
|||||||
sanitize_content,
|
sanitize_content,
|
||||||
)
|
)
|
||||||
from .version import __version__ # noqa: E402
|
from .version import __version__ # noqa: E402
|
||||||
from .backends.chroma import ChromaBackend, ChromaCollection # noqa: E402
|
from .backends.chroma import ChromaBackend, ChromaCollection, _pin_hnsw_threads # noqa: E402
|
||||||
from .query_sanitizer import sanitize_query # noqa: E402
|
from .query_sanitizer import sanitize_query # noqa: E402
|
||||||
from .searcher import search_memories # noqa: E402
|
from .searcher import search_memories # noqa: E402
|
||||||
from .palace_graph import ( # noqa: E402
|
from .palace_graph import ( # noqa: E402
|
||||||
@@ -219,20 +219,23 @@ def _get_collection(create=False):
|
|||||||
if create:
|
if create:
|
||||||
# hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor
|
# hnsw:num_threads=1 disables ChromaDB's multi-threaded ParallelFor
|
||||||
# HNSW insert path, which has a race in repairConnectionsForUpdate /
|
# HNSW insert path, which has a race in repairConnectionsForUpdate /
|
||||||
# addPoint (see issues #974, #965). The setting is only honored at
|
# addPoint (see issues #974, #965). Set via metadata on fresh
|
||||||
# collection creation time — pre-existing palaces created before
|
# collections and re-applied via _pin_hnsw_threads() for legacy
|
||||||
# this fix keep the unsafe default; users must `mempalace nuke` +
|
# palaces whose collections were created before this fix (the
|
||||||
# re-mine to get the protection on legacy palaces.
|
# runtime config does not persist cross-process in chromadb 1.5.x,
|
||||||
_collection_cache = ChromaCollection(
|
# so the retrofit runs every time _get_collection opens a cache).
|
||||||
client.get_or_create_collection(
|
raw = client.get_or_create_collection(
|
||||||
_config.collection_name,
|
_config.collection_name,
|
||||||
metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1},
|
metadata={"hnsw:space": "cosine", "hnsw:num_threads": 1},
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
_pin_hnsw_threads(raw)
|
||||||
|
_collection_cache = ChromaCollection(raw)
|
||||||
_metadata_cache = None
|
_metadata_cache = None
|
||||||
_metadata_cache_time = 0
|
_metadata_cache_time = 0
|
||||||
elif _collection_cache is None:
|
elif _collection_cache is None:
|
||||||
_collection_cache = ChromaCollection(client.get_collection(_config.collection_name))
|
raw = client.get_collection(_config.collection_name)
|
||||||
|
_pin_hnsw_threads(raw)
|
||||||
|
_collection_cache = ChromaCollection(raw)
|
||||||
_metadata_cache = None
|
_metadata_cache = None
|
||||||
_metadata_cache_time = 0
|
_metadata_cache_time = 0
|
||||||
return _collection_cache
|
return _collection_cache
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from mempalace.backends.chroma import (
|
|||||||
ChromaBackend,
|
ChromaBackend,
|
||||||
ChromaCollection,
|
ChromaCollection,
|
||||||
_fix_blob_seq_ids,
|
_fix_blob_seq_ids,
|
||||||
|
_pin_hnsw_threads,
|
||||||
quarantine_stale_hnsw,
|
quarantine_stale_hnsw,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -443,3 +444,56 @@ def test_quarantine_stale_hnsw_skips_already_quarantined(tmp_path):
|
|||||||
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
moved = quarantine_stale_hnsw(str(palace), stale_seconds=3600.0)
|
||||||
assert moved == []
|
assert moved == []
|
||||||
assert drift.exists()
|
assert drift.exists()
|
||||||
|
|
||||||
|
|
||||||
|
# ── _pin_hnsw_threads ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_pin_hnsw_threads_retrofits_legacy_collection(tmp_path):
|
||||||
|
"""Legacy collections (created without num_threads) get the retrofit applied."""
|
||||||
|
palace_path = tmp_path / "legacy-palace"
|
||||||
|
palace_path.mkdir()
|
||||||
|
|
||||||
|
client = chromadb.PersistentClient(path=str(palace_path))
|
||||||
|
col = client.create_collection(
|
||||||
|
"mempalace_drawers",
|
||||||
|
metadata={"hnsw:space": "cosine"}, # no num_threads — legacy
|
||||||
|
)
|
||||||
|
assert col.configuration_json.get("hnsw", {}).get("num_threads") is None
|
||||||
|
|
||||||
|
_pin_hnsw_threads(col)
|
||||||
|
|
||||||
|
assert col.configuration_json["hnsw"]["num_threads"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_pin_hnsw_threads_swallows_all_errors():
|
||||||
|
"""Retrofit never raises even when collection.modify explodes."""
|
||||||
|
|
||||||
|
class _ExplodingCollection:
|
||||||
|
def modify(self, *args, **kwargs):
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
|
||||||
|
_pin_hnsw_threads(_ExplodingCollection()) # must not raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_collection_applies_retrofit_on_existing_palace(tmp_path):
|
||||||
|
"""ChromaBackend.get_collection(create=False) applies the retrofit."""
|
||||||
|
palace_path = tmp_path / "palace"
|
||||||
|
palace_path.mkdir()
|
||||||
|
|
||||||
|
# Simulate a legacy palace: create collection without num_threads
|
||||||
|
bootstrap_client = chromadb.PersistentClient(path=str(palace_path))
|
||||||
|
bootstrap_client.create_collection(
|
||||||
|
"mempalace_drawers", metadata={"hnsw:space": "cosine"}
|
||||||
|
)
|
||||||
|
del bootstrap_client # drop reference so a fresh client reopens cleanly
|
||||||
|
|
||||||
|
wrapper = ChromaBackend().get_collection(
|
||||||
|
str(palace_path),
|
||||||
|
collection_name="mempalace_drawers",
|
||||||
|
create=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
wrapper._collection.configuration_json["hnsw"]["num_threads"] == 1
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user