Merge pull request #1107 from sha2fiddy/fix/1073-closet-llm-paginate
fix: paginate closet_llm col.get (#1073)
This commit is contained in:
+22
-11
@@ -221,17 +221,28 @@ def regenerate_closets(
|
|||||||
print("No drawers in palace.")
|
print("No drawers in palace.")
|
||||||
return {"processed": 0}
|
return {"processed": 0}
|
||||||
|
|
||||||
all_data = drawers_col.get(limit=total, include=["documents", "metadatas"])
|
# Paginate the fetch — a single get(limit=total, ...) blows through
|
||||||
by_source = {}
|
# SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) on large palaces and
|
||||||
for doc_id, doc, meta in zip(all_data["ids"], all_data["documents"], all_data["metadatas"]):
|
# crashes inside chromadb (see #802, #850, #1073).
|
||||||
source = meta.get("source_file", "unknown")
|
by_source: dict = {}
|
||||||
w = meta.get("wing", "")
|
batch_size = 5000
|
||||||
if wing and w != wing:
|
offset = 0
|
||||||
continue
|
while offset < total:
|
||||||
if source not in by_source:
|
batch = drawers_col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"])
|
||||||
by_source[source] = {"drawer_ids": [], "content": [], "meta": meta}
|
ids = batch["ids"]
|
||||||
by_source[source]["drawer_ids"].append(doc_id)
|
if not ids:
|
||||||
by_source[source]["content"].append(doc)
|
break
|
||||||
|
for doc_id, doc, meta in zip(ids, batch["documents"], batch["metadatas"]):
|
||||||
|
meta = meta or {}
|
||||||
|
source = meta.get("source_file", "unknown")
|
||||||
|
w = meta.get("wing", "")
|
||||||
|
if wing and w != wing:
|
||||||
|
continue
|
||||||
|
if source not in by_source:
|
||||||
|
by_source[source] = {"drawer_ids": [], "content": [], "meta": meta}
|
||||||
|
by_source[source]["drawer_ids"].append(doc_id)
|
||||||
|
by_source[source]["content"].append(doc)
|
||||||
|
offset += len(ids)
|
||||||
|
|
||||||
sources = list(by_source.keys())
|
sources = list(by_source.keys())
|
||||||
if sample > 0:
|
if sample > 0:
|
||||||
|
|||||||
@@ -296,6 +296,182 @@ class TestRegenerateClosets:
|
|||||||
assert meta.get("generated_by", "").startswith("llm:")
|
assert meta.get("generated_by", "").startswith("llm:")
|
||||||
assert meta.get("normalize_version") == NORMALIZE_VERSION
|
assert meta.get("normalize_version") == NORMALIZE_VERSION
|
||||||
|
|
||||||
|
def test_regen_paginates_drawer_fetch(self, tmp_path):
|
||||||
|
"""Regression for #1073: drawers_col.get must be paginated at
|
||||||
|
batch_size=5000. A single get(limit=total, ...) on a palace with
|
||||||
|
more than SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) drawers
|
||||||
|
blows up inside chromadb. Matches the miner.status pattern
|
||||||
|
introduced in #851 (see #802, #850, #1073)."""
|
||||||
|
from mempalace import closet_llm as closet_llm_mod
|
||||||
|
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
|
||||||
|
# Build a fake collection: 12_000 drawers across 3 source files,
|
||||||
|
# enough to force 3 batches of batch_size=5000 (5000 + 5000 + 2000).
|
||||||
|
n_drawers = 12_000
|
||||||
|
ids = [f"d{i:05d}" for i in range(n_drawers)]
|
||||||
|
docs = [f"doc body {i}" for i in range(n_drawers)]
|
||||||
|
metas = [
|
||||||
|
{
|
||||||
|
"wing": "w",
|
||||||
|
"room": "r",
|
||||||
|
"source_file": f"/src/file_{i % 3}.md",
|
||||||
|
"entities": "",
|
||||||
|
}
|
||||||
|
for i in range(n_drawers)
|
||||||
|
]
|
||||||
|
|
||||||
|
get_calls: list = []
|
||||||
|
|
||||||
|
class FakeDrawersCol:
|
||||||
|
def count(self):
|
||||||
|
return n_drawers
|
||||||
|
|
||||||
|
def get(self, limit=None, offset=0, include=None, **kwargs):
|
||||||
|
get_calls.append({"limit": limit, "offset": offset, "include": include})
|
||||||
|
end = min(offset + (limit or n_drawers), n_drawers)
|
||||||
|
return {
|
||||||
|
"ids": ids[offset:end],
|
||||||
|
"documents": docs[offset:end],
|
||||||
|
"metadatas": metas[offset:end],
|
||||||
|
}
|
||||||
|
|
||||||
|
class FakeClosetsCol:
|
||||||
|
"""Accept the purge + upsert calls the success path makes."""
|
||||||
|
|
||||||
|
def get(self, *a, **kw):
|
||||||
|
return {"ids": [], "documents": [], "metadatas": []}
|
||||||
|
|
||||||
|
def delete(self, *a, **kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def upsert(self, *a, **kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
fake_drawers = FakeDrawersCol()
|
||||||
|
fake_closets = FakeClosetsCol()
|
||||||
|
|
||||||
|
def fake_urlopen(req, timeout=None):
|
||||||
|
return _FakeResp(
|
||||||
|
{
|
||||||
|
"choices": [
|
||||||
|
{"message": {"content": '{"topics":["t1"],"quotes":[],"summary":""}'}}
|
||||||
|
],
|
||||||
|
"usage": {"prompt_tokens": 1, "completion_tokens": 1},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg = LLMConfig(endpoint="http://local/v1", model="m")
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(closet_llm_mod, "get_collection", return_value=fake_drawers),
|
||||||
|
patch.object(closet_llm_mod, "get_closets_collection", return_value=fake_closets),
|
||||||
|
patch.object(closet_llm_mod, "purge_file_closets", return_value=None),
|
||||||
|
patch.object(closet_llm_mod, "upsert_closet_lines", return_value=None),
|
||||||
|
patch("urllib.request.urlopen", side_effect=fake_urlopen),
|
||||||
|
):
|
||||||
|
result = regenerate_closets(palace, cfg=cfg, dry_run=True)
|
||||||
|
|
||||||
|
# Three paginated calls: (limit=5000, offset=0), (5000, 5000), (5000, 10000).
|
||||||
|
assert len(get_calls) == 3, f"expected 3 batched fetches, got {len(get_calls)}"
|
||||||
|
for call in get_calls:
|
||||||
|
assert (
|
||||||
|
call["limit"] == 5000
|
||||||
|
), f"batch must be 5000 — got {call['limit']} (would risk SQLITE_MAX_VARIABLE_NUMBER)"
|
||||||
|
# include must still request both documents and metadatas
|
||||||
|
assert "documents" in call["include"]
|
||||||
|
assert "metadatas" in call["include"]
|
||||||
|
assert [c["offset"] for c in get_calls] == [0, 5000, 10_000]
|
||||||
|
|
||||||
|
# by_source aggregation must be preserved exactly across batches:
|
||||||
|
# 12_000 drawers, 3 source files → 4_000 drawers each.
|
||||||
|
# dry_run=True short-circuits LLM calls but still walks by_source.
|
||||||
|
assert result.get("processed", 0) == 0 # dry_run
|
||||||
|
# Verify no single call tried to pull more than batch_size.
|
||||||
|
assert max(c["limit"] for c in get_calls) <= 5000
|
||||||
|
|
||||||
|
def test_regen_by_source_aggregates_across_batches(self, tmp_path):
|
||||||
|
"""Pagination must not change the by_source grouping — drawers for
|
||||||
|
the same source_file split across batches still land in one group."""
|
||||||
|
from mempalace import closet_llm as closet_llm_mod
|
||||||
|
|
||||||
|
palace = str(tmp_path / "palace")
|
||||||
|
|
||||||
|
# 7_500 drawers, alternating between two source files → forces
|
||||||
|
# splits across the 5000/2500 boundary. Each source ends up with
|
||||||
|
# 3_750 drawers after regrouping.
|
||||||
|
n_drawers = 7_500
|
||||||
|
ids = [f"d{i:05d}" for i in range(n_drawers)]
|
||||||
|
docs = [f"body-{i}" for i in range(n_drawers)]
|
||||||
|
metas = [
|
||||||
|
{
|
||||||
|
"wing": "w",
|
||||||
|
"room": "r",
|
||||||
|
"source_file": f"/src/file_{i % 2}.md",
|
||||||
|
"entities": "",
|
||||||
|
}
|
||||||
|
for i in range(n_drawers)
|
||||||
|
]
|
||||||
|
|
||||||
|
captured_sources: dict = {}
|
||||||
|
|
||||||
|
class FakeDrawersCol:
|
||||||
|
def count(self):
|
||||||
|
return n_drawers
|
||||||
|
|
||||||
|
def get(self, limit=None, offset=0, include=None, **kwargs):
|
||||||
|
end = min(offset + (limit or n_drawers), n_drawers)
|
||||||
|
return {
|
||||||
|
"ids": ids[offset:end],
|
||||||
|
"documents": docs[offset:end],
|
||||||
|
"metadatas": metas[offset:end],
|
||||||
|
}
|
||||||
|
|
||||||
|
class FakeClosetsCol:
|
||||||
|
def get(self, *a, **kw):
|
||||||
|
return {"ids": [], "documents": [], "metadatas": []}
|
||||||
|
|
||||||
|
def delete(self, *a, **kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def upsert(self, *a, **kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Hook _call_llm to inspect what regenerate_closets aggregated
|
||||||
|
# per source before the HTTP boundary.
|
||||||
|
real_call_llm = closet_llm_mod._call_llm
|
||||||
|
|
||||||
|
def spying_call_llm(cfg, source_file, wing, room, content):
|
||||||
|
captured_sources[source_file] = content
|
||||||
|
return (
|
||||||
|
{"topics": ["t"], "quotes": [], "summary": ""},
|
||||||
|
{"prompt_tokens": 1, "completion_tokens": 1},
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg = LLMConfig(endpoint="http://local/v1", model="m")
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(closet_llm_mod, "get_collection", return_value=FakeDrawersCol()),
|
||||||
|
patch.object(closet_llm_mod, "get_closets_collection", return_value=FakeClosetsCol()),
|
||||||
|
patch.object(closet_llm_mod, "purge_file_closets", return_value=None),
|
||||||
|
patch.object(closet_llm_mod, "upsert_closet_lines", return_value=None),
|
||||||
|
patch.object(closet_llm_mod, "_call_llm", side_effect=spying_call_llm),
|
||||||
|
):
|
||||||
|
regenerate_closets(palace, cfg=cfg)
|
||||||
|
|
||||||
|
# Both sources survived the pagination boundary.
|
||||||
|
assert set(captured_sources.keys()) == {"/src/file_0.md", "/src/file_1.md"}
|
||||||
|
# Each source accumulated exactly 3_750 drawer bodies, concatenated
|
||||||
|
# with the "\n\n" separator the regenerate path uses.
|
||||||
|
for source, content in captured_sources.items():
|
||||||
|
assert content.count("\n\n") == 3_749, (
|
||||||
|
f"{source}: expected 3_750 chunks joined (3_749 separators), "
|
||||||
|
f"got {content.count(chr(10) + chr(10)) + 1}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Silence unused-var lint.
|
||||||
|
assert real_call_llm is not None
|
||||||
|
|
||||||
def test_regen_uses_basename_not_split_slash(self, tmp_path, monkeypatch):
|
def test_regen_uses_basename_not_split_slash(self, tmp_path, monkeypatch):
|
||||||
"""Regression: the old closet_id base used ``source.split('/')[-1]``
|
"""Regression: the old closet_id base used ``source.split('/')[-1]``
|
||||||
which silently degrades on Windows paths (``C:\\proj\\a.md`` →
|
which silently degrades on Windows paths (``C:\\proj\\a.md`` →
|
||||||
|
|||||||
Reference in New Issue
Block a user