import os import shutil import tempfile from pathlib import Path import chromadb import yaml from mempalace.miner import mine, scan_project, status from mempalace.palace import file_already_mined def write_file(path: Path, content: str): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") def scanned_files(project_root: Path, **kwargs): files = scan_project(str(project_root), **kwargs) return sorted(path.relative_to(project_root).as_posix() for path in files) def test_project_mining(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() os.makedirs(project_root / "backend") write_file( project_root / "backend" / "app.py", "def main():\n print('hello world')\n" * 20, ) with open(project_root / "mempalace.yaml", "w") as f: yaml.dump( { "wing": "test_project", "rooms": [ {"name": "backend", "description": "Backend code"}, {"name": "general", "description": "General"}, ], }, f, ) palace_path = project_root / "palace" mine(str(project_root), str(palace_path)) client = chromadb.PersistentClient(path=str(palace_path)) col = client.get_collection("mempalace_drawers") assert col.count() > 0 finally: shutil.rmtree(tmpdir, ignore_errors=True) def test_scan_project_respects_gitignore(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "ignored.py\ngenerated/\n") write_file(project_root / "src" / "app.py", "print('hello')\n" * 20) write_file(project_root / "ignored.py", "print('ignore me')\n" * 20) write_file( project_root / "generated" / "artifact.py", "print('artifact')\n" * 20 ) assert scanned_files(project_root) == ["src/app.py"] finally: shutil.rmtree(tmpdir) def test_scan_project_respects_nested_gitignore(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "*.log\n") write_file(project_root / "subrepo" / ".gitignore", "tasks/\n") write_file(project_root / "subrepo" / "src" / "main.py", "print('main')\n" * 20) write_file( project_root / "subrepo" / "tasks" / "task.py", "print('task')\n" * 20 ) write_file(project_root / "subrepo" / "debug.log", "debug\n" * 20) assert scanned_files(project_root) == ["subrepo/src/main.py"] finally: shutil.rmtree(tmpdir) def test_scan_project_allows_nested_gitignore_override(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "*.csv\n") write_file(project_root / "subrepo" / ".gitignore", "!keep.csv\n") write_file(project_root / "drop.csv", "a,b,c\n" * 20) write_file(project_root / "subrepo" / "keep.csv", "a,b,c\n" * 20) assert scanned_files(project_root) == ["subrepo/keep.csv"] finally: shutil.rmtree(tmpdir) def test_scan_project_allows_gitignore_negation_when_parent_dir_is_visible(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "generated/*\n!generated/keep.py\n") write_file(project_root / "generated" / "drop.py", "print('drop')\n" * 20) write_file(project_root / "generated" / "keep.py", "print('keep')\n" * 20) assert scanned_files(project_root) == ["generated/keep.py"] finally: shutil.rmtree(tmpdir) def test_scan_project_does_not_reinclude_file_from_ignored_directory(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "generated/\n!generated/keep.py\n") write_file(project_root / "generated" / "drop.py", "print('drop')\n" * 20) write_file(project_root / "generated" / "keep.py", "print('keep')\n" * 20) assert scanned_files(project_root) == [] finally: shutil.rmtree(tmpdir) def test_scan_project_can_disable_gitignore(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "data/\n") write_file(project_root / "data" / "stuff.csv", "a,b,c\n" * 20) assert scanned_files(project_root, respect_gitignore=False) == [ "data/stuff.csv" ] finally: shutil.rmtree(tmpdir) def test_scan_project_can_include_ignored_directory(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "docs/\n") write_file(project_root / "docs" / "guide.md", "# Guide\n" * 20) assert scanned_files(project_root, include_ignored=["docs"]) == [ "docs/guide.md" ] finally: shutil.rmtree(tmpdir) def test_scan_project_can_include_specific_ignored_file(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "generated/\n") write_file(project_root / "generated" / "drop.py", "print('drop')\n" * 20) write_file(project_root / "generated" / "keep.py", "print('keep')\n" * 20) assert scanned_files(project_root, include_ignored=["generated/keep.py"]) == [ "generated/keep.py" ] finally: shutil.rmtree(tmpdir) def test_scan_project_can_include_exact_file_without_known_extension(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".gitignore", "README\n") write_file(project_root / "README", "hello\n" * 20) assert scanned_files(project_root, include_ignored=["README"]) == ["README"] finally: shutil.rmtree(tmpdir) def test_scan_project_include_override_beats_skip_dirs(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".pytest_cache" / "cache.py", "print('cache')\n" * 20) assert scanned_files( project_root, respect_gitignore=False, include_ignored=[".pytest_cache"], ) == [".pytest_cache/cache.py"] finally: shutil.rmtree(tmpdir) def test_scan_project_skip_dirs_still_apply_without_override(): tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() write_file(project_root / ".pytest_cache" / "cache.py", "print('cache')\n" * 20) write_file(project_root / "main.py", "print('main')\n" * 20) assert scanned_files(project_root, respect_gitignore=False) == ["main.py"] finally: shutil.rmtree(tmpdir) def test_file_already_mined_check_mtime(): tmpdir = tempfile.mkdtemp() try: palace_path = os.path.join(tmpdir, "palace") os.makedirs(palace_path) client = chromadb.PersistentClient(path=palace_path) col = client.get_or_create_collection( "mempalace_drawers", metadata={"hnsw:space": "cosine"} ) test_file = os.path.join(tmpdir, "test.txt") with open(test_file, "w") as f: f.write("hello world") mtime = os.path.getmtime(test_file) # Not mined yet assert file_already_mined(col, test_file) is False assert file_already_mined(col, test_file, check_mtime=True) is False # Add it with mtime col.add( ids=["d1"], documents=["hello world"], metadatas=[{"source_file": test_file, "source_mtime": str(mtime)}], ) # Already mined (no mtime check) assert file_already_mined(col, test_file) is True # Already mined (mtime matches) assert file_already_mined(col, test_file, check_mtime=True) is True # Modify file and force a different mtime (Windows has low mtime resolution) with open(test_file, "w") as f: f.write("modified content") os.utime(test_file, (mtime + 10, mtime + 10)) # Still mined without mtime check assert file_already_mined(col, test_file) is True # Needs re-mining with mtime check assert file_already_mined(col, test_file, check_mtime=True) is False # Record with no mtime stored should return False for check_mtime col.add( ids=["d2"], documents=["other"], metadatas=[{"source_file": "/fake/no_mtime.txt"}], ) assert file_already_mined(col, "/fake/no_mtime.txt", check_mtime=True) is False finally: # Release ChromaDB file handles before cleanup (required on Windows) del col, client shutil.rmtree(tmpdir, ignore_errors=True) def test_mine_dry_run_with_tiny_file_no_crash(): """Dry-run must not crash when process_file returns 0 drawers (room was None).""" tmpdir = tempfile.mkdtemp() try: project_root = Path(tmpdir).resolve() # One normal file and one that falls below MIN_CHUNK_SIZE write_file( project_root / "good.py", "def main():\n print('hello world')\n" * 20 ) write_file(project_root / "tiny.txt", "x") with open(project_root / "mempalace.yaml", "w") as f: yaml.dump( { "wing": "test_project", "rooms": [{"name": "general", "description": "General"}], }, f, ) palace_path = project_root / "palace" # Should not raise TypeError on the summary print mine(str(project_root), str(palace_path), dry_run=True) finally: shutil.rmtree(tmpdir, ignore_errors=True) def test_status_missing_palace_does_not_create_empty_collection(tmp_path, capsys): palace_path = tmp_path / "missing-palace" status(str(palace_path)) out = capsys.readouterr().out assert "No palace found" in out assert not palace_path.exists()