294 lines
11 KiB
Bash
Executable File
294 lines
11 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
# vault-lint.sh — mechanically assert ECHO vault invariants.
|
|
#
|
|
# Catches the recurring "invariant violation" bugs that prose rules can't enforce:
|
|
# folder<->status drift, duplicate slugs, wikilinks leaking into frontmatter,
|
|
# duplicate "## Agent Log" headings, stale active projects, aging inbox captures,
|
|
# impossible dates, bad status values, missing frontmatter, broken source_notes, and
|
|
# paths that no route in routing.json permits. Invoked by the monthly Vault Health
|
|
# pass (see SKILL.md), but safe to run any time — it is READ-ONLY.
|
|
#
|
|
# Exit status: 0 = clean, 1 = violations found, 2 = vault unreachable,
|
|
# 3 = vault not bootstrapped (marker missing).
|
|
#
|
|
# Config (env overrides):
|
|
# ECHO_BASE (default https://echoapi.alwisp.com)
|
|
# ECHO_KEY (default the plugin's bearer token)
|
|
# ECHO_TODAY (default the machine date) — pass the conversation's currentDate so
|
|
# stale/aging math uses the SAME clock the agent writes with (YYYY-MM-DD)
|
|
# STALE_DAYS (default 30) INBOX_DAYS (default 14)
|
|
#
|
|
# routing.json (canonical route manifest) is read from this script's own directory.
|
|
|
|
set -euo pipefail
|
|
|
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
|
|
|
ECHO_BASE="${ECHO_BASE:-https://echoapi.alwisp.com}"
|
|
ECHO_KEY="${ECHO_KEY:-241265fbe6830934a9a4ad3e69335f64a42153b663aa5b0017cb1ea1217b2bab}"
|
|
STALE_DAYS="${STALE_DAYS:-30}"
|
|
INBOX_DAYS="${INBOX_DAYS:-14}"
|
|
ECHO_TODAY="${ECHO_TODAY:-$(date +%Y-%m-%d)}"
|
|
|
|
ECHO_BASE="$ECHO_BASE" ECHO_KEY="$ECHO_KEY" STALE_DAYS="$STALE_DAYS" INBOX_DAYS="$INBOX_DAYS" \
|
|
ECHO_TODAY="$ECHO_TODAY" ROUTING_JSON="$SCRIPT_DIR/routing.json" \
|
|
python3 - <<'PY'
|
|
import os, sys, json, re, datetime, urllib.request, urllib.error
|
|
|
|
BASE = os.environ["ECHO_BASE"].rstrip("/")
|
|
KEY = os.environ["ECHO_KEY"]
|
|
STALE_DAYS = int(os.environ["STALE_DAYS"])
|
|
INBOX_DAYS = int(os.environ["INBOX_DAYS"])
|
|
TODAY = datetime.date.fromisoformat(os.environ["ECHO_TODAY"])
|
|
ROUTING_JSON = os.environ["ROUTING_JSON"]
|
|
LIFECYCLES = ["active", "incubating", "on-hold", "archived"]
|
|
SKIP = {"README.md", "project-template.md", "decision-template.md"}
|
|
REQUIRED_FM = ("type", "created")
|
|
# Project status vocabulary IS enforced (status must equal the lifecycle folder) by the
|
|
# folder/status check below. Other note kinds (decisions/concepts) carry free-form status
|
|
# vocab (accepted, shipped, reference, ...), so there is no global status allow-list.
|
|
|
|
# optional real YAML parser; fall back to a tolerant line parser
|
|
try:
|
|
import yaml # type: ignore
|
|
HAVE_YAML = True
|
|
except Exception:
|
|
HAVE_YAML = False
|
|
|
|
violations = []
|
|
def flag(check, msg): violations.append((check, msg))
|
|
|
|
def get(path):
|
|
"""GET /vault/<path>. Returns text, or None on 404. Raises on hard failure."""
|
|
req = urllib.request.Request(f"{BASE}/vault/{path}",
|
|
headers={"Authorization": f"Bearer {KEY}"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=20) as r:
|
|
return r.read().decode("utf-8", "replace")
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
return None
|
|
raise
|
|
|
|
def list_dir(path):
|
|
"""Return (files, folders) for a vault directory. Directories may arrive either in a
|
|
'folders' key OR as 'files' entries ending in '/'; handle both. Root is '' -> /vault/.
|
|
Tolerates non-404 errors (e.g. a 400 on an odd path) by returning empty."""
|
|
p = "" if path in ("", "/") else (path if path.endswith("/") else path + "/")
|
|
try:
|
|
body = get(p)
|
|
except urllib.error.HTTPError:
|
|
return [], []
|
|
if body is None:
|
|
return [], []
|
|
try:
|
|
j = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
return [], []
|
|
entries = list(j.get("files", [])) + list(j.get("folders", []))
|
|
files = [e for e in entries if not e.endswith("/")]
|
|
folders = [e[:-1] for e in entries if e.endswith("/")]
|
|
return files, folders
|
|
|
|
def walk(prefix=""):
|
|
"""Yield every file path under prefix (recursive). prefix is '' or ends with '/'."""
|
|
files, folders = list_dir(prefix)
|
|
for f in files:
|
|
yield prefix + f
|
|
for d in folders:
|
|
yield from walk(f"{prefix}{d}/")
|
|
|
|
def split_frontmatter(text):
|
|
"""Return (raw_yaml_str, body) splitting on anchored ^---$ delimiters. ('', text) if none."""
|
|
if not text:
|
|
return "", ""
|
|
lines = text.splitlines()
|
|
if not lines or lines[0].strip() != "---":
|
|
return "", text
|
|
for i in range(1, len(lines)):
|
|
if lines[i].strip() == "---":
|
|
return "\n".join(lines[1:i]), "\n".join(lines[i+1:])
|
|
return "", text # unterminated block -> treat as no frontmatter
|
|
|
|
def parse_fm(text):
|
|
"""Return (raw_yaml_str, dict). Uses PyYAML when available, else a tolerant parser."""
|
|
raw, _ = split_frontmatter(text)
|
|
if not raw:
|
|
return "", {}
|
|
if HAVE_YAML:
|
|
try:
|
|
d = yaml.safe_load(raw)
|
|
return raw, (d if isinstance(d, dict) else {})
|
|
except Exception:
|
|
pass
|
|
# fallback: scalar + simple inline-list lines (keys may contain digits, _, -)
|
|
fields = {}
|
|
for line in raw.splitlines():
|
|
m = re.match(r"^([A-Za-z_][\w-]*):\s*(.*)$", line)
|
|
if m:
|
|
v = m.group(2).strip()
|
|
if v.startswith("[") and v.endswith("]"):
|
|
v = [x.strip().strip('"').strip("'") for x in v[1:-1].split(",") if x.strip()]
|
|
else:
|
|
v = v.strip('"').strip("'")
|
|
fields[m.group(1)] = v
|
|
return raw, fields
|
|
|
|
def parse_date(s):
|
|
m = re.match(r"(\d{4}-\d{2}-\d{2})", str(s or ""))
|
|
if not m:
|
|
return None
|
|
try:
|
|
return datetime.date.fromisoformat(m.group(1))
|
|
except ValueError:
|
|
return None
|
|
|
|
def as_list(v):
|
|
if v is None or v == "":
|
|
return []
|
|
return v if isinstance(v, list) else [v]
|
|
|
|
# ---- Reachability + bootstrap probe (M2: do NOT silently report clean) -------
|
|
try:
|
|
if get("_agent/echo-vault.md") is None:
|
|
print("vault-lint: marker missing — vault not bootstrapped (run bootstrap.sh).", file=sys.stderr)
|
|
sys.exit(3)
|
|
except Exception as e:
|
|
print(f"vault-lint: vault unreachable ({e}).", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# ---- Load canonical routing manifest (S3) ------------------------------------
|
|
ROUTES, RETIRED = [], []
|
|
try:
|
|
with open(ROUTING_JSON) as fh:
|
|
rj = json.load(fh)
|
|
ROUTES = [(r["id"], re.compile(r["pattern"])) for r in rj.get("routes", [])]
|
|
RETIRED = [(re.compile(r["pattern"]), r.get("replacement", "")) for r in rj.get("retired", [])]
|
|
except Exception as e:
|
|
flag("routing-manifest", f"could not load routing.json ({e}) — path checks skipped")
|
|
|
|
# ---- Single full walk feeds every path-level check ---------------------------
|
|
all_files = list(walk())
|
|
|
|
def route_for(path):
|
|
for rid, rx in ROUTES:
|
|
if rx.match(path):
|
|
return rid
|
|
return None
|
|
|
|
# Path membership + retired-path detection (S3)
|
|
for path in all_files:
|
|
if ROUTES and route_for(path) is None:
|
|
hit = next((repl for rx, repl in RETIRED if rx.match(path)), None)
|
|
if hit is not None:
|
|
flag("retired-path", f"{path}: retired location — should be {hit}")
|
|
else:
|
|
flag("unknown-path", f"{path}: matches no route in routing.json")
|
|
|
|
# ---- Per-note frontmatter checks (M5) ----------------------------------------
|
|
TEMPLATE_RE = re.compile(r"(^|/)(templates/|.*-template\.md$)")
|
|
for path in all_files:
|
|
base = path.rsplit("/", 1)[-1]
|
|
if base in SKIP or TEMPLATE_RE.search(path) or not path.endswith(".md"):
|
|
continue
|
|
text = get(path)
|
|
if text is None:
|
|
continue
|
|
raw, fm = parse_fm(text)
|
|
|
|
# wikilinks anywhere in frontmatter (widened sweep — all folders)
|
|
if "[[" in raw:
|
|
flag("frontmatter-wikilink", f"{path}: '[[...]]' inside frontmatter")
|
|
|
|
# missing required frontmatter
|
|
missing = [k for k in REQUIRED_FM if not str(fm.get(k, "")).strip()]
|
|
if fm and missing:
|
|
flag("missing-frontmatter", f"{path}: missing {', '.join(missing)}")
|
|
|
|
# impossible dates: updated < created
|
|
c, u = parse_date(fm.get("created")), parse_date(fm.get("updated"))
|
|
if c and u and u < c:
|
|
flag("date-order", f"{path}: updated {u} is before created {c}")
|
|
if u and u > TODAY:
|
|
flag("future-date", f"{path}: updated {u} is in the future (today {TODAY})")
|
|
|
|
# source_notes hygiene: plain relative paths, never wikilinks, no self-reference
|
|
for sn in as_list(fm.get("source_notes")):
|
|
s = str(sn)
|
|
if "[[" in s:
|
|
flag("source-notes-wikilink", f"{path}: source_notes contains a wikilink '{s}'")
|
|
|
|
# ---- Projects: folder<->status, stale active, duplicate slugs ----------------
|
|
slug_homes = {}
|
|
for lc in LIFECYCLES:
|
|
files, _ = list_dir(f"projects/{lc}")
|
|
for fn in files:
|
|
if fn.endswith("/") or fn in SKIP or not fn.endswith(".md"):
|
|
continue
|
|
slug = fn[:-3]
|
|
slug_homes.setdefault(slug, []).append(lc)
|
|
text = get(f"projects/{lc}/{fn}")
|
|
if text is None:
|
|
continue
|
|
_, fm = parse_fm(text)
|
|
status = str(fm.get("status", "")).strip().strip('"').strip("'")
|
|
if status and status != lc:
|
|
flag("folder/status", f"projects/{lc}/{fn}: status='{status}' but folder='{lc}'")
|
|
if lc == "active":
|
|
d = parse_date(fm.get("updated"))
|
|
if d and (TODAY - d).days > STALE_DAYS:
|
|
flag("stale-active", f"projects/active/{fn}: updated {d} ({(TODAY-d).days}d ago) — consider on-hold/")
|
|
for slug, homes in slug_homes.items():
|
|
if len(homes) > 1:
|
|
flag("duplicate-slug", f"'{slug}' exists in {', '.join(homes)}")
|
|
|
|
# ---- Daily notes: duplicate "## Agent Log" headings --------------------------
|
|
for path in all_files:
|
|
if not re.match(r"^journal/daily/.*\.md$", path):
|
|
continue
|
|
text = get(path) or ""
|
|
n = len(re.findall(r"(?m)^## Agent Log\s*$", text))
|
|
if n > 1:
|
|
flag("duplicate-agent-log", f"{path}: {n} '## Agent Log' headings")
|
|
|
|
# ---- Inbox: captures aging past INBOX_DAYS -----------------------------------
|
|
inbox = get("inbox/captures/inbox.md") or ""
|
|
for line in inbox.splitlines():
|
|
m = re.match(r"^\s*-\s*(\d{4}-\d{2}-\d{2})\b", line)
|
|
if m:
|
|
d = parse_date(m.group(1))
|
|
if d and (TODAY - d).days > INBOX_DAYS:
|
|
flag("aging-inbox", f"inbox capture {d} ({(TODAY-d).days}d): {line.strip()[:80]}")
|
|
|
|
# ---- Report ------------------------------------------------------------------
|
|
if not violations:
|
|
print("vault-lint: clean — all invariants hold.")
|
|
sys.exit(0)
|
|
|
|
print(f"vault-lint: {len(violations)} violation(s) found\n")
|
|
by = {}
|
|
for check, msg in violations:
|
|
by.setdefault(check, []).append(msg)
|
|
labels = {
|
|
"folder/status": "Folder <-> status mismatch",
|
|
"duplicate-slug": "Duplicate slug across lifecycle folders",
|
|
"frontmatter-wikilink": "Wikilink in frontmatter (breaks reading view)",
|
|
"duplicate-agent-log": "Duplicate '## Agent Log' heading",
|
|
"stale-active": f"Stale active project (updated > {STALE_DAYS}d)",
|
|
"aging-inbox": f"Inbox capture aging (> {INBOX_DAYS}d)",
|
|
"unknown-path": "Path matches no route in routing.json",
|
|
"retired-path": "Write to a retired/dead path",
|
|
"missing-frontmatter": "Missing required frontmatter field",
|
|
"date-order": "updated earlier than created",
|
|
"future-date": "updated date is in the future",
|
|
"source-notes-wikilink": "Wikilink in source_notes (must be plain paths)",
|
|
"routing-manifest": "routing.json problem",
|
|
}
|
|
for check, msgs in by.items():
|
|
print(f"## {labels.get(check, check)}")
|
|
for m in msgs:
|
|
print(f" - {m}")
|
|
print()
|
|
sys.exit(1)
|
|
PY
|