From 9e7fa1ceb59ba4eee306f05a8d1ac50a2eaa0b49 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva <4753812+igorls@users.noreply.github.com> Date: Fri, 24 Apr 2026 00:20:53 -0300 Subject: [PATCH] feat(init): scan manifests and git authors for real entity signal `mempalace init` previously leaned entirely on regex-based entity extraction from prose. That path works for text-only folders but wastes signal in any codebase: the project's own name is already in `package.json` / `pyproject.toml` / `Cargo.toml` / `go.mod`, and the people who worked on it are in `git log`. This adds `project_scanner.py`, which becomes the primary signal source when real signal is available, with the regex detector preserved as the fallback for prose-only folders (diaries, research notes, writing). What it does: - Walks the target directory, parses manifests for canonical project names, and detects git repos by the presence of a `.git` directory. - For each repo, reads `git log` for authors and filters obvious bots (`[bot]`, `dependabot`, `renovate`, `github-actions`, names ending in `bot`, `-autoroll`). Importantly does NOT filter `@users.noreply.github.com` - that's GitHub's privacy-protected human email, used by real contributors. - Resolves author aliases with a union-find: commits that share a name OR an email collapse into one person. Picks the most-frequent real-name variant as display, ignoring handles and single-token usernames. - Flags "mine" projects: user is top-5 committer OR has >=10% of commits OR >=20 commits. Ordered by user_commits in the UX. - `discover_entities()` merges scanner results with the regex detector case-insensitively (so `mempalace` from pyproject absorbs `MemPalace` from docs), and suppresses the regex `uncertain` bucket when real signal is already found - the user doesn't need to adjudicate prose noise when the answer is already in git. Integration: `cmd_init` now calls `discover_entities` instead of running the regex detector directly. Same output shape, so `confirm_entities` works unchanged. Ships with 39 new tests covering manifest parsing, bot filtering, union-find dedup, git repo discovery, scan integration, and merge/fallback behavior. Existing 56 regex-detector tests all pass. --- mempalace/cli.py | 33 +- mempalace/project_scanner.py | 623 ++++++++++++++++++++++++++++++++++ tests/test_project_scanner.py | 411 ++++++++++++++++++++++ uv.lock | 2 +- 4 files changed, 1051 insertions(+), 18 deletions(-) create mode 100644 mempalace/project_scanner.py create mode 100644 tests/test_project_scanner.py diff --git a/mempalace/cli.py b/mempalace/cli.py index d0da6e7..de40090 100644 --- a/mempalace/cli.py +++ b/mempalace/cli.py @@ -71,7 +71,8 @@ def _ensure_mempalace_files_gitignored(project_dir) -> bool: def cmd_init(args): import json from pathlib import Path - from .entity_detector import scan_for_detection, detect_entities, confirm_entities + from .entity_detector import confirm_entities + from .project_scanner import discover_entities from .room_detector_local import detect_rooms_local cfg = MempalaceConfig() @@ -85,25 +86,23 @@ def cmd_init(args): languages = cfg.entity_languages languages_tuple = tuple(languages) - # Pass 1: auto-detect people and projects from file content + # Pass 1: discover entities — manifests + git authors first, prose detection + # as supplement for names mentioned only in docs/notes. print(f"\n Scanning for entities in: {args.dir}") if languages_tuple != ("en",): print(f" Languages: {', '.join(languages_tuple)}") - files = scan_for_detection(args.dir) - if files: - print(f" Reading {len(files)} files...") - detected = detect_entities(files, languages=languages_tuple) - total = len(detected["people"]) + len(detected["projects"]) + len(detected["uncertain"]) - if total > 0: - confirmed = confirm_entities(detected, yes=getattr(args, "yes", False)) - # Save confirmed entities to /entities.json for the miner - if confirmed["people"] or confirmed["projects"]: - entities_path = Path(args.dir).expanduser().resolve() / "entities.json" - with open(entities_path, "w") as f: - json.dump(confirmed, f, indent=2) - print(f" Entities saved: {entities_path}") - else: - print(" No entities detected — proceeding with directory-based rooms.") + detected = discover_entities(args.dir, languages=languages_tuple) + total = len(detected["people"]) + len(detected["projects"]) + len(detected["uncertain"]) + if total > 0: + confirmed = confirm_entities(detected, yes=getattr(args, "yes", False)) + # Save confirmed entities to /entities.json for the miner + if confirmed["people"] or confirmed["projects"]: + entities_path = Path(args.dir).expanduser().resolve() / "entities.json" + with open(entities_path, "w") as f: + json.dump(confirmed, f, indent=2) + print(f" Entities saved: {entities_path}") + else: + print(" No entities detected — proceeding with directory-based rooms.") # Pass 2: detect rooms from folder structure detect_rooms_local(project_dir=args.dir, yes=getattr(args, "yes", False)) diff --git a/mempalace/project_scanner.py b/mempalace/project_scanner.py new file mode 100644 index 0000000..e078b6e --- /dev/null +++ b/mempalace/project_scanner.py @@ -0,0 +1,623 @@ +""" +project_scanner.py — Detect projects and people from real signal. + +For a codebase with build manifests or git history, this beats regex-based +entity detection by a wide margin: the project's own name is already written +down in package.json / pyproject.toml / Cargo.toml / go.mod, and the people +who worked on it are in `git log`. + +This module is used as the primary signal in `mempalace init`. The regex +detector in entity_detector.py stays as a fallback for prose-only folders +(notes, research, writing). + +Public: + scan(root) -> (projects, people) + to_detected_dict(projects, people) -> {people: [...], projects: [...], uncertain: []} +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +try: + import tomllib # Python 3.11+ +except ImportError: # pragma: no cover + tomllib = None # type: ignore + + +SKIP_DIRS = { + ".git", + "node_modules", + "__pycache__", + ".venv", + "venv", + "env", + "dist", + "build", + ".next", + "coverage", + ".terraform", + "vendor", + "target", + ".mempalace", + ".cache", + ".pytest_cache", + ".mypy_cache", + ".ruff_cache", +} + +MAX_DEPTH = 6 +MAX_COMMITS_PER_REPO = 1000 +GIT_TIMEOUT = 10 + + +# ==================== DATACLASSES ==================== + + +@dataclass +class ProjectInfo: + name: str + repo_root: Path + manifest: Optional[str] = None + has_git: bool = False + total_commits: int = 0 + user_commits: int = 0 + is_mine: bool = False + + @property + def confidence(self) -> float: + if self.is_mine: + return 0.99 + if self.has_git and self.total_commits > 0: + return 0.7 + return 0.85 # manifest-only, no git + + def to_signal(self) -> str: + parts: list[str] = [] + if self.manifest: + parts.append(self.manifest) + if self.has_git: + if self.is_mine and self.user_commits: + parts.append(f"{self.user_commits} of your commits") + elif self.user_commits: + parts.append(f"{self.user_commits}/{self.total_commits} yours") + else: + parts.append(f"{self.total_commits} commits (none by you)") + return ", ".join(parts) or "repo" + + +@dataclass +class PersonInfo: + name: str + total_commits: int = 0 + emails: set[str] = field(default_factory=set) + repos: set[str] = field(default_factory=set) + + @property + def confidence(self) -> float: + if self.total_commits >= 100 or len(self.repos) >= 3: + return 0.99 + if self.total_commits >= 20: + return 0.85 + return 0.65 + + def to_signal(self) -> str: + r = len(self.repos) + return f"{self.total_commits} commit{'s' if self.total_commits != 1 else ''} across {r} repo{'s' if r != 1 else ''}" + + +# ==================== MANIFEST PARSING ==================== + + +def _parse_package_json(path: Path) -> Optional[str]: + try: + data = json.loads(path.read_text(encoding="utf-8", errors="replace")) + except (json.JSONDecodeError, OSError): + return None + name = data.get("name") + return name if isinstance(name, str) and name else None + + +def _parse_toml(path: Path) -> dict: + if tomllib is None: + return {} + try: + with open(path, "rb") as f: + return tomllib.load(f) + except (OSError, Exception): + return {} + + +def _parse_pyproject(path: Path) -> Optional[str]: + data = _parse_toml(path) + name = data.get("project", {}).get("name") + if isinstance(name, str) and name: + return name + name = data.get("tool", {}).get("poetry", {}).get("name") + return name if isinstance(name, str) and name else None + + +def _parse_cargo(path: Path) -> Optional[str]: + data = _parse_toml(path) + name = data.get("package", {}).get("name") + return name if isinstance(name, str) and name else None + + +def _parse_gomod(path: Path) -> Optional[str]: + try: + for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): + line = line.strip() + if line.startswith("module "): + mod = line.split(None, 1)[1].strip() + return mod.split("/")[-1] or None + except OSError: + return None + return None + + +MANIFEST_PARSERS = { + "package.json": _parse_package_json, + "pyproject.toml": _parse_pyproject, + "Cargo.toml": _parse_cargo, + "go.mod": _parse_gomod, +} + + +# ==================== GIT HELPERS ==================== + + +def _run_git(cwd: Path, *args: str, timeout: int = GIT_TIMEOUT) -> str: + try: + r = subprocess.run( + ["git", "-C", str(cwd), *args], + capture_output=True, + text=True, + timeout=timeout, + check=False, + ) + return r.stdout if r.returncode == 0 else "" + except (OSError, subprocess.SubprocessError): + return "" + + +def _git_user_identity(repo: Path) -> tuple[str, str]: + """Return (name, email) for this repo, falling back to global config.""" + name = _run_git(repo, "config", "user.name", timeout=2).strip() + email = _run_git(repo, "config", "user.email", timeout=2).strip() + return name, email + + +def _global_git_identity() -> tuple[str, str]: + try: + n = subprocess.run( + ["git", "config", "--global", "user.name"], + capture_output=True, + text=True, + timeout=2, + check=False, + ).stdout.strip() + e = subprocess.run( + ["git", "config", "--global", "user.email"], + capture_output=True, + text=True, + timeout=2, + check=False, + ).stdout.strip() + return n, e + except (OSError, subprocess.SubprocessError): + return "", "" + + +def _git_authors(repo: Path) -> list[tuple[str, str]]: + out = _run_git( + repo, + "log", + f"--max-count={MAX_COMMITS_PER_REPO}", + "--format=%aN|%aE", + ) + result = [] + for line in out.splitlines(): + if "|" in line: + name, email = line.split("|", 1) + result.append((name.strip(), email.strip())) + return result + + +# ==================== BOT / NAME FILTERING ==================== + + +_BOT_NAME_PATTERNS = [ + r"\[bot\]", + r"^dependabot", + r"^renovate", + r"^github-actions", + r"^actions-user", + r"-bot$", + r"\bbot$", # catches "PR Bot", "Release Bot", etc. Not "robot" (no \b) + r"^bot-", + r"^snyk", + r"^greenkeeper", + r"^semantic-release", + r"^allcontributors", + r"-autoroll$", + r"^auto-format", + r"^pre-commit-ci", +] +_BOT_EMAIL_PATTERNS = [ + # `@users.noreply.github.com` is GitHub's privacy-protected human email — + # do NOT filter it. Real bots identify themselves via the display name + # (usually containing "[bot]"), which is caught by _BOT_NAME_PATTERNS. + r"bot@", + r"-bot@", + r"\[bot\]@", +] + +_BOT_RE_NAMES = [re.compile(p) for p in _BOT_NAME_PATTERNS] +_BOT_RE_EMAILS = [re.compile(p) for p in _BOT_EMAIL_PATTERNS] + + +def _is_bot(name: str, email: str) -> bool: + ln, le = name.lower(), email.lower() + return any(rx.search(ln) for rx in _BOT_RE_NAMES) or any(rx.search(le) for rx in _BOT_RE_EMAILS) + + +def _looks_like_real_name(name: str) -> bool: + """Heuristic: a human's name has a space and at least two title-cased parts. + + Filters out handles (lowercase, digits, one-token usernames). + """ + if not name or " " not in name: + return False + parts = name.split() + if len(parts) < 2: + return False + # First and last parts must start with an uppercase letter + return parts[0][:1].isupper() and parts[-1][:1].isupper() + + +# ==================== DIRECTORY WALK ==================== + + +def _walk(root: Path, max_depth: int = MAX_DEPTH): + for dirpath, dirs, files in os.walk(root): + dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(".")] + rel = Path(dirpath).relative_to(root) if dirpath != str(root) else Path(".") + try: + rel = Path(dirpath).relative_to(root) + except ValueError: + continue + depth = 0 if rel == Path(".") else len(rel.parts) + if depth > max_depth: + dirs.clear() + continue + yield Path(dirpath), dirs, files + + +def find_git_repos(root: Path, max_depth: int = MAX_DEPTH) -> list[Path]: + """Return git repo roots under `root` (including root itself if it's a repo).""" + root = root.resolve() + repos: list[Path] = [] + if (root / ".git").is_dir(): + # Root is a repo — still walk for nested repos (submodules, etc.) + repos.append(root) + for dirpath, dirs, _ in _walk(root, max_depth): + if dirpath == root: + continue + if (dirpath / ".git").is_dir(): + repos.append(dirpath) + dirs.clear() # don't descend into this repo's contents from here + return repos + + +def _collect_manifest_names(repo_root: Path) -> list[tuple[str, str, Path]]: + """Return (manifest_filename, project_name, dirpath) within a repo. + + Does not descend into nested git repos. + """ + found: list[tuple[str, str, Path]] = [] + for dirpath, dirs, files in _walk(repo_root): + if dirpath != repo_root and (dirpath / ".git").is_dir(): + dirs.clear() + continue + for fname in files: + parser = MANIFEST_PARSERS.get(fname) + if not parser: + continue + name = parser(dirpath / fname) + if name: + found.append((fname, name, dirpath)) + return found + + +# ==================== MAIN SCAN ==================== + + +class _UnionFind: + """Minimal union-find for (name, email) identity resolution.""" + + def __init__(self) -> None: + self.parent: dict = {} + + def find(self, x): + if x not in self.parent: + self.parent[x] = x + return x + root = x + while self.parent[root] != root: + root = self.parent[root] + while self.parent[x] != root: + self.parent[x], x = root, self.parent[x] + return root + + def union(self, a, b) -> None: + ra, rb = self.find(a), self.find(b) + if ra != rb: + self.parent[ra] = rb + + +def _dedupe_people( + all_commits: list[tuple[str, str, str]], +) -> dict[str, PersonInfo]: + """Group commits by identity. Two commits are the same person if they + share a name OR an email. Display name = most frequent non-bot variant. + + ``all_commits`` is a list of (name, email, repo_str) triples from every repo. + """ + uf = _UnionFind() + for name, email, _repo in all_commits: + uf.union(("name", name), ("email", email) if email else ("name", name)) + + # Aggregate by component root + component_commits: dict = {} + for name, email, repo in all_commits: + key = uf.find(("name", name)) + entry = component_commits.setdefault( + key, {"name_counts": {}, "emails": set(), "repos": set(), "total": 0} + ) + entry["name_counts"][name] = entry["name_counts"].get(name, 0) + 1 + if email: + entry["emails"].add(email) + entry["repos"].add(repo) + entry["total"] += 1 + + # Pick display name per component: the most-frequent variant that looks + # like a real name; fall back to most-frequent overall. + people: dict[str, PersonInfo] = {} + for _key, entry in component_commits.items(): + candidates = sorted(entry["name_counts"].items(), key=lambda x: -x[1]) + display = next( + (n for n, _ in candidates if _looks_like_real_name(n)), + candidates[0][0], + ) + if not _looks_like_real_name(display): + continue # Skip handles and single-token names + # If we already have this display (rare — distinct components with the + # same chosen display), merge into the existing entry. + existing = people.get(display) + if existing: + existing.total_commits += entry["total"] + existing.emails.update(entry["emails"]) + existing.repos.update(entry["repos"]) + else: + people[display] = PersonInfo( + name=display, + total_commits=entry["total"], + emails=set(entry["emails"]), + repos=set(entry["repos"]), + ) + return people + + +def scan(root: str | os.PathLike) -> tuple[list[ProjectInfo], list[PersonInfo]]: + """Scan `root` for projects and people. Returns (projects, people) sorted.""" + root_path = Path(root).expanduser().resolve() + if not root_path.is_dir(): + return [], [] + + repos = find_git_repos(root_path) + + # Identify current user from first repo's git config, fall back to global + me_name, me_email = "", "" + if repos: + me_name, me_email = _git_user_identity(repos[0]) + if not me_name and not me_email: + me_name, me_email = _global_git_identity() + + projects: dict[str, ProjectInfo] = {} + all_commits: list[tuple[str, str, str]] = [] + + for repo in repos: + manifests = _collect_manifest_names(repo) + root_level = [m for m in manifests if m[2] == repo] + if root_level: + manifest_file, proj_name, _ = root_level[0] + elif manifests: + manifest_file, proj_name, _ = manifests[0] + else: + manifest_file, proj_name = None, repo.name + + authors = _git_authors(repo) + total_commits = len(authors) + user_commits = 0 + author_counts: dict[str, int] = {} + for name, email in authors: + if _is_bot(name, email): + continue + author_counts[name] = author_counts.get(name, 0) + 1 + all_commits.append((name, email, str(repo))) + if (me_name and name == me_name) or (me_email and email == me_email): + user_commits += 1 + + is_mine = False + if user_commits > 0: + sorted_authors = sorted(author_counts.items(), key=lambda x: -x[1]) + top5 = {n for n, _ in sorted_authors[:5]} + if me_name and me_name in top5: + is_mine = True + elif total_commits and user_commits / total_commits >= 0.10: + is_mine = True + elif user_commits >= 20: + is_mine = True + + proj = ProjectInfo( + name=proj_name, + repo_root=repo, + manifest=manifest_file, + has_git=True, + total_commits=total_commits, + user_commits=user_commits, + is_mine=is_mine, + ) + existing = projects.get(proj_name) + if existing is None or proj.user_commits > existing.user_commits: + projects[proj_name] = proj + + people = _dedupe_people(all_commits) + + # Handle case: root has manifests but no git repo anywhere + if not repos: + manifests = _collect_manifest_names(root_path) + for manifest_file, proj_name, _dirpath in manifests: + if proj_name in projects: + continue + projects[proj_name] = ProjectInfo( + name=proj_name, + repo_root=root_path, + manifest=manifest_file, + has_git=False, + ) + + project_list = sorted( + projects.values(), + key=lambda p: (not p.is_mine, -p.user_commits, -p.total_commits, p.name), + ) + people_list = sorted(people.values(), key=lambda p: -p.total_commits) + + return project_list, people_list + + +# ==================== ADAPTER ==================== + + +def to_detected_dict( + projects: list[ProjectInfo], + people: list[PersonInfo], + project_cap: int = 15, + people_cap: int = 15, +) -> dict: + """Convert scan results into the dict shape produced by entity_detector.detect_entities.""" + proj_entries = [ + { + "name": p.name, + "type": "project", + "confidence": round(p.confidence, 2), + "frequency": p.user_commits or p.total_commits, + "signals": [p.to_signal()], + } + for p in projects[:project_cap] + ] + people_entries = [ + { + "name": p.name, + "type": "person", + "confidence": round(p.confidence, 2), + "frequency": p.total_commits, + "signals": [p.to_signal()], + } + for p in people[:people_cap] + ] + return { + "people": people_entries, + "projects": proj_entries, + "uncertain": [], + } + + +# ==================== MERGE WITH REGEX DETECTOR ==================== + + +def _merge_detected(primary: dict, secondary: dict, drop_secondary_uncertain: bool = False) -> dict: + """Merge two detected dicts. Primary entries win on name conflict. + + Dedup is case-insensitive so "mempalace" (manifest name) absorbs "MemPalace" + (docs/prose reference) instead of surfacing both. + + If ``drop_secondary_uncertain`` is True, the secondary's uncertain bucket is + dropped entirely — useful when the primary signal is strong (real repo + found) and we'd rather not ask the user to adjudicate prose-regex noise. + """ + seen = {e["name"].lower() for cat in primary.values() for e in cat} + merged = {k: list(v) for k, v in primary.items()} + for cat_key in ("people", "projects", "uncertain"): + if cat_key == "uncertain" and drop_secondary_uncertain: + continue + for e in secondary.get(cat_key, []): + if e["name"].lower() in seen: + continue + merged.setdefault(cat_key, []).append(e) + seen.add(e["name"].lower()) + return merged + + +def discover_entities( + project_dir: str | os.PathLike, + languages: tuple = ("en",), + prose_file_cap: int = 10, + project_cap: int = 15, + people_cap: int = 15, +) -> dict: + """Top-level entity discovery: real signals first, prose detection second. + + Returns the same dict shape as ``entity_detector.detect_entities`` so it + plugs into ``confirm_entities`` unchanged. + + Order of signal preference: + 1. Package manifests (package.json, pyproject.toml, Cargo.toml, go.mod) + → canonical project names + 2. Git commit authors → real people with real commit counts + 3. Regex entity detection on prose files → supplementary names only + mentioned in docs/notes (not code) + """ + projects, people = scan(project_dir) + real_signal = to_detected_dict(projects, people, project_cap=project_cap, people_cap=people_cap) + + # Secondary pass: prose-only extraction catches names mentioned in docs + # that never made a commit (e.g. a stakeholder or family member in notes). + from mempalace.entity_detector import detect_entities, scan_for_detection + + prose_files = scan_for_detection(str(project_dir), max_files=prose_file_cap) + prose_detected = ( + detect_entities(prose_files, languages=languages) + if prose_files + else {"people": [], "projects": [], "uncertain": []} + ) + + # If git/manifests gave us real projects, suppress the regex "uncertain" bucket. + # That bucket is mostly noise (common words, CamelCase tech terms, etc.) and + # adding it to the review flow just makes the user do triage we can skip. + has_real_signal = bool(projects) or bool(people) + return _merge_detected(real_signal, prose_detected, drop_secondary_uncertain=has_real_signal) + + +# ==================== CLI ==================== + + +if __name__ == "__main__": + import sys + + target = sys.argv[1] if len(sys.argv) > 1 else "." + projs, ppl = scan(target) + print(f"=== PROJECTS ({len(projs)}) ===") + for p in projs[:30]: + mark = "★" if p.is_mine else " " + print(f" {mark} {p.name:35} conf={p.confidence:.2f} {p.to_signal()}") + print() + print(f"=== PEOPLE ({len(ppl)}) ===") + for p in ppl[:30]: + print(f" {p.name:30} conf={p.confidence:.2f} {p.to_signal()}") diff --git a/tests/test_project_scanner.py b/tests/test_project_scanner.py new file mode 100644 index 0000000..3499796 --- /dev/null +++ b/tests/test_project_scanner.py @@ -0,0 +1,411 @@ +"""Tests for mempalace.project_scanner.""" + +import json +import subprocess +from pathlib import Path + +from mempalace.project_scanner import ( + PersonInfo, + ProjectInfo, + _dedupe_people, + _is_bot, + _looks_like_real_name, + _merge_detected, + _parse_cargo, + _parse_gomod, + _parse_package_json, + _parse_pyproject, + _UnionFind, + discover_entities, + find_git_repos, + scan, + to_detected_dict, +) + + +# ── manifest parsers ──────────────────────────────────────────────────── + + +def test_parse_package_json(tmp_path): + f = tmp_path / "package.json" + f.write_text(json.dumps({"name": "my-package", "version": "1.0.0"})) + assert _parse_package_json(f) == "my-package" + + +def test_parse_package_json_missing_name(tmp_path): + f = tmp_path / "package.json" + f.write_text(json.dumps({"version": "1.0.0"})) + assert _parse_package_json(f) is None + + +def test_parse_package_json_malformed(tmp_path): + f = tmp_path / "package.json" + f.write_text("{ not valid json") + assert _parse_package_json(f) is None + + +def test_parse_pyproject_pep621(tmp_path): + f = tmp_path / "pyproject.toml" + f.write_text('[project]\nname = "my-py-package"\n') + assert _parse_pyproject(f) == "my-py-package" + + +def test_parse_pyproject_poetry(tmp_path): + f = tmp_path / "pyproject.toml" + f.write_text('[tool.poetry]\nname = "poetry-pkg"\n') + assert _parse_pyproject(f) == "poetry-pkg" + + +def test_parse_cargo(tmp_path): + f = tmp_path / "Cargo.toml" + f.write_text('[package]\nname = "rust-crate"\nversion = "0.1.0"\n') + assert _parse_cargo(f) == "rust-crate" + + +def test_parse_gomod(tmp_path): + f = tmp_path / "go.mod" + f.write_text("module github.com/user/my-go-mod\n\ngo 1.21\n") + assert _parse_gomod(f) == "my-go-mod" + + +# ── bot filtering ─────────────────────────────────────────────────────── + + +def test_is_bot_catches_github_actions(): + assert _is_bot("github-actions[bot]", "41898282+github-actions[bot]@users.noreply.github.com") + + +def test_is_bot_catches_dependabot(): + assert _is_bot("dependabot[bot]", "dependabot@github.com") + + +def test_is_bot_catches_pr_bot(): + assert _is_bot("Comfy Org PR Bot", "prbot@example.com") + + +def test_is_bot_does_not_flag_github_privacy_email(): + # Real humans use ...@users.noreply.github.com when privacy is enabled. + # Must NOT be filtered. + assert not _is_bot("Igor Lins e Silva", "123456+igorls@users.noreply.github.com") + + +def test_is_bot_does_not_flag_robot_person_name(): + # "Robot" as a surname should not trigger the \bbot$ pattern + # since \b requires a boundary before 'bot'. + assert not _is_bot("Sarah Robot", "sarah@example.com") + + +def test_looks_like_real_name_accepts_human(): + assert _looks_like_real_name("Igor Lins e Silva") + assert _looks_like_real_name("Jane Doe") + + +def test_looks_like_real_name_rejects_handles(): + assert not _looks_like_real_name("666ghj") + assert not _looks_like_real_name("comfyanonymous") + assert not _looks_like_real_name("bensig") + assert not _looks_like_real_name("") + assert not _looks_like_real_name("no_spaces_handle") + + +# ── union-find dedup ──────────────────────────────────────────────────── + + +def test_unionfind_merges_shared_email(): + commits = [ + ("Milla J", "shared@example.com", "repo1"), + ("MSL", "shared@example.com", "repo1"), + ("Milla J", "other@example.com", "repo1"), + ] + people = _dedupe_people(commits) + # All three commits collapse into one "Milla J" person (MSL is filtered + # as display name because it lacks a space but its commits still count). + assert "Milla J" in people + assert people["Milla J"].total_commits == 3 + assert "MSL" not in people + + +def test_unionfind_keeps_distinct_people_separate(): + commits = [ + ("Alice Example", "alice@example.com", "r"), + ("Bob Sample", "bob@sample.org", "r"), + ] + people = _dedupe_people(commits) + assert "Alice Example" in people + assert "Bob Sample" in people + + +def test_unionfind_merges_shared_name(): + """Same display name, two different emails, same person.""" + commits = [ + ("Jane Doe", "jane@work.com", "r"), + ("Jane Doe", "jane@personal.com", "r"), + ] + people = _dedupe_people(commits) + assert people["Jane Doe"].total_commits == 2 + assert len(people["Jane Doe"].emails) == 2 + + +# ── project_info / person_info ───────────────────────────────────────── + + +def test_project_info_confidence_is_mine(): + p = ProjectInfo(name="x", repo_root=Path("."), is_mine=True) + assert p.confidence == 0.99 + + +def test_project_info_confidence_no_git(): + p = ProjectInfo(name="x", repo_root=Path("."), has_git=False, manifest="package.json") + assert p.confidence > 0.8 + + +def test_person_info_signal_pluralization(): + p = PersonInfo(name="x", total_commits=1, repos={"a"}) + assert "1 commit across 1 repo" == p.to_signal() + p2 = PersonInfo(name="y", total_commits=5, repos={"a", "b"}) + assert "5 commits across 2 repos" == p2.to_signal() + + +# ── find_git_repos ────────────────────────────────────────────────────── + + +def test_find_git_repos_detects_root_repo(tmp_path): + (tmp_path / ".git").mkdir() + repos = find_git_repos(tmp_path) + assert tmp_path in repos + + +def test_find_git_repos_detects_nested(tmp_path): + sub = tmp_path / "subproject" + sub.mkdir() + (sub / ".git").mkdir() + repos = find_git_repos(tmp_path) + assert sub in repos + + +def test_find_git_repos_skips_nested_inside_repo(tmp_path): + """If root is a repo and there's another repo inside it, the inner repo is + NOT walked into (we stop at the first repo boundary when descending).""" + (tmp_path / ".git").mkdir() + deep = tmp_path / "a" / "b" / "nested-repo" + deep.mkdir(parents=True) + (deep / ".git").mkdir() + repos = find_git_repos(tmp_path) + # Root IS found; nested still discovered on its own branch (not inside root's .git) + assert tmp_path in repos + + +def test_find_git_repos_empty_dir(tmp_path): + assert find_git_repos(tmp_path) == [] + + +# ── scan ──────────────────────────────────────────────────────────────── + + +def _init_git_repo(path: Path, name: str = "Jane Doe", email: str = "jane@example.com"): + """Helper: init a git repo with one commit.""" + subprocess.run(["git", "init", "-q"], cwd=path, check=True) + subprocess.run(["git", "config", "user.name", name], cwd=path, check=True) + subprocess.run(["git", "config", "user.email", email], cwd=path, check=True) + subprocess.run(["git", "config", "commit.gpgsign", "false"], cwd=path, check=True) + (path / "README.md").write_text("hello") + subprocess.run(["git", "add", "README.md"], cwd=path, check=True) + subprocess.run( + ["git", "commit", "-q", "-m", "initial"], + cwd=path, + check=True, + env={"GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, "PATH": "/usr/bin:/bin"}, + ) + + +def test_scan_project_from_package_json(tmp_path): + (tmp_path / "package.json").write_text(json.dumps({"name": "my-app"})) + _init_git_repo(tmp_path) + projects, people = scan(tmp_path) + assert len(projects) == 1 + assert projects[0].name == "my-app" + assert projects[0].is_mine is True + + +def test_scan_project_from_pyproject(tmp_path): + (tmp_path / "pyproject.toml").write_text('[project]\nname = "pyproj"\n') + _init_git_repo(tmp_path) + projects, _ = scan(tmp_path) + assert any(p.name == "pyproj" for p in projects) + + +def test_scan_fallback_to_dir_name_when_no_manifest(tmp_path): + repo = tmp_path / "my-repo-name" + repo.mkdir() + _init_git_repo(repo) + projects, _ = scan(tmp_path) + assert any(p.name == "my-repo-name" for p in projects) + + +def test_scan_manifest_only_no_git(tmp_path): + """A dir with a manifest but no git still produces a project.""" + (tmp_path / "package.json").write_text(json.dumps({"name": "manifest-only"})) + projects, people = scan(tmp_path) + assert len(projects) == 1 + assert projects[0].name == "manifest-only" + assert projects[0].has_git is False + assert people == [] + + +def test_scan_empty_dir(tmp_path): + projects, people = scan(tmp_path) + assert projects == [] + assert people == [] + + +def test_scan_returns_empty_for_nonexistent(tmp_path): + missing = tmp_path / "does-not-exist" + projects, people = scan(missing) + assert projects == [] + assert people == [] + + +# ── to_detected_dict ──────────────────────────────────────────────────── + + +def test_to_detected_dict_shape(): + projects = [ProjectInfo(name="p", repo_root=Path("."), is_mine=True, manifest="package.json")] + people = [PersonInfo(name="Jane Doe", total_commits=5, repos={"r"})] + d = to_detected_dict(projects, people) + assert set(d.keys()) == {"people", "projects", "uncertain"} + assert d["projects"][0]["name"] == "p" + assert d["projects"][0]["type"] == "project" + assert d["people"][0]["name"] == "Jane Doe" + assert d["people"][0]["type"] == "person" + assert d["uncertain"] == [] + + +# ── merge ─────────────────────────────────────────────────────────────── + + +def test_merge_primary_wins_case_insensitive(): + primary = { + "people": [], + "projects": [ + { + "name": "mempalace", + "type": "project", + "confidence": 0.99, + "frequency": 10, + "signals": ["pyproject.toml"], + } + ], + "uncertain": [], + } + secondary = { + "people": [], + "projects": [], + "uncertain": [ + { + "name": "MemPalace", + "type": "uncertain", + "confidence": 0.4, + "frequency": 6, + "signals": ["regex"], + } + ], + } + merged = _merge_detected(primary, secondary) + # `MemPalace` (uncertain) is deduped against `mempalace` (project) case-insensitively + assert len(merged["projects"]) == 1 + assert len(merged["uncertain"]) == 0 + + +def test_merge_drops_secondary_uncertain_when_requested(): + primary = {"people": [], "projects": [], "uncertain": []} + secondary = { + "people": [], + "projects": [], + "uncertain": [ + {"name": "Foo", "type": "uncertain", "confidence": 0.4, "frequency": 3, "signals": []} + ], + } + merged = _merge_detected(primary, secondary, drop_secondary_uncertain=True) + assert merged["uncertain"] == [] + + +def test_merge_keeps_distinct_names(): + primary = { + "people": [ + { + "name": "Alice Smith", + "type": "person", + "confidence": 0.9, + "frequency": 10, + "signals": [], + } + ], + "projects": [], + "uncertain": [], + } + secondary = { + "people": [ + { + "name": "Bob Jones", + "type": "person", + "confidence": 0.7, + "frequency": 3, + "signals": [], + } + ], + "projects": [], + "uncertain": [], + } + merged = _merge_detected(primary, secondary) + assert len(merged["people"]) == 2 + + +# ── discover_entities ────────────────────────────────────────────────── + + +def test_discover_entities_falls_back_to_prose_when_no_git(tmp_path): + """If no manifests or git, regex detector on prose is the only source.""" + notes = tmp_path / "notes.md" + notes.write_text( + "Riley said hello. Riley asked about it. Riley laughed. " + "Hey Riley, thanks for the help. Riley pushed the change. " + "Riley decided to go." + ) + d = discover_entities(str(tmp_path)) + # Prose-only fallback kicks in — Riley appears with person signals + all_names = [e["name"] for cat in d.values() for e in cat] + assert "Riley" in all_names + + +def test_discover_entities_prefers_real_signal_over_prose(tmp_path): + """When manifest exists, its name wins even if prose has noisy candidates.""" + (tmp_path / "package.json").write_text(json.dumps({"name": "realproj"})) + _init_git_repo(tmp_path) + (tmp_path / "doc.md").write_text( + "Something. Another. Whatever. Context. Context. Context. Context. " + "realproj. realproj. realproj. realproj." + ) + d = discover_entities(str(tmp_path)) + proj_names = [e["name"] for e in d["projects"]] + assert "realproj" in proj_names + + +# ── _UnionFind basics ────────────────────────────────────────────────── + + +def test_unionfind_find_creates_singleton(): + uf = _UnionFind() + assert uf.find("x") == "x" + + +def test_unionfind_union_merges(): + uf = _UnionFind() + uf.union("a", "b") + assert uf.find("a") == uf.find("b") + + +def test_unionfind_transitive(): + uf = _UnionFind() + uf.union("a", "b") + uf.union("b", "c") + assert uf.find("a") == uf.find("c") diff --git a/uv.lock b/uv.lock index 49c28ff..5af54f1 100644 --- a/uv.lock +++ b/uv.lock @@ -1169,7 +1169,7 @@ wheels = [ [[package]] name = "mempalace" -version = "3.3.2" +version = "3.3.3" source = { editable = "." } dependencies = [ { name = "chromadb" },