391 lines
14 KiB
Python
391 lines
14 KiB
Python
"""
|
|
MemPalace configuration system.
|
|
|
|
Priority: env vars > config file (~/.mempalace/config.json) > defaults
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
|
|
|
|
# ── Input validation ──────────────────────────────────────────────────────────
|
|
# Shared sanitizers for wing/room/entity names. Prevents path traversal,
|
|
# excessively long strings, and special characters that could cause issues
|
|
# in file paths, SQLite, or ChromaDB metadata.
|
|
|
|
MAX_NAME_LENGTH = 128
|
|
_SAFE_NAME_RE = re.compile(r"^(?:[^\W_]|[^\W_][\w .'-]{0,126}[^\W_])$")
|
|
|
|
|
|
def normalize_wing_name(name: str) -> str:
|
|
"""Lower-case + collapse separators (`-`, ` `) to `_` for wing slugs.
|
|
|
|
The same rule is applied by ``init`` when persisting `topics_by_wing`
|
|
and when writing `mempalace.yaml`, so the miner's lookup matches at
|
|
mine time regardless of the source dirname.
|
|
"""
|
|
return name.lower().replace(" ", "_").replace("-", "_")
|
|
|
|
|
|
def sanitize_name(value: str, field_name: str = "name") -> str:
|
|
"""Validate and sanitize a wing/room/entity name.
|
|
|
|
Raises ValueError if the name is invalid.
|
|
"""
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError(f"{field_name} must be a non-empty string")
|
|
|
|
value = value.strip()
|
|
|
|
if len(value) > MAX_NAME_LENGTH:
|
|
raise ValueError(f"{field_name} exceeds maximum length of {MAX_NAME_LENGTH} characters")
|
|
|
|
# Block path traversal
|
|
if ".." in value or "/" in value or "\\" in value:
|
|
raise ValueError(f"{field_name} contains invalid path characters")
|
|
|
|
# Block null bytes
|
|
if "\x00" in value:
|
|
raise ValueError(f"{field_name} contains null bytes")
|
|
|
|
# Enforce safe character set
|
|
if not _SAFE_NAME_RE.match(value):
|
|
raise ValueError(f"{field_name} contains invalid characters")
|
|
|
|
return value
|
|
|
|
|
|
def sanitize_kg_value(value: str, field_name: str = "value") -> str:
|
|
"""Validate a knowledge-graph entity name (subject or object).
|
|
|
|
More permissive than sanitize_name — allows punctuation like commas,
|
|
colons, and parentheses that are common in natural-language KG values.
|
|
Only blocks null bytes and over-length strings.
|
|
|
|
Not used for wing/room names (which have filesystem constraints) or
|
|
predicates (which should be simple relationship identifiers).
|
|
"""
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError(f"{field_name} must be a non-empty string")
|
|
|
|
value = value.strip()
|
|
|
|
if len(value) > MAX_NAME_LENGTH:
|
|
raise ValueError(f"{field_name} exceeds maximum length of {MAX_NAME_LENGTH} characters")
|
|
|
|
if "\x00" in value:
|
|
raise ValueError(f"{field_name} contains null bytes")
|
|
|
|
return value
|
|
|
|
|
|
# ISO-8601 date validator for knowledge-graph temporal parameters
|
|
# (as_of, valid_from, valid_to, ended). Parameterized queries already
|
|
# prevent SQL injection, but unvalidated date strings silently miss
|
|
# every row — callers cannot distinguish "no fact at this time" from
|
|
# "your date format was unrecognized." Require full YYYY-MM-DD: KG
|
|
# queries compare TEXT dates lexicographically, so partials like "2026"
|
|
# would re-introduce silent empty results (e.g. "2026-01-01" <= "2026"
|
|
# is False), defeating the purpose of validation.
|
|
_ISO_DATE_RE = re.compile(r"^\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12]\d|3[01])$")
|
|
|
|
|
|
def sanitize_iso_date(value, field_name: str = "date"):
|
|
"""Validate an ISO-8601 date string, accepting None or empty as-is.
|
|
|
|
Accepts only ``YYYY-MM-DD``. Raises ValueError on any other
|
|
non-empty input so the MCP layer can surface a clear error to the
|
|
caller instead of silently returning empty results. Partial dates
|
|
(``YYYY``, ``YYYY-MM``) are rejected because KG queries compare
|
|
TEXT dates lexicographically and would silently exclude valid facts.
|
|
"""
|
|
if value is None or value == "":
|
|
return value
|
|
if not isinstance(value, str):
|
|
raise ValueError(f"{field_name} must be a string")
|
|
value = value.strip()
|
|
if not _ISO_DATE_RE.match(value):
|
|
raise ValueError(
|
|
f"{field_name}={value!r} is not a valid ISO-8601 date " f"(expected YYYY-MM-DD)"
|
|
)
|
|
return value
|
|
|
|
|
|
def sanitize_content(value: str, max_length: int = 100_000) -> str:
|
|
"""Validate drawer/diary content length."""
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError("content must be a non-empty string")
|
|
if len(value) > max_length:
|
|
raise ValueError(f"content exceeds maximum length of {max_length} characters")
|
|
if "\x00" in value:
|
|
raise ValueError("content contains null bytes")
|
|
return value
|
|
|
|
|
|
DEFAULT_PALACE_PATH = os.path.expanduser("~/.mempalace/palace")
|
|
DEFAULT_COLLECTION_NAME = "mempalace_drawers"
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_configured_collection_name() -> str:
|
|
"""Return the configured drawer collection name without repeated config-file reads."""
|
|
return MempalaceConfig().collection_name
|
|
|
|
|
|
DEFAULT_TOPIC_WINGS = [
|
|
"emotions",
|
|
"consciousness",
|
|
"memory",
|
|
"technical",
|
|
"identity",
|
|
"family",
|
|
"creative",
|
|
]
|
|
|
|
DEFAULT_HALL_KEYWORDS = {
|
|
"emotions": [
|
|
"scared",
|
|
"afraid",
|
|
"worried",
|
|
"happy",
|
|
"sad",
|
|
"love",
|
|
"hate",
|
|
"feel",
|
|
"cry",
|
|
"tears",
|
|
],
|
|
"consciousness": [
|
|
"consciousness",
|
|
"conscious",
|
|
"aware",
|
|
"real",
|
|
"genuine",
|
|
"soul",
|
|
"exist",
|
|
"alive",
|
|
],
|
|
"memory": ["memory", "remember", "forget", "recall", "archive", "palace", "store"],
|
|
"technical": [
|
|
"code",
|
|
"python",
|
|
"script",
|
|
"bug",
|
|
"error",
|
|
"function",
|
|
"api",
|
|
"database",
|
|
"server",
|
|
],
|
|
"identity": ["identity", "name", "who am i", "persona", "self"],
|
|
"family": ["family", "kids", "children", "daughter", "son", "parent", "mother", "father"],
|
|
"creative": ["game", "gameplay", "player", "app", "design", "art", "music", "story"],
|
|
}
|
|
|
|
|
|
class MempalaceConfig:
|
|
"""Configuration manager for MemPalace.
|
|
|
|
Load order: env vars > config file > defaults.
|
|
"""
|
|
|
|
def __init__(self, config_dir=None):
|
|
"""Initialize config.
|
|
|
|
Args:
|
|
config_dir: Override config directory (useful for testing).
|
|
Defaults to ~/.mempalace.
|
|
"""
|
|
self._config_dir = (
|
|
Path(config_dir) if config_dir else Path(os.path.expanduser("~/.mempalace"))
|
|
)
|
|
self._config_file = self._config_dir / "config.json"
|
|
self._people_map_file = self._config_dir / "people_map.json"
|
|
self._file_config = {}
|
|
|
|
if self._config_file.exists():
|
|
try:
|
|
with open(self._config_file, "r") as f:
|
|
self._file_config = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
self._file_config = {}
|
|
|
|
@property
|
|
def palace_path(self):
|
|
"""Path to the memory palace data directory."""
|
|
env_val = os.environ.get("MEMPALACE_PALACE_PATH") or os.environ.get("MEMPAL_PALACE_PATH")
|
|
if env_val:
|
|
# Normalize: expand ~ and collapse .. to match the CLI --palace
|
|
# code path (mcp_server.py:62) and prevent surprise redirection
|
|
# when the env var contains unresolved components.
|
|
return os.path.abspath(os.path.expanduser(env_val))
|
|
return self._file_config.get("palace_path", DEFAULT_PALACE_PATH)
|
|
|
|
@property
|
|
def collection_name(self):
|
|
"""ChromaDB collection name."""
|
|
return self._file_config.get("collection_name", DEFAULT_COLLECTION_NAME)
|
|
|
|
@property
|
|
def people_map(self):
|
|
"""Mapping of name variants to canonical names."""
|
|
if self._people_map_file.exists():
|
|
try:
|
|
with open(self._people_map_file, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return self._file_config.get("people_map", {})
|
|
|
|
@property
|
|
def topic_wings(self):
|
|
"""List of topic wing names."""
|
|
return self._file_config.get("topic_wings", DEFAULT_TOPIC_WINGS)
|
|
|
|
@property
|
|
def hall_keywords(self):
|
|
"""Mapping of hall names to keyword lists."""
|
|
return self._file_config.get("hall_keywords", DEFAULT_HALL_KEYWORDS)
|
|
|
|
@property
|
|
def entity_languages(self):
|
|
"""Languages whose entity-detection patterns should be applied.
|
|
|
|
Reads from env var ``MEMPALACE_ENTITY_LANGUAGES`` (comma-separated)
|
|
first, then the ``entity_languages`` field in ``config.json``,
|
|
defaulting to ``["en"]``.
|
|
"""
|
|
env_val = os.environ.get("MEMPALACE_ENTITY_LANGUAGES") or os.environ.get(
|
|
"MEMPAL_ENTITY_LANGUAGES"
|
|
)
|
|
if env_val:
|
|
return [s.strip() for s in env_val.split(",") if s.strip()] or ["en"]
|
|
cfg = self._file_config.get("entity_languages")
|
|
if isinstance(cfg, list) and cfg:
|
|
return [str(s) for s in cfg]
|
|
return ["en"]
|
|
|
|
def set_entity_languages(self, languages):
|
|
"""Persist the entity-detection language list to ``config.json``."""
|
|
normalized = [s.strip() for s in languages if s and s.strip()]
|
|
if not normalized:
|
|
normalized = ["en"]
|
|
self._file_config["entity_languages"] = normalized
|
|
self._config_dir.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
with open(self._config_file, "w", encoding="utf-8") as f:
|
|
json.dump(self._file_config, f, indent=2, ensure_ascii=False)
|
|
except OSError:
|
|
pass
|
|
try:
|
|
self._config_file.chmod(0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
return normalized
|
|
|
|
@property
|
|
def embedding_device(self):
|
|
"""Hardware device for the ONNX embedding model.
|
|
|
|
Values: ``"auto"`` (default), ``"cpu"``, ``"cuda"``, ``"coreml"``,
|
|
``"dml"``. Read from env ``MEMPALACE_EMBEDDING_DEVICE`` first, then
|
|
``embedding_device`` in ``config.json``, then ``"auto"``.
|
|
|
|
``auto`` resolves to the first available accelerator at runtime via
|
|
:mod:`mempalace.embedding`; requesting an unavailable accelerator
|
|
logs a warning and falls back to CPU.
|
|
"""
|
|
env_val = os.environ.get("MEMPALACE_EMBEDDING_DEVICE")
|
|
if env_val:
|
|
return env_val.strip().lower()
|
|
return str(self._file_config.get("embedding_device", "auto")).strip().lower()
|
|
|
|
@property
|
|
def topic_tunnel_min_count(self):
|
|
"""Minimum number of overlapping confirmed topics required to create
|
|
a cross-wing tunnel between two wings.
|
|
|
|
Default is ``1`` — any single shared topic produces a tunnel. Bump
|
|
to ``2+`` if your projects share lots of common-tech labels (Python,
|
|
Docker, Git) and you want only meaningfully overlapping wings to
|
|
link. Reads ``MEMPALACE_TOPIC_TUNNEL_MIN_COUNT`` env first, then the
|
|
config-file value, then ``1``.
|
|
"""
|
|
env_val = os.environ.get("MEMPALACE_TOPIC_TUNNEL_MIN_COUNT")
|
|
if env_val:
|
|
try:
|
|
parsed = int(env_val)
|
|
if parsed >= 1:
|
|
return parsed
|
|
except ValueError:
|
|
pass
|
|
cfg_val = self._file_config.get("topic_tunnel_min_count")
|
|
try:
|
|
parsed = int(cfg_val) if cfg_val is not None else 1
|
|
except (TypeError, ValueError):
|
|
parsed = 1
|
|
return max(1, parsed)
|
|
|
|
@property
|
|
def hook_silent_save(self):
|
|
"""Whether the stop hook saves directly (True) or blocks for MCP calls (False)."""
|
|
return self._file_config.get("hooks", {}).get("silent_save", True)
|
|
|
|
@property
|
|
def hook_desktop_toast(self):
|
|
"""Whether the stop hook shows a desktop notification via notify-send."""
|
|
return self._file_config.get("hooks", {}).get("desktop_toast", False)
|
|
|
|
def set_hook_setting(self, key: str, value: bool):
|
|
"""Update a hook setting and write config to disk."""
|
|
if "hooks" not in self._file_config:
|
|
self._file_config["hooks"] = {}
|
|
self._file_config["hooks"][key] = value
|
|
try:
|
|
with open(self._config_file, "w", encoding="utf-8") as f:
|
|
json.dump(self._file_config, f, indent=2, ensure_ascii=False)
|
|
except OSError:
|
|
pass
|
|
|
|
def init(self):
|
|
"""Create config directory and write default config.json if it doesn't exist."""
|
|
self._config_dir.mkdir(parents=True, exist_ok=True)
|
|
# Restrict directory permissions to owner only (Unix)
|
|
try:
|
|
self._config_dir.chmod(0o700)
|
|
except (OSError, NotImplementedError):
|
|
pass # Windows doesn't support Unix permissions
|
|
if not self._config_file.exists():
|
|
default_config = {
|
|
"palace_path": DEFAULT_PALACE_PATH,
|
|
"collection_name": DEFAULT_COLLECTION_NAME,
|
|
"topic_wings": DEFAULT_TOPIC_WINGS,
|
|
"hall_keywords": DEFAULT_HALL_KEYWORDS,
|
|
}
|
|
with open(self._config_file, "w") as f:
|
|
json.dump(default_config, f, indent=2)
|
|
# Restrict config file to owner read/write only
|
|
try:
|
|
self._config_file.chmod(0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
return self._config_file
|
|
|
|
def save_people_map(self, people_map):
|
|
"""Write people_map.json to config directory.
|
|
|
|
Args:
|
|
people_map: Dict mapping name variants to canonical names.
|
|
"""
|
|
self._config_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(self._people_map_file, "w") as f:
|
|
json.dump(people_map, f, indent=2)
|
|
try:
|
|
self._people_map_file.chmod(0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
return self._people_map_file
|