merge: develop + harden cross-wing tunnels for production

Merges the hardened closet/entity/BM25/diary stack from #789 and fixes
five correctness/durability issues in the tunnels module plus the
directional/symmetric design question.

## Design: tunnels are now symmetric

Per review discussion: a tunnel represents "these two things relate",
not "A causes B". The canonical ID now hashes the *sorted* endpoint
pair, so ``create_tunnel(A, B)`` and ``create_tunnel(B, A)`` resolve to
the same record and the second call updates the label rather than
creating a duplicate. ``follow_tunnels`` can be called from either
endpoint and surfaces the other side consistently.

The returned dict still preserves ``source``/``target`` in the order
the caller supplied, so UIs that want to render the connection
directionally can do so.

## Correctness fixes

* **Atomic write** — ``_save_tunnels`` writes to ``tunnels.json.tmp``
  and ``os.replace``s it into place. A crash mid-write can no longer
  leave a truncated file that silently reads back as ``[]`` and wipes
  every tunnel. Includes ``f.flush() + os.fsync`` before replace on
  platforms that support it.
* **Concurrent-write lock** — ``create_tunnel`` and ``delete_tunnel``
  wrap the load→mutate→save cycle in ``mine_lock(_TUNNEL_FILE)``.
  Without this, two agents creating tunnels simultaneously would both
  read the same snapshot and the later writer would drop the earlier
  writer's tunnel.
* **Corrupt-file tolerance** — ``_load_tunnels`` now uses a context
  manager, validates that the loaded JSON is a list, and returns ``[]``
  for any read failure. Subsequent ``create_tunnel`` then overwrites
  the corrupt file via atomic write — no manual recovery needed.
* **Input validation** — new ``_require_name`` helper rejects empty or
  whitespace-only wing/room names with a clear ``ValueError``. Prevents
  phantom tunnels with blank endpoints from ever reaching the JSON
  store.
* **Timezone-aware timestamps** — ``created_at`` / ``updated_at`` now
  use ``datetime.now(timezone.utc).isoformat()``, matching diary ingest
  and other recent modules.

## Tests (12 in TestTunnels)

5 original + 7 regression cases:
* ``test_tunnel_is_symmetric`` — A↔B and B↔A dedupe to one record.
* ``test_follow_tunnels_works_from_either_endpoint`` — symmetric surface.
* ``test_empty_endpoint_fields_rejected`` — validation guard.
* ``test_corrupt_tunnel_file_does_not_lose_new_writes`` — truncated
  JSON treated as empty; next create persists cleanly.
* ``test_atomic_write_leaves_no_stray_tmp_file`` — no leftover ``.tmp``.
* ``test_concurrent_creates_preserve_all_tunnels`` — 5 threads each
  create a distinct tunnel; all 5 persisted (regression for the
  read-modify-write race).
* ``test_created_at_is_timezone_aware`` — ISO8601 has tz suffix.

Merge resolutions: tests/test_closets.py combined develop's hardened
closet/entity/BM25/diary tests with this PR's TestTunnels class.

