42b940d263
Four defects surfaced by the automated review, fixed with targeted tests: 1. BaseCollection.update() default now validates that documents / metadatas / embeddings lengths match ids, raising ValueError instead of silently misaligning pairs or raising IndexError (base.py). 2. ChromaCollection.query() now rejects the two ambiguous input shapes up front — neither or both of query_texts / query_embeddings, and empty input lists — with clear ValueError messages rather than delegating to chromadb's less-obvious errors (chroma.py). 3. QueryResult.empty() accepts embeddings_requested=True to preserve the outer-query dimension with empty hit lists when the caller asked for embeddings, matching the spec rule that included fields carry the outer shape even when empty (base.py). ChromaCollection.query() threads this through on the empty-result path (chroma.py). 4. ChromaBackend cache-freshness check now matches the semantics from mcp_server._get_client (merged via #757) on three edge cases Copilot called out: (a) invalidate when chroma.sqlite3 disappears while a cached client is held, (b) treat a 0→nonzero stat transition as a change so a cache built when the DB did not yet exist is refreshed, (c) re-stat after PersistentClient constructs the DB lazily so freshness reflects the post-creation state (chroma.py). Tests: 978 passed (up from 970), 8 new tests covering the fixes.
400 lines
14 KiB
Python
400 lines
14 KiB
Python
import sqlite3
|
|
|
|
import chromadb
|
|
import pytest
|
|
|
|
from mempalace.backends import (
|
|
GetResult,
|
|
PalaceRef,
|
|
QueryResult,
|
|
UnsupportedFilterError,
|
|
available_backends,
|
|
get_backend,
|
|
)
|
|
from mempalace.backends.chroma import ChromaBackend, ChromaCollection, _fix_blob_seq_ids
|
|
|
|
|
|
class _FakeCollection:
|
|
"""Stand-in for a chromadb.Collection returning raw chroma-shaped dicts."""
|
|
|
|
def __init__(self, query_response=None, get_response=None, count_value=7):
|
|
self.calls = []
|
|
self._query_response = query_response or {
|
|
"ids": [["a", "b"]],
|
|
"documents": [["da", "db"]],
|
|
"metadatas": [[{"wing": "w1"}, {"wing": "w2"}]],
|
|
"distances": [[0.1, 0.2]],
|
|
}
|
|
self._get_response = get_response or {
|
|
"ids": ["a"],
|
|
"documents": ["da"],
|
|
"metadatas": [{"wing": "w1"}],
|
|
}
|
|
self._count_value = count_value
|
|
|
|
def add(self, **kwargs):
|
|
self.calls.append(("add", kwargs))
|
|
|
|
def upsert(self, **kwargs):
|
|
self.calls.append(("upsert", kwargs))
|
|
|
|
def update(self, **kwargs):
|
|
self.calls.append(("update", kwargs))
|
|
|
|
def query(self, **kwargs):
|
|
self.calls.append(("query", kwargs))
|
|
return self._query_response
|
|
|
|
def get(self, **kwargs):
|
|
self.calls.append(("get", kwargs))
|
|
return self._get_response
|
|
|
|
def delete(self, **kwargs):
|
|
self.calls.append(("delete", kwargs))
|
|
|
|
def count(self):
|
|
self.calls.append(("count", {}))
|
|
return self._count_value
|
|
|
|
|
|
def test_chroma_collection_returns_typed_query_result():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
|
|
result = collection.query(query_texts=["q"])
|
|
|
|
assert isinstance(result, QueryResult)
|
|
assert result.ids == [["a", "b"]]
|
|
assert result.documents == [["da", "db"]]
|
|
assert result.metadatas == [[{"wing": "w1"}, {"wing": "w2"}]]
|
|
assert result.distances == [[0.1, 0.2]]
|
|
assert result.embeddings is None
|
|
|
|
|
|
def test_chroma_collection_returns_typed_get_result():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
|
|
result = collection.get(where={"wing": "w1"})
|
|
|
|
assert isinstance(result, GetResult)
|
|
assert result.ids == ["a"]
|
|
assert result.documents == ["da"]
|
|
assert result.metadatas == [{"wing": "w1"}]
|
|
|
|
|
|
def test_query_result_empty_preserves_outer_dimension():
|
|
empty = QueryResult.empty(num_queries=2)
|
|
assert empty.ids == [[], []]
|
|
assert empty.documents == [[], []]
|
|
assert empty.distances == [[], []]
|
|
assert empty.embeddings is None
|
|
|
|
|
|
def test_typed_results_support_dict_compat_access():
|
|
"""Transitional compat shim per base.py — retained until callers migrate to attrs."""
|
|
result = GetResult(ids=["a"], documents=["da"], metadatas=[{"w": 1}])
|
|
assert result["ids"] == ["a"]
|
|
assert result.get("documents") == ["da"]
|
|
assert result.get("missing", "default") == "default"
|
|
assert "ids" in result
|
|
assert "missing" not in result
|
|
|
|
|
|
def test_chroma_collection_query_empty_result_preserves_outer_shape():
|
|
fake = _FakeCollection(
|
|
query_response={"ids": [], "documents": [], "metadatas": [], "distances": []}
|
|
)
|
|
collection = ChromaCollection(fake)
|
|
|
|
result = collection.query(query_texts=["q1", "q2"])
|
|
assert result.ids == [[], []]
|
|
assert result.documents == [[], []]
|
|
assert result.distances == [[], []]
|
|
|
|
|
|
def test_chroma_collection_rejects_unknown_where_operator():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
|
|
with pytest.raises(UnsupportedFilterError):
|
|
collection.query(query_texts=["q"], where={"$regex": "foo"})
|
|
|
|
|
|
def test_chroma_collection_delegates_writes():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
|
|
collection.add(documents=["d"], ids=["1"], metadatas=[{"wing": "w"}])
|
|
collection.upsert(documents=["u"], ids=["2"], metadatas=[{"room": "r"}])
|
|
collection.delete(ids=["1"])
|
|
assert collection.count() == 7
|
|
|
|
kinds = [call[0] for call in fake.calls]
|
|
assert kinds == ["add", "upsert", "delete", "count"]
|
|
|
|
|
|
def test_registry_exposes_chroma_by_default():
|
|
names = available_backends()
|
|
assert "chroma" in names
|
|
assert isinstance(get_backend("chroma"), ChromaBackend)
|
|
|
|
|
|
def test_registry_unknown_backend_raises():
|
|
with pytest.raises(KeyError):
|
|
get_backend("no-such-backend-exists")
|
|
|
|
|
|
def test_resolve_backend_priority_order(tmp_path):
|
|
from mempalace.backends import resolve_backend_for_palace
|
|
|
|
# explicit kwarg wins over everything
|
|
assert resolve_backend_for_palace(explicit="pg", config_value="lance") == "pg"
|
|
# config value wins over env / default
|
|
assert resolve_backend_for_palace(config_value="lance", env_value="qdrant") == "lance"
|
|
# env wins over default
|
|
assert resolve_backend_for_palace(env_value="qdrant", default="chroma") == "qdrant"
|
|
# falls back to default
|
|
assert resolve_backend_for_palace() == "chroma"
|
|
|
|
|
|
def test_chroma_detect_matches_palace_with_chroma_sqlite(tmp_path):
|
|
(tmp_path / "chroma.sqlite3").write_bytes(b"")
|
|
assert ChromaBackend.detect(str(tmp_path)) is True
|
|
assert ChromaBackend.detect(str(tmp_path.parent)) is False
|
|
|
|
|
|
def test_query_rejects_missing_input():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
with pytest.raises(ValueError):
|
|
collection.query()
|
|
|
|
|
|
def test_query_rejects_both_texts_and_embeddings():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
with pytest.raises(ValueError):
|
|
collection.query(query_texts=["q"], query_embeddings=[[0.1, 0.2]])
|
|
|
|
|
|
def test_query_rejects_empty_input_list():
|
|
fake = _FakeCollection()
|
|
collection = ChromaCollection(fake)
|
|
with pytest.raises(ValueError):
|
|
collection.query(query_texts=[])
|
|
|
|
|
|
def test_query_empty_preserves_embeddings_outer_shape_when_requested():
|
|
fake = _FakeCollection(
|
|
query_response={"ids": [], "documents": [], "metadatas": [], "distances": []}
|
|
)
|
|
collection = ChromaCollection(fake)
|
|
|
|
requested = collection.query(query_texts=["q1", "q2"], include=["documents", "embeddings"])
|
|
assert requested.embeddings == [[], []]
|
|
|
|
not_requested = collection.query(query_texts=["q1", "q2"], include=["documents"])
|
|
assert not_requested.embeddings is None
|
|
|
|
|
|
def test_base_collection_update_default_validates_list_lengths(tmp_path):
|
|
backend = ChromaBackend()
|
|
palace_path = tmp_path / "palace"
|
|
collection = backend.get_collection(
|
|
palace=PalaceRef(id=str(palace_path), local_path=str(palace_path)),
|
|
collection_name="mempalace_drawers",
|
|
create=True,
|
|
)
|
|
|
|
# Mismatched documents length → clear ValueError, not silent merge.
|
|
with pytest.raises(ValueError, match="documents length"):
|
|
collection._collection.add(
|
|
documents=["a", "b"],
|
|
ids=["1", "2"],
|
|
metadatas=[{"k": 1}, {"k": 2}],
|
|
)
|
|
from mempalace.backends.base import BaseCollection
|
|
|
|
BaseCollection.update(
|
|
collection,
|
|
ids=["1", "2"],
|
|
documents=["only-one"],
|
|
)
|
|
|
|
|
|
def test_chroma_cache_invalidates_when_db_file_missing(tmp_path):
|
|
"""A palace rebuild that removes chroma.sqlite3 must drop the stale cache."""
|
|
backend = ChromaBackend()
|
|
palace_path = tmp_path / "palace"
|
|
backend.get_collection(
|
|
palace=PalaceRef(id=str(palace_path), local_path=str(palace_path)),
|
|
collection_name="mempalace_drawers",
|
|
create=True,
|
|
)
|
|
assert str(palace_path) in backend._clients
|
|
prior_client = backend._clients[str(palace_path)]
|
|
prior_freshness = backend._freshness[str(palace_path)]
|
|
assert prior_freshness != (0, 0.0) # DB file exists after get_or_create_collection
|
|
|
|
# Remove chroma.sqlite3 to simulate a rebuild mid-flight. The stale cache
|
|
# must not be silently reused — the in-memory HNSW index would be wrong.
|
|
(palace_path / "chroma.sqlite3").unlink()
|
|
|
|
new_client = backend._client(str(palace_path))
|
|
# New client object (cache was replaced, not reused) and freshness was reset
|
|
# to (0, 0.0) to reflect "no DB on disk yet" state.
|
|
assert new_client is not prior_client
|
|
assert backend._freshness[str(palace_path)] == (0, 0.0)
|
|
|
|
|
|
def test_chroma_cache_picks_up_db_created_after_first_open(tmp_path):
|
|
"""The 0 → nonzero stat transition invalidates a cache built before the DB existed."""
|
|
backend = ChromaBackend()
|
|
palace_path = tmp_path / "palace"
|
|
palace_path.mkdir()
|
|
|
|
# Seed an entry in the caches as if a prior _client() call had opened the
|
|
# palace when chroma.sqlite3 did not exist yet. Freshness (0, 0.0) is the
|
|
# signal that the DB was absent at cache time.
|
|
sentinel = object()
|
|
backend._clients[str(palace_path)] = sentinel
|
|
backend._freshness[str(palace_path)] = (0, 0.0)
|
|
|
|
# The DB file now appears (real chromadb would have created it by now).
|
|
# Use a real chromadb call so _fix_blob_seq_ids and PersistentClient succeed.
|
|
import chromadb as _chromadb
|
|
|
|
_chromadb.PersistentClient(path=str(palace_path)).get_or_create_collection("seed")
|
|
assert (palace_path / "chroma.sqlite3").is_file()
|
|
|
|
# Next _client() call must detect the 0 → nonzero transition and rebuild.
|
|
refreshed = backend._client(str(palace_path))
|
|
assert refreshed is not sentinel
|
|
assert backend._freshness[str(palace_path)] != (0, 0.0)
|
|
|
|
|
|
def test_base_collection_update_default_rejects_mismatched_lengths(tmp_path):
|
|
"""The ABC default update() raises ValueError rather than silently misaligning."""
|
|
from mempalace.backends.base import BaseCollection
|
|
|
|
backend = ChromaBackend()
|
|
palace_path = tmp_path / "palace"
|
|
collection = backend.get_collection(
|
|
palace=PalaceRef(id=str(palace_path), local_path=str(palace_path)),
|
|
collection_name="mempalace_drawers",
|
|
create=True,
|
|
)
|
|
collection.add(
|
|
documents=["a", "b"],
|
|
ids=["1", "2"],
|
|
metadatas=[{"k": 1}, {"k": 2}],
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="documents length"):
|
|
BaseCollection.update(collection, ids=["1", "2"], documents=["only-one"])
|
|
|
|
with pytest.raises(ValueError, match="metadatas length"):
|
|
BaseCollection.update(collection, ids=["1", "2"], metadatas=[{"k": 9}])
|
|
|
|
|
|
def test_chroma_backend_accepts_palace_ref_kwarg(tmp_path):
|
|
palace_path = tmp_path / "palace"
|
|
backend = ChromaBackend()
|
|
collection = backend.get_collection(
|
|
palace=PalaceRef(id=str(palace_path), local_path=str(palace_path)),
|
|
collection_name="mempalace_drawers",
|
|
create=True,
|
|
)
|
|
assert palace_path.is_dir()
|
|
assert isinstance(collection, ChromaCollection)
|
|
|
|
|
|
def test_chroma_backend_create_false_raises_without_creating_directory(tmp_path):
|
|
palace_path = tmp_path / "missing-palace"
|
|
|
|
with pytest.raises(FileNotFoundError):
|
|
ChromaBackend().get_collection(
|
|
str(palace_path),
|
|
collection_name="mempalace_drawers",
|
|
create=False,
|
|
)
|
|
|
|
assert not palace_path.exists()
|
|
|
|
|
|
def test_chroma_backend_create_true_creates_directory_and_collection(tmp_path):
|
|
palace_path = tmp_path / "palace"
|
|
|
|
collection = ChromaBackend().get_collection(
|
|
str(palace_path),
|
|
collection_name="mempalace_drawers",
|
|
create=True,
|
|
)
|
|
|
|
assert palace_path.is_dir()
|
|
assert isinstance(collection, ChromaCollection)
|
|
|
|
client = chromadb.PersistentClient(path=str(palace_path))
|
|
client.get_collection("mempalace_drawers")
|
|
|
|
|
|
def test_chroma_backend_creates_collection_with_cosine_distance(tmp_path):
|
|
palace_path = tmp_path / "palace"
|
|
|
|
ChromaBackend().get_collection(
|
|
str(palace_path),
|
|
collection_name="mempalace_drawers",
|
|
create=True,
|
|
)
|
|
|
|
client = chromadb.PersistentClient(path=str(palace_path))
|
|
col = client.get_collection("mempalace_drawers")
|
|
assert col.metadata.get("hnsw:space") == "cosine"
|
|
|
|
|
|
def test_fix_blob_seq_ids_converts_blobs_to_integers(tmp_path):
|
|
"""Simulate a ChromaDB 0.6.x database with BLOB seq_ids and verify repair."""
|
|
db_path = tmp_path / "chroma.sqlite3"
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id)")
|
|
conn.execute("CREATE TABLE max_seq_id (rowid INTEGER PRIMARY KEY, seq_id)")
|
|
# Insert BLOB seq_ids like ChromaDB 0.6.x would
|
|
blob_42 = (42).to_bytes(8, byteorder="big")
|
|
blob_99 = (99).to_bytes(8, byteorder="big")
|
|
conn.execute("INSERT INTO embeddings (seq_id) VALUES (?)", (blob_42,))
|
|
conn.execute("INSERT INTO max_seq_id (seq_id) VALUES (?)", (blob_99,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
_fix_blob_seq_ids(str(tmp_path))
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings").fetchone()
|
|
assert row == (42, "integer")
|
|
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM max_seq_id").fetchone()
|
|
assert row == (99, "integer")
|
|
conn.close()
|
|
|
|
|
|
def test_fix_blob_seq_ids_noop_without_blobs(tmp_path):
|
|
"""No error when seq_ids are already integers."""
|
|
db_path = tmp_path / "chroma.sqlite3"
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("CREATE TABLE embeddings (rowid INTEGER PRIMARY KEY, seq_id INTEGER)")
|
|
conn.execute("INSERT INTO embeddings (seq_id) VALUES (42)")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
_fix_blob_seq_ids(str(tmp_path))
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
row = conn.execute("SELECT seq_id, typeof(seq_id) FROM embeddings").fetchone()
|
|
assert row == (42, "integer")
|
|
conn.close()
|
|
|
|
|
|
def test_fix_blob_seq_ids_noop_without_database(tmp_path):
|
|
"""No error when palace has no chroma.sqlite3."""
|
|
_fix_blob_seq_ids(str(tmp_path)) # should not raise
|