feat(convo): parse Claude Code conversation dirs into project entities
Claude Code stores sessions under `~/.claude/projects/<slug>/<id>.jsonl` where `<slug>` is the original CWD with `/` replaced by `-`. That encoding is lossy — can't distinguish `foo-bar` (one segment) from `foo/bar` (two) — so slug-decoding alone produces wrong names for any hyphenated project. Fortunately, every message record carries a `cwd` field with the true path. This scanner reads one record per session to recover the accurate project name deterministically, falling back to slug-decoding only if the JSONL is malformed or empty. Output shape matches project_scanner.ProjectInfo so the discover orchestrator can union results across sources. Session count doubles as a density signal for ranking. 22 unit tests cover: root detection, cwd extraction with malformed input tolerance, fallback slug decoding, name resolution using the newest session (so renames win), and dedup when two encoded dirs resolve to the same project.
This commit is contained in:
@@ -0,0 +1,152 @@
|
||||
"""
|
||||
convo_scanner.py — Parse Claude Code conversation directories into ProjectInfo.
|
||||
|
||||
Claude Code stores sessions under ``~/.claude/projects/<slug>/<id>.jsonl``,
|
||||
where the ``<slug>`` is the original CWD with ``/`` replaced by ``-``. That
|
||||
encoding is lossy: we can't tell whether ``foo-bar`` in a slug is the
|
||||
literal project name ``foo-bar`` or two path segments ``foo/bar``.
|
||||
|
||||
Fortunately, every message record in the JSONL carries a ``cwd`` field with
|
||||
the true path. This scanner reads one record per session to recover the
|
||||
accurate project name, falling back to slug-decoding only if the JSONL
|
||||
is malformed or empty.
|
||||
|
||||
Output is the same ``ProjectInfo`` shape used by ``project_scanner``, so the
|
||||
``discover_entities`` orchestrator can mix-and-match sources.
|
||||
|
||||
Public:
|
||||
is_claude_projects_root(path) -> bool
|
||||
scan_claude_projects(path) -> list[ProjectInfo]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from mempalace.project_scanner import ProjectInfo
|
||||
|
||||
|
||||
MAX_HEADER_LINES = 20 # lines to read per session looking for `cwd`
|
||||
|
||||
|
||||
def is_claude_projects_root(path: Path) -> bool:
|
||||
"""Return True if path looks like `.claude/projects/`.
|
||||
|
||||
Heuristic: at least one child dir whose name starts with ``-`` and which
|
||||
contains at least one ``.jsonl`` file.
|
||||
"""
|
||||
if not path.is_dir():
|
||||
return False
|
||||
try:
|
||||
children = list(path.iterdir())
|
||||
except OSError:
|
||||
return False
|
||||
for child in children:
|
||||
if not (child.is_dir() and child.name.startswith("-")):
|
||||
continue
|
||||
try:
|
||||
if any(p.suffix == ".jsonl" for p in child.iterdir() if p.is_file()):
|
||||
return True
|
||||
except OSError:
|
||||
continue
|
||||
return False
|
||||
|
||||
|
||||
def _extract_cwd_from_session(session_file: Path) -> Optional[str]:
|
||||
"""Return the ``cwd`` from the first message record that carries one.
|
||||
|
||||
Returns None if the file can't be read, has no JSON, or no record has cwd.
|
||||
"""
|
||||
try:
|
||||
with open(session_file, encoding="utf-8", errors="replace") as f:
|
||||
for i, line in enumerate(f):
|
||||
if i >= MAX_HEADER_LINES:
|
||||
break
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
cwd = obj.get("cwd")
|
||||
if isinstance(cwd, str) and cwd:
|
||||
return cwd
|
||||
except OSError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _decode_slug_fallback(slug: str) -> str:
|
||||
"""Best-effort project name from slug when cwd is unavailable.
|
||||
|
||||
The slug is lossy (`/` and `-` both become `-`). Last non-empty segment
|
||||
is the closest guess at the project name, preserving kebab-case is
|
||||
impossible without cwd.
|
||||
"""
|
||||
stripped = slug.lstrip("-")
|
||||
parts = [p for p in stripped.split("-") if p]
|
||||
return parts[-1] if parts else slug
|
||||
|
||||
|
||||
def _resolve_project_name(project_dir: Path) -> str:
|
||||
"""Read one session's cwd to recover the original project name.
|
||||
|
||||
Falls back to slug-decoding if no session has a readable cwd.
|
||||
"""
|
||||
sessions = sorted(
|
||||
(p for p in project_dir.iterdir() if p.is_file() and p.suffix == ".jsonl"),
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
reverse=True, # newest first — most likely to be well-formed
|
||||
)
|
||||
for session in sessions:
|
||||
cwd = _extract_cwd_from_session(session)
|
||||
if cwd:
|
||||
return Path(cwd).name or cwd
|
||||
return _decode_slug_fallback(project_dir.name)
|
||||
|
||||
|
||||
def scan_claude_projects(path: str | Path) -> list[ProjectInfo]:
|
||||
"""Scan a ``.claude/projects/`` directory for Claude Code conversations.
|
||||
|
||||
One ProjectInfo per subdir. ``has_git`` is False (the directory isn't a
|
||||
repo itself) but ``total_commits`` is repurposed here as session count so
|
||||
the UX surfaces a density signal for ranking.
|
||||
"""
|
||||
root = Path(path).expanduser().resolve()
|
||||
if not is_claude_projects_root(root):
|
||||
return []
|
||||
|
||||
projects: dict[str, ProjectInfo] = {}
|
||||
for sub in sorted(root.iterdir()):
|
||||
if not (sub.is_dir() and sub.name.startswith("-")):
|
||||
continue
|
||||
try:
|
||||
sessions = [p for p in sub.iterdir() if p.is_file() and p.suffix == ".jsonl"]
|
||||
except OSError:
|
||||
continue
|
||||
if not sessions:
|
||||
continue
|
||||
|
||||
name = _resolve_project_name(sub)
|
||||
session_count = len(sessions)
|
||||
|
||||
proj = ProjectInfo(
|
||||
name=name,
|
||||
repo_root=sub,
|
||||
manifest=None,
|
||||
has_git=False,
|
||||
total_commits=session_count,
|
||||
user_commits=session_count,
|
||||
is_mine=True, # Claude Code sessions are authored by the user
|
||||
)
|
||||
existing = projects.get(name)
|
||||
if existing is None or session_count > existing.user_commits:
|
||||
projects[name] = proj
|
||||
|
||||
return sorted(
|
||||
projects.values(),
|
||||
key=lambda p: (-p.user_commits, p.name),
|
||||
)
|
||||
@@ -0,0 +1,199 @@
|
||||
"""Tests for mempalace.convo_scanner."""
|
||||
|
||||
import json
|
||||
|
||||
from mempalace.convo_scanner import (
|
||||
_decode_slug_fallback,
|
||||
_extract_cwd_from_session,
|
||||
_resolve_project_name,
|
||||
is_claude_projects_root,
|
||||
scan_claude_projects,
|
||||
)
|
||||
|
||||
|
||||
# ── is_claude_projects_root ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_is_claude_projects_root_true(tmp_path):
|
||||
project_dir = tmp_path / "-home-user-dev-foo"
|
||||
project_dir.mkdir()
|
||||
(project_dir / "abc.jsonl").write_text("{}\n")
|
||||
assert is_claude_projects_root(tmp_path)
|
||||
|
||||
|
||||
def test_is_claude_projects_root_false_no_dash_prefix(tmp_path):
|
||||
project_dir = tmp_path / "normal-folder"
|
||||
project_dir.mkdir()
|
||||
(project_dir / "abc.jsonl").write_text("{}\n")
|
||||
assert not is_claude_projects_root(tmp_path)
|
||||
|
||||
|
||||
def test_is_claude_projects_root_false_no_jsonl(tmp_path):
|
||||
project_dir = tmp_path / "-home-user-foo"
|
||||
project_dir.mkdir()
|
||||
(project_dir / "other.txt").write_text("hello")
|
||||
assert not is_claude_projects_root(tmp_path)
|
||||
|
||||
|
||||
def test_is_claude_projects_root_false_empty(tmp_path):
|
||||
assert not is_claude_projects_root(tmp_path)
|
||||
|
||||
|
||||
def test_is_claude_projects_root_false_nonexistent(tmp_path):
|
||||
assert not is_claude_projects_root(tmp_path / "does-not-exist")
|
||||
|
||||
|
||||
# ── cwd extraction ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_extract_cwd_from_session(tmp_path):
|
||||
f = tmp_path / "session.jsonl"
|
||||
lines = [
|
||||
json.dumps({"type": "file-history-snapshot", "messageId": "x"}),
|
||||
json.dumps({"type": "user", "cwd": "/home/user/dev/myproj", "content": "hi"}),
|
||||
]
|
||||
f.write_text("\n".join(lines) + "\n")
|
||||
assert _extract_cwd_from_session(f) == "/home/user/dev/myproj"
|
||||
|
||||
|
||||
def test_extract_cwd_from_session_skips_malformed(tmp_path):
|
||||
f = tmp_path / "session.jsonl"
|
||||
f.write_text(
|
||||
"{not valid json\n" + json.dumps({"type": "user", "cwd": "/home/user/dev/good"}) + "\n"
|
||||
)
|
||||
assert _extract_cwd_from_session(f) == "/home/user/dev/good"
|
||||
|
||||
|
||||
def test_extract_cwd_from_session_none_if_absent(tmp_path):
|
||||
f = tmp_path / "session.jsonl"
|
||||
f.write_text(json.dumps({"type": "x", "messageId": "y"}) + "\n")
|
||||
assert _extract_cwd_from_session(f) is None
|
||||
|
||||
|
||||
def test_extract_cwd_from_session_none_if_file_missing(tmp_path):
|
||||
assert _extract_cwd_from_session(tmp_path / "missing.jsonl") is None
|
||||
|
||||
|
||||
# ── slug fallback ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_decode_slug_fallback_last_segment():
|
||||
assert _decode_slug_fallback("-home-user-dev-foo") == "foo"
|
||||
|
||||
|
||||
def test_decode_slug_fallback_double_dash():
|
||||
assert _decode_slug_fallback("-home-user--bentokit") == "bentokit"
|
||||
|
||||
|
||||
def test_decode_slug_fallback_empty():
|
||||
assert _decode_slug_fallback("") == ""
|
||||
|
||||
|
||||
def test_decode_slug_fallback_only_dashes():
|
||||
assert _decode_slug_fallback("---") == "---"
|
||||
|
||||
|
||||
# ── _resolve_project_name ───────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_resolve_project_name_uses_cwd(tmp_path):
|
||||
pdir = tmp_path / "-home-user-dev-coolproj"
|
||||
pdir.mkdir()
|
||||
session = pdir / "a.jsonl"
|
||||
session.write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/cool-proj-real"}) + "\n")
|
||||
assert _resolve_project_name(pdir) == "cool-proj-real"
|
||||
|
||||
|
||||
def test_resolve_project_name_falls_back_when_no_cwd(tmp_path):
|
||||
pdir = tmp_path / "-home-user-dev-foo"
|
||||
pdir.mkdir()
|
||||
(pdir / "a.jsonl").write_text(json.dumps({"type": "x"}) + "\n")
|
||||
assert _resolve_project_name(pdir) == "foo"
|
||||
|
||||
|
||||
def test_resolve_project_name_prefers_newer_session(tmp_path):
|
||||
"""Newest session's cwd wins — covers the case where user renamed the
|
||||
project directory between sessions."""
|
||||
|
||||
pdir = tmp_path / "-home-user-dev-old"
|
||||
pdir.mkdir()
|
||||
old = pdir / "old.jsonl"
|
||||
old.write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/old"}) + "\n")
|
||||
# Ensure distinguishable mtimes
|
||||
old_mtime = old.stat().st_mtime - 100
|
||||
import os
|
||||
|
||||
os.utime(old, (old_mtime, old_mtime))
|
||||
|
||||
new = pdir / "new.jsonl"
|
||||
new.write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/new-name"}) + "\n")
|
||||
assert _resolve_project_name(pdir) == "new-name"
|
||||
|
||||
|
||||
# ── scan_claude_projects ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_scan_claude_projects_empty_dir(tmp_path):
|
||||
assert scan_claude_projects(tmp_path) == []
|
||||
|
||||
|
||||
def test_scan_claude_projects_not_a_projects_root(tmp_path):
|
||||
"""Returns empty list if the dir doesn't look like .claude/projects/."""
|
||||
(tmp_path / "some-folder").mkdir()
|
||||
(tmp_path / "some-folder" / "readme.md").write_text("hi")
|
||||
assert scan_claude_projects(tmp_path) == []
|
||||
|
||||
|
||||
def test_scan_claude_projects_finds_projects(tmp_path):
|
||||
p1 = tmp_path / "-home-user-dev-alpha"
|
||||
p1.mkdir()
|
||||
(p1 / "a.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/alpha"}) + "\n")
|
||||
(p1 / "b.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/alpha"}) + "\n")
|
||||
|
||||
p2 = tmp_path / "-home-user-dev-beta"
|
||||
p2.mkdir()
|
||||
(p2 / "x.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/beta"}) + "\n")
|
||||
|
||||
result = scan_claude_projects(tmp_path)
|
||||
names = [p.name for p in result]
|
||||
assert "alpha" in names
|
||||
assert "beta" in names
|
||||
# alpha has 2 sessions, beta has 1 — alpha ranks higher
|
||||
alpha = next(p for p in result if p.name == "alpha")
|
||||
beta = next(p for p in result if p.name == "beta")
|
||||
assert alpha.user_commits == 2
|
||||
assert beta.user_commits == 1
|
||||
|
||||
|
||||
def test_scan_claude_projects_ignores_dirs_without_jsonl(tmp_path):
|
||||
empty_proj = tmp_path / "-home-user-dev-empty"
|
||||
empty_proj.mkdir()
|
||||
(empty_proj / "notes.md").write_text("hi")
|
||||
assert scan_claude_projects(tmp_path) == []
|
||||
|
||||
|
||||
def test_scan_claude_projects_marks_as_mine(tmp_path):
|
||||
p = tmp_path / "-home-user-dev-owned"
|
||||
p.mkdir()
|
||||
(p / "s.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/dev/owned"}) + "\n")
|
||||
result = scan_claude_projects(tmp_path)
|
||||
assert len(result) == 1
|
||||
assert result[0].is_mine is True
|
||||
|
||||
|
||||
def test_scan_claude_projects_dedup_by_name(tmp_path):
|
||||
"""Two encoded dirs resolving to the same project name collapse to one."""
|
||||
p1 = tmp_path / "-home-user-a-proj"
|
||||
p1.mkdir()
|
||||
(p1 / "s.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/a/proj"}) + "\n")
|
||||
(p1 / "t.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/a/proj"}) + "\n")
|
||||
|
||||
p2 = tmp_path / "-home-user-b-proj"
|
||||
p2.mkdir()
|
||||
(p2 / "u.jsonl").write_text(json.dumps({"type": "user", "cwd": "/home/user/b/proj"}) + "\n")
|
||||
|
||||
result = scan_claude_projects(tmp_path)
|
||||
# Both decode to "proj"; only one remains — the one with more sessions wins
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "proj"
|
||||
assert result[0].user_commits == 2
|
||||
Reference in New Issue
Block a user