diff --git a/mempalace/convo_scanner.py b/mempalace/convo_scanner.py new file mode 100644 index 0000000..bb8fbef --- /dev/null +++ b/mempalace/convo_scanner.py @@ -0,0 +1,152 @@ +""" +convo_scanner.py — Parse Claude Code conversation directories into ProjectInfo. + +Claude Code stores sessions under ``~/.claude/projects//.jsonl``, +where the ```` is the original CWD with ``/`` replaced by ``-``. That +encoding is lossy: we can't tell whether ``foo-bar`` in a slug is the +literal project name ``foo-bar`` or two path segments ``foo/bar``. + +Fortunately, every message record in the JSONL carries a ``cwd`` field with +the true path. This scanner reads one record per session to recover the +accurate project name, falling back to slug-decoding only if the JSONL +is malformed or empty. + +Output is the same ``ProjectInfo`` shape used by ``project_scanner``, so the +``discover_entities`` orchestrator can mix-and-match sources. + +Public: + is_claude_projects_root(path) -> bool + scan_claude_projects(path) -> list[ProjectInfo] +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Optional + +from mempalace.project_scanner import ProjectInfo + + +MAX_HEADER_LINES = 20 # lines to read per session looking for `cwd` + + +def is_claude_projects_root(path: Path) -> bool: + """Return True if path looks like `.claude/projects/`. + + Heuristic: at least one child dir whose name starts with ``-`` and which + contains at least one ``.jsonl`` file. + """ + if not path.is_dir(): + return False + try: + children = list(path.iterdir()) + except OSError: + return False + for child in children: + if not (child.is_dir() and child.name.startswith("-")): + continue + try: + if any(p.suffix == ".jsonl" for p in child.iterdir() if p.is_file()): + return True + except OSError: + continue + return False + + +def _extract_cwd_from_session(session_file: Path) -> Optional[str]: + """Return the ``cwd`` from the first message record that carries one. + + Returns None if the file can't be read, has no JSON, or no record has cwd. + """ + try: + with open(session_file, encoding="utf-8", errors="replace") as f: + for i, line in enumerate(f): + if i >= MAX_HEADER_LINES: + break + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + cwd = obj.get("cwd") + if isinstance(cwd, str) and cwd: + return cwd + except OSError: + return None + return None + + +def _decode_slug_fallback(slug: str) -> str: + """Best-effort project name from slug when cwd is unavailable. + + The slug is lossy (`/` and `-` both become `-`). Last non-empty segment + is the closest guess at the project name, preserving kebab-case is + impossible without cwd. + """ + stripped = slug.lstrip("-") + parts = [p for p in stripped.split("-") if p] + return parts[-1] if parts else slug + + +def _resolve_project_name(project_dir: Path) -> str: + """Read one session's cwd to recover the original project name. + + Falls back to slug-decoding if no session has a readable cwd. + """ + sessions = sorted( + (p for p in project_dir.iterdir() if p.is_file() and p.suffix == ".jsonl"), + key=lambda p: p.stat().st_mtime, + reverse=True, # newest first — most likely to be well-formed + ) + for session in sessions: + cwd = _extract_cwd_from_session(session) + if cwd: + return Path(cwd).name or cwd + return _decode_slug_fallback(project_dir.name) + + +def scan_claude_projects(path: str | Path) -> list[ProjectInfo]: + """Scan a ``.claude/projects/`` directory for Claude Code conversations. + + One ProjectInfo per subdir. ``has_git`` is False (the directory isn't a + repo itself) but ``total_commits`` is repurposed here as session count so + the UX surfaces a density signal for ranking. + """ + root = Path(path).expanduser().resolve() + if not is_claude_projects_root(root): + return [] + + projects: dict[str, ProjectInfo] = {} + for sub in sorted(root.iterdir()): + if not (sub.is_dir() and sub.name.startswith("-")): + continue + try: + sessions = [p for p in sub.iterdir() if p.is_file() and p.suffix == ".jsonl"] + except OSError: + continue + if not sessions: + continue + + name = _resolve_project_name(sub) + session_count = len(sessions) + + proj = ProjectInfo( + name=name, + repo_root=sub, + manifest=None, + has_git=False, + total_commits=session_count, + user_commits=session_count, + is_mine=True, # Claude Code sessions are authored by the user + ) + existing = projects.get(name) + if existing is None or session_count > existing.user_commits: + projects[name] = proj + + return sorted( + projects.values(), + key=lambda p: (-p.user_commits, p.name), + ) diff --git a/tests/test_convo_scanner.py b/tests/test_convo_scanner.py new file mode 100644 index 0000000..9fcd339 --- /dev/null +++ b/tests/test_convo_scanner.py @@ -0,0 +1,199 @@ +"""Tests for mempalace.convo_scanner.""" + +import json + +from mempalace.convo_scanner import ( + _decode_slug_fallback, + _extract_cwd_from_session, + _resolve_project_name, + is_claude_projects_root, + scan_claude_projects, +) + + +# ── is_claude_projects_root ───────────────────────────────────────────── + + +def test_is_claude_projects_root_true(tmp_path): + project_dir = tmp_path / "-home-user-dev-foo" + project_dir.mkdir() + (project_dir / "abc.jsonl").write_text("{}\n") + assert is_claude_projects_root(tmp_path) + + +def test_is_claude_projects_root_false_no_dash_prefix(tmp_path): + project_dir = tmp_path / "normal-folder" + project_dir.mkdir() + (project_dir / "abc.jsonl").write_text("{}\n") + assert not is_claude_projects_root(tmp_path) + + +def test_is_claude_projects_root_false_no_jsonl(tmp_path): + project_dir = tmp_path / "-home-user-foo" + project_dir.mkdir() + (project_dir / "other.txt").write_text("hello") + assert not is_claude_projects_root(tmp_path) + + +def test_is_claude_projects_root_false_empty(tmp_path): + assert not is_claude_projects_root(tmp_path) + + +def test_is_claude_projects_root_false_nonexistent(tmp_path): + assert not is_claude_projects_root(tmp_path / "does-not-exist") + + +# ── cwd extraction ────────────────────────────────────────────────────── + + +def test_extract_cwd_from_session(tmp_path): + f = tmp_path / "session.jsonl" + lines = [ + json.dumps({"type": "file-history-snapshot", "messageId": "x"}), + json.dumps({"type": "user", "cwd": "/home/user/dev/myproj", "content": "hi"}), + ] + f.write_text("\n".join(lines) + "\n") + assert _extract_cwd_from_session(f) == "/home/user/dev/myproj" + + +def test_extract_cwd_from_session_skips_malformed(tmp_path): + f = tmp_path / "session.jsonl" + f.write_text( + "{not valid json\n" + json.dumps({"type": "user", "cwd": "/home/user/dev/good"}) + "\n" + ) + assert _extract_cwd_from_session(f) == "/home/user/dev/good" + + +def test_extract_cwd_from_session_none_if_absent(tmp_path): + f = tmp_path / "session.jsonl" + f.write_text(json.dumps({"type": "x", "messageId": "y"}) + "\n") + assert _extract_cwd_from_session(f) is None + + +def test_extract_cwd_from_session_none_if_file_missing(tmp_path): + assert _extract_cwd_from_session(tmp_path / "missing.jsonl") is None + + +# ── slug fallback ─────────────────────────────────────────────────────── + + +def test_decode_slug_fallback_last_segment(): + assert _decode_slug_fallback("-home-user-dev-foo") == "foo" + + +def test_decode_slug_fallback_double_dash(): + assert _decode_slug_fallback("-home-user--bentokit") == "bentokit" + + +def test_decode_slug_fallback_empty(): + assert _decode_slug_fallback("") == "" + + +def test_decode_slug_fallback_only_dashes(): + assert _decode_slug_fallback("---") == "---" + + +# ── _resolve_project_name ─────────────────────────────────────────────── + + +def test_resolve_project_name_uses_cwd(tmp_path): + pdir = tmp_path / "-home-user-dev-coolproj" + pdir.mkdir() + session = pdir / "a.jsonl" + session.write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/cool-proj-real"}) + "\n") + assert _resolve_project_name(pdir) == "cool-proj-real" + + +def test_resolve_project_name_falls_back_when_no_cwd(tmp_path): + pdir = tmp_path / "-home-user-dev-foo" + pdir.mkdir() + (pdir / "a.jsonl").write_text(json.dumps({"type": "x"}) + "\n") + assert _resolve_project_name(pdir) == "foo" + + +def test_resolve_project_name_prefers_newer_session(tmp_path): + """Newest session's cwd wins — covers the case where user renamed the + project directory between sessions.""" + + pdir = tmp_path / "-home-user-dev-old" + pdir.mkdir() + old = pdir / "old.jsonl" + old.write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/old"}) + "\n") + # Ensure distinguishable mtimes + old_mtime = old.stat().st_mtime - 100 + import os + + os.utime(old, (old_mtime, old_mtime)) + + new = pdir / "new.jsonl" + new.write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/new-name"}) + "\n") + assert _resolve_project_name(pdir) == "new-name" + + +# ── scan_claude_projects ──────────────────────────────────────────────── + + +def test_scan_claude_projects_empty_dir(tmp_path): + assert scan_claude_projects(tmp_path) == [] + + +def test_scan_claude_projects_not_a_projects_root(tmp_path): + """Returns empty list if the dir doesn't look like .claude/projects/.""" + (tmp_path / "some-folder").mkdir() + (tmp_path / "some-folder" / "readme.md").write_text("hi") + assert scan_claude_projects(tmp_path) == [] + + +def test_scan_claude_projects_finds_projects(tmp_path): + p1 = tmp_path / "-home-user-dev-alpha" + p1.mkdir() + (p1 / "a.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/alpha"}) + "\n") + (p1 / "b.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/alpha"}) + "\n") + + p2 = tmp_path / "-home-user-dev-beta" + p2.mkdir() + (p2 / "x.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/beta"}) + "\n") + + result = scan_claude_projects(tmp_path) + names = [p.name for p in result] + assert "alpha" in names + assert "beta" in names + # alpha has 2 sessions, beta has 1 — alpha ranks higher + alpha = next(p for p in result if p.name == "alpha") + beta = next(p for p in result if p.name == "beta") + assert alpha.user_commits == 2 + assert beta.user_commits == 1 + + +def test_scan_claude_projects_ignores_dirs_without_jsonl(tmp_path): + empty_proj = tmp_path / "-home-user-dev-empty" + empty_proj.mkdir() + (empty_proj / "notes.md").write_text("hi") + assert scan_claude_projects(tmp_path) == [] + + +def test_scan_claude_projects_marks_as_mine(tmp_path): + p = tmp_path / "-home-user-dev-owned" + p.mkdir() + (p / "s.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/owned"}) + "\n") + result = scan_claude_projects(tmp_path) + assert len(result) == 1 + assert result[0].is_mine is True + + +def test_scan_claude_projects_dedup_by_name(tmp_path): + """Two encoded dirs resolving to the same project name collapse to one.""" + p1 = tmp_path / "-home-user-a-proj" + p1.mkdir() + (p1 / "s.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/a/proj"}) + "\n") + (p1 / "t.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/a/proj"}) + "\n") + + p2 = tmp_path / "-home-user-b-proj" + p2.mkdir() + (p2 / "u.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/b/proj"}) + "\n") + + result = scan_claude_projects(tmp_path) + # Both decode to "proj"; only one remains — the one with more sessions wins + assert len(result) == 1 + assert result[0].name == "proj" + assert result[0].user_commits == 2