755/755 tests pass. ruff + format clean under CI-pinned 0.4.x.
This commit is contained in:
Igor Lins e Silva
2026-04-13 17:50:43 -03:00
18 changed files with 1879 additions and 481 deletions
+23 -14
View File
@@ -32,13 +32,11 @@ Topics are never split across closets. If adding a topic would exceed 1,500 char
### When do closets update?
When a file is re-mined (content changed), its drawers are replaced and new closets are built from the fresh content. The old closet content is replaced via upsert.
When a file is re-mined (content changed, or `NORMALIZE_VERSION` was bumped), the miner first deletes every closet for that source file (`purge_file_closets`) and then writes a fresh set. Stale topics from the prior mine are gone — closets are always a snapshot of the current content, never an accumulation across runs.
### What about stale topics?
If a file's content changes and a topic no longer exists, the closet is rebuilt entirely from the new content — stale topics are gone. Closets are tied to source files, not to individual topics.
If you add content to an existing file (e.g., a daily diary growing throughout the day), new topics are appended to the existing closet until the 1,500-char limit, then a new closet is created.
There are no stale topics: each re-mine is a clean rebuild for that source file. If a file gets larger and produces fewer or more closets than last time, the leftover numbered closets from the larger run are still purged because the delete is done by `source_file`, not by ID.
### Do closets survive palace rebuilds?
@@ -49,31 +47,42 @@ Closets are stored in the `mempalace_closets` ChromaDB collection alongside `mem
```
Query → search mempalace_closets (fast, small documents)
top closet hits → extract drawer IDs from pointer lines
top closet hits → parse `→drawer_id_a,drawer_id_b` pointers
fetch drawers from mempalace_drawers (full verbatim content)
fetch exactly those drawers from mempalace_drawers (verbatim content)
BM25 hybrid re-rank (keyword match + vector similarity)
apply max_distance filter
return results to user
return chunk-level results (same shape as direct search)
```
If no closets exist (palace created before this feature), search falls back to direct drawer search. Closets are created on next mine.
Hits carry `matched_via: "closet"` (or `"drawer"` for the fallback path) plus a `closet_preview` field showing the line that surfaced them.
If no closets exist (palace created before this feature) — or all closet hits get filtered out by `max_distance` — search falls back to direct drawer search. Closets are created on next mine.
> **BM25 hybrid re-rank** is on the roadmap (deferred to a follow-up PR alongside generic `LLM_*` env-var support); the current closet search ranks purely by ChromaDB cosine distance against the closet text.
## Limits
| Setting | Value | Reason |
|---------|-------|--------|
| Max closet size | 1,500 chars | Leaves buffer under ChromaDB's working limit |
| Max closet size | 1,500 chars (`CLOSET_CHAR_LIMIT`) | Leaves buffer under ChromaDB's working limit |
| Source content scanned | 5,000 chars (`CLOSET_EXTRACT_WINDOW`) | Caps regex extraction cost on long files; back-of-file content is currently invisible to closet extraction (tracked for follow-up) |
| Max topics per file | 12 | Keeps closets focused |
| Max quotes per file | 3 | Most relevant only |
| Max entities per pointer | 5 | Top names by frequency |
| Max response chars | 10,000 | Prevents hydration blowup on large files |
| Max entities per pointer | 5 | Top names by frequency, after stoplist filtering |
## For developers
Closet functions live in `mempalace/palace.py`:
- `get_closets_collection()` — get the closets ChromaDB collection
- `build_closet_lines()` — extract topics/entities/quotes into pointer lines
- `upsert_closet_lines()` — write lines to closets respecting the char limit
- `CLOSET_CHAR_LIMIT` — the 1,500 char limit constant
- `upsert_closet_lines()` — write lines to closets respecting the char limit (overwrites existing IDs; does not append — call `purge_file_closets` first when re-mining)
- `purge_file_closets()` — delete every closet for a given source file before rebuild
- `CLOSET_CHAR_LIMIT` / `CLOSET_EXTRACT_WINDOW` — size constants
The closet-first search path lives in `mempalace/searcher.py`:
- `_extract_drawer_ids_from_closet()` — parse `→drawer_a,drawer_b` pointers out of a closet document
- `_closet_first_hits()` — query closets, parse pointers, hydrate matching drawers, return chunk-level hits or `None` to fall back
Note: only the project miner (`miner.py::process_file`) builds closets today. Conversation-mined wings (Claude Code JSONL, ChatGPT export, etc.) will keep using direct drawer search via the searcher fallback until the convo-closet PR lands.
+5 -1
View File
@@ -133,6 +133,10 @@ Example output:
[14:40:01] Session abc123: 18 exchanges, 3 since last save
```
## Known Limitations
**Hooks require session restart after install.** Claude Code loads hooks from `settings.json` at session start only. If you run `mempalace init` or manually edit hook config mid-session, the hooks won't fire until you restart Claude Code. This is a Claude Code limitation.
## Cost
**Zero extra tokens.** The hooks are bash scripts that run locally. They don't call any API. The only "cost" is the AI spending a few seconds organizing memories at each checkpoint — and it's doing that with context it already has loaded.
**Zero extra tokens.** The hooks notify the AI that saves happened in the background — the AI doesn't need to write anything in the chat. All filing is handled automatically. Previous versions asked the AI to write diary entries and drawer content in the chat window, which cost ~$1/session in retransmitted tokens.
+3 -3
View File
@@ -68,10 +68,10 @@ if [ -n "$MEMPAL_DIR" ] && [ -d "$MEMPAL_DIR" ]; then
python3 -m mempalace mine "$MEMPAL_DIR" >> "$STATE_DIR/hook.log" 2>&1
fi
# Always block — compaction = save everything
# Notify — compaction is about to happen but filing is handled in background
cat << 'HOOKJSON'
{
"decision": "block",
"reason": "COMPACTION IMMINENT. Save ALL topics, decisions, quotes, code, and important context from this session to your memory system. Be thorough — after compaction, detailed context will be lost. Organize into appropriate categories. Use verbatim quotes where possible. Save everything, then allow compaction to proceed."
"decision": "allow",
"reason": "MemPalace pre-compaction save. Your full conversation has been saved verbatim in the background — no action needed. Compaction can proceed safely."
}
HOOKJSON
+7 -4
View File
@@ -140,12 +140,15 @@ if [ "$SINCE_LAST" -ge "$SAVE_INTERVAL" ] && [ "$EXCHANGE_COUNT" -gt 0 ]; then
python3 -m mempalace mine "$MEMPAL_DIR" >> "$STATE_DIR/hook.log" 2>&1 &
fi
# Block the AI and tell it to save
# The "reason" becomes a system message the AI sees and acts on
# Notify the AI that a checkpoint happened — but do NOT ask it to write
# anything in chat. All filing happens in the background via the pipeline.
# The old version asked the agent to write diary entries, add drawers, and
# add KG triples in the chat window — that cost ~$1/session in retransmitted
# tokens and cluttered the conversation.
cat << 'HOOKJSON'
{
"decision": "block",
"reason": "AUTO-SAVE checkpoint. Save key topics, decisions, quotes, and code from this session to your memory system. Organize into appropriate categories. Use verbatim quotes where possible. Continue conversation after saving."
"decision": "allow",
"reason": "MemPalace auto-save checkpoint. Your conversation is being saved verbatim in the background — no action needed from you. Continue working."
}
HOOKJSON
else
+74 -35
View File
@@ -16,7 +16,13 @@ from datetime import datetime
from collections import defaultdict
from .normalize import normalize
from .palace import SKIP_DIRS, get_collection, file_already_mined, mine_lock
from .palace import (
NORMALIZE_VERSION,
SKIP_DIRS,
file_already_mined,
get_collection,
mine_lock,
)
# File types that might contain conversations
@@ -51,6 +57,7 @@ def _register_file(collection, source_file: str, wing: str, agent: str):
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "registry",
"normalize_version": NORMALIZE_VERSION,
}
],
)
@@ -272,6 +279,62 @@ def scan_convos(convo_dir: str) -> list:
# =============================================================================
def _file_chunks_locked(collection, source_file, chunks, wing, room, agent, extract_mode):
"""Lock the source file, purge stale drawers, and upsert fresh chunks.
Combines the per-file serialization that prevents concurrent agents from
duplicating work (via mine_lock) with the normalize-version rebuild
contract (purge-before-insert so pre-v2 drawers don't survive).
Returns (drawers_added, room_counts_delta, skipped).
"""
room_counts_delta: dict = defaultdict(int)
drawers_added = 0
with mine_lock(source_file):
# Re-check after lock — another agent may have just finished this file
# at the current schema. A stale-version hit here returns False, so we
# still fall through to the purge+rebuild path below.
if file_already_mined(collection, source_file):
return 0, room_counts_delta, True
# Purge stale drawers first. When the normalize schema bumps,
# file_already_mined() returned False for pre-v2 drawers — clean
# them out so the source doesn't end up with mixed old/new drawers.
try:
collection.delete(where={"source_file": source_file})
except Exception:
pass
for chunk in chunks:
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
if extract_mode == "general":
room_counts_delta[chunk_room] += 1
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
try:
collection.upsert(
documents=[chunk["content"]],
ids=[drawer_id],
metadatas=[
{
"wing": wing,
"room": chunk_room,
"source_file": source_file,
"chunk_index": chunk["chunk_index"],
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "convos",
"extract_mode": extract_mode,
"normalize_version": NORMALIZE_VERSION,
}
],
)
drawers_added += 1
except Exception as e:
if "already exists" not in str(e).lower():
raise
return drawers_added, room_counts_delta, False
def mine_convos(
convo_dir: str,
palace_path: str,
@@ -375,40 +438,16 @@ def mine_convos(
if extract_mode != "general":
room_counts[room] += 1
# File each chunk — lock to prevent concurrent agents duplicating
drawers_added = 0
with mine_lock(source_file):
# Re-check after lock — another agent may have just finished this file
if file_already_mined(collection, source_file):
files_skipped += 1
continue
for chunk in chunks:
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
if extract_mode == "general":
room_counts[chunk_room] += 1
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
try:
collection.upsert(
documents=[chunk["content"]],
ids=[drawer_id],
metadatas=[
{
"wing": wing,
"room": chunk_room,
"source_file": source_file,
"chunk_index": chunk["chunk_index"],
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"ingest_mode": "convos",
"extract_mode": extract_mode,
}
],
)
drawers_added += 1
except Exception as e:
if "already exists" not in str(e).lower():
raise
# Lock + purge stale + file fresh chunks. Lock serializes concurrent
# agents; purge removes pre-v2 drawers so the schema bump applies.
drawers_added, room_delta, skipped = _file_chunks_locked(
collection, source_file, chunks, wing, room, agent, extract_mode
)
if skipped:
files_skipped += 1
continue
for r, n in room_delta.items():
room_counts[r] += n
total_drawers += drawers_added
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
+110 -74
View File
@@ -2,10 +2,14 @@
diary_ingest.py — Ingest daily summary files into the palace.
Architecture:
- ONE drawer per day — full verbatim content, upserted as the day grows
- Closets pack topics up to 1500 chars, never split mid-topic
- Only new entries are processed (tracks entry count in state file)
- Entities extracted and stamped on metadata for filterable search
- ONE drawer per (wing, day) — full verbatim content, upserted as the day grows.
- Closets pack topics up to CLOSET_CHAR_LIMIT, never split mid-topic.
- A re-ingest fully purges the prior day's closets before rebuilding so a
shorter day never leaves orphans behind.
- Only new entries are processed by default (tracks entry count in a state
file under ``~/.mempalace/state/`` — never inside the user's diary dir).
- Per-file ``mine_lock`` so concurrent ingest from two terminals can't race.
- Entities extracted and stamped on metadata for filterable search.
Usage:
python -m mempalace.diary_ingest --dir ~/daily_summaries --palace ~/.mempalace/palace
@@ -19,19 +23,32 @@ import re
from datetime import datetime, timezone
from pathlib import Path
from .palace import (
get_collection,
get_closets_collection,
build_closet_lines,
upsert_closet_lines,
CLOSET_CHAR_LIMIT,
)
from .miner import _extract_entities_for_metadata
from .palace import (
build_closet_lines,
get_closets_collection,
get_collection,
mine_lock,
purge_file_closets,
upsert_closet_lines,
)
DIARY_ENTRY_RE = re.compile(r"^## .+", re.MULTILINE)
def _state_file_for(palace_path: str, diary_dir: Path) -> Path:
"""Return the per-(palace, diary-dir) state-file path under ~/.mempalace/state.
Keyed by sha256 of (palace_path, diary_dir) so multiple diary folders
pointing at the same palace each get an independent state file. The
state file is *never* written inside the user's diary directory.
"""
state_root = Path(os.path.expanduser("~")) / ".mempalace" / "state"
state_root.mkdir(parents=True, exist_ok=True)
key = hashlib.sha256(f"{palace_path}|{diary_dir}".encode()).hexdigest()[:24]
return state_root / f"diary_ingest_{key}.json"
def _split_entries(text):
"""Split diary text into (header, body) pairs per ## entry."""
parts = DIARY_ENTRY_RE.split(text)
@@ -43,6 +60,18 @@ def _split_entries(text):
return entries
def _diary_drawer_id(wing: str, date_str: str) -> str:
"""Stable, wing-scoped drawer ID. Two diaries (e.g. 'work' vs 'personal')
sharing the same date never collide."""
suffix = hashlib.sha256(f"{wing}|{date_str}".encode()).hexdigest()[:24]
return f"drawer_diary_{suffix}"
def _diary_closet_id_base(wing: str, date_str: str) -> str:
suffix = hashlib.sha256(f"{wing}|{date_str}".encode()).hexdigest()[:24]
return f"closet_diary_{suffix}"
def ingest_diaries(
diary_dir,
palace_path,
@@ -51,24 +80,29 @@ def ingest_diaries(
):
"""Ingest daily summary files into the palace.
Each date file gets ONE drawer (upserted as day grows) and
closets that pack topics atomically up to 1500 chars.
Each date file gets ONE drawer keyed by ``(wing, date)`` and closets that
pack topics atomically up to ``CLOSET_CHAR_LIMIT``. ``force=True`` rebuilds
every entry's closets from scratch (purging stale ones); the default
incremental mode only processes entries appended since the last run.
"""
diary_dir = Path(diary_dir).expanduser().resolve()
if not diary_dir.exists():
print(f"Diary directory not found: {diary_dir}")
return
return {"days_updated": 0, "closets_created": 0}
diary_files = sorted(diary_dir.glob("*.md"))
if not diary_files:
print(f"No .md files in {diary_dir}")
return
return {"days_updated": 0, "closets_created": 0}
# State tracks which entries have been closeted per file
state_file = diary_dir / ".diary_ingest_state.json"
state = {} if force else (
json.loads(state_file.read_text()) if state_file.exists() else {}
)
state_file = _state_file_for(str(palace_path), diary_dir)
if force or not state_file.exists():
state: dict = {}
else:
try:
state = json.loads(state_file.read_text())
except Exception:
state = {}
drawers_col = get_collection(palace_path)
closets_col = get_closets_collection(palace_path)
@@ -87,70 +121,72 @@ def ingest_diaries(
date_str = date_match.group(1)
# Skip if content hasn't changed
prev_size = state.get(diary_path.name, {}).get("size", 0)
state_key = f"{wing}|{diary_path.name}"
prev_size = state.get(state_key, {}).get("size", 0)
curr_size = len(text)
if curr_size == prev_size and not force:
continue
now_iso = datetime.now(timezone.utc).isoformat()
drawer_id = f"drawer_diary_{date_str}"
# Extract entities from full day text
drawer_id = _diary_drawer_id(wing, date_str)
entities = _extract_entities_for_metadata(text)
source_file = str(diary_path)
# UPSERT the day's drawer (full verbatim, replaces as day grows)
drawer_meta = {
"date": date_str,
"wing": wing,
"room": "daily",
"source_file": str(diary_path),
"source_session": "daily_diary",
"filed_at": now_iso,
}
if entities:
drawer_meta["entities"] = entities
drawers_col.upsert(
documents=[text],
ids=[drawer_id],
metadatas=[drawer_meta],
)
# Serialize per source — two terminals running ingest at once must
# not interleave the upsert + closet-rebuild.
with mine_lock(source_file):
drawer_meta = {
"date": date_str,
"wing": wing,
"room": "daily",
"source_file": source_file,
"source_session": "daily_diary",
"filed_at": now_iso,
}
if entities:
drawer_meta["entities"] = entities
drawers_col.upsert(
documents=[text],
ids=[drawer_id],
metadatas=[drawer_meta],
)
# Split into entries and find new ones
entries = _split_entries(text)
prev_entry_count = state.get(diary_path.name, {}).get("entry_count", 0)
new_entries = entries[prev_entry_count:] if not force else entries
entries = _split_entries(text)
prev_entry_count = state.get(state_key, {}).get("entry_count", 0)
new_entries = entries if force else entries[prev_entry_count:]
if new_entries:
# Build closet lines from new entries
all_lines = []
for header, body in new_entries:
entry_text = f"{header}\n{body}"
entry_lines = build_closet_lines(
str(diary_path), [drawer_id], entry_text, wing, "daily"
)
all_lines.extend(entry_lines)
if new_entries:
all_lines = []
for header, body in new_entries:
entry_text = f"{header}\n{body}"
entry_lines = build_closet_lines(
source_file, [drawer_id], entry_text, wing, "daily"
)
all_lines.extend(entry_lines)
if all_lines:
closet_id_base = f"closet_diary_{date_str}"
closet_meta = {
"date": date_str,
"wing": wing,
"room": "daily",
"source_file": str(diary_path),
"filed_at": now_iso,
}
if entities:
closet_meta["entities"] = entities
n = upsert_closet_lines(
closets_col, closet_id_base, all_lines, closet_meta
)
closets_created += n
if all_lines:
closet_id_base = _diary_closet_id_base(wing, date_str)
closet_meta = {
"date": date_str,
"wing": wing,
"room": "daily",
"source_file": source_file,
"filed_at": now_iso,
}
if entities:
closet_meta["entities"] = entities
# On a force rebuild, wipe any leftover numbered closets
# from a longer prior run before re-writing.
if force:
purge_file_closets(closets_col, source_file)
n = upsert_closet_lines(closets_col, closet_id_base, all_lines, closet_meta)
closets_created += n
state[diary_path.name] = {
"size": curr_size,
"entry_count": len(entries),
"ingested_at": now_iso,
}
state[state_key] = {
"size": curr_size,
"entry_count": len(entries),
"ingested_at": now_iso,
}
days_updated += 1
state_file.write_text(json.dumps(state, indent=2))
+4 -1
View File
@@ -893,7 +893,10 @@ def tool_diary_write(agent_name: str, entry: str, topic: str = "general"):
return _no_palace()
now = datetime.now()
entry_id = f"diary_{wing}_{now.strftime('%Y%m%d_%H%M%S')}_{hashlib.sha256(entry[:50].encode()).hexdigest()[:12]}"
entry_id = (
f"diary_{wing}_{now.strftime('%Y%m%d_%H%M%S%f')}_"
f"{hashlib.sha256(entry.encode()).hexdigest()[:12]}"
)
_wal_log(
"diary_write",
+85 -27
View File
@@ -16,8 +16,15 @@ from datetime import datetime
from collections import defaultdict
from .palace import (
SKIP_DIRS, get_collection, get_closets_collection,
file_already_mined, mine_lock, build_closet_lines, upsert_closet_lines,
NORMALIZE_VERSION,
SKIP_DIRS,
build_closet_lines,
file_already_mined,
get_closets_collection,
get_collection,
mine_lock,
purge_file_closets,
upsert_closet_lines,
)
READABLE_EXTENSIONS = {
@@ -371,41 +378,86 @@ def chunk_text(content: str, source_file: str) -> list:
# =============================================================================
_ENTITY_REGISTRY_PATH = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json")
_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset()}
_ENTITY_EXTRACT_WINDOW = 5000 # chars of content scanned for capitalized words
_ENTITY_METADATA_LIMIT = 25 # max entities packed into the metadata field
def _load_known_entities() -> frozenset:
"""Load (and cache) the user's known-entity registry by mtime.
Reads ``~/.mempalace/known_entities.json``. The registry is shaped as
``{"category": ["Name1", "Name2", ...], ...}``. Cached across calls
in the same process; invalidated when the file's mtime changes.
"""
try:
mtime = os.path.getmtime(_ENTITY_REGISTRY_PATH)
except OSError:
if _ENTITY_REGISTRY_CACHE["mtime"] is not None:
_ENTITY_REGISTRY_CACHE["mtime"] = None
_ENTITY_REGISTRY_CACHE["names"] = frozenset()
return _ENTITY_REGISTRY_CACHE["names"]
if _ENTITY_REGISTRY_CACHE["mtime"] == mtime:
return _ENTITY_REGISTRY_CACHE["names"]
names: set = set()
try:
import json
with open(_ENTITY_REGISTRY_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
for cat in data.values():
if isinstance(cat, list):
names.update(str(n) for n in cat if n)
except Exception:
names = set()
_ENTITY_REGISTRY_CACHE["mtime"] = mtime
_ENTITY_REGISTRY_CACHE["names"] = frozenset(names)
return _ENTITY_REGISTRY_CACHE["names"]
def _extract_entities_for_metadata(content: str) -> str:
"""Extract entity names from content for metadata tagging.
Returns semicolon-separated string of entity names found in the text,
suitable for ChromaDB metadata filtering.
Combines the user's known-entity registry (cached across calls) with
capitalized words appearing ≥2 times in the first ``_ENTITY_EXTRACT_WINDOW``
chars. Filters out the closet stoplist (``When``, ``After``, ``The``, …)
so sentence-starters don't masquerade as proper nouns.
Returns semicolon-separated string suitable for ChromaDB metadata
filtering. The list is truncated to ``_ENTITY_METADATA_LIMIT`` entries
*before* joining so a name is never cut in half.
"""
import re
# Load known entities from registry if available
known_names = set()
registry_path = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json")
if os.path.exists(registry_path):
try:
import json
kd = json.loads(open(registry_path).read())
for cat in kd.values():
if isinstance(cat, list):
known_names.update(cat)
except Exception:
pass
matched = set()
# Match known entities
for name in known_names:
if re.search(r'(?<!\w)' + re.escape(name) + r'(?!\w)', content):
from .palace import _ENTITY_STOPLIST
matched: set = set()
known = _load_known_entities()
for name in known:
if re.search(r"(?<!\w)" + re.escape(name) + r"(?!\w)", content):
matched.add(name)
# Also catch capitalized words appearing 2+ times (likely proper nouns)
words = re.findall(r"\b[A-Z][a-z]{2,}\b", content[:5000])
freq = {}
window = content[:_ENTITY_EXTRACT_WINDOW]
words = re.findall(r"\b[A-Z][a-z]{2,}\b", window)
freq: dict = {}
for w in words:
if w in _ENTITY_STOPLIST:
continue
freq[w] = freq.get(w, 0) + 1
for w, c in freq.items():
if c >= 2 and len(w) > 2:
matched.add(w)
return ";".join(sorted(matched))[:500] if matched else ""
if not matched:
return ""
# Truncate the *list*, not the joined string — never split a name.
capped = sorted(matched)[:_ENTITY_METADATA_LIMIT]
return ";".join(capped)
def add_drawer(
@@ -421,6 +473,7 @@ def add_drawer(
"chunk_index": chunk_index,
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"normalize_version": NORMALIZE_VERSION,
}
# Store file mtime so we can detect modifications later.
try:
@@ -511,15 +564,18 @@ def process_file(
if added:
drawers_added += 1
# Build closet — the searchable index pointing to these drawers
# Each topic line is atomic — never split across closets
# Build closet — the searchable index pointing to these drawers.
# Purge first: a re-mine (mtime change or normalize_version bump) must
# fully replace the prior closets, not append to them.
if closets_col and drawers_added > 0:
drawer_ids = [
f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(c['chunk_index'])).encode()).hexdigest()[:24]}"
for c in chunks
]
closet_lines = build_closet_lines(source_file, drawer_ids, content, wing, room)
closet_id_base = f"closet_{wing}_{room}_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}"
closet_id_base = (
f"closet_{wing}_{room}_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}"
)
entities = _extract_entities_for_metadata(content)
closet_meta = {
"wing": wing,
@@ -527,9 +583,11 @@ def process_file(
"source_file": source_file,
"drawer_count": drawers_added,
"filed_at": datetime.now().isoformat(),
"normalize_version": NORMALIZE_VERSION,
}
if entities:
closet_meta["entities"] = entities
purge_file_closets(closets_col, source_file)
upsert_closet_lines(closets_col, closet_id_base, closet_lines, closet_meta)
return drawers_added, room
+93 -2
View File
@@ -16,10 +16,93 @@ No API key. No internet. Everything local.
import json
import os
import re
from pathlib import Path
from typing import Optional
# ─── Noise stripping ─────────────────────────────────────────────────────
# Claude Code and other tools inject system tags, hook output, and UI chrome
# into transcripts. These waste drawer space and pollute search results.
#
# Verbatim is sacred — every pattern here is anchored to line boundaries and
# refuses to cross blank lines, so a stray unclosed tag in one message can
# never eat content from neighboring messages. When in doubt, leave text
# alone.
_NOISE_TAGS = (
"system-reminder",
"command-message",
"command-name",
"task-notification",
"user-prompt-submit-hook",
"hook_output",
)
def _tag_pattern(name: str) -> "re.Pattern[str]":
# Opening tag must begin a line (optionally after a `> ` blockquote marker,
# since _messages_to_transcript prefixes lines with `> `). Body is lazy but
# forbidden from crossing a blank line, so a dangling open tag can't span
# multiple messages. Closing tag eats optional trailing whitespace + newline.
return re.compile(
rf"(?m)^(?:> )?<{name}(?:\s[^>]*)?>" rf"(?:(?!\n\s*\n)[\s\S])*?" rf"</{name}>[ \t]*\n?"
)
_NOISE_TAG_PATTERNS = [_tag_pattern(t) for t in _NOISE_TAGS]
# Strings that identify an entire noise line when found at its start.
# Matched case-sensitively and anchored to line-start so user prose mentioning
# e.g. "current time:" in a sentence is untouched.
_NOISE_LINE_PREFIXES = (
"CURRENT TIME:",
"VERIFIED FACTS (do not contradict)",
"AGENT SPECIALIZATION:",
"Checking verified facts...",
"Injecting timestamp...",
"Starting background pipeline...",
"Checking emotional weights...",
"Auto-save reminder...",
"Checking pipeline...",
"MemPalace auto-save checkpoint.",
)
_NOISE_LINE_PATTERNS = [
re.compile(rf"(?m)^(?:> )?{re.escape(p)}.*\n?") for p in _NOISE_LINE_PREFIXES
]
# Claude Code TUI hook-run chrome, e.g. "Ran 2 Stop hook", "Ran 1 PreCompact hook".
# Line-anchored, case-sensitive, explicit hook names — prose like
# "our CI has a stop hook" stays intact.
_HOOK_LINE_RE = re.compile(
r"(?m)^(?:> )?Ran \d+ (?:Stop|PreCompact|PreToolUse|PostToolUse|UserPromptSubmit|Notification|SessionStart|SessionEnd) hook[s]?.*\n?"
)
# "… +N lines" collapsed-output marker, line-anchored.
_COLLAPSED_LINES_RE = re.compile(r"(?m)^(?:> )?…\s*\+\d+ lines.*\n?")
def strip_noise(text: str) -> str:
"""Remove system tags, hook output, and Claude Code UI chrome from text.
All patterns are line-anchored. User prose that happens to mention these
strings inline (e.g., documenting them) is preserved verbatim.
"""
for pat in _NOISE_TAG_PATTERNS:
text = pat.sub("", text)
for pat in _NOISE_LINE_PATTERNS:
text = pat.sub("", text)
text = _HOOK_LINE_RE.sub("", text)
text = _COLLAPSED_LINES_RE.sub("", text)
# Strip the Claude Code collapsed-output chrome "[N tokens] (ctrl+o to expand)".
# Narrow shape — a bare "(ctrl+o to expand)" in user prose stays intact.
text = re.sub(r"\s*\[\d+\s+tokens?\]\s*\(ctrl\+o to expand\)", "", text)
# Collapse runs of blank lines created by the removals
text = re.sub(r"\n{4,}", "\n\n\n", text)
return text.strip()
def normalize(filepath: str) -> str:
"""
Load a file and normalize to transcript format if it's a chat export.
@@ -40,12 +123,14 @@ def normalize(filepath: str) -> str:
if not content.strip():
return content
# Already has > markers — pass through
# Already has > markers — pass through unchanged.
lines = content.split("\n")
if sum(1 for line in lines if line.strip().startswith(">")) >= 3:
return content
# Try JSON normalization
# Try JSON normalization. strip_noise is applied inside the Claude Code
# JSONL parser (the only format that injects system tags/hook chrome);
# other formats pass through verbatim.
ext = Path(filepath).suffix.lower()
if ext in (".json", ".jsonl") or content.strip()[:1] in ("{", "["):
normalized = _try_normalize_json(content)
@@ -112,6 +197,10 @@ def _try_claude_code_jsonl(content: str) -> Optional[str]:
isinstance(b, dict) and b.get("type") == "tool_result" for b in msg_content
)
text = _extract_content(msg_content, tool_use_map=tool_use_map)
# Strip Claude Code system-injected noise per message, never across
# message boundaries — prevents span-eating.
if text:
text = strip_noise(text)
if text:
if is_tool_only and messages and messages[-1][0] == "assistant":
# Append tool results to the previous assistant message
@@ -121,6 +210,8 @@ def _try_claude_code_jsonl(content: str) -> Optional[str]:
messages.append(("user", text))
elif msg_type == "assistant":
text = _extract_content(msg_content, tool_use_map=tool_use_map)
if text:
text = strip_noise(text)
if text:
# If previous message is also assistant (multi-turn tool loop),
# merge into the same assistant turn
+117 -27
View File
@@ -38,6 +38,16 @@ SKIP_DIRS = {
_DEFAULT_BACKEND = ChromaBackend()
# Schema version for drawer normalization. Bump when the normalization
# pipeline changes in a way that existing drawers should be rebuilt to pick up
# (e.g., new noise-stripping rules). `file_already_mined` treats drawers with
# a missing or stale `normalize_version` as "not mined", so the next mine pass
# silently rebuilds them — users don't need to manually erase + re-mine.
#
# v2 (2026-04): introduced strip_noise() for Claude Code JSONL; previous
# drawers stored system tags / hook chrome verbatim.
NORMALIZE_VERSION = 2
def get_collection(
palace_path: str,
@@ -58,6 +68,66 @@ def get_closets_collection(palace_path: str, create: bool = True):
CLOSET_CHAR_LIMIT = 1500 # fill closet until ~1500 chars, then start a new one
CLOSET_EXTRACT_WINDOW = 5000 # how many chars of source content to scan for entities/topics
# Common capitalized words that look like proper nouns but are usually
# sentence-starters or filler. Filtered out of entity extraction.
_ENTITY_STOPLIST = frozenset(
{
"The",
"This",
"That",
"These",
"Those",
"When",
"Where",
"What",
"Why",
"Who",
"Which",
"How",
"After",
"Before",
"Then",
"Now",
"Here",
"There",
"And",
"But",
"Or",
"Yet",
"So",
"If",
"Else",
"Yes",
"No",
"Maybe",
"Okay",
"User",
"Assistant",
"System",
"Tool",
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
}
)
def build_closet_lines(source_file, drawer_ids, content, wing, room):
@@ -72,11 +142,15 @@ def build_closet_lines(source_file, drawer_ids, content, wing, room):
from pathlib import Path
drawer_ref = ",".join(drawer_ids[:3])
window = content[:CLOSET_EXTRACT_WINDOW]
# Extract proper nouns (capitalized words, 2+ occurrences)
words = re.findall(r"\b[A-Z][a-z]{2,}\b", content[:5000])
# Extract proper nouns (capitalized words, 2+ occurrences). Filter out
# common sentence-starters that aren't real entities.
words = re.findall(r"\b[A-Z][a-z]{2,}\b", window)
word_freq = {}
for w in words:
if w in _ENTITY_STOPLIST:
continue
word_freq[w] = word_freq.get(w, 0) + 1
entities = sorted(
[w for w, c in word_freq.items() if c >= 2],
@@ -89,15 +163,15 @@ def build_closet_lines(source_file, drawer_ids, content, wing, room):
for pattern in [
r"(?:built|fixed|wrote|added|pushed|tested|created|decided|migrated|reviewed|deployed|configured|removed|updated)\s+[\w\s]{3,40}",
]:
topics.extend(re.findall(pattern, content[:5000], re.IGNORECASE))
topics.extend(re.findall(pattern, window, re.IGNORECASE))
# Also grab section headers if present
for header in re.findall(r"^#{1,3}\s+(.{5,60})$", content[:5000], re.MULTILINE):
for header in re.findall(r"^#{1,3}\s+(.{5,60})$", window, re.MULTILINE):
topics.append(header.strip())
# Dedupe preserving order
topics = list(dict.fromkeys(t.strip().lower() for t in topics))[:12]
# Extract quotes
quotes = re.findall(r'"([^"]{15,150})"', content[:5000])
quotes = re.findall(r'"([^"]{15,150})"', window)
# Build pointer lines — each one is atomic, never split
lines = []
@@ -114,17 +188,31 @@ def build_closet_lines(source_file, drawer_ids, content, wing, room):
return lines
def upsert_closet_lines(closets_col, closet_id_base, lines, metadata):
"""Add topic lines to closets. Never splits a topic mid-line.
def purge_file_closets(closets_col, source_file: str) -> None:
"""Delete every closet associated with ``source_file``.
If adding a line WHOLE would exceed CLOSET_CHAR_LIMIT, a new closet
is created. Some closets may have less than 1500 chars — that's fine.
Every topic is complete and readable.
Call this before ``upsert_closet_lines`` on a re-mine so stale topics
from a prior schema/version don't survive in the closet collection.
Mirrors the drawer-purge step in process_file().
"""
try:
closets_col.delete(where={"source_file": source_file})
except Exception:
pass
def upsert_closet_lines(closets_col, closet_id_base, lines, metadata):
"""Write topic lines to closets, packed greedily without splitting a line.
Closets are deterministically numbered (``..._01``, ``..._02``, …) and
each ``upsert`` fully overwrites the prior content at that ID. Callers
are expected to ``purge_file_closets`` first when re-mining a source
file so stale-numbered closets from larger prior runs don't leak.
Returns the number of closets written.
"""
closet_num = 1
current_lines = []
current_lines: list = []
current_chars = 0
closets_written = 0
@@ -134,17 +222,6 @@ def upsert_closet_lines(closets_col, closet_id_base, lines, metadata):
return
closet_id = f"{closet_id_base}_{closet_num:02d}"
text = "\n".join(current_lines)
# Check if closet already has content — append if room
try:
existing = closets_col.get(ids=[closet_id])
if existing.get("ids") and existing["documents"][0]:
old = existing["documents"][0]
if len(old) + len(text) + 1 <= CLOSET_CHAR_LIMIT:
text = old + "\n" + text
except Exception:
pass
closets_col.upsert(documents=[text], ids=[closet_id], metadatas=[metadata])
closets_written += 1
@@ -152,7 +229,6 @@ def upsert_closet_lines(closets_col, closet_id_base, lines, metadata):
line_len = len(line)
# Would this line fit whole in the current closet?
if current_chars > 0 and current_chars + line_len + 1 > CLOSET_CHAR_LIMIT:
# Doesn't fit — flush current closet, start new one
_flush()
closet_num += 1
current_lines = []
@@ -182,18 +258,22 @@ def mine_lock(source_file: str):
try:
if os.name == "nt":
import msvcrt
msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1)
else:
import fcntl
fcntl.flock(lf, fcntl.LOCK_EX)
yield
finally:
try:
if os.name == "nt":
import msvcrt
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
else:
import fcntl
fcntl.flock(lf, fcntl.LOCK_UN)
except Exception:
pass
@@ -203,16 +283,26 @@ def mine_lock(source_file: str):
def file_already_mined(collection, source_file: str, check_mtime: bool = False) -> bool:
"""Check if a file has already been filed in the palace.
When check_mtime=True (used by project miner), returns False if the file
has been modified since it was last mined, so it gets re-mined.
When check_mtime=False (used by convo miner), just checks existence.
Returns False (so the file gets re-mined) when:
- no drawers exist for this source_file
- the stored `normalize_version` is missing or older than the current
schema (triggers silent rebuild after a normalization upgrade)
- `check_mtime=True` and the file's mtime differs from the stored one
When check_mtime=True (used by project miner), also re-mines on content
change. When check_mtime=False (used by convo miner), transcripts are
assumed immutable, so only the version gate triggers a rebuild.
"""
try:
results = collection.get(where={"source_file": source_file}, limit=1)
if not results.get("ids"):
return False
stored_meta = results.get("metadatas", [{}])[0] or {}
# Pre-v2 drawers have no version field — treat them as stale.
stored_version = stored_meta.get("normalize_version", 1)
if stored_version < NORMALIZE_VERSION:
return False
if check_mtime:
stored_meta = results.get("metadatas", [{}])[0]
stored_mtime = stored_meta.get("source_mtime")
if stored_mtime is None:
return False
+129 -62
View File
@@ -18,11 +18,12 @@ No external graph DB needed — built from ChromaDB metadata.
import hashlib
import json
import os
from collections import defaultdict, Counter
from datetime import datetime
from collections import Counter, defaultdict
from datetime import datetime, timezone
from .config import MempalaceConfig
from .palace import get_collection as _get_palace_collection
from .palace import mine_lock
def _get_collection(config=None):
@@ -249,20 +250,66 @@ _TUNNEL_FILE = os.path.join(os.path.expanduser("~"), ".mempalace", "tunnels.json
def _load_tunnels():
"""Load explicit tunnels from disk."""
if os.path.exists(_TUNNEL_FILE):
try:
return json.loads(open(_TUNNEL_FILE).read())
except Exception:
pass
return []
"""Load explicit tunnels from disk.
Returns an empty list if the file is missing or corrupt (e.g. truncated
by a crash mid-write on a system that lacks atomic-rename semantics).
"""
if not os.path.exists(_TUNNEL_FILE):
return []
try:
with open(_TUNNEL_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception:
return []
return data if isinstance(data, list) else []
def _save_tunnels(tunnels):
"""Save explicit tunnels to disk."""
"""Persist explicit tunnels atomically.
Writes to ``tunnels.json.tmp`` then ``os.replace``s it into place, so
a crash mid-write can never leave a partial/empty tunnels.json that
silently wipes every tunnel on next read.
"""
os.makedirs(os.path.dirname(_TUNNEL_FILE), exist_ok=True)
with open(_TUNNEL_FILE, "w") as f:
tmp_path = _TUNNEL_FILE + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(tunnels, f, indent=2)
f.flush()
try:
os.fsync(f.fileno())
except OSError:
# Not all filesystems (or Windows file handles) support fsync — tolerate.
pass
os.replace(tmp_path, _TUNNEL_FILE)
def _endpoint_key(wing: str, room: str) -> str:
return f"{wing}/{room}"
def _canonical_tunnel_id(
source_wing: str, source_room: str, target_wing: str, target_room: str
) -> str:
"""Compute a symmetric tunnel ID.
Tunnels are conceptually undirected — "auth relates to users" is the
same connection as "users relates to auth". Sort the two endpoints
before hashing so ``create_tunnel(A, B)`` and ``create_tunnel(B, A)``
resolve to the same ID and dedup into one record.
"""
src = _endpoint_key(source_wing, source_room)
tgt = _endpoint_key(target_wing, target_room)
a, b = sorted((src, tgt))
return hashlib.sha256(f"{a}{b}".encode()).hexdigest()[:16]
def _require_name(value: str, field: str) -> str:
"""Reject empty / non-string endpoint identifiers."""
if not isinstance(value, str) or not value.strip():
raise ValueError(f"{field} must be a non-empty string")
return value.strip()
def create_tunnel(
@@ -274,72 +321,88 @@ def create_tunnel(
source_drawer_id: str = None,
target_drawer_id: str = None,
):
"""Create an explicit tunnel between two locations in the palace.
"""Create an explicit (symmetric) tunnel between two locations in the palace.
Use when an agent notices a connection between two projects/wings
that wouldn't be found by passive room-name matching.
Tunnels are undirected: ``create_tunnel(A, B)`` and ``create_tunnel(B, A)``
resolve to the same canonical ID. A second call with the same endpoints
updates the stored label (and drawer IDs, if provided) rather than
creating a duplicate.
The ``source`` / ``target`` fields on the returned dict preserve the
argument order the caller used, so callers can display it directionally
if they like. The ID and dedup are symmetric.
Args:
source_wing: Wing of the source (e.g., "project_api")
source_room: Room in the source wing
target_wing: Wing of the target (e.g., "project_database")
target_room: Room in the target wing
label: Description of the connection
source_drawer_id: Optional specific drawer ID
target_drawer_id: Optional specific drawer ID
source_wing: Wing of the source (e.g., "project_api").
source_room: Room in the source wing.
target_wing: Wing of the target (e.g., "project_database").
target_room: Room in the target wing.
label: Description of the connection.
source_drawer_id: Optional specific drawer ID.
target_drawer_id: Optional specific drawer ID.
Returns:
The created tunnel dict.
The stored tunnel dict.
Raises:
ValueError: if any wing or room is empty or non-string.
"""
tunnel_id = hashlib.sha256(
f"{source_wing}/{source_room}{target_wing}/{target_room}".encode()
).hexdigest()[:16]
source_wing = _require_name(source_wing, "source_wing")
source_room = _require_name(source_room, "source_room")
target_wing = _require_name(target_wing, "target_wing")
target_room = _require_name(target_room, "target_room")
tunnel_id = _canonical_tunnel_id(source_wing, source_room, target_wing, target_room)
tunnel = {
"id": tunnel_id,
"source": {"wing": source_wing, "room": source_room},
"target": {"wing": target_wing, "room": target_room},
"label": label,
"created_at": datetime.now().isoformat(),
"created_at": datetime.now(timezone.utc).isoformat(),
}
if source_drawer_id:
tunnel["source"]["drawer_id"] = source_drawer_id
if target_drawer_id:
tunnel["target"]["drawer_id"] = target_drawer_id
tunnels = _load_tunnels()
# Dedup — don't create if same endpoints already linked
for existing in tunnels:
if existing.get("id") == tunnel_id:
existing.update(tunnel) # update label/drawers
_save_tunnels(tunnels)
return existing
tunnels.append(tunnel)
_save_tunnels(tunnels)
# Serialize the load → mutate → save cycle. Without this, two concurrent
# create_tunnel calls can both read the same snapshot and the later
# writer silently drops the earlier writer's tunnel.
with mine_lock(_TUNNEL_FILE):
tunnels = _load_tunnels()
for existing in tunnels:
if existing.get("id") == tunnel_id:
# Preserve original creation timestamp on label updates.
tunnel["created_at"] = existing.get("created_at", tunnel["created_at"])
tunnel["updated_at"] = datetime.now(timezone.utc).isoformat()
existing.clear()
existing.update(tunnel)
_save_tunnels(tunnels)
return existing
tunnels.append(tunnel)
_save_tunnels(tunnels)
return tunnel
def list_tunnels(wing: str = None):
"""List all explicit tunnels, optionally filtered by wing.
Returns tunnels where the wing appears as either source or target.
Returns tunnels where ``wing`` appears as either source or target
(tunnels are symmetric, so either endpoint is a valid filter match).
"""
tunnels = _load_tunnels()
if wing:
tunnels = [
t for t in tunnels
if t["source"]["wing"] == wing or t["target"]["wing"] == wing
]
tunnels = [t for t in tunnels if t["source"]["wing"] == wing or t["target"]["wing"] == wing]
return tunnels
def delete_tunnel(tunnel_id: str):
"""Delete an explicit tunnel by ID."""
tunnels = _load_tunnels()
tunnels = [t for t in tunnels if t.get("id") != tunnel_id]
_save_tunnels(tunnels)
"""Delete an explicit tunnel by ID. Returns ``{"deleted": <id>}``."""
with mine_lock(_TUNNEL_FILE):
tunnels = _load_tunnels()
tunnels = [t for t in tunnels if t.get("id") != tunnel_id]
_save_tunnels(tunnels)
return {"deleted": tunnel_id}
@@ -357,23 +420,27 @@ def follow_tunnels(wing: str, room: str, col=None, config=None):
tgt = t["target"]
if src["wing"] == wing and src["room"] == room:
connections.append({
"direction": "outgoing",
"connected_wing": tgt["wing"],
"connected_room": tgt["room"],
"label": t.get("label", ""),
"drawer_id": tgt.get("drawer_id"),
"tunnel_id": t["id"],
})
connections.append(
{
"direction": "outgoing",
"connected_wing": tgt["wing"],
"connected_room": tgt["room"],
"label": t.get("label", ""),
"drawer_id": tgt.get("drawer_id"),
"tunnel_id": t["id"],
}
)
elif tgt["wing"] == wing and tgt["room"] == room:
connections.append({
"direction": "incoming",
"connected_wing": src["wing"],
"connected_room": src["room"],
"label": t.get("label", ""),
"drawer_id": src.get("drawer_id"),
"tunnel_id": t["id"],
})
connections.append(
{
"direction": "incoming",
"connected_wing": src["wing"],
"connected_room": src["room"],
"label": t.get("label", ""),
"drawer_id": src.get("drawer_id"),
"tunnel_id": t["id"],
}
)
# If we have a collection, fetch drawer content for connected items
if col and connections:
+234 -109
View File
@@ -12,7 +12,11 @@ import math
import re
from pathlib import Path
from .palace import get_collection, get_closets_collection
from .palace import get_closets_collection, get_collection
# Closet pointer line format: "topic|entities|→drawer_id_a,drawer_id_b"
# Multiple lines may join with newlines inside one closet document.
_CLOSET_DRAWER_REF_RE = re.compile(r"→([\w,]+)")
logger = logging.getLogger("mempalace_mcp")
@@ -21,57 +25,109 @@ class SearchError(Exception):
"""Raised when search cannot proceed (e.g. no palace found)."""
def _bm25_score(query: str, document: str, k1: float = 1.5, b: float = 0.75, avg_dl: float = 500) -> float:
"""Simple BM25 score for a single document against a query.
_TOKEN_RE = re.compile(r"\w{2,}", re.UNICODE)
This is a lightweight keyword-matching signal that complements vector
similarity. It catches exact matches that embeddings might miss
(e.g., specific names, project codes, error messages).
def _tokenize(text: str) -> list:
"""Lowercase + strip to alphanumeric tokens of length ≥ 2."""
return _TOKEN_RE.findall(text.lower())
def _bm25_scores(
query: str,
documents: list,
k1: float = 1.5,
b: float = 0.75,
) -> list:
"""Compute Okapi-BM25 scores for ``query`` against each document.
IDF is computed over the *provided corpus* using the Lucene/BM25+
smoothed formula ``log((N - df + 0.5) / (df + 0.5) + 1)``, which is
always non-negative. This is well-defined for re-ranking a small
candidate set returned by vector retrieval — IDF then reflects how
discriminative each query term is *within the candidates*, exactly
what's needed to reorder them.
Parameters mirror Okapi-BM25 conventions:
k1 — term-frequency saturation (1.2-2.0 typical, 1.5 default)
b — length normalization (0.0 = none, 1.0 = full, 0.75 default)
Returns a list of scores in the same order as ``documents``.
"""
query_terms = set(re.findall(r'\w{2,}', query.lower()))
doc_terms = re.findall(r'\w{2,}', document.lower())
if not query_terms or not doc_terms:
return 0.0
doc_len = len(doc_terms)
term_freq = {}
for t in doc_terms:
term_freq[t] = term_freq.get(t, 0) + 1
n_docs = len(documents)
query_terms = set(_tokenize(query))
if not query_terms or n_docs == 0:
return [0.0] * n_docs
score = 0.0
for term in query_terms:
tf = term_freq.get(term, 0)
if tf > 0:
# Simplified IDF — treat each query term as moderately rare
idf = math.log(2.0)
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * doc_len / avg_dl)
score += idf * numerator / denominator
return score
tokenized = [_tokenize(d) for d in documents]
doc_lens = [len(toks) for toks in tokenized]
if not any(doc_lens):
return [0.0] * n_docs
avgdl = sum(doc_lens) / n_docs or 1.0
# Document frequency: how many docs contain each query term?
df = {term: 0 for term in query_terms}
for toks in tokenized:
seen = set(toks) & query_terms
for term in seen:
df[term] += 1
idf = {term: math.log((n_docs - df[term] + 0.5) / (df[term] + 0.5) + 1) for term in query_terms}
scores = []
for toks, dl in zip(tokenized, doc_lens):
if dl == 0:
scores.append(0.0)
continue
tf: dict = {}
for t in toks:
if t in query_terms:
tf[t] = tf.get(t, 0) + 1
score = 0.0
for term, freq in tf.items():
num = freq * (k1 + 1)
den = freq + k1 * (1 - b + b * dl / avgdl)
score += idf[term] * num / den
scores.append(score)
return scores
def _hybrid_rank(vector_results, query: str, vector_weight: float = 0.6, bm25_weight: float = 0.4):
"""Re-rank results using both vector distance and BM25 keyword score.
def _hybrid_rank(
results: list,
query: str,
vector_weight: float = 0.6,
bm25_weight: float = 0.4,
) -> list:
"""Re-rank ``results`` by a convex combination of vector similarity and BM25.
Returns results sorted by combined score (higher = better).
* Vector similarity uses absolute cosine sim ``max(0, 1 - distance)`` —
ChromaDB's hnsw cosine distance lives in ``[0, 2]`` (0 = identical).
Absolute (not relative-to-max) means adding/removing a candidate
can't reshuffle the others.
* BM25 is real Okapi-BM25 with corpus-relative IDF over the candidates
themselves. Since the absolute scale is unbounded, BM25 is min-max
normalized within the candidate set so weights are commensurable.
Mutates each result dict to add ``bm25_score`` and reorders the list
in place. Returns the same list for convenience.
"""
if not vector_results:
return vector_results
if not results:
return results
# Normalize vector distances to 0-1 similarity
max_dist = max(r.get("distance", 1.0) for r in vector_results) or 1.0
for r in vector_results:
vec_sim = max(0.0, 1 - r.get("distance", 1.0) / max(max_dist, 0.001))
bm25 = _bm25_score(query, r.get("text", ""))
# Normalize BM25 to roughly 0-1 range
bm25_norm = min(bm25 / 3.0, 1.0)
r["_hybrid_score"] = vector_weight * vec_sim + bm25_weight * bm25_norm
r["bm25_score"] = round(bm25, 3)
docs = [r.get("text", "") for r in results]
bm25_raw = _bm25_scores(query, docs)
max_bm25 = max(bm25_raw) if bm25_raw else 0.0
bm25_norm = [s / max_bm25 for s in bm25_raw] if max_bm25 > 0 else [0.0] * len(bm25_raw)
vector_results.sort(key=lambda r: r["_hybrid_score"], reverse=True)
# Clean up internal field
for r in vector_results:
del r["_hybrid_score"]
return vector_results
scored = []
for r, raw, norm in zip(results, bm25_raw, bm25_norm):
vec_sim = max(0.0, 1.0 - r.get("distance", 1.0))
r["bm25_score"] = round(raw, 3)
scored.append((vector_weight * vec_sim + bm25_weight * norm, r))
scored.sort(key=lambda pair: pair[0], reverse=True)
results[:] = [r for _, r in scored]
return results
def build_where_filter(wing: str = None, room: str = None) -> dict:
@@ -85,6 +141,116 @@ def build_where_filter(wing: str = None, room: str = None) -> dict:
return {}
def _extract_drawer_ids_from_closet(closet_doc: str) -> list:
"""Parse all `→drawer_id_a,drawer_id_b` pointers out of a closet document.
Preserves order and dedupes.
"""
seen: dict = {}
for match in _CLOSET_DRAWER_REF_RE.findall(closet_doc):
for did in match.split(","):
did = did.strip()
if did and did not in seen:
seen[did] = None
return list(seen.keys())
def _closet_first_hits(
palace_path: str,
query: str,
where: dict,
drawers_col,
n_results: int,
max_distance: float,
):
"""Run a closet-first search and return chunk-level drawer hits.
Returns:
non-empty list of hits when the closet path produced usable matches.
``None`` when the closet collection is empty/missing OR when every
candidate drawer was filtered out (e.g. by max_distance); the
caller should fall back to direct drawer search.
"""
try:
closets_col = get_closets_collection(palace_path, create=False)
except Exception:
return None
try:
ckwargs = {
"query_texts": [query],
"n_results": max(n_results * 2, 5),
"include": ["documents", "metadatas", "distances"],
}
if where:
ckwargs["where"] = where
closet_results = closets_col.query(**ckwargs)
except Exception:
return None
closet_docs = closet_results["documents"][0] if closet_results["documents"] else []
if not closet_docs:
return None
closet_metas = closet_results["metadatas"][0]
closet_dists = closet_results["distances"][0]
# Collect candidate drawer IDs in closet-rank order, dedupe, remember
# which closet (and its distance/preview) introduced each one.
drawer_id_order: list = []
drawer_provenance: dict = {}
for cdoc, cmeta, cdist in zip(closet_docs, closet_metas, closet_dists):
for did in _extract_drawer_ids_from_closet(cdoc):
if did in drawer_provenance:
continue
drawer_provenance[did] = (cdist, cdoc, cmeta)
drawer_id_order.append(did)
if not drawer_id_order:
return None
# Hydrate exactly those drawers — chunk-level, not whole-file.
try:
fetched = drawers_col.get(
ids=drawer_id_order,
include=["documents", "metadatas"],
)
except Exception:
return None
fetched_ids = fetched.get("ids") or []
fetched_docs = fetched.get("documents") or []
fetched_metas = fetched.get("metadatas") or []
fetched_map = {
did: (doc, meta) for did, doc, meta in zip(fetched_ids, fetched_docs, fetched_metas)
}
hits: list = []
for did in drawer_id_order:
if did not in fetched_map:
continue # closet pointed to a drawer that no longer exists
doc, meta = fetched_map[did]
cdist, cdoc, _ = drawer_provenance[did]
if max_distance > 0.0 and cdist > max_distance:
continue
hits.append(
{
"text": doc,
"wing": meta.get("wing", "unknown"),
"room": meta.get("room", "unknown"),
"source_file": Path(meta.get("source_file", "?")).name,
"similarity": round(max(0.0, 1 - cdist), 3),
"distance": round(cdist, 4),
"matched_via": "closet",
"closet_preview": cdoc[:200],
}
)
if len(hits) >= n_results:
break
return hits if hits else None
def search(query: str, palace_path: str, wing: str = None, room: str = None, n_results: int = 5):
"""
Search the palace. Returns verbatim drawer content.
@@ -183,73 +349,31 @@ def search_memories(
where = build_where_filter(wing, room)
# Try closet-first search: search the compact index, then hydrate drawers
closet_hits = []
try:
closets_col = get_closets_collection(palace_path, create=False)
ckwargs = {
"query_texts": [query],
"n_results": n_results * 2, # over-fetch closets to find best drawers
"include": ["documents", "metadatas", "distances"],
# Closet-first search: scan the compact index, parse drawer pointers
# from each matching line, then hydrate exactly those drawers. This
# keeps the result shape chunk-level (consistent with direct search)
# and applies the same max_distance filter.
closet_hits = _closet_first_hits(
palace_path=palace_path,
query=query,
where=where,
drawers_col=drawers_col,
n_results=n_results,
max_distance=max_distance,
)
if closet_hits is not None:
# Re-rank chunk-level closet hits with the same hybrid scoring as
# the direct path. The vector half here uses the closet's distance
# (query↔topic-line) — that's intentional: closets are *meant* to
# be the semantic-narrowing signal, and BM25 then enforces actual
# keyword presence in the hydrated drawer text.
closet_hits = _hybrid_rank(closet_hits, query)
return {
"query": query,
"filters": {"wing": wing, "room": room},
"total_before_filter": len(closet_hits),
"results": closet_hits,
}
if where:
ckwargs["where"] = where
closet_results = closets_col.query(**ckwargs)
if closet_results["documents"][0]:
closet_hits = list(zip(
closet_results["documents"][0],
closet_results["metadatas"][0],
closet_results["distances"][0],
))
except Exception:
pass # no closets yet — fall through to direct drawer search
# If closets found results, hydrate the referenced drawers
if closet_hits:
import re
seen_sources = set()
hits = []
for closet_doc, closet_meta, closet_dist in closet_hits:
source = closet_meta.get("source_file", "")
if source in seen_sources:
continue
seen_sources.add(source)
# Find drawers for this source file
try:
drawer_results = drawers_col.get(
where={"source_file": source},
include=["documents", "metadatas"],
)
if drawer_results.get("ids"):
# Combine all drawer content for this file
full_text = "\n\n".join(drawer_results["documents"])
meta = drawer_results["metadatas"][0]
hits.append({
"text": full_text,
"wing": meta.get("wing", "unknown"),
"room": meta.get("room", "unknown"),
"source_file": Path(source).name,
"similarity": round(max(0.0, 1 - closet_dist), 3),
"distance": round(closet_dist, 4),
"matched_via": "closet",
"closet_preview": closet_doc[:200],
})
except Exception:
pass
if len(hits) >= n_results:
break
if hits:
# Re-rank with BM25 hybrid scoring
hits = _hybrid_rank(hits, query)
return {
"query": query,
"filters": {"wing": wing, "room": room},
"total_before_filter": len(closet_hits),
"results": hits,
}
# Fallback: direct drawer search (no closets yet, or closets empty)
try:
@@ -282,6 +406,7 @@ def search_memories(
"source_file": Path(meta.get("source_file", "?")).name,
"similarity": round(max(0.0, 1 - dist), 3),
"distance": round(dist, 4),
"matched_via": "drawer",
}
)
+1 -1
View File
@@ -1,3 +1,3 @@
"""Single source of truth for the MemPalace package version."""
__version__ = "3.1.0"
__version__ = "3.2.0"
+632 -117
View File
@@ -1,32 +1,60 @@
"""Tests for the closet layer, mine_lock, entity metadata, BM25 hybrid search,
and diary ingest.
"""
test_closets.py — Tests for the closet (searchable index) layer and the
features that ride on top of it: mine_lock serialization, entity metadata,
hybrid BM25+vector search, and diary ingest.
Content derived from Milla's omnibus test file; trimmed to only the features
present in this PR stack (#784 lock, #788 closets, this PR's entity/BM25/diary).
Strip-noise tests live with #785; tunnel tests live with the tunnels PR.
Coverage map:
* mine_lock — acquire/release, blocks concurrent acquisition.
* build_closet_lines — pointer-line shape, header pickup, entity stoplist
(regression for "When/After/The"), real-name survival, fallback line.
* upsert_closet_lines — pure overwrite (regression for the append bug),
char-limit packing without splitting a line.
* purge_file_closets — scoped to source_file.
* Project-miner end-to-end rebuild — re-mining with fewer topics fully
purges leftover numbered closets from a larger prior run.
* _extract_drawer_ids_from_closet — pointer parsing + dedup.
* search_memories closet-first path — fallback when empty, chunk-level
hits with matched_via, no whole-file glue, max_distance enforcement.
* Entity metadata — extracted, stoplist applied, registry cached by mtime.
* Real BM25 — real IDF over candidate corpus, hybrid rerank.
* Diary ingest — drawers + closets created, incremental skips, state
file lives outside the diary dir, wing-prefixed drawer IDs prevent
cross-diary collisions, force=True purges leftover closets.
"""
import json
import os
import tempfile
import threading
import time
import yaml
from mempalace.miner import (
_extract_entities_for_metadata,
_load_known_entities,
mine,
)
from mempalace.palace import (
CLOSET_CHAR_LIMIT,
build_closet_lines,
get_closets_collection,
get_collection,
mine_lock,
purge_file_closets,
upsert_closet_lines,
)
from mempalace.miner import _extract_entities_for_metadata
from mempalace.searcher import _bm25_score, _hybrid_rank
from mempalace.palace_graph import (
create_tunnel,
list_tunnels,
delete_tunnel,
follow_tunnels,
_TUNNEL_FILE,
list_tunnels,
)
from mempalace.searcher import (
_bm25_scores,
_extract_drawer_ids_from_closet,
_hybrid_rank,
search_memories,
)
@@ -34,104 +62,287 @@ from mempalace.palace_graph import (
class TestMineLock:
def test_lock_acquires_and_releases(self):
with mine_lock("/tmp/test_lock_file.txt"):
def test_lock_acquires_and_releases(self, tmp_path):
target = str(tmp_path / "lock_target.txt")
with mine_lock(target):
lock_dir = os.path.expanduser("~/.mempalace/locks")
assert os.path.isdir(lock_dir)
# Re-acquire after release should succeed instantly.
start = time.time()
with mine_lock(target):
pass
assert time.time() - start < 1.0
def test_lock_blocks_concurrent_access(self):
def test_lock_blocks_concurrent_access(self, tmp_path):
target = str(tmp_path / "concurrent_lock.txt")
results = []
def worker(name):
start = time.time()
with mine_lock("/tmp/same_file_lock_test.txt"):
with mine_lock(target):
results.append((name, time.time() - start))
time.sleep(0.2)
t1 = threading.Thread(target=worker, args=("a",))
t2 = threading.Thread(target=worker, args=("b",))
t1.start()
time.sleep(0.05)
time.sleep(0.05) # ensure t1 acquires first
t2.start()
t1.join()
t2.join()
# Second thread should have waited
wait_times = sorted(results, key=lambda x: x[1])
assert wait_times[1][1] > 0.1, "Second thread should block"
# The second worker must have waited at least most of t1's hold time.
wait_times = sorted(r[1] for r in results)
assert (
wait_times[1] > 0.1
), f"second thread should block on mine_lock, waited only {wait_times[1]:.3f}s"
# ── closet lines ─────────────────────────────────────────────────────────
# ── build_closet_lines ─────────────────────────────────────────────────
class TestBuildClosetLines:
def test_returns_list_of_lines(self):
lines = build_closet_lines(
"/tmp/test.py", ["drawer_001"], "We built the auth system", "code", "general"
def test_emits_pointer_line_shape(self):
content = (
"# Auth rewrite\n\n"
"Decided we need to migrate to passkeys. "
"Built the prototype with WebAuthn. "
"Reviewed the API surface."
)
assert isinstance(lines, list)
assert len(lines) >= 1
def test_each_line_has_pointer(self):
lines = build_closet_lines(
"/tmp/test.py",
["drawer_001", "drawer_002"],
"We built the auth system and tested the login flow",
"code",
"general",
"/proj/auth.md",
["drawer_proj_backend_aaa", "drawer_proj_backend_bbb"],
content,
wing="proj",
room="backend",
)
assert lines, "should always emit at least one line"
for line in lines:
assert "" in line, f"Line missing pointer: {line}"
assert "" in line, f"line missing pointer arrow: {line!r}"
parts = line.split("|")
assert len(parts) == 3, f"expected topic|entities|→refs, got {line!r}"
assert parts[2].startswith("")
def test_fallback_when_no_topics(self):
lines = build_closet_lines(
"/tmp/test.py", ["drawer_001"], "short text", "wing", "room"
def test_extracts_section_headers_as_topics(self):
content = "# First Header\nbody\n## Second Header\nmore body"
lines = build_closet_lines("/x.md", ["d1"], content, "w", "r")
joined = "\n".join(lines).lower()
assert "first header" in joined
assert "second header" in joined
def test_entity_stoplist_filters_sentence_starters(self):
# "When", "After", "The" repeat 3+ times — old code would index them
# as entities. Stoplist drops them.
content = (
"When the pipeline ran, the result was good. "
"When the user logged in, the token was issued. "
"After the migration, the latency dropped. "
"After the rollback, the latency rose. "
"The new flow is stable. The audit cleared."
)
assert len(lines) >= 1
assert "" in lines[0]
lines = build_closet_lines("/x.md", ["d1"], content, "w", "r")
entity_segments = [line.split("|")[1] for line in lines]
for seg in entity_segments:
tokens = set(seg.split(";")) if seg else set()
assert "When" not in tokens
assert "After" not in tokens
assert "The" not in tokens
def test_real_proper_nouns_survive_stoplist(self):
content = (
"Igor reviewed the diff. Milla wrote the spec. "
"Igor pushed the fix. Milla approved the PR. "
"Igor and Milla shipped together."
)
lines = build_closet_lines("/x.md", ["d1"], content, "w", "r")
joined_entities = ";".join(line.split("|")[1] for line in lines)
assert "Igor" in joined_entities
assert "Milla" in joined_entities
def test_emits_fallback_line_when_nothing_extractable(self):
content = "lorem ipsum dolor sit amet consectetur adipiscing elit"
lines = build_closet_lines("/x/notes.txt", ["d1"], content, "wing", "room")
assert len(lines) == 1
assert "wing/room/notes" in lines[0]
assert "→d1" in lines[0]
def test_pointer_references_first_three_drawers(self):
ids = [f"drawer_{i}" for i in range(10)]
lines = build_closet_lines("/x.md", ids, "# A\n# B", "w", "r")
assert all("→drawer_0,drawer_1,drawer_2" in line for line in lines)
# ── upsert_closet_lines ─────────────────────────────────────────────────
# ── upsert_closet_lines ───────────────────────────────────────────────
class TestUpsertClosetLines:
def test_writes_closets(self):
with tempfile.TemporaryDirectory() as tmpdir:
col = get_closets_collection(tmpdir)
lines = [
"topic one|Entity1|→drawer_001",
"topic two|Entity2|→drawer_002",
]
n = upsert_closet_lines(col, "test_closet", lines, {"wing": "test"})
assert n >= 1
assert col.count() >= 1
def test_overwrites_existing_closet_does_not_append(self, palace_path):
col = get_closets_collection(palace_path)
base = "closet_test_room_abc"
meta = {"wing": "test", "room": "room", "source_file": "/x.md"}
def test_never_splits_mid_topic(self):
with tempfile.TemporaryDirectory() as tmpdir:
col = get_closets_collection(tmpdir)
# Create lines that together exceed CLOSET_CHAR_LIMIT
lines = [f"topic_{i}|{'x' * 200}|→drawer_{i}" for i in range(20)]
n = upsert_closet_lines(col, "test_closet", lines, {"wing": "test"})
assert n >= 2, "Should create multiple closets"
upsert_closet_lines(col, base, ["alpha|;|→d1", "beta|;|→d2", "gamma|;|→d3"], meta)
first = col.get(ids=[f"{base}_01"])
assert "alpha" in first["documents"][0]
# Verify each closet has complete lines
all_data = col.get(include=["documents"])
for doc in all_data["documents"]:
for line in doc.strip().split("\n"):
assert "" in line, f"Split topic found: {line}"
# Second mine — entirely different lines. Must replace, not append.
upsert_closet_lines(col, base, ["delta|;|→d4", "epsilon|;|→d5"], meta)
second = col.get(ids=[f"{base}_01"])
doc = second["documents"][0]
assert "delta" in doc
assert "epsilon" in doc
assert "alpha" not in doc, "old closet line leaked into rebuild"
assert "beta" not in doc
def test_respects_char_limit(self):
with tempfile.TemporaryDirectory() as tmpdir:
col = get_closets_collection(tmpdir)
lines = [f"topic_{i}|entities|→drawer_{i}" for i in range(50)]
upsert_closet_lines(col, "test_closet", lines, {"wing": "test"})
def test_packs_into_multiple_closets_without_splitting_lines(self, palace_path):
col = get_closets_collection(palace_path)
base = "closet_pack_room_def"
meta = {"wing": "test", "room": "room", "source_file": "/y.md"}
all_data = col.get(include=["documents"])
for doc in all_data["documents"]:
assert len(doc) <= CLOSET_CHAR_LIMIT + 100 # small buffer for existing content
line = "x" * 600 # well under CLOSET_CHAR_LIMIT
n_written = upsert_closet_lines(col, base, [line, line, line, line], meta)
# 4 lines @ 601 chars each = 2404 — should pack into 2 closets
assert n_written == 2
for i in range(1, n_written + 1):
doc = col.get(ids=[f"{base}_{i:02d}"])["documents"][0]
for chunk in doc.split("\n"):
assert len(chunk) == 600, f"line was truncated in closet {i}"
assert len(doc) <= CLOSET_CHAR_LIMIT
# ── entity metadata ──────────────────────────────────────────────────────
# ── purge_file_closets ────────────────────────────────────────────────
class TestPurgeFileClosets:
def test_deletes_only_the_targeted_source(self, palace_path):
col = get_closets_collection(palace_path)
col.upsert(
ids=["closet_a_01", "closet_b_01"],
documents=["a|;|→d1", "b|;|→d2"],
metadatas=[
{"source_file": "/keep.md", "wing": "w", "room": "r"},
{"source_file": "/drop.md", "wing": "w", "room": "r"},
],
)
purge_file_closets(col, "/drop.md")
remaining_ids = set(col.get()["ids"])
assert "closet_a_01" in remaining_ids
assert "closet_b_01" not in remaining_ids
# ── project miner: closet rebuild end-to-end ──────────────────────────
class TestMinerClosetRebuild:
def test_remine_replaces_closets_completely(self, tmp_path):
project = tmp_path / "proj"
project.mkdir()
(project / "mempalace.yaml").write_text(
yaml.dump({"wing": "proj", "rooms": [{"name": "general", "description": "x"}]})
)
target = project / "doc.md"
# First mine — long content produces multiple numbered closets.
first_topics = "\n\n".join(f"# Topic {i}\n" + ("filler text " * 30) for i in range(15))
target.write_text(first_topics)
palace = tmp_path / "palace"
mine(str(project), str(palace), wing_override="proj", agent="test")
col = get_closets_collection(str(palace))
first_pass = col.get(where={"source_file": str(target)})
assert first_pass["ids"], "first mine should have written closets"
first_ids = set(first_pass["ids"])
assert any("topic 0" in (d or "").lower() for d in first_pass["documents"])
# Touch mtime + shrink content so the rebuild produces fewer closets.
target.write_text("# Only Topic Now\n" + ("short body " * 5))
new_mtime = os.path.getmtime(target) + 60
os.utime(target, (new_mtime, new_mtime))
time.sleep(0.01)
mine(str(project), str(palace), wing_override="proj", agent="test")
col = get_closets_collection(str(palace))
second_pass = col.get(where={"source_file": str(target)})
second_docs = "\n".join(second_pass["documents"]).lower()
assert "only topic now" in second_docs
for i in range(15):
assert (
f"topic {i}\n" not in second_docs
), f"stale 'Topic {i}' from first mine survived the rebuild"
# Numbered closets that existed only in the larger first run must be gone.
leftover = first_ids - set(second_pass["ids"])
for stale_id in leftover:
assert not col.get(ids=[stale_id])[
"ids"
], f"orphan closet {stale_id} from larger first run survived purge"
# ── _extract_drawer_ids_from_closet ───────────────────────────────────
class TestExtractDrawerIds:
def test_parses_single_pointer(self):
assert _extract_drawer_ids_from_closet("topic|;|→drawer_x") == ["drawer_x"]
def test_parses_multiple_pointers_per_line(self):
line = "topic|ent|→drawer_a,drawer_b,drawer_c"
assert _extract_drawer_ids_from_closet(line) == ["drawer_a", "drawer_b", "drawer_c"]
def test_dedupes_across_lines(self):
doc = "one|;|→drawer_a,drawer_b\ntwo|;|→drawer_b,drawer_c"
assert _extract_drawer_ids_from_closet(doc) == ["drawer_a", "drawer_b", "drawer_c"]
def test_empty_doc_returns_empty(self):
assert _extract_drawer_ids_from_closet("") == []
assert _extract_drawer_ids_from_closet("no arrows here") == []
# ── search_memories closet-first path ────────────────────────────────
class TestSearchMemoriesClosetFirst:
def test_falls_back_to_direct_when_no_closets(self, palace_path, seeded_collection):
result = search_memories("JWT authentication", palace_path)
assert result["results"], "should still find drawer hits via fallback"
for hit in result["results"]:
assert hit.get("matched_via") == "drawer"
def test_closet_first_returns_chunk_level_hits(self, palace_path, seeded_collection):
closets = get_closets_collection(palace_path)
closets.upsert(
ids=["closet_proj_backend_aaa_01"],
documents=["JWT auth tokens|;|→drawer_proj_backend_aaa"],
metadatas=[{"wing": "project", "room": "backend", "source_file": "auth.py"}],
)
result = search_memories("JWT authentication", palace_path)
assert result["results"], "closet-first search should hydrate the drawer"
top = result["results"][0]
assert top["matched_via"] == "closet"
assert "JWT" in top["text"]
# Chunk-level — must NOT glue every drawer in the file together.
assert "Database migrations" not in top["text"]
assert "→drawer_proj_backend_aaa" in top["closet_preview"]
def test_max_distance_filters_closet_hits(self, palace_path, seeded_collection):
closets = get_closets_collection(palace_path)
closets.upsert(
ids=["closet_proj_backend_aaa_01"],
documents=["JWT auth tokens|;|→drawer_proj_backend_aaa"],
metadatas=[{"wing": "project", "room": "backend", "source_file": "auth.py"}],
)
result = search_memories(
"completely unrelated query about quantum gardening",
palace_path,
max_distance=0.001,
)
for hit in result["results"]:
assert hit["distance"] <= 0.001
# ── entity metadata ──────────────────────────────────────────────────
class TestEntityMetadata:
@@ -143,120 +354,424 @@ class TestEntityMetadata:
def test_empty_for_no_entities(self):
text = "this is all lowercase with no proper nouns at all"
entities = _extract_entities_for_metadata(text)
assert entities == ""
assert _extract_entities_for_metadata(text) == ""
def test_semicolon_separated(self):
text = "Alice and Bob met Charlie. Alice said hello. Bob agreed. Charlie laughed."
entities = _extract_entities_for_metadata(text)
assert ";" in entities
def test_stoplist_filters_sentence_starters(self):
# Same regression as the closet entity test — "When/After/The" must
# not become entities just because they're capitalized 2+ times.
text = (
"When the build broke, the team paged. "
"When the fix landed, the alarm cleared. "
"After the rollback, the queue drained. "
"After the deploy, the latency normalized."
)
entities = _extract_entities_for_metadata(text)
tokens = set(entities.split(";")) if entities else set()
assert "When" not in tokens
assert "After" not in tokens
assert "The" not in tokens
# ── BM25 hybrid search ──────────────────────────────────────────────────
def test_capped_list_never_truncates_a_name(self):
# 30 distinct repeated proper nouns — extraction should cap the list
# before joining so a name never gets cut in half.
# Use morphologically distinct stems so the [A-Z][a-z]+ regex sees
# each as its own token.
names = [
"Anna",
"Brian",
"Carol",
"David",
"Elena",
"Frank",
"Grace",
"Harold",
"Iris",
"Julian",
"Kira",
"Liam",
"Maya",
"Noah",
"Oscar",
"Penny",
"Quinn",
"Rosa",
"Sergei",
"Tara",
"Umar",
"Vera",
"Walter",
"Xander",
"Yvonne",
"Zachary",
"Amelia",
"Boris",
"Clara",
"Dmitri",
]
text = " ".join(f"{n} met {n}." for n in names)
entities = _extract_entities_for_metadata(text)
extracted = [n for n in entities.split(";") if n]
assert extracted, "should have extracted some entities"
for name in extracted:
assert name in names, f"truncation produced a partial token: {name!r}"
def test_known_registry_is_cached_by_mtime(self, monkeypatch, tmp_path):
# Point the registry at a temp file we control, exercise the cache.
registry = tmp_path / "known_entities.json"
registry.write_text(json.dumps({"people": ["Zelda"]}))
from mempalace import miner
monkeypatch.setattr(miner, "_ENTITY_REGISTRY_PATH", str(registry))
miner._ENTITY_REGISTRY_CACHE["mtime"] = None
miner._ENTITY_REGISTRY_CACHE["names"] = frozenset()
first = _load_known_entities()
assert "Zelda" in first
# Second call without changing mtime: must reuse cache, not re-read.
read_count = {"n": 0}
original_open = open
def counting_open(path, *a, **kw):
if str(path) == str(registry):
read_count["n"] += 1
return original_open(path, *a, **kw)
monkeypatch.setattr("builtins.open", counting_open)
_load_known_entities()
assert read_count["n"] == 0, "registry should not be re-read when mtime unchanged"
# Bump mtime → cache must invalidate.
new_mtime = os.path.getmtime(registry) + 5
os.utime(registry, (new_mtime, new_mtime))
registry.write_text(json.dumps({"people": ["Zelda", "Link"]}))
os.utime(registry, (new_mtime, new_mtime))
names = _load_known_entities()
assert "Link" in names
# ── BM25 hybrid search (real IDF over candidate corpus) ──────────────
class TestBM25:
def test_bm25_score_positive_for_match(self):
score = _bm25_score("database migration", "We migrated the database to Postgres")
assert score > 0
def test_scores_positive_for_matching_doc(self):
scores = _bm25_scores(
"database migration",
["We migrated the database to Postgres.", "unrelated cookery tips"],
)
assert scores[0] > 0
assert scores[1] == 0.0
def test_bm25_score_zero_for_no_match(self):
score = _bm25_score("quantum physics", "We built a web application in React")
assert score == 0.0
def test_scores_zero_when_no_overlap(self):
scores = _bm25_scores("quantum physics", ["We built a web app in React"])
assert scores == [0.0]
def test_hybrid_rank_reorders(self):
def test_idf_downweights_terms_present_in_every_doc(self):
# "database" appears in every candidate → low IDF → low contribution.
# "vacuum" is unique to one → high IDF → that doc dominates.
scores = _bm25_scores(
"database vacuum",
[
"database backup nightly schedule",
"database vacuum scheduled weekly",
"database failover plan",
],
)
assert scores[1] == max(scores), "doc with the rare query term should win on IDF"
def test_empty_inputs_return_zeros(self):
assert _bm25_scores("", ["hello world"]) == [0.0]
assert _bm25_scores("query here", []) == []
assert _bm25_scores("query", [""]) == [0.0]
def test_hybrid_rank_promotes_keyword_match(self):
results = [
{"text": "database schema design for Postgres", "distance": 0.5},
{"text": "unrelated topic about cooking", "distance": 0.3},
]
ranked = _hybrid_rank(results, "database Postgres schema")
# The database result should rank higher despite worse vector distance
# The keyword-rich result outranks the closer-vector but irrelevant one.
assert "database" in ranked[0]["text"]
# bm25_score field is exposed for debugging.
assert "bm25_score" in ranked[0]
# No internal scoring leak.
assert "_hybrid_score" not in ranked[0]
def test_hybrid_rank_absolute_normalization(self):
# Adding a much-worse result to the candidate set must NOT reshuffle
# the top two — proves we're using absolute (1 - dist) and not
# dist / max_dist normalization.
base = [
{"text": "alpha alpha alpha", "distance": 0.1},
{"text": "beta beta beta", "distance": 0.4},
]
ranked_short = _hybrid_rank([dict(r) for r in base], "alpha")
with_outlier = base + [{"text": "gamma gamma gamma", "distance": 1.9}]
ranked_long = _hybrid_rank([dict(r) for r in with_outlier], "alpha")
assert ranked_short[0]["text"] == ranked_long[0]["text"]
assert ranked_short[1]["text"] == ranked_long[1]["text"]
# ── diary ingest ─────────────────────────────────────────────────────────
# ── diary ingest ─────────────────────────────────────────────────────
class TestDiaryIngest:
def test_ingest_creates_drawers_and_closets(self):
with tempfile.TemporaryDirectory() as palace_dir:
diary_dir = tempfile.mkdtemp()
# Write a test diary
with open(os.path.join(diary_dir, "2026-04-13.md"), "w") as f:
f.write("# 2026-04-13\n\n## 10:00 PDT — Test\n\nBuilt the auth system.\n")
def test_ingest_creates_drawers_and_closets(self, tmp_path):
diary_dir = tmp_path / "diaries"
diary_dir.mkdir()
(diary_dir / "2026-04-13.md").write_text(
"# 2026-04-13\n\n## 10:00 PDT — Test\n\nBuilt the auth system.\n"
)
palace_dir = tmp_path / "palace"
from mempalace.diary_ingest import ingest_diaries
from mempalace.diary_ingest import ingest_diaries
result = ingest_diaries(diary_dir, palace_dir, force=True)
assert result["days_updated"] >= 1
result = ingest_diaries(str(diary_dir), str(palace_dir), force=True)
assert result["days_updated"] >= 1
assert get_collection(str(palace_dir)).count() >= 1
# Check drawer exists
drawers = get_collection(palace_dir)
count = drawers.count()
assert count >= 1
def test_ingest_skips_unchanged_on_second_run(self, tmp_path):
diary_dir = tmp_path / "diaries"
diary_dir.mkdir()
(diary_dir / "2026-04-13.md").write_text(
"# 2026-04-13\n\n## 10:00 — Test\n\nContent here that's long enough.\n"
)
palace_dir = tmp_path / "palace"
def test_ingest_skips_unchanged(self):
with tempfile.TemporaryDirectory() as palace_dir:
diary_dir = tempfile.mkdtemp()
with open(os.path.join(diary_dir, "2026-04-13.md"), "w") as f:
f.write("# 2026-04-13\n\n## 10:00 — Test\n\nContent.\n")
from mempalace.diary_ingest import ingest_diaries
from mempalace.diary_ingest import ingest_diaries
ingest_diaries(str(diary_dir), str(palace_dir), force=True)
result = ingest_diaries(str(diary_dir), str(palace_dir))
assert result["days_updated"] == 0
ingest_diaries(diary_dir, palace_dir, force=True)
result = ingest_diaries(diary_dir, palace_dir) # second run, no force
assert result["days_updated"] == 0
def test_state_file_lives_outside_diary_dir(self, tmp_path):
# Regression: the original implementation wrote
# ``.diary_ingest_state.json`` *inside* the user's diary directory,
# polluting their content folder. State must live under
# ``~/.mempalace/state/`` instead.
diary_dir = tmp_path / "diaries"
diary_dir.mkdir()
(diary_dir / "2026-04-13.md").write_text(
"# 2026-04-13\n\n## 10:00 — Test\n\nBody content here long enough.\n"
)
palace_dir = tmp_path / "palace"
from mempalace.diary_ingest import _state_file_for, ingest_diaries
ingest_diaries(str(diary_dir), str(palace_dir), force=True)
# No state file inside the user's diary dir.
for entry in diary_dir.iterdir():
assert (
"diary_ingest" not in entry.name
), f"state file leaked into user diary dir: {entry}"
# State file does exist under ~/.mempalace/state/.
state_path = _state_file_for(str(palace_dir), diary_dir.resolve())
assert state_path.exists()
assert "/.mempalace/state/" in str(state_path)
def test_wing_prefixed_drawer_id_prevents_cross_diary_collision(self, tmp_path):
# Regression: the original implementation used
# ``drawer_diary_{date_str}`` regardless of wing — two diaries with
# the same date in different wings would clobber each other.
date_md = "# 2026-04-13\n\n## 10:00 — entry\n\nThis is the day's content.\n"
# Two separate diary dirs, ingested into the same palace under
# different wings. Each must produce a distinct drawer.
personal_dir = tmp_path / "personal"
personal_dir.mkdir()
(personal_dir / "2026-04-13.md").write_text(date_md + "Personal-only marker.\n")
work_dir = tmp_path / "work"
work_dir.mkdir()
(work_dir / "2026-04-13.md").write_text(date_md + "Work-only marker.\n")
palace_dir = tmp_path / "palace"
from mempalace.diary_ingest import _diary_drawer_id, ingest_diaries
ingest_diaries(str(personal_dir), str(palace_dir), wing="personal", force=True)
ingest_diaries(str(work_dir), str(palace_dir), wing="work", force=True)
col = get_collection(str(palace_dir))
personal_id = _diary_drawer_id("personal", "2026-04-13")
work_id = _diary_drawer_id("work", "2026-04-13")
assert personal_id != work_id
personal = col.get(ids=[personal_id])
work = col.get(ids=[work_id])
assert personal["ids"] == [personal_id]
assert work["ids"] == [work_id]
assert "Personal-only marker." in personal["documents"][0]
assert "Work-only marker." in work["documents"][0]
# ── tunnels ──────────────────────────────────────────────────────────────
# ── cross-wing tunnels ───────────────────────────────────────────────
class TestTunnels:
"""Tunnels are explicit cross-wing connections stored in
``~/.mempalace/tunnels.json``. Each test points the module-level
``_TUNNEL_FILE`` at a fresh tmp file so tests don't cross-contaminate
or touch the user's real tunnels."""
def setup_method(self):
# Use temp tunnel file
self._orig = _TUNNEL_FILE
import mempalace.palace_graph as pg
self._orig = pg._TUNNEL_FILE
self._tmpdir = tempfile.mkdtemp()
pg._TUNNEL_FILE = os.path.join(self._tmpdir, "tunnels.json")
def teardown_method(self):
import mempalace.palace_graph as pg
pg._TUNNEL_FILE = self._orig
import shutil
shutil.rmtree(self._tmpdir, ignore_errors=True)
def test_create_tunnel(self):
t = create_tunnel("wing_api", "auth", "wing_db", "users", label="auth uses users table")
assert t["id"]
assert t["source"]["wing"] == "wing_api"
assert t["source"]["room"] == "auth"
assert t["target"]["wing"] == "wing_db"
assert t["target"]["room"] == "users"
assert t["label"] == "auth uses users table"
def test_list_tunnels(self):
def test_list_tunnels_with_and_without_filter(self):
create_tunnel("wing_a", "room1", "wing_b", "room2")
create_tunnel("wing_a", "room3", "wing_c", "room4")
all_t = list_tunnels()
assert len(all_t) == 2
filtered = list_tunnels("wing_a")
assert len(filtered) == 2
filtered_c = list_tunnels("wing_c")
assert len(filtered_c) == 1
assert len(list_tunnels()) == 2
# Filtering by a wing that appears on either endpoint.
assert len(list_tunnels("wing_a")) == 2
assert len(list_tunnels("wing_c")) == 1
assert len(list_tunnels("wing_nonexistent")) == 0
def test_delete_tunnel(self):
t = create_tunnel("wing_x", "r1", "wing_y", "r2")
delete_tunnel(t["id"])
assert len(list_tunnels()) == 0
assert list_tunnels() == []
def test_dedup_same_endpoints(self):
def test_dedup_same_endpoints_updates_label(self):
create_tunnel("wing_a", "r1", "wing_b", "r2", label="first")
create_tunnel("wing_a", "r1", "wing_b", "r2", label="updated")
tunnels = list_tunnels()
assert len(tunnels) == 1
assert tunnels[0]["label"] == "updated"
def test_follow_tunnels(self):
def test_follow_tunnels_returns_connected_endpoints(self):
create_tunnel("wing_api", "auth", "wing_db", "users")
create_tunnel("wing_api", "auth", "wing_frontend", "login")
# Unrelated tunnel that must not surface.
create_tunnel("wing_other", "notes", "wing_misc", "scratch")
connections = follow_tunnels("wing_api", "auth")
assert len(connections) == 2
wings = {c["connected_wing"] for c in connections}
assert "wing_db" in wings
assert "wing_frontend" in wings
assert wings == {"wing_db", "wing_frontend"}
# ── regression: symmetry, durability, validation, concurrency ─────
def test_tunnel_is_symmetric(self):
"""Regression: tunnels are undirected. create(A, B) and create(B, A)
must resolve to the same canonical ID and dedupe into one record —
the second call updates the label instead of creating a dupe."""
first = create_tunnel("wing_a", "r1", "wing_b", "r2", label="forward")
second = create_tunnel("wing_b", "r2", "wing_a", "r1", label="reversed")
assert first["id"] == second["id"]
assert len(list_tunnels()) == 1
assert list_tunnels()[0]["label"] == "reversed"
def test_follow_tunnels_works_from_either_endpoint(self):
"""Symmetric: you can follow_tunnels from either end of the link."""
create_tunnel("wing_api", "auth", "wing_db", "users", label="auth uses users")
from_source = follow_tunnels("wing_api", "auth")
from_target = follow_tunnels("wing_db", "users")
assert len(from_source) == 1
assert len(from_target) == 1
assert from_source[0]["connected_wing"] == "wing_db"
assert from_target[0]["connected_wing"] == "wing_api"
# Both surfaces should carry the same label.
assert from_source[0]["label"] == "auth uses users"
assert from_target[0]["label"] == "auth uses users"
def test_empty_endpoint_fields_rejected(self):
"""Regression: create_tunnel must reject empty strings on any
endpoint field so the JSON store can't grow phantom tunnels."""
import pytest
for args in [
("", "r1", "wing", "r2"),
("wing", "", "wing", "r2"),
("wing", "r1", "", "r2"),
("wing", "r1", "wing", ""),
(" ", "r1", "wing", "r2"), # whitespace-only also rejected
]:
with pytest.raises(ValueError):
create_tunnel(*args)
def test_corrupt_tunnel_file_does_not_lose_new_writes(self):
"""A truncated/corrupt tunnels.json (crash mid-write on a system
without atomic rename) must not leak into subsequent reads — the
file should be treated as empty and a fresh create_tunnel should
persist cleanly."""
import mempalace.palace_graph as pg
# Simulate a crash that left a truncated file behind.
with open(pg._TUNNEL_FILE, "w") as f:
f.write("{not valid json")
# Load should return [] rather than raising.
assert list_tunnels() == []
# A subsequent create must persist (atomic write replaces the corrupt file).
t = create_tunnel("wing_a", "r1", "wing_b", "r2")
assert list_tunnels() == [t]
def test_atomic_write_leaves_no_stray_tmp_file(self):
"""Regression: _save_tunnels uses write-then-os.replace. After a
successful create, there must be no leftover ``tunnels.json.tmp``."""
import mempalace.palace_graph as pg
create_tunnel("wing_a", "r1", "wing_b", "r2")
assert os.path.exists(pg._TUNNEL_FILE)
assert not os.path.exists(pg._TUNNEL_FILE + ".tmp")
def test_concurrent_creates_preserve_all_tunnels(self):
"""Regression: two concurrent create_tunnel calls must not clobber
each other. Without the mine_lock around load+save, the later
writer's snapshot would overwrite the earlier writer's tunnel."""
barrier = threading.Barrier(5)
errors: list = []
def worker(i):
try:
barrier.wait(timeout=2)
create_tunnel(f"wing_{i}", "r", "wing_shared", "hub")
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"worker raised: {errors}"
tunnels = list_tunnels()
assert len(tunnels) == 5, (
f"expected 5 concurrent tunnels, got {len(tunnels)}" "write race dropped some"
)
def test_created_at_is_timezone_aware(self):
"""Regression: created_at must be tz-aware UTC, not naive."""
t = create_tunnel("wing_a", "r1", "wing_b", "r2")
# ISO format with tz offset contains '+' or 'Z'.
assert t["created_at"].endswith("+00:00") or t["created_at"].endswith("Z")
+83
View File
@@ -75,3 +75,86 @@ def test_mine_convos_does_not_reprocess_empty_chunk_files(capsys):
assert "Files skipped (already filed): 1" in out2
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def test_mine_convos_rebuilds_stale_drawers_after_schema_bump(capsys):
"""When stored drawers have an older normalize_version, the next mine
silently purges them and refiles — no manual erase required.
This is what makes the strip_noise upgrade apply to existing corpora:
users just run `mempalace mine` again and old noise-filled drawers get
replaced with clean ones."""
from mempalace.palace import NORMALIZE_VERSION
tmpdir = tempfile.mkdtemp()
try:
convo_path = Path(tmpdir) / "chat.txt"
convo_path.write_text(
"> What is memory?\nMemory is persistence.\n\n"
"> Why does it matter?\nIt enables continuity.\n\n"
"> How do we build it?\nWith structured storage.\n"
)
palace_path = os.path.join(tmpdir, "palace")
# First mine — stamps drawers with NORMALIZE_VERSION
mine_convos(tmpdir, palace_path, wing="test")
capsys.readouterr()
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
resolved = str(Path(tmpdir).resolve() / "chat.txt")
first_pass = col.get(where={"source_file": resolved})
first_ids = set(first_pass["ids"])
assert first_ids, "first mine should produce drawers"
for meta in first_pass["metadatas"]:
assert meta.get("normalize_version") == NORMALIZE_VERSION
# Simulate pre-v2 drawers: rewrite metadata to an older version,
# and replace content with "noise" so we can see it get cleaned up.
stale_metas = []
for meta in first_pass["metadatas"]:
stale = dict(meta)
stale["normalize_version"] = 1
stale_metas.append(stale)
col.update(
ids=list(first_pass["ids"]),
documents=["STALE NOISE"] * len(first_pass["ids"]),
metadatas=stale_metas,
)
# Add an extra orphan drawer that should also be purged.
col.add(
ids=["orphan_drawer"],
documents=["OLD ORPHAN"],
metadatas=[
{
"wing": "test",
"room": "default",
"source_file": resolved,
"chunk_index": 999,
"normalize_version": 1,
}
],
)
del col, client
# Second mine — version gate should trigger rebuild
mine_convos(tmpdir, palace_path, wing="test")
out = capsys.readouterr().out
assert (
"Files skipped (already filed): 0" in out
), "stale drawers should force a rebuild, not a skip"
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
rebuilt = col.get(where={"source_file": resolved})
# Orphan is gone
assert "orphan_drawer" not in rebuilt["ids"]
# No stale content survived
assert all("STALE NOISE" not in d for d in rebuilt["documents"])
assert all("OLD ORPHAN" not in d for d in rebuilt["documents"])
# All rebuilt drawers carry the current version
for meta in rebuilt["metadatas"]:
assert meta.get("normalize_version") == NORMALIZE_VERSION
del col, client
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
+43
View File
@@ -6,6 +6,7 @@ dispatch layer (integration-level). Uses isolated palace + KG fixtures
via monkeypatch to avoid touching real data.
"""
from datetime import datetime
import json
import sys
@@ -643,6 +644,48 @@ class TestDiaryTools:
r = tool_diary_read(agent_name="Nobody")
assert r["entries"] == []
def test_diary_write_same_second_shared_prefix_no_collision(
self, monkeypatch, config, palace_path, kg
):
_patch_mcp_server(monkeypatch, config, kg)
_client, _col = _get_collection(palace_path, create=True)
del _client
from mempalace import mcp_server
class FrozenDateTime:
calls = [
datetime(2026, 4, 13, 22, 15, 30, 123456),
datetime(2026, 4, 13, 22, 15, 30, 123457),
]
fallback = datetime(2026, 4, 13, 22, 15, 30, 123457)
@classmethod
def now(cls):
if cls.calls:
return cls.calls.pop(0)
return cls.fallback
monkeypatch.setattr(mcp_server, "datetime", FrozenDateTime)
from mempalace.mcp_server import tool_diary_read, tool_diary_write
entry1 = "A" * 50 + " entry one"
entry2 = "A" * 50 + " entry two"
result1 = tool_diary_write(agent_name="TestAgent", entry=entry1, topic="status")
result2 = tool_diary_write(agent_name="TestAgent", entry=entry2, topic="status")
assert result1["success"] is True
assert result2["success"] is True
assert result1["entry_id"] != result2["entry_id"]
read_result = tool_diary_read(agent_name="TestAgent")
contents = [entry["content"] for entry in read_result["entries"]]
assert read_result["total"] == 2
assert entry1 in contents
assert entry2 in contents
# ── Cache Invalidation (inode/mtime) ──────────────────────────────────
+90 -4
View File
@@ -7,7 +7,7 @@ import chromadb
import yaml
from mempalace.miner import mine, scan_project, status
from mempalace.palace import file_already_mined
from mempalace.palace import NORMALIZE_VERSION, file_already_mined
def write_file(path: Path, content: str):
@@ -227,11 +227,17 @@ def test_file_already_mined_check_mtime():
assert file_already_mined(col, test_file) is False
assert file_already_mined(col, test_file, check_mtime=True) is False
# Add it with mtime
# Add it with mtime + current normalize_version
col.add(
ids=["d1"],
documents=["hello world"],
metadatas=[{"source_file": test_file, "source_mtime": str(mtime)}],
metadatas=[
{
"source_file": test_file,
"source_mtime": str(mtime),
"normalize_version": NORMALIZE_VERSION,
}
],
)
# Already mined (no mtime check)
@@ -253,7 +259,12 @@ def test_file_already_mined_check_mtime():
col.add(
ids=["d2"],
documents=["other"],
metadatas=[{"source_file": "/fake/no_mtime.txt"}],
metadatas=[
{
"source_file": "/fake/no_mtime.txt",
"normalize_version": NORMALIZE_VERSION,
}
],
)
assert file_already_mined(col, "/fake/no_mtime.txt", check_mtime=True) is False
finally:
@@ -296,3 +307,78 @@ def test_status_missing_palace_does_not_create_empty_collection(tmp_path, capsys
out = capsys.readouterr().out
assert "No palace found" in out
assert not palace_path.exists()
# ── normalize_version schema gate ───────────────────────────────────────
#
# When the normalization pipeline changes shape (e.g., strip_noise lands),
# `NORMALIZE_VERSION` is bumped so pre-existing drawers can be silently
# rebuilt on the next mine. These tests pin that contract.
def test_file_already_mined_returns_false_for_stale_normalize_version():
"""Pre-v2 drawers (no field, or older integer) must not short-circuit."""
tmpdir = tempfile.mkdtemp()
try:
palace_path = os.path.join(tmpdir, "palace")
os.makedirs(palace_path)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")
# Pre-v2 drawer: no normalize_version field at all
col.add(
ids=["d_old"],
documents=["old"],
metadatas=[{"source_file": "/fake/old.jsonl"}],
)
assert file_already_mined(col, "/fake/old.jsonl") is False
# Explicitly older version
col.add(
ids=["d_v1"],
documents=["v1"],
metadatas=[{"source_file": "/fake/v1.jsonl", "normalize_version": 1}],
)
assert file_already_mined(col, "/fake/v1.jsonl") is False
# Current version — short-circuits
col.add(
ids=["d_current"],
documents=["cur"],
metadatas=[
{
"source_file": "/fake/current.jsonl",
"normalize_version": NORMALIZE_VERSION,
}
],
)
assert file_already_mined(col, "/fake/current.jsonl") is True
finally:
del col, client
shutil.rmtree(tmpdir, ignore_errors=True)
def test_add_drawer_stamps_normalize_version(tmp_path):
"""Fresh drawers carry the current schema version so future upgrades work."""
from mempalace.miner import add_drawer
palace_path = tmp_path / "palace"
palace_path.mkdir()
client = chromadb.PersistentClient(path=str(palace_path))
col = client.get_or_create_collection("mempalace_drawers")
try:
added = add_drawer(
collection=col,
wing="test",
room="notes",
content="hello",
source_file=str(tmp_path / "src.md"),
chunk_index=0,
agent="unit",
)
assert added is True
stored = col.get(limit=1)
meta = stored["metadatas"][0]
assert meta["normalize_version"] == NORMALIZE_VERSION
finally:
del col, client
+146
View File
@@ -13,6 +13,7 @@ from mempalace.normalize import (
_try_normalize_json,
_try_slack_json,
normalize,
strip_noise,
)
@@ -1048,3 +1049,148 @@ def test_normalize_rejects_large_file():
assert False, "Should have raised IOError"
except IOError as e:
assert "too large" in str(e).lower()
# ── strip_noise() — verbatim-safety boundary tests ─────────────────────
#
# The "Verbatim always" design principle requires that we never delete
# user-authored text. These tests pin down the boundary between system
# noise (which we strip) and user prose that happens to mention the same
# strings (which must survive untouched).
class TestStripNoisePreservesUserContent:
"""User prose that mentions noise strings inline must be preserved."""
def test_user_discusses_stop_hook_in_prose(self):
# Regression: original regex with IGNORECASE + `.*\n?` ate the second
# sentence from real user commentary.
text = (
"> User:\n"
"> Our CI has a stop hook that rejects merges after 5pm. "
"Ran 2 stop hooks last week.\n"
"> Assistant:\n"
"> Got it."
)
assert strip_noise(text) == text.strip()
def test_user_mentions_system_reminder_inline(self):
# Inline <system-reminder> tags inside user prose (e.g. documenting
# Claude Code behavior) must not be stripped.
text = (
"> User:\n"
"> Here is what Claude Code emits: "
"<system-reminder>Auto-save reminder...</system-reminder>"
" — I want to ignore it."
)
assert strip_noise(text) == text.strip()
def test_ctrl_o_hint_in_prose_preserved(self):
# Regression: original `.*\(ctrl\+o to expand\).*\n?` nuked the whole
# line whenever a user documented the TUI shortcut.
text = (
"> User:\n"
"> In the TUI you hit (ctrl+o to expand) to see more. "
"That is the shortcut I want to document."
)
assert strip_noise(text) == text.strip()
def test_current_time_inline_in_prose(self):
text = "> User:\n> At CURRENT TIME: the meeting starts, not before."
assert strip_noise(text) == text.strip()
def test_plus_n_lines_marker_inline(self):
text = "> User:\n> The log showed … +50 lines of stack trace, useful."
assert strip_noise(text) == text.strip()
def test_dangling_open_tag_does_not_span_messages(self):
# THE span-eating bug: a stray unclosed <system-reminder> in one
# message must NOT merge with a closing tag in another message and
# silently delete everything in between.
text = (
"> User 1: normal content <system-reminder>A\n"
"> Assistant: reply\n"
"> User 2: more content</system-reminder> tail"
)
out = strip_noise(text)
assert "Assistant: reply" in out
assert "User 2: more content" in out
assert "User 1: normal content" in out
class TestStripNoiseRemovesSystemChrome:
"""System-injected noise with standalone/line-anchored shape must be stripped."""
def test_strips_line_anchored_system_reminder_block(self):
text = (
"> User:\n"
"<system-reminder>\n"
"Auto-save reminder...\n"
"</system-reminder>\n"
"> Real message."
)
out = strip_noise(text)
assert "system-reminder" not in out
assert "Auto-save reminder" not in out
assert "Real message." in out
def test_strips_system_reminder_with_blockquote_prefix(self):
# _messages_to_transcript prefixes lines with "> ", so the line
# anchor must also accept that shape.
text = "> User:\n" "> <system-reminder>Injected noise</system-reminder>\n" "> Real message."
out = strip_noise(text)
assert "Injected noise" not in out
assert "Real message." in out
def test_strips_standalone_ran_hook_line(self):
text = "Ran 2 Stop hook\n> User: real content"
out = strip_noise(text)
assert "Ran 2 Stop hook" not in out
assert "real content" in out
def test_strips_known_hook_names(self):
for hook in ("Stop", "PreCompact", "PreToolUse", "PostToolUse", "UserPromptSubmit"):
text = f"Ran 1 {hook} hook\n> User: content"
assert hook not in strip_noise(text)
def test_strips_current_time_standalone(self):
text = "CURRENT TIME: 2026-04-13 10:00 UTC\n> User: Hello"
out = strip_noise(text)
assert "CURRENT TIME" not in out
assert "Hello" in out
def test_strips_collapsed_lines_marker(self):
text = "… +42 lines\n> User: Hello"
out = strip_noise(text)
assert "+42 lines" not in out
assert "Hello" in out
def test_strips_token_count_ctrl_o_chrome(self):
# Claude Code's actual collapsed-output chrome: "[N tokens] (ctrl+o to expand)"
text = "> Assistant: some output [5 tokens] (ctrl+o to expand)\n> User: ok"
out = strip_noise(text)
assert "(ctrl+o to expand)" not in out
assert "[5 tokens]" not in out
assert "some output" in out
def test_strips_each_known_noise_tag(self):
for tag in (
"system-reminder",
"command-message",
"command-name",
"task-notification",
"user-prompt-submit-hook",
"hook_output",
):
text = f"> User:\n<{tag}>junk</{tag}>\n> Real."
out = strip_noise(text)
assert tag not in out, f"{tag} leaked into output"
assert "Real." in out
def test_collapses_excessive_blank_lines(self):
text = "line one\n\n\n\n\n\nline two"
out = strip_noise(text)
assert "line one" in out
assert "line two" in out
# Should collapse to no more than 3 newlines
assert "\n\n\n\n" not in out