fix: harden palace security checks
Agent-Logs-Url: https://github.com/MemPalace/mempalace/sessions/775f2fc4-3051-462e-8586-6d694b55da0d Co-authored-by: igorls <4753812+igorls@users.noreply.github.com>
This commit is contained in:
committed by
Igor Lins e Silva
parent
bb577bb41f
commit
c478dfa173
+40
-1
@@ -423,10 +423,24 @@ def test_cmd_repair_no_palace(mock_config_cls, tmp_path, capsys):
|
||||
assert "No palace found" in out
|
||||
|
||||
|
||||
@patch("mempalace.cli.MempalaceConfig")
|
||||
def test_cmd_repair_requires_palace_database(mock_config_cls, tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
mock_config_cls.return_value.palace_path = str(palace_dir)
|
||||
args = argparse.Namespace(palace=None)
|
||||
mock_chromadb = MagicMock()
|
||||
with patch.dict("sys.modules", {"chromadb": mock_chromadb}):
|
||||
cmd_repair(args)
|
||||
out = capsys.readouterr().out
|
||||
assert "No palace database found" in out
|
||||
|
||||
|
||||
@patch("mempalace.cli.MempalaceConfig")
|
||||
def test_cmd_repair_error_reading(mock_config_cls, tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
(palace_dir / "chroma.sqlite3").write_text("db")
|
||||
mock_config_cls.return_value.palace_path = str(palace_dir)
|
||||
args = argparse.Namespace(palace=None)
|
||||
mock_chromadb = MagicMock()
|
||||
@@ -443,6 +457,7 @@ def test_cmd_repair_error_reading(mock_config_cls, tmp_path, capsys):
|
||||
def test_cmd_repair_zero_drawers(mock_config_cls, tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
(palace_dir / "chroma.sqlite3").write_text("db")
|
||||
mock_config_cls.return_value.palace_path = str(palace_dir)
|
||||
args = argparse.Namespace(palace=None)
|
||||
mock_chromadb = MagicMock()
|
||||
@@ -461,8 +476,9 @@ def test_cmd_repair_zero_drawers(mock_config_cls, tmp_path, capsys):
|
||||
def test_cmd_repair_success(mock_config_cls, tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
(palace_dir / "chroma.sqlite3").write_text("db")
|
||||
mock_config_cls.return_value.palace_path = str(palace_dir)
|
||||
args = argparse.Namespace(palace=None)
|
||||
args = argparse.Namespace(palace=None, yes=True)
|
||||
mock_chromadb = MagicMock()
|
||||
mock_col = MagicMock()
|
||||
mock_col.count.return_value = 2
|
||||
@@ -483,6 +499,29 @@ def test_cmd_repair_success(mock_config_cls, tmp_path, capsys):
|
||||
assert "2 drawers rebuilt" in out
|
||||
|
||||
|
||||
@patch("mempalace.cli.MempalaceConfig")
|
||||
def test_cmd_repair_aborts_without_confirmation(mock_config_cls, tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
(palace_dir / "chroma.sqlite3").write_text("db")
|
||||
mock_config_cls.return_value.palace_path = str(palace_dir)
|
||||
args = argparse.Namespace(palace=None)
|
||||
mock_chromadb = MagicMock()
|
||||
mock_col = MagicMock()
|
||||
mock_col.count.return_value = 1
|
||||
mock_client = MagicMock()
|
||||
mock_client.get_collection.return_value = mock_col
|
||||
mock_chromadb.PersistentClient.return_value = mock_client
|
||||
with (
|
||||
patch.dict("sys.modules", {"chromadb": mock_chromadb}),
|
||||
patch("builtins.input", return_value="n"),
|
||||
):
|
||||
cmd_repair(args)
|
||||
out = capsys.readouterr().out
|
||||
assert "Aborted." in out
|
||||
mock_client.create_collection.assert_not_called()
|
||||
|
||||
|
||||
# ── cmd_compress ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@@ -8,6 +8,8 @@ via monkeypatch to avoid touching real data.
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _patch_mcp_server(monkeypatch, config, kg):
|
||||
"""Patch the mcp_server module globals to use test fixtures."""
|
||||
@@ -311,6 +313,59 @@ class TestSearchTool:
|
||||
result_loose = tool_search(query="JWT", max_distance=0.01, min_similarity=999.0)
|
||||
assert len(result_strict["results"]) <= len(result_loose["results"])
|
||||
|
||||
def test_list_rooms_rejects_invalid_wing(self, monkeypatch, config, kg):
|
||||
_patch_mcp_server(monkeypatch, config, kg)
|
||||
from mempalace import mcp_server
|
||||
|
||||
monkeypatch.setattr(mcp_server, "_get_collection", lambda *args, **kwargs: pytest.fail())
|
||||
|
||||
result = mcp_server.tool_list_rooms(wing="../etc/passwd")
|
||||
assert "error" in result
|
||||
|
||||
def test_search_rejects_invalid_room(self, monkeypatch, config, kg):
|
||||
_patch_mcp_server(monkeypatch, config, kg)
|
||||
from mempalace import mcp_server
|
||||
|
||||
monkeypatch.setattr(mcp_server, "search_memories", lambda *args, **kwargs: pytest.fail())
|
||||
|
||||
result = mcp_server.tool_search(query="JWT", room="../backend")
|
||||
assert "error" in result
|
||||
|
||||
def test_list_drawers_rejects_invalid_wing(self, monkeypatch, config, kg):
|
||||
_patch_mcp_server(monkeypatch, config, kg)
|
||||
from mempalace import mcp_server
|
||||
|
||||
monkeypatch.setattr(mcp_server, "_get_collection", lambda *args, **kwargs: pytest.fail())
|
||||
|
||||
result = mcp_server.tool_list_drawers(wing="../notes")
|
||||
assert "error" in result
|
||||
|
||||
def test_find_tunnels_rejects_invalid_wing(self, monkeypatch, config, kg):
|
||||
_patch_mcp_server(monkeypatch, config, kg)
|
||||
from mempalace import mcp_server
|
||||
|
||||
monkeypatch.setattr(mcp_server, "_get_collection", lambda *args, **kwargs: pytest.fail())
|
||||
|
||||
result = mcp_server.tool_find_tunnels(wing_a="../project")
|
||||
assert "error" in result
|
||||
|
||||
def test_wal_redacts_sensitive_fields(self, monkeypatch, config, kg, tmp_path):
|
||||
_patch_mcp_server(monkeypatch, config, kg)
|
||||
from mempalace import mcp_server
|
||||
|
||||
wal_file = tmp_path / "write_log.jsonl"
|
||||
monkeypatch.setattr(mcp_server, "_WAL_FILE", wal_file)
|
||||
|
||||
mcp_server._wal_log(
|
||||
"test",
|
||||
{"content": "secret note", "query": "private search", "safe": "ok"},
|
||||
)
|
||||
|
||||
entry = json.loads(wal_file.read_text().strip())
|
||||
assert entry["params"]["content"].startswith("[REDACTED")
|
||||
assert entry["params"]["query"].startswith("[REDACTED")
|
||||
assert entry["params"]["safe"] == "ok"
|
||||
|
||||
|
||||
# ── Write Tools ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
"""Tests for destructive-operation safety in mempalace.migrate."""
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from mempalace.migrate import migrate
|
||||
|
||||
|
||||
def test_migrate_requires_palace_database(tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
|
||||
result = migrate(str(palace_dir))
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert result is False
|
||||
assert "No palace database found" in out
|
||||
|
||||
|
||||
def test_migrate_aborts_without_confirmation(tmp_path, capsys):
|
||||
palace_dir = tmp_path / "palace"
|
||||
palace_dir.mkdir()
|
||||
(palace_dir / "chroma.sqlite3").write_text("db")
|
||||
|
||||
mock_chromadb = SimpleNamespace(__version__="0.6.0", PersistentClient=MagicMock())
|
||||
mock_chromadb.PersistentClient.side_effect = Exception("unreadable")
|
||||
|
||||
with (
|
||||
patch.dict("sys.modules", {"chromadb": mock_chromadb}),
|
||||
patch("mempalace.migrate.detect_chromadb_version", return_value="0.5.x"),
|
||||
patch(
|
||||
"mempalace.migrate.extract_drawers_from_sqlite",
|
||||
return_value=[{"id": "id1", "document": "doc", "metadata": {"wing": "w", "room": "r"}}],
|
||||
),
|
||||
patch("builtins.input", return_value="n"),
|
||||
patch("mempalace.migrate.shutil.copytree") as mock_copytree,
|
||||
patch("mempalace.migrate.shutil.rmtree") as mock_rmtree,
|
||||
):
|
||||
result = migrate(str(palace_dir))
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert result is False
|
||||
assert "Aborted." in out
|
||||
mock_copytree.assert_not_called()
|
||||
mock_rmtree.assert_not_called()
|
||||
@@ -123,6 +123,9 @@ class TestTailTruncation:
|
||||
class TestLengthGuards:
|
||||
"""Verify output length constraints."""
|
||||
|
||||
def test_max_query_length_reduced(self):
|
||||
assert MAX_QUERY_LENGTH == 250
|
||||
|
||||
def test_output_never_exceeds_max(self):
|
||||
# Very long question sentence
|
||||
long_question = "a" * 1000 + "?"
|
||||
|
||||
Reference in New Issue
Block a user