Files
mempalace/mempalace/miner.py
T
Igor Lins e Silva 1263c3c91e merge: full hardened stack + rewrite fact_checker around actual KG API
Merges the full hardened stack (up through #791 drawer-grep) and turns
fact_checker from "dead code hidden behind bare except" into an
actually-working offline contradiction detector with tests.

## Dead paths the PR body advertised but the code never executed

Both buried by a single outer ``except Exception: pass``:

  * ``kg.query(subject)`` — ``KnowledgeGraph`` has no ``query()`` method;
    it has ``query_entity()``. The attribute error was silently swallowed
    and the entire KG branch always returned ``[]``. Now using
    ``kg.query_entity(subject, direction="outgoing")`` with proper
    handling of the ``predicate``/``object``/``current``/``valid_to``
    fields the real API returns.
  * ``KnowledgeGraph(palace_path=palace_path)`` — the constructor's only
    kwarg is ``db_path``. Passing ``palace_path`` raised TypeError,
    silently swallowed. Now computing the db_path correctly from
    ``<palace>/knowledge_graph.sqlite3``, matching the convention the
    MCP server already uses.

## Contradiction logic rewritten

The previous ``if kg_pred in claim and fact.object not in claim`` only
fired when text used the SAME predicate word as the KG fact — the exact
opposite of the stated use case ("Bob is Alice's brother" when KG says
husband" would NOT have fired). Replaced with a proper parse → lookup
→ compare pipeline:

  * ``_extract_claims`` parses two surface forms ("X is Y's Z" and
    "X's Z is Y") into ``(subject, predicate, object)`` triples.
  * ``_check_kg_contradictions`` pulls the subject's outgoing facts
    and flags two classes:
      - ``relationship_mismatch`` when a current KG fact matches the
        same ``(subject, object)`` pair but with a different predicate.
      - ``stale_fact`` when the exact triple exists but is
        ``valid_to``-closed in the past.
  * Stale-fact detection is now implemented (the PR body claimed it;
    the old code silently didn't implement it).

## Performance fix — O(n²) → O(mentioned × n)

``_check_entity_confusion`` previously computed Levenshtein for every
pair of registered names on every ``check_text`` call. For 1,000
registered names that's ~500K edit-distance calls per hook invocation.
Now we first identify which registry names actually appear in the text
(single regex scan), then only compute edit distance between mentioned
and unmentioned names. Pinned by a test that asserts <200ms on a 500-
name registry with zero mentions.

Also: when *both* similar names are mentioned in the text, we no
longer flag them — the user clearly knows they're different people.

## Shared entity-registry loader

``mempalace/miner.py`` already had an mtime-cached loader for
``~/.mempalace/known_entities.json``. fact_checker had a duplicate
implementation that leaked file handles and ignored caching. Extended
miner's cache to expose both the flat set (``_load_known_entities``)
and the raw category dict (``_load_known_entities_raw``); fact_checker
now imports the latter. No more double disk reads, no more handle leak.

## Tests — 24 cases in tests/test_fact_checker.py

All three detection paths + both dead-code regressions:
  * ``test_kg_init_uses_db_path_not_palace_path_kwarg`` — pins the
    correct KG constructor signature so the ``palace_path=`` bug can't
    come back.
  * ``test_relationship_mismatch_detected`` — the headline example from
    the PR body now actually fires.
  * ``test_stale_fact_detected`` — valid_to-closed triple is flagged.
  * ``test_current_fact_same_triple_is_not_flagged`` — no false positive
    on a still-valid match.
  * ``test_performance_bounded_by_mentioned_names`` — 500-name registry,
    zero mentions, <200ms. Regression for the O(n²) blowup.
  * ``test_no_false_positive_when_both_names_mentioned`` — Mila and
    Milla in the same text is fine.
  * Plus claim extraction, flatten_names shapes, CLI exit code, empty
    text handling, missing-palace graceful fallback, registry-dict
    shape support.

785/785 suite pass. ruff + format clean on CI-pinned 0.4.x.
2026-04-13 18:20:11 -03:00

813 lines
26 KiB
Python

#!/usr/bin/env python3
"""
miner.py — Files everything into the palace.
Reads mempalace.yaml from the project directory to know the wing + rooms.
Routes each file to the right room based on content.
Stores verbatim chunks as drawers. No summaries. Ever.
"""
import os
import sys
import hashlib
import fnmatch
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from .palace import (
NORMALIZE_VERSION,
SKIP_DIRS,
build_closet_lines,
file_already_mined,
get_closets_collection,
get_collection,
mine_lock,
purge_file_closets,
upsert_closet_lines,
)
READABLE_EXTENSIONS = {
".txt",
".md",
".py",
".js",
".ts",
".jsx",
".tsx",
".json",
".yaml",
".yml",
".html",
".css",
".java",
".go",
".rs",
".rb",
".sh",
".csv",
".sql",
".toml",
}
SKIP_FILENAMES = {
"mempalace.yaml",
"mempalace.yml",
"mempal.yaml",
"mempal.yml",
".gitignore",
"package-lock.json",
}
CHUNK_SIZE = 800 # chars per drawer
CHUNK_OVERLAP = 100 # overlap between chunks
MIN_CHUNK_SIZE = 50 # skip tiny chunks
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this
# =============================================================================
# IGNORE MATCHING
# =============================================================================
class GitignoreMatcher:
"""Lightweight matcher for one directory's .gitignore patterns."""
def __init__(self, base_dir: Path, rules: list):
self.base_dir = base_dir
self.rules = rules
@classmethod
def from_dir(cls, dir_path: Path):
gitignore_path = dir_path / ".gitignore"
if not gitignore_path.is_file():
return None
try:
lines = gitignore_path.read_text(encoding="utf-8", errors="replace").splitlines()
except Exception:
return None
rules = []
for raw_line in lines:
line = raw_line.strip()
if not line:
continue
if line.startswith("\\#") or line.startswith("\\!"):
line = line[1:]
elif line.startswith("#"):
continue
negated = line.startswith("!")
if negated:
line = line[1:]
anchored = line.startswith("/")
if anchored:
line = line.lstrip("/")
dir_only = line.endswith("/")
if dir_only:
line = line.rstrip("/")
if not line:
continue
rules.append(
{
"pattern": line,
"anchored": anchored,
"dir_only": dir_only,
"negated": negated,
}
)
if not rules:
return None
return cls(dir_path, rules)
def matches(self, path: Path, is_dir: bool = None):
try:
relative = path.relative_to(self.base_dir).as_posix().strip("/")
except ValueError:
return None
if not relative:
return None
if is_dir is None:
is_dir = path.is_dir()
ignored = None
for rule in self.rules:
if self._rule_matches(rule, relative, is_dir):
ignored = not rule["negated"]
return ignored
def _rule_matches(self, rule: dict, relative: str, is_dir: bool) -> bool:
pattern = rule["pattern"]
parts = relative.split("/")
pattern_parts = pattern.split("/")
if rule["dir_only"]:
target_parts = parts if is_dir else parts[:-1]
if not target_parts:
return False
if rule["anchored"] or len(pattern_parts) > 1:
return self._match_from_root(target_parts, pattern_parts)
return any(fnmatch.fnmatch(part, pattern) for part in target_parts)
if rule["anchored"] or len(pattern_parts) > 1:
return self._match_from_root(parts, pattern_parts)
return any(fnmatch.fnmatch(part, pattern) for part in parts)
def _match_from_root(self, target_parts: list, pattern_parts: list) -> bool:
def matches(path_index: int, pattern_index: int) -> bool:
if pattern_index == len(pattern_parts):
return True
if path_index == len(target_parts):
return all(part == "**" for part in pattern_parts[pattern_index:])
pattern_part = pattern_parts[pattern_index]
if pattern_part == "**":
return matches(path_index, pattern_index + 1) or matches(
path_index + 1, pattern_index
)
if not fnmatch.fnmatch(target_parts[path_index], pattern_part):
return False
return matches(path_index + 1, pattern_index + 1)
return matches(0, 0)
def load_gitignore_matcher(dir_path: Path, cache: dict):
"""Load and cache one directory's .gitignore matcher."""
if dir_path not in cache:
cache[dir_path] = GitignoreMatcher.from_dir(dir_path)
return cache[dir_path]
def is_gitignored(path: Path, matchers: list, is_dir: bool = False) -> bool:
"""Apply active .gitignore matchers in ancestor order; last match wins."""
ignored = False
for matcher in matchers:
decision = matcher.matches(path, is_dir=is_dir)
if decision is not None:
ignored = decision
return ignored
def should_skip_dir(dirname: str) -> bool:
"""Skip known generated/cache directories before gitignore matching."""
return dirname in SKIP_DIRS or dirname.endswith(".egg-info")
def normalize_include_paths(include_ignored: list) -> set:
"""Normalize comma-parsed include paths into project-relative POSIX strings."""
normalized = set()
for raw_path in include_ignored or []:
candidate = str(raw_path).strip().strip("/")
if candidate:
normalized.add(Path(candidate).as_posix())
return normalized
def is_exact_force_include(path: Path, project_path: Path, include_paths: set) -> bool:
"""Return True when a path exactly matches an explicit include override."""
if not include_paths:
return False
try:
relative = path.relative_to(project_path).as_posix().strip("/")
except ValueError:
return False
return relative in include_paths
def is_force_included(path: Path, project_path: Path, include_paths: set) -> bool:
"""Return True when a path or one of its ancestors/descendants was explicitly included."""
if not include_paths:
return False
try:
relative = path.relative_to(project_path).as_posix().strip("/")
except ValueError:
return False
if not relative:
return False
for include_path in include_paths:
if relative == include_path:
return True
if relative.startswith(f"{include_path}/"):
return True
if include_path.startswith(f"{relative}/"):
return True
return False
# =============================================================================
# CONFIG
# =============================================================================
def load_config(project_dir: str) -> dict:
"""Load mempalace.yaml from project directory (falls back to mempal.yaml)."""
import yaml
config_path = Path(project_dir).expanduser().resolve() / "mempalace.yaml"
if not config_path.exists():
# Fallback to legacy name
legacy_path = Path(project_dir).expanduser().resolve() / "mempal.yaml"
if legacy_path.exists():
config_path = legacy_path
else:
print(f"ERROR: No mempalace.yaml found in {project_dir}")
print(f"Run: mempalace init {project_dir}")
sys.exit(1)
with open(config_path) as f:
return yaml.safe_load(f)
# =============================================================================
# FILE ROUTING — which room does this file belong to?
# =============================================================================
def detect_room(filepath: Path, content: str, rooms: list, project_path: Path) -> str:
"""
Route a file to the right room.
Priority:
1. Folder path matches a room name
2. Filename matches a room name or keyword
3. Content keyword scoring
4. Fallback: "general"
"""
relative = str(filepath.relative_to(project_path)).lower()
filename = filepath.stem.lower()
content_lower = content[:2000].lower()
# Priority 1: folder path matches room name or keywords
path_parts = relative.replace("\\", "/").split("/")
for part in path_parts[:-1]: # skip filename itself
for room in rooms:
candidates = [room["name"].lower()] + [k.lower() for k in room.get("keywords", [])]
if any(part == c or c in part or part in c for c in candidates):
return room["name"]
# Priority 2: filename matches room name
for room in rooms:
if room["name"].lower() in filename or filename in room["name"].lower():
return room["name"]
# Priority 3: keyword scoring from room keywords + name
scores = defaultdict(int)
for room in rooms:
keywords = room.get("keywords", []) + [room["name"]]
for kw in keywords:
count = content_lower.count(kw.lower())
scores[room["name"]] += count
if scores:
best = max(scores, key=scores.get)
if scores[best] > 0:
return best
return "general"
# =============================================================================
# CHUNKING
# =============================================================================
def chunk_text(content: str, source_file: str) -> list:
"""
Split content into drawer-sized chunks.
Tries to split on paragraph/line boundaries.
Returns list of {"content": str, "chunk_index": int}
"""
# Clean up
content = content.strip()
if not content:
return []
chunks = []
start = 0
chunk_index = 0
while start < len(content):
end = min(start + CHUNK_SIZE, len(content))
# Try to break at paragraph boundary
if end < len(content):
newline_pos = content.rfind("\n\n", start, end)
if newline_pos > start + CHUNK_SIZE // 2:
end = newline_pos
else:
newline_pos = content.rfind("\n", start, end)
if newline_pos > start + CHUNK_SIZE // 2:
end = newline_pos
chunk = content[start:end].strip()
if len(chunk) >= MIN_CHUNK_SIZE:
chunks.append(
{
"content": chunk,
"chunk_index": chunk_index,
}
)
chunk_index += 1
start = end - CHUNK_OVERLAP if end < len(content) else end
return chunks
# =============================================================================
# PALACE — ChromaDB operations
# =============================================================================
_ENTITY_REGISTRY_PATH = os.path.join(os.path.expanduser("~"), ".mempalace", "known_entities.json")
_ENTITY_REGISTRY_CACHE: dict = {"mtime": None, "names": frozenset(), "raw": {}}
_ENTITY_EXTRACT_WINDOW = 5000 # chars of content scanned for capitalized words
_ENTITY_METADATA_LIMIT = 25 # max entities packed into the metadata field
def _refresh_known_entities_cache() -> None:
"""Reload ``~/.mempalace/known_entities.json`` into the module cache if
its mtime changed since the last read. Shared by ``_load_known_entities``
(flat set) and ``_load_known_entities_raw`` (category dict), so callers
can pick whichever shape they need without duplicating the mtime-gated
disk read.
"""
try:
mtime = os.path.getmtime(_ENTITY_REGISTRY_PATH)
except OSError:
if _ENTITY_REGISTRY_CACHE["mtime"] is not None:
_ENTITY_REGISTRY_CACHE["mtime"] = None
_ENTITY_REGISTRY_CACHE["names"] = frozenset()
_ENTITY_REGISTRY_CACHE["raw"] = {}
return
if _ENTITY_REGISTRY_CACHE["mtime"] == mtime:
return
names: set = set()
raw: dict = {}
try:
import json
with open(_ENTITY_REGISTRY_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
raw = data
for cat in data.values():
if isinstance(cat, list):
names.update(str(n) for n in cat if n)
elif isinstance(cat, dict):
names.update(str(k) for k in cat.keys() if k)
except Exception:
names = set()
raw = {}
_ENTITY_REGISTRY_CACHE["mtime"] = mtime
_ENTITY_REGISTRY_CACHE["names"] = frozenset(names)
_ENTITY_REGISTRY_CACHE["raw"] = raw
def _load_known_entities() -> frozenset:
"""Flat set of every known entity name (across all categories).
Cached by mtime; invalidated when the registry file changes.
"""
_refresh_known_entities_cache()
return _ENTITY_REGISTRY_CACHE["names"]
def _load_known_entities_raw() -> dict:
"""Full category-dict view of the registry, shape
``{"category": ["Name1", ...], ...}``. Cached by mtime.
Consumed by modules (e.g., fact_checker) that need to reason about
categories rather than a flat name set. Never returns a mutable
reference to the cache — callers get a shallow copy.
"""
_refresh_known_entities_cache()
return dict(_ENTITY_REGISTRY_CACHE["raw"])
def _extract_entities_for_metadata(content: str) -> str:
"""Extract entity names from content for metadata tagging.
Combines the user's known-entity registry (cached across calls) with
capitalized words appearing ≥2 times in the first ``_ENTITY_EXTRACT_WINDOW``
chars. Filters out the closet stoplist (``When``, ``After``, ``The``, …)
so sentence-starters don't masquerade as proper nouns.
Returns semicolon-separated string suitable for ChromaDB metadata
filtering. The list is truncated to ``_ENTITY_METADATA_LIMIT`` entries
*before* joining so a name is never cut in half.
"""
import re
from .palace import _ENTITY_STOPLIST
matched: set = set()
known = _load_known_entities()
for name in known:
if re.search(r"(?<!\w)" + re.escape(name) + r"(?!\w)", content):
matched.add(name)
window = content[:_ENTITY_EXTRACT_WINDOW]
words = re.findall(r"\b[A-Z][a-z]{2,}\b", window)
freq: dict = {}
for w in words:
if w in _ENTITY_STOPLIST:
continue
freq[w] = freq.get(w, 0) + 1
for w, c in freq.items():
if c >= 2 and len(w) > 2:
matched.add(w)
if not matched:
return ""
# Truncate the *list*, not the joined string — never split a name.
capped = sorted(matched)[:_ENTITY_METADATA_LIMIT]
return ";".join(capped)
def add_drawer(
collection, wing: str, room: str, content: str, source_file: str, chunk_index: int, agent: str
):
"""Add one drawer to the palace."""
drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk_index)).encode()).hexdigest()[:24]}"
try:
metadata = {
"wing": wing,
"room": room,
"source_file": source_file,
"chunk_index": chunk_index,
"added_by": agent,
"filed_at": datetime.now().isoformat(),
"normalize_version": NORMALIZE_VERSION,
}
# Store file mtime so we can detect modifications later.
try:
metadata["source_mtime"] = os.path.getmtime(source_file)
except OSError:
pass
# Tag with entity names for filterable search
entities = _extract_entities_for_metadata(content)
if entities:
metadata["entities"] = entities
collection.upsert(
documents=[content],
ids=[drawer_id],
metadatas=[metadata],
)
return True
except Exception:
raise
# =============================================================================
# PROCESS ONE FILE
# =============================================================================
def process_file(
filepath: Path,
project_path: Path,
collection,
wing: str,
rooms: list,
agent: str,
dry_run: bool,
closets_col=None,
) -> tuple:
"""Read, chunk, route, and file one file. Returns (drawer_count, room_name)."""
# Skip if already filed
source_file = str(filepath)
if not dry_run and file_already_mined(collection, source_file, check_mtime=True):
return 0, "general"
try:
content = filepath.read_text(encoding="utf-8", errors="replace")
except OSError:
return 0, "general"
content = content.strip()
if len(content) < MIN_CHUNK_SIZE:
return 0, "general"
room = detect_room(filepath, content, rooms, project_path)
chunks = chunk_text(content, source_file)
if dry_run:
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)")
return len(chunks), room
# Lock this file so concurrent agents don't interleave delete+insert.
# Without the lock, two agents can both pass file_already_mined(),
# both delete, and both insert — creating duplicates or losing data.
with mine_lock(source_file):
# Re-check after acquiring lock — another agent may have just finished
if file_already_mined(collection, source_file, check_mtime=True):
return 0, room
# Purge stale drawers for this file before re-inserting the fresh chunks.
# Converts modified-file re-mines from upsert-over-existing-IDs (which hits
# hnswlib's thread-unsafe updatePoint path and can segfault on macOS ARM
# with chromadb 0.6.3) into a clean delete+insert, bypassing the update
# path entirely.
try:
collection.delete(where={"source_file": source_file})
except Exception:
pass
drawers_added = 0
for chunk in chunks:
added = add_drawer(
collection=collection,
wing=wing,
room=room,
content=chunk["content"],
source_file=source_file,
chunk_index=chunk["chunk_index"],
agent=agent,
)
if added:
drawers_added += 1
# Build closet — the searchable index pointing to these drawers.
# Purge first: a re-mine (mtime change or normalize_version bump) must
# fully replace the prior closets, not append to them.
if closets_col and drawers_added > 0:
drawer_ids = [
f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(c['chunk_index'])).encode()).hexdigest()[:24]}"
for c in chunks
]
closet_lines = build_closet_lines(source_file, drawer_ids, content, wing, room)
closet_id_base = (
f"closet_{wing}_{room}_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}"
)
entities = _extract_entities_for_metadata(content)
closet_meta = {
"wing": wing,
"room": room,
"source_file": source_file,
"drawer_count": drawers_added,
"filed_at": datetime.now().isoformat(),
"normalize_version": NORMALIZE_VERSION,
}
if entities:
closet_meta["entities"] = entities
purge_file_closets(closets_col, source_file)
upsert_closet_lines(closets_col, closet_id_base, closet_lines, closet_meta)
return drawers_added, room
# =============================================================================
# SCAN PROJECT
# =============================================================================
def scan_project(
project_dir: str,
respect_gitignore: bool = True,
include_ignored: list = None,
) -> list:
"""Return list of all readable file paths."""
project_path = Path(project_dir).expanduser().resolve()
files = []
active_matchers = []
matcher_cache = {}
include_paths = normalize_include_paths(include_ignored)
for root, dirs, filenames in os.walk(project_path):
root_path = Path(root)
if respect_gitignore:
active_matchers = [
matcher
for matcher in active_matchers
if root_path == matcher.base_dir or matcher.base_dir in root_path.parents
]
current_matcher = load_gitignore_matcher(root_path, matcher_cache)
if current_matcher is not None:
active_matchers.append(current_matcher)
dirs[:] = [
d
for d in dirs
if is_force_included(root_path / d, project_path, include_paths)
or not should_skip_dir(d)
]
if respect_gitignore and active_matchers:
dirs[:] = [
d
for d in dirs
if is_force_included(root_path / d, project_path, include_paths)
or not is_gitignored(root_path / d, active_matchers, is_dir=True)
]
for filename in filenames:
filepath = root_path / filename
force_include = is_force_included(filepath, project_path, include_paths)
exact_force_include = is_exact_force_include(filepath, project_path, include_paths)
if not force_include and filename in SKIP_FILENAMES:
continue
if filepath.suffix.lower() not in READABLE_EXTENSIONS and not exact_force_include:
continue
if respect_gitignore and active_matchers and not force_include:
if is_gitignored(filepath, active_matchers, is_dir=False):
continue
# Skip symlinks — prevents following links to /dev/urandom, etc.
if filepath.is_symlink():
continue
# Skip files exceeding size limit
try:
if filepath.stat().st_size > MAX_FILE_SIZE:
continue
except OSError:
continue
files.append(filepath)
return files
# =============================================================================
# MAIN: MINE
# =============================================================================
def mine(
project_dir: str,
palace_path: str,
wing_override: str = None,
agent: str = "mempalace",
limit: int = 0,
dry_run: bool = False,
respect_gitignore: bool = True,
include_ignored: list = None,
):
"""Mine a project directory into the palace."""
project_path = Path(project_dir).expanduser().resolve()
config = load_config(project_dir)
wing = wing_override or config["wing"]
rooms = config.get("rooms", [{"name": "general", "description": "All project files"}])
files = scan_project(
project_dir,
respect_gitignore=respect_gitignore,
include_ignored=include_ignored,
)
if limit > 0:
files = files[:limit]
print(f"\n{'=' * 55}")
print(" MemPalace Mine")
print(f"{'=' * 55}")
print(f" Wing: {wing}")
print(f" Rooms: {', '.join(r['name'] for r in rooms)}")
print(f" Files: {len(files)}")
print(f" Palace: {palace_path}")
if dry_run:
print(" DRY RUN — nothing will be filed")
if not respect_gitignore:
print(" .gitignore: DISABLED")
if include_ignored:
print(f" Include: {', '.join(sorted(normalize_include_paths(include_ignored)))}")
print(f"{'' * 55}\n")
if not dry_run:
collection = get_collection(palace_path)
closets_col = get_closets_collection(palace_path)
else:
collection = None
closets_col = None
total_drawers = 0
files_skipped = 0
room_counts = defaultdict(int)
for i, filepath in enumerate(files, 1):
drawers, room = process_file(
filepath=filepath,
project_path=project_path,
collection=collection,
wing=wing,
rooms=rooms,
agent=agent,
dry_run=dry_run,
closets_col=closets_col,
)
if drawers == 0 and not dry_run:
files_skipped += 1
else:
total_drawers += drawers
room_counts[room] += 1
if not dry_run:
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers}")
print(f"\n{'=' * 55}")
print(" Done.")
print(f" Files processed: {len(files) - files_skipped}")
print(f" Files skipped (already filed): {files_skipped}")
print(f" Drawers filed: {total_drawers}")
print("\n By room:")
for room, count in sorted(room_counts.items(), key=lambda x: x[1], reverse=True):
print(f" {room:20} {count} files")
print('\n Next: mempalace search "what you\'re looking for"')
print(f"{'=' * 55}\n")
# =============================================================================
# STATUS
# =============================================================================
def status(palace_path: str):
"""Show what's been filed in the palace."""
try:
col = get_collection(palace_path, create=False)
except Exception:
print(f"\n No palace found at {palace_path}")
print(" Run: mempalace init <dir> then mempalace mine <dir>")
return
# Count by wing and room
total = col.count()
r = col.get(limit=total, include=["metadatas"]) if total else {"metadatas": []}
metas = r["metadatas"]
wing_rooms = defaultdict(lambda: defaultdict(int))
for m in metas:
wing_rooms[m.get("wing", "?")][m.get("room", "?")] += 1
print(f"\n{'=' * 55}")
print(f" MemPalace Status — {len(metas)} drawers")
print(f"{'=' * 55}\n")
for wing, rooms in sorted(wing_rooms.items()):
print(f" WING: {wing}")
for room, count in sorted(rooms.items(), key=lambda x: x[1], reverse=True):
print(f" ROOM: {room:20} {count:5} drawers")
print()
print(f"{'=' * 55}\n")