Files
mempalace/tests/test_sources.py
T
Igor Lins e Silva 552e9927b7 refactor(sources): RFC 002 §9 scaffolding — BaseSourceAdapter, registry, PalaceContext
Lands the read-side contract so third-party adapter authors (@Perseusxrltd,
@JakobSachs, @adv3nt3, @zendesk-thittesdorf, @mfhens, @roip, @MrDys) have a
stable target matching what RFC 001 §10 landed on the write side in #995.

Scope (this PR):

- mempalace/sources/base.py: BaseSourceAdapter ABC with kwargs-only
  ingest() / describe_schema() and default is_current() / source_summary()
  / close() (§1.1–1.2). Typed records: SourceRef, SourceItemMetadata,
  DrawerRecord, RouteHint, SourceSummary, AdapterSchema, FieldSpec (§1.3,
  §5.2). Error classes: SourceNotFoundError, AuthRequiredError,
  AdapterClosedError, TransformationViolationError, SchemaConformanceError
  (§2.7). Class-level identity contract: name / adapter_version /
  capabilities / supported_modes / declared_transformations /
  default_privacy_class (§2.1, §1.4, §1.5, §6).

- mempalace/sources/transforms.py: reference implementations of the 13
  reserved transformations (§1.4) — utf8_replace_invalid, newline_normalize,
  whitespace_trim, whitespace_collapse_internal, line_trim, line_join_spaces,
  blank_line_drop — as pure functions, plus identity shims for the six
  adapter-specific ones (strip_tool_chrome, tool_result_truncate,
  tool_result_omitted, spellcheck_user, synthesized_marker,
  speaker_role_assignment) that the conversations adapter will override
  when migrated. get_transformation(name) resolves by reserved name.

- mempalace/sources/registry.py: entry-point discovery via
  importlib.metadata.entry_points(group="mempalace.sources") + explicit
  register()/unregister() surface (§3.1–3.2). resolve_adapter_for_source()
  implements the §3.3 priority order; crucially, no auto-detection on the
  read side (§3.3 is explicit about that — user intent never inferred from
  on-disk artifacts).

- mempalace/sources/context.py: PalaceContext facade (§9) bundling the
  drawer/closet collections, knowledge graph, palace path, adapter identity,
  and progress hooks core passes into adapter.ingest(). upsert_drawer()
  applies the spec-mandated adapter_name/adapter_version stamps from §5.1.
  skip_current_item() signals laziness; emit() dispatches to hooks and
  swallows hook exceptions.

- mempalace/knowledge_graph.py: add_triple() gains optional source_drawer_id
  and adapter_name kwargs (§5.5). Backwards-compatible column migration
  auto-adds the new columns on open of a pre-RFC 002 palace (PRAGMA
  table_info then ALTER TABLE ADD COLUMN), matching the pattern used for
  any new palace-side provenance fields.

- pyproject.toml: mempalace.sources entry-point group declared. Empty on
  the first-party side for now — miners migrate in a follow-up; the group
  being present means third-party packages can begin registering today.

Out of scope (explicit follow-ups):

- miner.py → mempalace/sources/filesystem.py. Behavior-preserving rename
  that also moves READABLE_EXTENSIONS, detect_room(), detect_hall() into
  the adapter (§9). Larger refactor; lands separately.
- convo_miner.py + normalize.py → mempalace/sources/conversations.py. The
  format-detection if-chain in normalize.py becomes per-format plugins;
  declared_transformations enumerates what the current pipeline already
  does to source bytes (§1.4 existing-code mapping).
- Closet post-step wired into the conversations adapter (§1.7).
- CLI --source flag + --mode deprecation alias (§3.3).
- MCP mempalace_mine tool source parameter.
- AbstractSourceAdapterContractSuite (§7.1–7.3): byte-preservation round-
  trip and declared-transformation round-trip tests.
- Privacy-class floor enforcement (§6.2); depends on #389 for
  secrets_possible scanning.

Tests: 1018 passed (up from ~990 on develop), +27 targeted tests covering
the ABC instantiation rules, typed records, all reserved transformations,
the registry register/get/unregister surface, PalaceContext upsert + skip +
emit semantics, and both the new KG provenance kwargs and backwards-
compatible legacy-schema migration.

