From e24d8ca733de7f62604c5d83337be2b509a6b7f5 Mon Sep 17 00:00:00 2001 From: Tal Muskal Date: Wed, 8 Apr 2026 21:07:03 +0300 Subject: [PATCH] test: expand coverage to 70%, fix mcp_server CI crash (threshold 60%) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add/expand tests for normalize (39%→97%), searcher (39%→100%), layers (28%→97%), split_mega_files (34%→72%). Fix mcp_server.py parse_args→parse_known_args to prevent SystemExit when imported during pytest (CI was crashing on all test jobs). Co-Authored-By: Claude Opus 4.6 --- mempalace/mcp_server.py | 3 +- pyproject.toml | 2 +- tests/test_layers.py | 631 ++++++++++++++++++++++++++++++++- tests/test_normalize.py | 534 ++++++++++++++++++++++++++-- tests/test_searcher.py | 86 ++++- tests/test_split_mega_files.py | 244 +++++++++++++ 6 files changed, 1458 insertions(+), 42 deletions(-) diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index b447249..2c1bbe6 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -44,7 +44,8 @@ def _parse_args(): metavar="PATH", help="Path to the palace directory (overrides config file and env var)", ) - return parser.parse_args() + args, _ = parser.parse_known_args() + return args _args = _parse_args() diff --git a/pyproject.toml b/pyproject.toml index d410a1a..8eadfc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ testpaths = ["tests"] source = ["mempalace"] [tool.coverage.report] -fail_under = 50 +fail_under = 60 show_missing = true exclude_lines = [ "if __name__", diff --git a/tests/test_layers.py b/tests/test_layers.py index 1d06140..46b60e9 100644 --- a/tests/test_layers.py +++ b/tests/test_layers.py @@ -1,9 +1,9 @@ -"""Tests for mempalace.layers — focused on Layer0.""" +"""Tests for mempalace.layers — Layer0, Layer1, Layer2, Layer3, MemoryStack.""" import os -from unittest.mock import patch +from unittest.mock import MagicMock, patch -from mempalace.layers import Layer0 +from mempalace.layers import Layer0, Layer1, Layer2, Layer3, MemoryStack # ── Layer0 — with identity file ───────────────────────────────────────── @@ -23,10 +23,8 @@ def test_layer0_caches_text(tmp_path): identity_file.write_text("Hello world") layer = Layer0(identity_path=str(identity_file)) first = layer.render() - # Modify file after first read identity_file.write_text("Changed content") second = layer.render() - # Should return cached version assert first == second assert second == "Hello world" @@ -41,7 +39,7 @@ def test_layer0_missing_file_returns_default(tmp_path): def test_layer0_token_estimate(tmp_path): identity_file = tmp_path / "identity.txt" - content = "A" * 400 # 400 chars ~ 100 tokens + content = "A" * 400 identity_file.write_text(content) layer = Layer0(identity_path=str(identity_file)) estimate = layer.token_estimate() @@ -72,51 +70,650 @@ def test_layer0_default_path(): # ── Layer1 — mocked chromadb ──────────────────────────────────────────── +def _mock_chromadb_for_layer(docs, metas, monkeypatch=None): + """Return a mock PersistentClient whose collection.get returns docs/metas.""" + mock_col = MagicMock() + # First batch returns data, second batch returns empty (end of pagination) + mock_col.get.side_effect = [ + {"documents": docs, "metadatas": metas}, + {"documents": [], "metadatas": []}, + ] + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + return mock_client + + def test_layer1_no_palace(): """Layer1 returns helpful message when no palace exists.""" with patch("mempalace.layers.MempalaceConfig") as mock_cfg: mock_cfg.return_value.palace_path = "/nonexistent/palace" - from mempalace.layers import Layer1 - layer = Layer1(palace_path="/nonexistent/palace") result = layer.generate() assert "No palace found" in result or "No memories" in result +def test_layer1_generates_essential_story(): + docs = [ + "Important memory about project decisions", + "Key architectural choice for the backend", + ] + metas = [ + {"room": "decisions", "source_file": "meeting.txt", "importance": 5}, + {"room": "architecture", "source_file": "design.txt", "importance": 4}, + ] + mock_client = _mock_chromadb_for_layer(docs, metas) + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake") + result = layer.generate() + + assert "ESSENTIAL STORY" in result + assert "project decisions" in result + + +def test_layer1_empty_palace(): + mock_col = MagicMock() + mock_col.get.return_value = {"documents": [], "metadatas": []} + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake") + result = layer.generate() + + assert "No memories" in result + + +def test_layer1_with_wing_filter(): + docs = ["Memory about project X"] + metas = [{"room": "general", "source_file": "x.txt", "importance": 3}] + mock_client = _mock_chromadb_for_layer(docs, metas) + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake", wing="project_x") + result = layer.generate() + + assert "ESSENTIAL STORY" in result + # Verify wing filter was passed + call_kwargs = mock_client.get_collection.return_value.get.call_args_list[0][1] + assert call_kwargs.get("where") == {"wing": "project_x"} + + +def test_layer1_truncates_long_snippets(): + docs = ["A" * 300] + metas = [{"room": "general", "source_file": "long.txt"}] + mock_client = _mock_chromadb_for_layer(docs, metas) + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake") + result = layer.generate() + + assert "..." in result + + +def test_layer1_respects_max_chars(): + """L1 stops adding entries once MAX_CHARS is reached.""" + docs = [f"Memory number {i} with substantial content padding here" for i in range(30)] + metas = [{"room": "general", "source_file": f"f{i}.txt", "importance": 5} for i in range(30)] + mock_client = _mock_chromadb_for_layer(docs, metas) + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake") + layer.MAX_CHARS = 200 # Very low cap to trigger truncation + result = layer.generate() + + assert "more in L3 search" in result + + +def test_layer1_importance_from_various_keys(): + """Layer1 tries importance, emotional_weight, weight keys.""" + docs = ["mem1", "mem2", "mem3"] + metas = [ + {"room": "r", "emotional_weight": 5}, + {"room": "r", "weight": 1}, + {"room": "r"}, # no weight key, defaults to 3 + ] + mock_client = _mock_chromadb_for_layer(docs, metas) + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake") + result = layer.generate() + + assert "ESSENTIAL STORY" in result + + +def test_layer1_batch_exception_breaks(): + """If col.get raises on a batch, loop breaks gracefully.""" + mock_col = MagicMock() + mock_col.get.side_effect = [ + {"documents": ["doc1"], "metadatas": [{"room": "r"}]}, + RuntimeError("batch error"), + ] + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer1(palace_path="/fake") + result = layer.generate() + + assert "ESSENTIAL STORY" in result + + # ── Layer2 — mocked chromadb ──────────────────────────────────────────── def test_layer2_no_palace(): - """Layer2 returns message when no palace exists.""" with patch("mempalace.layers.MempalaceConfig") as mock_cfg: mock_cfg.return_value.palace_path = "/nonexistent/palace" - from mempalace.layers import Layer2 - layer = Layer2(palace_path="/nonexistent/palace") result = layer.retrieve(wing="test") assert "No palace found" in result +def test_layer2_retrieve_with_wing(): + mock_col = MagicMock() + mock_col.get.return_value = { + "documents": ["Some memory about the project"], + "metadatas": [{"room": "backend", "source_file": "notes.txt"}], + } + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + result = layer.retrieve(wing="project") + + assert "ON-DEMAND" in result + assert "memory about the project" in result + + +def test_layer2_retrieve_with_room(): + mock_col = MagicMock() + mock_col.get.return_value = { + "documents": ["Backend architecture notes"], + "metadatas": [{"room": "architecture", "source_file": "arch.txt"}], + } + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + result = layer.retrieve(room="architecture") + + assert "ON-DEMAND" in result + + +def test_layer2_retrieve_wing_and_room(): + mock_col = MagicMock() + mock_col.get.return_value = { + "documents": ["Filtered result"], + "metadatas": [{"room": "backend", "source_file": "x.txt"}], + } + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + result = layer.retrieve(wing="proj", room="backend") + + assert "ON-DEMAND" in result + call_kwargs = mock_col.get.call_args[1] + assert "$and" in call_kwargs.get("where", {}) + + +def test_layer2_retrieve_empty(): + mock_col = MagicMock() + mock_col.get.return_value = {"documents": [], "metadatas": []} + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + result = layer.retrieve(wing="missing") + + assert "No drawers found" in result + + +def test_layer2_retrieve_no_filter(): + mock_col = MagicMock() + mock_col.get.return_value = {"documents": [], "metadatas": []} + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + layer.retrieve() + + # No where filter should be passed + call_kwargs = mock_col.get.call_args[1] + assert "where" not in call_kwargs + + +def test_layer2_retrieve_error(): + mock_col = MagicMock() + mock_col.get.side_effect = RuntimeError("db error") + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + result = layer.retrieve(wing="test") + + assert "Retrieval error" in result + + +def test_layer2_truncates_long_snippets(): + mock_col = MagicMock() + mock_col.get.return_value = { + "documents": ["B" * 400], + "metadatas": [{"room": "r", "source_file": "s.txt"}], + } + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer2(palace_path="/fake") + result = layer.retrieve(wing="test") + + assert "..." in result + + # ── Layer3 — mocked chromadb ──────────────────────────────────────────── +def _mock_query_results(docs, metas, dists): + return { + "documents": [docs], + "metadatas": [metas], + "distances": [dists], + } + + def test_layer3_no_palace(): - """Layer3 returns message when no palace exists.""" with patch("mempalace.layers.MempalaceConfig") as mock_cfg: mock_cfg.return_value.palace_path = "/nonexistent/palace" - from mempalace.layers import Layer3 - layer = Layer3(palace_path="/nonexistent/palace") result = layer.search("test query") assert "No palace found" in result def test_layer3_search_raw_no_palace(): - """Layer3.search_raw returns empty list when no palace exists.""" with patch("mempalace.layers.MempalaceConfig") as mock_cfg: mock_cfg.return_value.palace_path = "/nonexistent/palace" - from mempalace.layers import Layer3 - layer = Layer3(palace_path="/nonexistent/palace") result = layer.search_raw("test query") assert result == [] + + +def test_layer3_search_with_results(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["Found this important memory"], + [{"wing": "project", "room": "backend", "source_file": "notes.txt"}], + [0.2], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + result = layer.search("important") + + assert "SEARCH RESULTS" in result + assert "important memory" in result + assert "sim=0.8" in result + + +def test_layer3_search_no_results(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results([], [], []) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + result = layer.search("nothing") + + assert "No results found" in result + + +def test_layer3_search_with_wing_filter(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["result"], + [{"wing": "proj", "room": "r"}], + [0.1], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + layer.search("q", wing="proj") + + call_kwargs = mock_col.query.call_args[1] + assert call_kwargs["where"] == {"wing": "proj"} + + +def test_layer3_search_with_room_filter(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["result"], + [{"wing": "w", "room": "backend"}], + [0.1], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + layer.search("q", room="backend") + + call_kwargs = mock_col.query.call_args[1] + assert call_kwargs["where"] == {"room": "backend"} + + +def test_layer3_search_with_wing_and_room(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["result"], + [{"wing": "proj", "room": "backend"}], + [0.1], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + layer.search("q", wing="proj", room="backend") + + call_kwargs = mock_col.query.call_args[1] + assert "$and" in call_kwargs["where"] + + +def test_layer3_search_error(): + mock_col = MagicMock() + mock_col.query.side_effect = RuntimeError("search failed") + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + result = layer.search("q") + + assert "Search error" in result + + +def test_layer3_search_truncates_long_docs(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["C" * 400], + [{"wing": "w", "room": "r", "source_file": "s.txt"}], + [0.1], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + result = layer.search("q") + + assert "..." in result + + +def test_layer3_search_raw_returns_dicts(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["doc text"], + [{"wing": "proj", "room": "backend", "source_file": "f.txt"}], + [0.3], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + hits = layer.search_raw("q") + + assert len(hits) == 1 + assert hits[0]["text"] == "doc text" + assert hits[0]["wing"] == "proj" + assert hits[0]["similarity"] == 0.7 + assert "metadata" in hits[0] + + +def test_layer3_search_raw_with_filters(): + mock_col = MagicMock() + mock_col.query.return_value = _mock_query_results( + ["doc"], + [{"wing": "w", "room": "r"}], + [0.1], + ) + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + layer.search_raw("q", wing="w", room="r") + + call_kwargs = mock_col.query.call_args[1] + assert "$and" in call_kwargs["where"] + + +def test_layer3_search_raw_error(): + mock_col = MagicMock() + mock_col.query.side_effect = RuntimeError("fail") + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + layer = Layer3(palace_path="/fake") + result = layer.search_raw("q") + + assert result == [] + + +# ── MemoryStack ───────────────────────────────────────────────────────── + + +def test_memory_stack_wake_up(tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Atlas.") + + with patch("mempalace.layers.MempalaceConfig") as mock_cfg: + mock_cfg.return_value.palace_path = "/nonexistent" + stack = MemoryStack( + palace_path="/nonexistent", + identity_path=str(identity_file), + ) + result = stack.wake_up() + + assert "Atlas" in result + # L1 will say no palace found + assert "No palace" in result or "No memories" in result + + +def test_memory_stack_wake_up_with_wing(tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Atlas.") + + with patch("mempalace.layers.MempalaceConfig") as mock_cfg: + mock_cfg.return_value.palace_path = "/nonexistent" + stack = MemoryStack( + palace_path="/nonexistent", + identity_path=str(identity_file), + ) + result = stack.wake_up(wing="my_project") + + assert stack.l1.wing == "my_project" + assert "Atlas" in result + + +def test_memory_stack_recall(tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Atlas.") + + with patch("mempalace.layers.MempalaceConfig") as mock_cfg: + mock_cfg.return_value.palace_path = "/nonexistent" + stack = MemoryStack( + palace_path="/nonexistent", + identity_path=str(identity_file), + ) + result = stack.recall(wing="test") + + assert "No palace found" in result + + +def test_memory_stack_search(tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Atlas.") + + with patch("mempalace.layers.MempalaceConfig") as mock_cfg: + mock_cfg.return_value.palace_path = "/nonexistent" + stack = MemoryStack( + palace_path="/nonexistent", + identity_path=str(identity_file), + ) + result = stack.search("test query") + + assert "No palace found" in result + + +def test_memory_stack_status(tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Atlas.") + + with patch("mempalace.layers.MempalaceConfig") as mock_cfg: + mock_cfg.return_value.palace_path = "/nonexistent" + stack = MemoryStack( + palace_path="/nonexistent", + identity_path=str(identity_file), + ) + result = stack.status() + + assert result["palace_path"] == "/nonexistent" + assert result["total_drawers"] == 0 + assert "L0_identity" in result + assert "L1_essential" in result + assert "L2_on_demand" in result + assert "L3_deep_search" in result + + +def test_memory_stack_status_with_palace(tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Atlas.") + + mock_col = MagicMock() + mock_col.count.return_value = 42 + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with ( + patch("mempalace.layers.MempalaceConfig") as mock_cfg, + patch("mempalace.layers.chromadb.PersistentClient", return_value=mock_client), + ): + mock_cfg.return_value.palace_path = "/fake" + stack = MemoryStack( + palace_path="/fake", + identity_path=str(identity_file), + ) + result = stack.status() + + assert result["total_drawers"] == 42 + assert result["L0_identity"]["exists"] is True diff --git a/tests/test_normalize.py b/tests/test_normalize.py index c304c9d..d613e58 100644 --- a/tests/test_normalize.py +++ b/tests/test_normalize.py @@ -1,31 +1,525 @@ -import os import json -import tempfile -from mempalace.normalize import normalize +from unittest.mock import patch + +from mempalace.normalize import ( + _extract_content, + _messages_to_transcript, + _try_chatgpt_json, + _try_claude_ai_json, + _try_claude_code_jsonl, + _try_codex_jsonl, + _try_normalize_json, + _try_slack_json, + normalize, +) -def test_plain_text(): - f = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) - f.write("Hello world\nSecond line\n") - f.close() - result = normalize(f.name) +# ── normalize() top-level ────────────────────────────────────────────── + + +def test_plain_text(tmp_path): + f = tmp_path / "plain.txt" + f.write_text("Hello world\nSecond line\n") + result = normalize(str(f)) assert "Hello world" in result - os.unlink(f.name) -def test_claude_json(): +def test_claude_json(tmp_path): data = [{"role": "user", "content": "Hi"}, {"role": "assistant", "content": "Hello"}] - f = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) - json.dump(data, f) - f.close() - result = normalize(f.name) + f = tmp_path / "claude.json" + f.write_text(json.dumps(data)) + result = normalize(str(f)) assert "Hi" in result - os.unlink(f.name) -def test_empty(): - f = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) - f.close() - result = normalize(f.name) +def test_empty(tmp_path): + f = tmp_path / "empty.txt" + f.write_text("") + result = normalize(str(f)) assert result.strip() == "" - os.unlink(f.name) + + +def test_normalize_io_error(): + """normalize raises IOError for unreadable file.""" + try: + normalize("/nonexistent/path/file.txt") + assert False, "Should have raised" + except IOError as e: + assert "Could not read" in str(e) + + +def test_normalize_already_has_markers(tmp_path): + """Files with >= 3 '>' lines pass through unchanged.""" + content = "> question 1\nanswer 1\n> question 2\nanswer 2\n> question 3\nanswer 3\n" + f = tmp_path / "markers.txt" + f.write_text(content) + result = normalize(str(f)) + assert result == content + + +def test_normalize_json_content_detected_by_brace(tmp_path): + """A .txt file starting with [ triggers JSON parsing.""" + data = [{"role": "user", "content": "Hey"}, {"role": "assistant", "content": "Hi there"}] + f = tmp_path / "chat.txt" + f.write_text(json.dumps(data)) + result = normalize(str(f)) + assert "Hey" in result + + +def test_normalize_whitespace_only(tmp_path): + f = tmp_path / "ws.txt" + f.write_text(" \n \n ") + result = normalize(str(f)) + assert result.strip() == "" + + +# ── _extract_content ─────────────────────────────────────────────────── + + +def test_extract_content_string(): + assert _extract_content("hello") == "hello" + + +def test_extract_content_list_of_strings(): + assert _extract_content(["hello", "world"]) == "hello world" + + +def test_extract_content_list_of_blocks(): + blocks = [{"type": "text", "text": "hello"}, {"type": "image", "url": "x"}] + assert _extract_content(blocks) == "hello" + + +def test_extract_content_dict(): + assert _extract_content({"text": "hello"}) == "hello" + + +def test_extract_content_none(): + assert _extract_content(None) == "" + + +def test_extract_content_mixed_list(): + blocks = ["plain", {"type": "text", "text": "block"}] + assert _extract_content(blocks) == "plain block" + + +# ── _try_claude_code_jsonl ───────────────────────────────────────────── + + +def test_claude_code_jsonl_valid(): + lines = [ + json.dumps({"type": "human", "message": {"content": "What is X?"}}), + json.dumps({"type": "assistant", "message": {"content": "X is Y."}}), + ] + result = _try_claude_code_jsonl("\n".join(lines)) + assert result is not None + assert "> What is X?" in result + assert "X is Y." in result + + +def test_claude_code_jsonl_user_type(): + lines = [ + json.dumps({"type": "user", "message": {"content": "Q"}}), + json.dumps({"type": "assistant", "message": {"content": "A"}}), + ] + result = _try_claude_code_jsonl("\n".join(lines)) + assert result is not None + assert "> Q" in result + + +def test_claude_code_jsonl_too_few_messages(): + lines = [json.dumps({"type": "human", "message": {"content": "only one"}})] + result = _try_claude_code_jsonl("\n".join(lines)) + assert result is None + + +def test_claude_code_jsonl_invalid_json_lines(): + lines = [ + "not json", + json.dumps({"type": "human", "message": {"content": "Q"}}), + json.dumps({"type": "assistant", "message": {"content": "A"}}), + ] + result = _try_claude_code_jsonl("\n".join(lines)) + assert result is not None + + +def test_claude_code_jsonl_non_dict_entries(): + lines = [ + json.dumps([1, 2, 3]), + json.dumps({"type": "human", "message": {"content": "Q"}}), + json.dumps({"type": "assistant", "message": {"content": "A"}}), + ] + result = _try_claude_code_jsonl("\n".join(lines)) + assert result is not None + + +# ── _try_codex_jsonl ─────────────────────────────────────────────────── + + +def test_codex_jsonl_valid(): + lines = [ + json.dumps({"type": "session_meta", "payload": {}}), + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}} + ), + ] + result = _try_codex_jsonl("\n".join(lines)) + assert result is not None + assert "> Q" in result + + +def test_codex_jsonl_no_session_meta(): + """Without session_meta, codex parser returns None.""" + lines = [ + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}} + ), + ] + result = _try_codex_jsonl("\n".join(lines)) + assert result is None + + +def test_codex_jsonl_skips_non_event_msg(): + lines = [ + json.dumps({"type": "session_meta"}), + json.dumps({"type": "response_item", "payload": {"type": "user_message", "message": "X"}}), + json.dumps({"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}}), + json.dumps({"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}}), + ] + result = _try_codex_jsonl("\n".join(lines)) + assert result is not None + assert "X" not in result.split("> Q")[0] + + +def test_codex_jsonl_non_string_message(): + lines = [ + json.dumps({"type": "session_meta"}), + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": 123}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}} + ), + ] + result = _try_codex_jsonl("\n".join(lines)) + assert result is not None + + +def test_codex_jsonl_empty_text_skipped(): + lines = [ + json.dumps({"type": "session_meta"}), + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": " "}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}} + ), + ] + result = _try_codex_jsonl("\n".join(lines)) + assert result is not None + + +def test_codex_jsonl_payload_not_dict(): + lines = [ + json.dumps({"type": "session_meta"}), + json.dumps({"type": "event_msg", "payload": "not a dict"}), + json.dumps( + {"type": "event_msg", "payload": {"type": "user_message", "message": "Q"}} + ), + json.dumps( + {"type": "event_msg", "payload": {"type": "agent_message", "message": "A"}} + ), + ] + result = _try_codex_jsonl("\n".join(lines)) + assert result is not None + + +# ── _try_claude_ai_json ─────────────────────────────────────────────── + + +def test_claude_ai_flat_messages(): + data = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there"}, + ] + result = _try_claude_ai_json(data) + assert result is not None + assert "> Hello" in result + + +def test_claude_ai_dict_with_messages_key(): + data = { + "messages": [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi"}, + ] + } + result = _try_claude_ai_json(data) + assert result is not None + + +def test_claude_ai_privacy_export(): + data = [ + { + "chat_messages": [ + {"role": "human", "content": "Q1"}, + {"role": "ai", "content": "A1"}, + ] + } + ] + result = _try_claude_ai_json(data) + assert result is not None + assert "> Q1" in result + + +def test_claude_ai_not_a_list(): + result = _try_claude_ai_json("not a list") + assert result is None + + +def test_claude_ai_too_few_messages(): + data = [{"role": "user", "content": "Hello"}] + result = _try_claude_ai_json(data) + assert result is None + + +def test_claude_ai_dict_with_chat_messages_key(): + data = { + "chat_messages": [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "World"}, + ] + } + result = _try_claude_ai_json(data) + assert result is not None + + +def test_claude_ai_privacy_export_non_dict_items(): + """Non-dict items in privacy export are skipped.""" + data = [ + { + "chat_messages": [ + "not a dict", + {"role": "user", "content": "Q"}, + {"role": "assistant", "content": "A"}, + ] + }, + "not a convo", + ] + result = _try_claude_ai_json(data) + assert result is not None + + +# ── _try_chatgpt_json ───────────────────────────────────────────────── + + +def test_chatgpt_json_valid(): + data = { + "mapping": { + "root": { + "parent": None, + "message": None, + "children": ["msg1"], + }, + "msg1": { + "parent": "root", + "message": { + "author": {"role": "user"}, + "content": {"parts": ["Hello ChatGPT"]}, + }, + "children": ["msg2"], + }, + "msg2": { + "parent": "msg1", + "message": { + "author": {"role": "assistant"}, + "content": {"parts": ["Hello! How can I help?"]}, + }, + "children": [], + }, + } + } + result = _try_chatgpt_json(data) + assert result is not None + assert "> Hello ChatGPT" in result + + +def test_chatgpt_json_no_mapping(): + result = _try_chatgpt_json({"data": []}) + assert result is None + + +def test_chatgpt_json_not_dict(): + result = _try_chatgpt_json([1, 2, 3]) + assert result is None + + +def test_chatgpt_json_fallback_root(): + """Root node has a message (no synthetic root), uses fallback.""" + data = { + "mapping": { + "root": { + "parent": None, + "message": { + "author": {"role": "system"}, + "content": {"parts": ["system prompt"]}, + }, + "children": ["msg1"], + }, + "msg1": { + "parent": "root", + "message": { + "author": {"role": "user"}, + "content": {"parts": ["Hello"]}, + }, + "children": ["msg2"], + }, + "msg2": { + "parent": "msg1", + "message": { + "author": {"role": "assistant"}, + "content": {"parts": ["Hi there"]}, + }, + "children": [], + }, + } + } + result = _try_chatgpt_json(data) + assert result is not None + + +def test_chatgpt_json_too_few_messages(): + data = { + "mapping": { + "root": { + "parent": None, + "message": None, + "children": ["msg1"], + }, + "msg1": { + "parent": "root", + "message": { + "author": {"role": "user"}, + "content": {"parts": ["Only one"]}, + }, + "children": [], + }, + } + } + result = _try_chatgpt_json(data) + assert result is None + + +# ── _try_slack_json ──────────────────────────────────────────────────── + + +def test_slack_json_valid(): + data = [ + {"type": "message", "user": "U1", "text": "Hello"}, + {"type": "message", "user": "U2", "text": "Hi there"}, + ] + result = _try_slack_json(data) + assert result is not None + assert "Hello" in result + + +def test_slack_json_not_a_list(): + result = _try_slack_json({"type": "message"}) + assert result is None + + +def test_slack_json_too_few_messages(): + data = [{"type": "message", "user": "U1", "text": "Hello"}] + result = _try_slack_json(data) + assert result is None + + +def test_slack_json_skips_non_message_types(): + data = [ + {"type": "channel_join", "user": "U1", "text": "joined"}, + {"type": "message", "user": "U1", "text": "Hello"}, + {"type": "message", "user": "U2", "text": "Hi"}, + ] + result = _try_slack_json(data) + assert result is not None + + +def test_slack_json_three_users(): + """Three speakers get alternating roles.""" + data = [ + {"type": "message", "user": "U1", "text": "Hello"}, + {"type": "message", "user": "U2", "text": "Hi"}, + {"type": "message", "user": "U3", "text": "Hey"}, + ] + result = _try_slack_json(data) + assert result is not None + + +def test_slack_json_empty_text_skipped(): + data = [ + {"type": "message", "user": "U1", "text": ""}, + {"type": "message", "user": "U1", "text": "Hello"}, + {"type": "message", "user": "U2", "text": "Hi"}, + ] + result = _try_slack_json(data) + assert result is not None + + +def test_slack_json_username_fallback(): + data = [ + {"type": "message", "username": "bot1", "text": "Hello"}, + {"type": "message", "username": "bot2", "text": "Hi"}, + ] + result = _try_slack_json(data) + assert result is not None + + +# ── _try_normalize_json ──────────────────────────────────────────────── + + +def test_try_normalize_json_invalid_json(): + result = _try_normalize_json("not json at all {{{") + assert result is None + + +def test_try_normalize_json_valid_but_unknown_schema(): + result = _try_normalize_json(json.dumps({"random": "data"})) + assert result is None + + +# ── _messages_to_transcript ──────────────────────────────────────────── + + +def test_messages_to_transcript_basic(): + msgs = [("user", "Q"), ("assistant", "A")] + with patch("mempalace.normalize.spellcheck_user_text", side_effect=lambda x: x, create=True): + result = _messages_to_transcript(msgs, spellcheck=False) + assert "> Q" in result + assert "A" in result + + +def test_messages_to_transcript_consecutive_users(): + """Two user messages in a row (no assistant between).""" + msgs = [("user", "Q1"), ("user", "Q2"), ("assistant", "A")] + result = _messages_to_transcript(msgs, spellcheck=False) + assert "> Q1" in result + assert "> Q2" in result + + +def test_messages_to_transcript_assistant_first(): + """Leading assistant message (no user before it).""" + msgs = [("assistant", "preamble"), ("user", "Q"), ("assistant", "A")] + result = _messages_to_transcript(msgs, spellcheck=False) + assert "preamble" in result + assert "> Q" in result diff --git a/tests/test_searcher.py b/tests/test_searcher.py index 1c2687d..94f22b4 100644 --- a/tests/test_searcher.py +++ b/tests/test_searcher.py @@ -1,10 +1,18 @@ """ -test_searcher.py — Tests for the programmatic search_memories API. +test_searcher.py -- Tests for both search() (CLI) and search_memories() (API). -Tests the library-facing search interface (not the CLI print variant). +Uses the real ChromaDB fixtures from conftest.py for integration tests, +plus mock-based tests for error paths. """ -from mempalace.searcher import search_memories +from unittest.mock import MagicMock, patch + +import pytest + +from mempalace.searcher import SearchError, search, search_memories + + +# ── search_memories (API) ────────────────────────────────────────────── class TestSearchMemories: @@ -43,3 +51,75 @@ class TestSearchMemories: assert "source_file" in hit assert "similarity" in hit assert isinstance(hit["similarity"], float) + + def test_search_memories_query_error(self): + """search_memories returns error dict when query raises.""" + mock_col = MagicMock() + mock_col.query.side_effect = RuntimeError("query failed") + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with patch("mempalace.searcher.chromadb.PersistentClient", return_value=mock_client): + result = search_memories("test", "/fake/path") + assert "error" in result + assert "query failed" in result["error"] + + def test_search_memories_filters_in_result(self, palace_path, seeded_collection): + result = search_memories("test", palace_path, wing="project", room="backend") + assert result["filters"]["wing"] == "project" + assert result["filters"]["room"] == "backend" + + +# ── search() (CLI print function) ───────────────────────────────────── + + +class TestSearchCLI: + def test_search_prints_results(self, palace_path, seeded_collection, capsys): + search("JWT authentication", palace_path) + captured = capsys.readouterr() + assert "JWT" in captured.out or "authentication" in captured.out + + def test_search_with_wing_filter(self, palace_path, seeded_collection, capsys): + search("planning", palace_path, wing="notes") + captured = capsys.readouterr() + assert "Results for" in captured.out + + def test_search_with_room_filter(self, palace_path, seeded_collection, capsys): + search("database", palace_path, room="backend") + captured = capsys.readouterr() + assert "Room:" in captured.out + + def test_search_with_wing_and_room(self, palace_path, seeded_collection, capsys): + search("code", palace_path, wing="project", room="frontend") + captured = capsys.readouterr() + assert "Wing:" in captured.out + assert "Room:" in captured.out + + def test_search_no_palace_raises(self, tmp_path): + with pytest.raises(SearchError, match="No palace found"): + search("anything", str(tmp_path / "missing")) + + def test_search_no_results(self, palace_path, collection, capsys): + """Empty collection returns no results message.""" + # collection is empty (no seeded data) + result = search("xyzzy_nonexistent_query", palace_path, n_results=1) + captured = capsys.readouterr() + # Either prints "No results" or returns None + assert result is None or "No results" in captured.out + + def test_search_query_error_raises(self): + """search raises SearchError when query fails.""" + mock_col = MagicMock() + mock_col.query.side_effect = RuntimeError("boom") + mock_client = MagicMock() + mock_client.get_collection.return_value = mock_col + + with patch("mempalace.searcher.chromadb.PersistentClient", return_value=mock_client): + with pytest.raises(SearchError, match="Search error"): + search("test", "/fake/path") + + def test_search_n_results(self, palace_path, seeded_collection, capsys): + search("code", palace_path, n_results=1) + captured = capsys.readouterr() + # Should have output with at least one result block + assert "[1]" in captured.out diff --git a/tests/test_split_mega_files.py b/tests/test_split_mega_files.py index 70c7f84..c1db02b 100644 --- a/tests/test_split_mega_files.py +++ b/tests/test_split_mega_files.py @@ -3,6 +3,9 @@ import json from mempalace import split_mega_files as smf +# ── Config loading ───────────────────────────────────────────────────── + + def test_load_known_people_falls_back_when_config_missing(monkeypatch, tmp_path): monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", tmp_path / "missing.json") smf._KNOWN_NAMES_CACHE = None @@ -46,3 +49,244 @@ def test_extract_people_detects_names_from_content(monkeypatch): monkeypatch.setattr(smf, "KNOWN_PEOPLE", ["Alice", "Ben"]) people = smf.extract_people(["> Alice reviewed the change with Ben\n"]) assert people == ["Alice", "Ben"] + + +# ── Config: force_reload and invalid JSON ────────────────────────────── + + +def test_load_known_names_force_reload(monkeypatch, tmp_path): + config_path = tmp_path / "known_names.json" + config_path.write_text(json.dumps(["Alice"])) + monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", config_path) + smf._KNOWN_NAMES_CACHE = None + + smf._load_known_names_config() + assert smf._KNOWN_NAMES_CACHE == ["Alice"] + + config_path.write_text(json.dumps(["Bob"])) + smf._load_known_names_config(force_reload=True) + assert smf._KNOWN_NAMES_CACHE == ["Bob"] + + +def test_load_known_names_invalid_json(monkeypatch, tmp_path): + config_path = tmp_path / "known_names.json" + config_path.write_text("not json {{{") + monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", config_path) + smf._KNOWN_NAMES_CACHE = None + + result = smf._load_known_names_config() + assert result is None + + +def test_load_known_names_caching(monkeypatch, tmp_path): + config_path = tmp_path / "known_names.json" + config_path.write_text(json.dumps(["Alice"])) + monkeypatch.setattr(smf, "_KNOWN_NAMES_PATH", config_path) + smf._KNOWN_NAMES_CACHE = None + + smf._load_known_names_config() + # Second call returns cached value without re-reading + config_path.write_text(json.dumps(["Changed"])) + result = smf._load_known_names_config() + assert result == ["Alice"] + + +# ── is_true_session_start ────────────────────────────────────────────── + + +def test_is_true_session_start_yes(): + lines = ["Claude Code v1.0", "Some content", "More content", "", "", ""] + assert smf.is_true_session_start(lines, 0) is True + + +def test_is_true_session_start_no_ctrl_e(): + lines = [ + "Claude Code v1.0", + "Ctrl+E to show 5 previous messages", + "", + "", + "", + "", + ] + assert smf.is_true_session_start(lines, 0) is False + + +def test_is_true_session_start_no_previous_messages(): + lines = [ + "Claude Code v1.0", + "Some text", + "previous messages here", + "", + "", + "", + ] + assert smf.is_true_session_start(lines, 0) is False + + +# ── find_session_boundaries ──────────────────────────────────────────── + + +def test_find_session_boundaries_two_sessions(): + lines = [ + "Claude Code v1.0", + "content 1", + "", + "", + "", + "", + "", + "Claude Code v1.0", + "content 2", + "", + "", + "", + "", + "", + ] + boundaries = smf.find_session_boundaries(lines) + assert boundaries == [0, 7] + + +def test_find_session_boundaries_none(): + lines = ["Just some text", "No sessions here"] + assert smf.find_session_boundaries(lines) == [] + + +def test_find_session_boundaries_context_restore_skipped(): + lines = [ + "Claude Code v1.0", + "content", + "", + "", + "", + "", + "", + "Claude Code v1.0", + "Ctrl+E to show 5 previous messages", + "", + "", + "", + "", + ] + boundaries = smf.find_session_boundaries(lines) + assert len(boundaries) == 1 + + +# ── extract_timestamp ────────────────────────────────────────────────── + + +def test_extract_timestamp_found(): + lines = ["⏺ 2:30 PM Wednesday, March 25, 2026"] + human, iso = smf.extract_timestamp(lines) + assert human == "2026-03-25_230PM" + assert iso == "2026-03-25" + + +def test_extract_timestamp_not_found(): + lines = ["No timestamp here"] + human, iso = smf.extract_timestamp(lines) + assert human is None + assert iso is None + + +def test_extract_timestamp_only_checks_first_50(): + lines = ["filler\n"] * 51 + ["⏺ 1:00 AM Monday, January 01, 2026"] + human, iso = smf.extract_timestamp(lines) + assert human is None + + +# ── extract_subject ──────────────────────────────────────────────────── + + +def test_extract_subject_found(): + lines = ["> How do we handle authentication?"] + subject = smf.extract_subject(lines) + assert "authentication" in subject.lower() + + +def test_extract_subject_skips_commands(): + lines = ["> cd /some/dir", "> git status", "> What is the plan?"] + subject = smf.extract_subject(lines) + assert "plan" in subject.lower() + + +def test_extract_subject_fallback(): + lines = ["No prompts at all", "Just text"] + subject = smf.extract_subject(lines) + assert subject == "session" + + +def test_extract_subject_short_prompt_skipped(): + lines = ["> ok", "> yes", "> What about the deployment strategy?"] + subject = smf.extract_subject(lines) + assert "deployment" in subject.lower() + + +def test_extract_subject_truncated(): + lines = ["> " + "a" * 100] + subject = smf.extract_subject(lines) + assert len(subject) <= 60 + + +# ── split_file ───────────────────────────────────────────────────────── + + +def _make_mega_file(tmp_path, n_sessions=3, lines_per_session=15): + """Create a mega-file with N sessions.""" + content = "" + for i in range(n_sessions): + content += f"Claude Code v1.{i}\n" + content += f"> What about topic {i} and how it works?\n" + for j in range(lines_per_session - 2): + content += f"Line {j} of session {i}\n" + path = tmp_path / "mega.txt" + path.write_text(content) + return path + + +def test_split_file_creates_output(tmp_path): + mega = _make_mega_file(tmp_path) + out_dir = tmp_path / "output" + out_dir.mkdir() + written = smf.split_file(str(mega), str(out_dir)) + assert len(written) >= 2 + for p in written: + assert p.exists() + + +def test_split_file_dry_run(tmp_path): + mega = _make_mega_file(tmp_path) + out_dir = tmp_path / "output" + out_dir.mkdir() + written = smf.split_file(str(mega), str(out_dir), dry_run=True) + assert len(written) >= 2 + for p in written: + assert not p.exists() + + +def test_split_file_not_mega(tmp_path): + """File with fewer than 2 sessions is not split.""" + path = tmp_path / "single.txt" + path.write_text("Claude Code v1.0\nJust one session\n" + "line\n" * 20) + written = smf.split_file(str(path), str(tmp_path)) + assert written == [] + + +def test_split_file_output_dir_none(tmp_path): + """When output_dir is None, writes to same dir as source.""" + mega = _make_mega_file(tmp_path) + written = smf.split_file(str(mega), None) + assert len(written) >= 2 + for p in written: + assert str(p.parent) == str(tmp_path) + + +def test_split_file_tiny_fragments_skipped(tmp_path): + """Tiny chunks (< 10 lines) are skipped.""" + content = "Claude Code v1.0\nline\n" * 2 + "Claude Code v1.0\n" + "line\n" * 20 + path = tmp_path / "tiny.txt" + path.write_text(content) + written = smf.split_file(str(path), str(tmp_path)) + # The first chunk is very small, should be skipped + for p in written: + assert p.stat().st_size > 0