test: cover embedding device fallback and bounded upserts

Agent-Logs-Url: https://github.com/MemPalace/mempalace/sessions/3213a67a-6871-4bb2-9ae0-23fa11001a22

Co-authored-by: igorls <4753812+igorls@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-24 23:06:50 +00:00
committed by GitHub
parent a4868a3589
commit fbd0904799
7 changed files with 268 additions and 57 deletions
+26
View File
@@ -20,6 +20,32 @@ def test_config_from_file():
assert cfg.palace_path == "/custom/palace"
def test_embedding_device_defaults_to_auto():
cfg = MempalaceConfig(config_dir=tempfile.mkdtemp())
assert cfg.embedding_device == "auto"
def test_embedding_device_from_config_is_normalized():
tmpdir = tempfile.mkdtemp()
with open(os.path.join(tmpdir, "config.json"), "w") as f:
json.dump({"embedding_device": " CUDA "}, f)
cfg = MempalaceConfig(config_dir=tmpdir)
assert cfg.embedding_device == "cuda"
def test_embedding_device_env_overrides_config():
tmpdir = tempfile.mkdtemp()
with open(os.path.join(tmpdir, "config.json"), "w") as f:
json.dump({"embedding_device": "cpu"}, f)
os.environ["MEMPALACE_EMBEDDING_DEVICE"] = " CoreML "
try:
cfg = MempalaceConfig(config_dir=tmpdir)
assert cfg.embedding_device == "coreml"
finally:
del os.environ["MEMPALACE_EMBEDDING_DEVICE"]
def test_env_override():
raw = "/env/palace"
os.environ["MEMPALACE_PALACE_PATH"] = raw
+36
View File
@@ -1,6 +1,7 @@
"""Unit tests for convo_miner pure functions (no chromadb needed)."""
from mempalace.convo_miner import (
_file_chunks_locked,
chunk_exchanges,
detect_convo_room,
scan_convos,
@@ -111,3 +112,38 @@ class TestScanConvos:
def test_scan_empty_dir(self, tmp_path):
files = scan_convos(str(tmp_path))
assert files == []
class TestFileChunksLocked:
def test_uses_bounded_upsert_batches(self, monkeypatch):
import contextlib
import mempalace.convo_miner as convo_miner
class FakeCol:
def __init__(self):
self.batch_sizes = []
def delete(self, *args, **kwargs):
pass
def upsert(self, documents, ids, metadatas):
self.batch_sizes.append(len(documents))
chunks = [{"content": f"chunk {i} " * 20, "chunk_index": i} for i in range(5)]
col = FakeCol()
monkeypatch.setattr(convo_miner, "DRAWER_UPSERT_BATCH_SIZE", 2)
monkeypatch.setattr(
convo_miner, "file_already_mined", lambda collection, source_file: False
)
monkeypatch.setattr(convo_miner, "mine_lock", lambda source_file: contextlib.nullcontext())
monkeypatch.setattr(convo_miner, "_detect_hall_cached", lambda content: "conversations")
drawers, room_counts, skipped = _file_chunks_locked(
col, "chat.txt", chunks, "wing", "general", "agent", "exchange"
)
assert drawers == 5
assert dict(room_counts) == {}
assert skipped is False
assert col.batch_sizes == [2, 2, 1]
+101
View File
@@ -0,0 +1,101 @@
import builtins
import pytest
import mempalace.embedding as embedding
@pytest.fixture(autouse=True)
def clear_embedding_state():
embedding._EF_CACHE.clear()
embedding._WARNED.clear()
yield
embedding._EF_CACHE.clear()
embedding._WARNED.clear()
def test_auto_picks_cuda(monkeypatch):
monkeypatch.setattr(
"onnxruntime.get_available_providers",
lambda: ["CUDAExecutionProvider", "CPUExecutionProvider"],
)
assert embedding._resolve_providers("auto") == (
["CUDAExecutionProvider", "CPUExecutionProvider"],
"cuda",
)
def test_auto_falls_to_cpu(monkeypatch):
monkeypatch.setattr("onnxruntime.get_available_providers", lambda: ["CPUExecutionProvider"])
assert embedding._resolve_providers("auto") == (["CPUExecutionProvider"], "cpu")
def test_cuda_missing_warns_with_gpu_extra(monkeypatch, caplog):
monkeypatch.setattr("onnxruntime.get_available_providers", lambda: ["CPUExecutionProvider"])
assert embedding._resolve_providers("cuda") == (["CPUExecutionProvider"], "cpu")
assert "mempalace[gpu]" in caplog.text
def test_coreml_missing_warns_with_coreml_extra(monkeypatch, caplog):
monkeypatch.setattr("onnxruntime.get_available_providers", lambda: ["CPUExecutionProvider"])
assert embedding._resolve_providers("coreml") == (["CPUExecutionProvider"], "cpu")
assert "mempalace[coreml]" in caplog.text
def test_dml_missing_warns_with_dml_extra(monkeypatch, caplog):
monkeypatch.setattr("onnxruntime.get_available_providers", lambda: ["CPUExecutionProvider"])
assert embedding._resolve_providers("dml") == (["CPUExecutionProvider"], "cpu")
assert "mempalace[dml]" in caplog.text
def test_unknown_device_warns_once(monkeypatch, caplog):
monkeypatch.setattr("onnxruntime.get_available_providers", lambda: ["CPUExecutionProvider"])
assert embedding._resolve_providers("bogus") == (["CPUExecutionProvider"], "cpu")
assert embedding._resolve_providers("bogus") == (["CPUExecutionProvider"], "cpu")
assert caplog.text.count("Unknown embedding_device") == 1
def test_onnxruntime_import_error_falls_back_to_cpu(monkeypatch):
real_import = builtins.__import__
def fake_import(name, *args, **kwargs):
if name == "onnxruntime":
raise ImportError("missing")
return real_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", fake_import)
assert embedding._resolve_providers("cuda") == (["CPUExecutionProvider"], "cpu")
def test_get_embedding_function_caches_by_resolved_provider_tuple(monkeypatch):
class DummyEF:
def __init__(self, preferred_providers):
self.preferred_providers = preferred_providers
monkeypatch.setattr(embedding, "_build_ef_class", lambda: DummyEF)
monkeypatch.setattr(
embedding, "_resolve_providers", lambda device: (["CPUExecutionProvider"], "cpu")
)
first = embedding.get_embedding_function("cpu")
second = embedding.get_embedding_function("auto")
assert first is second
assert first.preferred_providers == ["CPUExecutionProvider"]
def test_describe_device_uses_resolved_effective_device(monkeypatch):
monkeypatch.setattr(
embedding,
"_resolve_providers",
lambda device: (["CUDAExecutionProvider", "CPUExecutionProvider"], "cuda"),
)
assert embedding.describe_device("auto") == "cuda"
+40
View File
@@ -383,6 +383,46 @@ def test_status_handles_none_metadata_without_crash(tmp_path, capsys):
assert "WING: proj" in out
def test_process_file_uses_bounded_upsert_batches(tmp_path, monkeypatch):
from mempalace import miner
class FakeCol:
def __init__(self):
self.batch_sizes = []
def get(self, *args, **kwargs):
return {"ids": []}
def delete(self, *args, **kwargs):
pass
def upsert(self, documents, ids, metadatas):
self.batch_sizes.append(len(documents))
source = tmp_path / "src.py"
source.write_text("print('hello')\n" * 20, encoding="utf-8")
chunks = [{"content": f"chunk {i} " * 20, "chunk_index": i} for i in range(5)]
col = FakeCol()
monkeypatch.setattr(miner, "DRAWER_UPSERT_BATCH_SIZE", 2)
monkeypatch.setattr(miner, "chunk_text", lambda content, source_file: chunks)
monkeypatch.setattr(miner, "detect_hall", lambda content: "code")
monkeypatch.setattr(miner, "_extract_entities_for_metadata", lambda content: "")
drawers, room = miner.process_file(
source,
tmp_path,
col,
"wing",
[{"name": "general", "description": "General"}],
"agent",
False,
)
assert drawers == 5
assert room == "general"
assert col.batch_sizes == [2, 2, 1]
# ── normalize_version schema gate ───────────────────────────────────────
#
# When the normalization pipeline changes shape (e.g., strip_noise lands),