Refs: #989 (RFC 002 tracking), #990 (RFC 002 spec), #995 (RFC 001 §10
cleanup — sibling PR on the write side).
2026-04-18 16:05:32 -03:00

428 lines
13 KiB
Python

"""Tests for the RFC 002 source-adapter scaffolding."""
import pytest
from mempalace.sources import (
AdapterSchema,
BaseSourceAdapter,
DrawerRecord,
FieldSpec,
PalaceContext,
RouteHint,
SourceItemMetadata,
SourceRef,
SourceSummary,
available_adapters,
get_adapter,
get_adapter_class,
register,
reset_adapters,
resolve_adapter_for_source,
unregister,
)
from mempalace.sources.transforms import (
RESERVED_TRANSFORMATIONS,
blank_line_drop,
get_transformation,
line_join_spaces,
line_trim,
newline_normalize,
utf8_replace_invalid,
whitespace_collapse_internal,
whitespace_trim,
)
# ---------------------------------------------------------------------------
# Minimal conforming adapter used as a fixture across tests
# ---------------------------------------------------------------------------
class _TrivialAdapter(BaseSourceAdapter):
name = "_trivial"
adapter_version = "0.1.0"
capabilities = frozenset({"byte_preserving"})
supported_modes = frozenset({"whole_record"})
declared_transformations = frozenset()
default_privacy_class = "public"
def ingest(self, *, source, palace):
yield SourceItemMetadata(source_file=source.uri or "x", version="v1")
yield DrawerRecord(content="hello", source_file=source.uri or "x", chunk_index=0)
def describe_schema(self):
return AdapterSchema(
version="1.0",
fields={"example": FieldSpec(type="string", required=False, description="x")},
)
@pytest.fixture(autouse=True)
def _isolate_registry():
yield
reset_adapters()
for name in list(available_adapters()):
unregister(name)
# ---------------------------------------------------------------------------
# base.py — ABC + typed records
# ---------------------------------------------------------------------------
def test_base_adapter_is_abstract_without_required_methods():
with pytest.raises(TypeError):
class Incomplete(BaseSourceAdapter):
name = "incomplete"
Incomplete()
def test_conforming_adapter_instantiates_and_yields_typed_records():
adapter = _TrivialAdapter()
results = list(adapter.ingest(source=SourceRef(uri="foo"), palace=None))
assert len(results) == 2
assert isinstance(results[0], SourceItemMetadata)
assert isinstance(results[1], DrawerRecord)
assert results[1].content == "hello"
def test_is_current_default_is_false_always_reextracts():
adapter = _TrivialAdapter()
item = SourceItemMetadata(source_file="f", version="v1")
assert adapter.is_current(item=item, existing_metadata=None) is False
assert adapter.is_current(item=item, existing_metadata={"version": "v1"}) is False
def test_source_summary_default_uses_adapter_name():
adapter = _TrivialAdapter()
summary = adapter.source_summary(source=SourceRef(uri="x"))
assert isinstance(summary, SourceSummary)
assert summary.description == "_trivial"
def test_source_ref_options_default_is_empty_dict():
# Frozen dataclass must not share default_factory=list instance across instances.
a = SourceRef(uri="a")
b = SourceRef(uri="b")
a.options["touched"] = True
assert "touched" not in b.options
# ---------------------------------------------------------------------------
# transforms.py
# ---------------------------------------------------------------------------
def test_reserved_transformations_registry_has_all_13():
expected = {
"utf8_replace_invalid",
"newline_normalize",
"whitespace_trim",
"whitespace_collapse_internal",
"line_trim",
"line_join_spaces",
"blank_line_drop",
"strip_tool_chrome",
"tool_result_truncate",
"tool_result_omitted",
"spellcheck_user",
"synthesized_marker",
"speaker_role_assignment",
}
assert set(RESERVED_TRANSFORMATIONS) == expected
def test_utf8_replace_invalid_handles_bad_bytes():
# A lone 0xff byte is never valid UTF-8; U+FFFD should replace it.
assert utf8_replace_invalid(b"ok \xff end") == "ok \ufffd end"
def test_newline_normalize_converts_crlf_and_cr():
assert newline_normalize("a\r\nb\rc\nd") == "a\nb\nc\nd"
def test_whitespace_trim_strips_boundaries():
assert whitespace_trim(" hello\n\n") == "hello"
def test_whitespace_collapse_internal_caps_at_two_blanks():
# Five blanks collapses to exactly three newlines (two blank lines).
text = "a\n\n\n\n\nb"
assert whitespace_collapse_internal(text) == "a\n\n\nb"
def test_line_trim_strips_each_line():
assert line_trim(" a \n\t b \n c") == "a\nb\nc"
def test_line_join_spaces_preserves_paragraph_breaks():
text = "foo\nbar\nbaz\n\nqux\nquux"
assert line_join_spaces(text) == "foo bar baz\n\nqux quux"
def test_blank_line_drop_removes_blanks_only():
assert blank_line_drop("a\n\nb\n\n\nc") == "a\nb\nc"
def test_get_transformation_resolves_reserved_and_rejects_unknown():
assert get_transformation("newline_normalize") is newline_normalize
with pytest.raises(KeyError):
get_transformation("not_a_real_transformation")
# ---------------------------------------------------------------------------
# registry.py
# ---------------------------------------------------------------------------
def test_register_and_get_adapter_roundtrip():
register("_trivial", _TrivialAdapter)
assert "_trivial" in available_adapters()
inst = get_adapter("_trivial")
assert isinstance(inst, _TrivialAdapter)
# Cached: repeated calls return the same instance.
assert get_adapter("_trivial") is inst
def test_get_adapter_class_returns_class_not_instance():
register("_trivial", _TrivialAdapter)
assert get_adapter_class("_trivial") is _TrivialAdapter
def test_get_adapter_unknown_raises_key_error():
with pytest.raises(KeyError):
get_adapter("does-not-exist")
def test_unregister_drops_registration_and_cached_instance():
register("_trivial", _TrivialAdapter)
get_adapter("_trivial")
unregister("_trivial")
assert "_trivial" not in available_adapters()
with pytest.raises(KeyError):
get_adapter("_trivial")
def test_resolve_adapter_priority_order():
# Explicit wins over everything.
assert resolve_adapter_for_source(explicit="cursor", config_value="git") == "cursor"
# Config wins over default.
assert resolve_adapter_for_source(config_value="git") == "git"
# Default is filesystem (preserves existing ``mempalace mine <path>`` behavior).
assert resolve_adapter_for_source() == "filesystem"
# ---------------------------------------------------------------------------
# PalaceContext
# ---------------------------------------------------------------------------
class _FakeCollection:
def __init__(self):
self.upserts = []
def add(self, **kwargs):
pass
def upsert(self, **kwargs):
self.upserts.append(kwargs)
def query(self, **kwargs):
return {}
def get(self, **kwargs):
return {}
def delete(self, **kwargs):
pass
def count(self):
return 0
class _FakeKG:
def __init__(self):
self.triples = []
def add_triple(self, subject, predicate, obj, **kwargs):
self.triples.append((subject, predicate, obj, kwargs))
def test_palace_context_upsert_drawer_stamps_adapter_metadata():
drawers = _FakeCollection()
kg = _FakeKG()
ctx = PalaceContext(
drawer_collection=drawers,
knowledge_graph=kg,
palace_path="/tmp/palace",
adapter_name="test-adapter",
adapter_version="0.1.0",
)
record = DrawerRecord(
content="hello",
source_file="/abs/path/file.txt",
chunk_index=2,
metadata={"wing": "proj"},
)
ctx.upsert_drawer(record)
assert len(drawers.upserts) == 1
kwargs = drawers.upserts[0]
assert kwargs["documents"] == ["hello"]
assert len(kwargs["ids"]) == 1
meta = kwargs["metadatas"][0]
assert meta["wing"] == "proj"
assert meta["adapter_name"] == "test-adapter"
assert meta["adapter_version"] == "0.1.0"
assert meta["source_file"] == "/abs/path/file.txt"
assert meta["chunk_index"] == 2
def test_palace_context_skip_current_item_sets_flag():
ctx = PalaceContext(
drawer_collection=_FakeCollection(),
knowledge_graph=_FakeKG(),
palace_path="/tmp/p",
)
assert ctx._skip_requested is False
ctx.skip_current_item()
assert ctx._skip_requested is True
def test_palace_context_emit_dispatches_to_hooks_and_swallows_errors():
calls = []
err_calls = []
def good_hook(event, **details):
calls.append((event, details))
def bad_hook(event, **details):
err_calls.append(event)
raise RuntimeError("hook exploded")
ctx = PalaceContext(
drawer_collection=_FakeCollection(),
knowledge_graph=_FakeKG(),
palace_path="/tmp/p",
progress_hooks=[good_hook, bad_hook],
)
ctx.emit("mined_file", path="a.txt", bytes=42)
assert calls == [("mined_file", {"path": "a.txt", "bytes": 42})]
assert err_calls == ["mined_file"] # was invoked; error was swallowed
def test_palace_context_uses_route_hint_when_present():
# Route hints are frozen dataclasses the adapter passes through.
hint = RouteHint(wing="proj", room="backend", hall="general")
assert hint.wing == "proj"
assert hint.room == "backend"
# ---------------------------------------------------------------------------
# KnowledgeGraph new provenance params (RFC 002 §5.5)
# ---------------------------------------------------------------------------
def test_knowledge_graph_add_triple_accepts_source_drawer_id_and_adapter_name(tmp_path):
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
try:
triple_id = kg.add_triple(
"Ben",
"committed",
"PR-567",
valid_from="2026-03-12",
source_file="github.com/org/repo#pr=567",
source_drawer_id="abc123_0",
adapter_name="git",
)
assert triple_id is not None
import sqlite3
conn = sqlite3.connect(str(tmp_path / "kg.sqlite3"))
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT source_drawer_id, adapter_name FROM triples WHERE id=?", (triple_id,)
).fetchone()
assert row["source_drawer_id"] == "abc123_0"
assert row["adapter_name"] == "git"
conn.close()
finally:
kg.close()
def test_knowledge_graph_migration_adds_missing_columns_to_old_schema(tmp_path):
"""An old-schema triples table (pre-RFC 002) should auto-migrate on open."""
import sqlite3
db_path = tmp_path / "legacy.sqlite3"
conn = sqlite3.connect(str(db_path))
conn.executescript("""
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT DEFAULT 'unknown',
properties TEXT DEFAULT '{}',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE triples (
id TEXT PRIMARY KEY,
subject TEXT NOT NULL,
predicate TEXT NOT NULL,
object TEXT NOT NULL,
valid_from TEXT,
valid_to TEXT,
confidence REAL DEFAULT 1.0,
source_closet TEXT,
source_file TEXT,
extracted_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
conn.close()
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(db_path))
try:
# New columns must be present after _init_db runs the migration.
conn = sqlite3.connect(str(db_path))
cols = {row[1] for row in conn.execute("PRAGMA table_info(triples)")}
conn.close()
assert "source_drawer_id" in cols
assert "adapter_name" in cols
# New-column insert works.
kg.add_triple("a", "rel", "b", source_drawer_id="d0", adapter_name="x")
finally:
kg.close()
def test_knowledge_graph_add_triple_backwards_compatible_without_new_kwargs(tmp_path):
"""Existing callers that omit the RFC 002 kwargs keep working unchanged."""
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
try:
triple_id = kg.add_triple("Max", "likes", "trains")
assert triple_id is not None
finally:
kg.close()
# ---------------------------------------------------------------------------
# pyproject entry-point group is discoverable even when empty
# ---------------------------------------------------------------------------
def test_entry_point_group_exists_and_returns_zero_or_more_adapters():
# No in-tree first-party adapters yet (miners migrate in a follow-up PR),
# but the ``mempalace.sources`` entry-point group is declared so third-
# party packages can register. ``available_adapters`` MUST NOT raise.
adapters = available_adapters()
assert isinstance(adapters, list)