79c9c0e517
sanitize_name rejects commas, colons, parentheses, and slashes — characters that commonly appear in knowledge graph subject/object values. Adds sanitize_kg_value for KG entity fields (subject, object, entity) while keeping sanitize_name for predicates and wing/room names.
255 lines
8.3 KiB
Python
255 lines
8.3 KiB
Python
"""
|
|
MemPalace configuration system.
|
|
|
|
Priority: env vars > config file (~/.mempalace/config.json) > defaults
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
|
|
|
|
# ── Input validation ──────────────────────────────────────────────────────────
|
|
# Shared sanitizers for wing/room/entity names. Prevents path traversal,
|
|
# excessively long strings, and special characters that could cause issues
|
|
# in file paths, SQLite, or ChromaDB metadata.
|
|
|
|
MAX_NAME_LENGTH = 128
|
|
_SAFE_NAME_RE = re.compile(r"^(?:[^\W_]|[^\W_][\w .'-]{0,126}[^\W_])$")
|
|
|
|
|
|
def sanitize_name(value: str, field_name: str = "name") -> str:
|
|
"""Validate and sanitize a wing/room/entity name.
|
|
|
|
Raises ValueError if the name is invalid.
|
|
"""
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError(f"{field_name} must be a non-empty string")
|
|
|
|
value = value.strip()
|
|
|
|
if len(value) > MAX_NAME_LENGTH:
|
|
raise ValueError(f"{field_name} exceeds maximum length of {MAX_NAME_LENGTH} characters")
|
|
|
|
# Block path traversal
|
|
if ".." in value or "/" in value or "\\" in value:
|
|
raise ValueError(f"{field_name} contains invalid path characters")
|
|
|
|
# Block null bytes
|
|
if "\x00" in value:
|
|
raise ValueError(f"{field_name} contains null bytes")
|
|
|
|
# Enforce safe character set
|
|
if not _SAFE_NAME_RE.match(value):
|
|
raise ValueError(f"{field_name} contains invalid characters")
|
|
|
|
return value
|
|
|
|
|
|
def sanitize_kg_value(value: str, field_name: str = "value") -> str:
|
|
"""Validate a knowledge-graph entity name (subject or object).
|
|
|
|
More permissive than sanitize_name — allows punctuation like commas,
|
|
colons, and parentheses that are common in natural-language KG values.
|
|
Only blocks null bytes and over-length strings.
|
|
|
|
Not used for wing/room names (which have filesystem constraints) or
|
|
predicates (which should be simple relationship identifiers).
|
|
"""
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError(f"{field_name} must be a non-empty string")
|
|
|
|
value = value.strip()
|
|
|
|
if len(value) > MAX_NAME_LENGTH:
|
|
raise ValueError(f"{field_name} exceeds maximum length of {MAX_NAME_LENGTH} characters")
|
|
|
|
if "\x00" in value:
|
|
raise ValueError(f"{field_name} contains null bytes")
|
|
|
|
return value
|
|
|
|
|
|
def sanitize_content(value: str, max_length: int = 100_000) -> str:
|
|
"""Validate drawer/diary content length."""
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError("content must be a non-empty string")
|
|
if len(value) > max_length:
|
|
raise ValueError(f"content exceeds maximum length of {max_length} characters")
|
|
if "\x00" in value:
|
|
raise ValueError("content contains null bytes")
|
|
return value
|
|
|
|
|
|
DEFAULT_PALACE_PATH = os.path.expanduser("~/.mempalace/palace")
|
|
DEFAULT_COLLECTION_NAME = "mempalace_drawers"
|
|
|
|
DEFAULT_TOPIC_WINGS = [
|
|
"emotions",
|
|
"consciousness",
|
|
"memory",
|
|
"technical",
|
|
"identity",
|
|
"family",
|
|
"creative",
|
|
]
|
|
|
|
DEFAULT_HALL_KEYWORDS = {
|
|
"emotions": [
|
|
"scared",
|
|
"afraid",
|
|
"worried",
|
|
"happy",
|
|
"sad",
|
|
"love",
|
|
"hate",
|
|
"feel",
|
|
"cry",
|
|
"tears",
|
|
],
|
|
"consciousness": [
|
|
"consciousness",
|
|
"conscious",
|
|
"aware",
|
|
"real",
|
|
"genuine",
|
|
"soul",
|
|
"exist",
|
|
"alive",
|
|
],
|
|
"memory": ["memory", "remember", "forget", "recall", "archive", "palace", "store"],
|
|
"technical": [
|
|
"code",
|
|
"python",
|
|
"script",
|
|
"bug",
|
|
"error",
|
|
"function",
|
|
"api",
|
|
"database",
|
|
"server",
|
|
],
|
|
"identity": ["identity", "name", "who am i", "persona", "self"],
|
|
"family": ["family", "kids", "children", "daughter", "son", "parent", "mother", "father"],
|
|
"creative": ["game", "gameplay", "player", "app", "design", "art", "music", "story"],
|
|
}
|
|
|
|
|
|
class MempalaceConfig:
|
|
"""Configuration manager for MemPalace.
|
|
|
|
Load order: env vars > config file > defaults.
|
|
"""
|
|
|
|
def __init__(self, config_dir=None):
|
|
"""Initialize config.
|
|
|
|
Args:
|
|
config_dir: Override config directory (useful for testing).
|
|
Defaults to ~/.mempalace.
|
|
"""
|
|
self._config_dir = (
|
|
Path(config_dir) if config_dir else Path(os.path.expanduser("~/.mempalace"))
|
|
)
|
|
self._config_file = self._config_dir / "config.json"
|
|
self._people_map_file = self._config_dir / "people_map.json"
|
|
self._file_config = {}
|
|
|
|
if self._config_file.exists():
|
|
try:
|
|
with open(self._config_file, "r") as f:
|
|
self._file_config = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
self._file_config = {}
|
|
|
|
@property
|
|
def palace_path(self):
|
|
"""Path to the memory palace data directory."""
|
|
env_val = os.environ.get("MEMPALACE_PALACE_PATH") or os.environ.get("MEMPAL_PALACE_PATH")
|
|
if env_val:
|
|
return env_val
|
|
return self._file_config.get("palace_path", DEFAULT_PALACE_PATH)
|
|
|
|
@property
|
|
def collection_name(self):
|
|
"""ChromaDB collection name."""
|
|
return self._file_config.get("collection_name", DEFAULT_COLLECTION_NAME)
|
|
|
|
@property
|
|
def people_map(self):
|
|
"""Mapping of name variants to canonical names."""
|
|
if self._people_map_file.exists():
|
|
try:
|
|
with open(self._people_map_file, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return self._file_config.get("people_map", {})
|
|
|
|
@property
|
|
def topic_wings(self):
|
|
"""List of topic wing names."""
|
|
return self._file_config.get("topic_wings", DEFAULT_TOPIC_WINGS)
|
|
|
|
@property
|
|
def hall_keywords(self):
|
|
"""Mapping of hall names to keyword lists."""
|
|
return self._file_config.get("hall_keywords", DEFAULT_HALL_KEYWORDS)
|
|
|
|
@property
|
|
def hook_silent_save(self):
|
|
"""Whether the stop hook saves directly (True) or blocks for MCP calls (False)."""
|
|
return self._file_config.get("hooks", {}).get("silent_save", True)
|
|
|
|
@property
|
|
def hook_desktop_toast(self):
|
|
"""Whether the stop hook shows a desktop notification via notify-send."""
|
|
return self._file_config.get("hooks", {}).get("desktop_toast", False)
|
|
|
|
def set_hook_setting(self, key: str, value: bool):
|
|
"""Update a hook setting and write config to disk."""
|
|
if "hooks" not in self._file_config:
|
|
self._file_config["hooks"] = {}
|
|
self._file_config["hooks"][key] = value
|
|
try:
|
|
with open(self._config_file, "w", encoding="utf-8") as f:
|
|
json.dump(self._file_config, f, indent=2, ensure_ascii=False)
|
|
except OSError:
|
|
pass
|
|
|
|
def init(self):
|
|
"""Create config directory and write default config.json if it doesn't exist."""
|
|
self._config_dir.mkdir(parents=True, exist_ok=True)
|
|
# Restrict directory permissions to owner only (Unix)
|
|
try:
|
|
self._config_dir.chmod(0o700)
|
|
except (OSError, NotImplementedError):
|
|
pass # Windows doesn't support Unix permissions
|
|
if not self._config_file.exists():
|
|
default_config = {
|
|
"palace_path": DEFAULT_PALACE_PATH,
|
|
"collection_name": DEFAULT_COLLECTION_NAME,
|
|
"topic_wings": DEFAULT_TOPIC_WINGS,
|
|
"hall_keywords": DEFAULT_HALL_KEYWORDS,
|
|
}
|
|
with open(self._config_file, "w") as f:
|
|
json.dump(default_config, f, indent=2)
|
|
# Restrict config file to owner read/write only
|
|
try:
|
|
self._config_file.chmod(0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
return self._config_file
|
|
|
|
def save_people_map(self, people_map):
|
|
"""Write people_map.json to config directory.
|
|
|
|
Args:
|
|
people_map: Dict mapping name variants to canonical names.
|
|
"""
|
|
self._config_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(self._people_map_file, "w") as f:
|
|
json.dump(people_map, f, indent=2)
|
|
return self._people_map_file
|