985 lines
35 KiB
Python
985 lines
35 KiB
Python
import os, hmac, hashlib, json, logging, uuid, re
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import urljoin
|
|
|
|
from flask import Flask, request, jsonify
|
|
import pytz, sqlite3
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
import requests, urllib3
|
|
|
|
urllib3.disable_warnings()
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
log = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__, static_folder="static", static_url_path="")
|
|
|
|
DB_PATH = os.environ.get("DB_PATH", "/data/dashboard.db")
|
|
TZ = os.environ.get("TZ", "America/Chicago")
|
|
DASHBOARD_BASE_URL = os.environ.get("DASHBOARD_BASE_URL", "").rstrip("/")
|
|
|
|
# Seed values for the auto-created "Default" controller (only used on first boot
|
|
# when the controllers table is empty). After that, manage controllers via the UI.
|
|
SEED_HOST = os.environ.get("UNIFI_HOST", "")
|
|
SEED_PORT = int(os.environ.get("UNIFI_PORT", "12445"))
|
|
SEED_TOKEN = os.environ.get("UNIFI_API_TOKEN", "")
|
|
SEED_SECRET = os.environ.get("WEBHOOK_SECRET", "")
|
|
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
return conn
|
|
|
|
|
|
def _column_exists(db, table, column):
|
|
rows = db.execute(f"PRAGMA table_info({table})").fetchall()
|
|
return any(r["name"] == column for r in rows)
|
|
|
|
|
|
def _table_exists(db, table):
|
|
row = db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,)
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def init_db():
|
|
with get_db() as db:
|
|
db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS controllers (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
host TEXT NOT NULL,
|
|
port INTEGER NOT NULL DEFAULT 12445,
|
|
api_token TEXT NOT NULL,
|
|
webhook_secret TEXT NOT NULL DEFAULT '',
|
|
webhook_id TEXT NOT NULL DEFAULT '',
|
|
enabled INTEGER NOT NULL DEFAULT 1,
|
|
created_at TEXT NOT NULL,
|
|
last_sync_at TEXT
|
|
)
|
|
"""
|
|
)
|
|
db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS badge_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
controller_id TEXT,
|
|
actor_id TEXT NOT NULL,
|
|
ts TEXT NOT NULL,
|
|
date TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Migrate legacy badge_events that pre-date the controller_id column.
|
|
if not _column_exists(db, "badge_events", "controller_id"):
|
|
db.execute("ALTER TABLE badge_events ADD COLUMN controller_id TEXT")
|
|
|
|
# Migrate legacy user_cache (single-PK on actor_id) to composite PK.
|
|
legacy_user_cache = _table_exists(db, "user_cache") and not _column_exists(
|
|
db, "user_cache", "controller_id"
|
|
)
|
|
if legacy_user_cache:
|
|
db.execute("ALTER TABLE user_cache RENAME TO user_cache_legacy")
|
|
|
|
db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS user_cache (
|
|
controller_id TEXT NOT NULL,
|
|
actor_id TEXT NOT NULL,
|
|
full_name TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
filtered INTEGER NOT NULL DEFAULT 0,
|
|
PRIMARY KEY (controller_id, actor_id)
|
|
)
|
|
"""
|
|
)
|
|
|
|
if not _column_exists(db, "user_cache", "filtered"):
|
|
db.execute(
|
|
"ALTER TABLE user_cache ADD COLUMN filtered INTEGER NOT NULL DEFAULT 0"
|
|
)
|
|
|
|
db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS persons (
|
|
id TEXT PRIMARY KEY,
|
|
display_name TEXT NOT NULL,
|
|
filtered INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS person_members (
|
|
person_id TEXT NOT NULL,
|
|
controller_id TEXT NOT NULL,
|
|
actor_id TEXT NOT NULL,
|
|
PRIMARY KEY (controller_id, actor_id),
|
|
FOREIGN KEY (person_id) REFERENCES persons(id) ON DELETE CASCADE
|
|
)
|
|
"""
|
|
)
|
|
db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_person_members_person ON person_members(person_id)"
|
|
)
|
|
|
|
# Seed a Default controller from env vars when the table is empty.
|
|
existing = db.execute("SELECT COUNT(*) AS n FROM controllers").fetchone()["n"]
|
|
default_id = None
|
|
if existing == 0 and SEED_HOST and SEED_TOKEN:
|
|
default_id = str(uuid.uuid4())
|
|
db.execute(
|
|
"""
|
|
INSERT INTO controllers
|
|
(id, name, host, port, api_token, webhook_secret, webhook_id,
|
|
enabled, created_at)
|
|
VALUES (?, 'Default', ?, ?, ?, ?, '', 1, ?)
|
|
""",
|
|
(
|
|
default_id,
|
|
SEED_HOST,
|
|
SEED_PORT,
|
|
SEED_TOKEN,
|
|
SEED_SECRET,
|
|
datetime.now(timezone.utc).isoformat(),
|
|
),
|
|
)
|
|
log.info("Seeded Default controller %s from env vars", default_id[:8])
|
|
|
|
# Backfill controller_id on legacy badge_events and user_cache rows.
|
|
if default_id is None:
|
|
row = db.execute(
|
|
"SELECT id FROM controllers ORDER BY created_at LIMIT 1"
|
|
).fetchone()
|
|
default_id = row["id"] if row else None
|
|
|
|
if default_id:
|
|
db.execute(
|
|
"UPDATE badge_events SET controller_id = ? WHERE controller_id IS NULL",
|
|
(default_id,),
|
|
)
|
|
if legacy_user_cache:
|
|
db.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO user_cache
|
|
(controller_id, actor_id, full_name, updated_at)
|
|
SELECT ?, actor_id, full_name, updated_at FROM user_cache_legacy
|
|
""",
|
|
(default_id,),
|
|
)
|
|
db.execute("DROP TABLE user_cache_legacy")
|
|
|
|
db.commit()
|
|
|
|
|
|
def controller_base(host, port):
|
|
return f"https://{host}:{port}/api/v1/developer"
|
|
|
|
|
|
def fetch_controller_users(host, port, token):
|
|
r = requests.get(
|
|
f"{controller_base(host, port)}/users",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
verify=False,
|
|
timeout=10,
|
|
)
|
|
return r
|
|
|
|
|
|
def sync_controller(controller_id):
|
|
with get_db() as db:
|
|
c = db.execute(
|
|
"SELECT * FROM controllers WHERE id = ? AND enabled = 1", (controller_id,)
|
|
).fetchone()
|
|
if not c:
|
|
return 0
|
|
|
|
try:
|
|
r = fetch_controller_users(c["host"], c["port"], c["api_token"])
|
|
if r.status_code != 200:
|
|
log.warning(
|
|
"User sync failed for controller %s: %s %s",
|
|
c["name"], r.status_code, r.text[:200],
|
|
)
|
|
return 0
|
|
users = r.json().get("data", [])
|
|
except Exception as e:
|
|
log.error("sync_controller(%s) network error: %s", c["name"], e)
|
|
return 0
|
|
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
with get_db() as db:
|
|
for u in users:
|
|
actor_id = u.get("id")
|
|
if not actor_id:
|
|
continue
|
|
full_name = (u.get("full_name") or "").strip()
|
|
if not full_name:
|
|
full_name = f"{u.get('first_name','')} {u.get('last_name','')}".strip()
|
|
db.execute(
|
|
"""
|
|
INSERT INTO user_cache (controller_id, actor_id, full_name, updated_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(controller_id, actor_id) DO UPDATE SET
|
|
full_name = excluded.full_name,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(controller_id, actor_id, full_name or f"User {actor_id[:8]}", now_iso),
|
|
)
|
|
db.execute(
|
|
"UPDATE controllers SET last_sync_at = ? WHERE id = ?", (now_iso, controller_id)
|
|
)
|
|
db.commit()
|
|
log.info("Synced %d users from controller %s", len(users), c["name"])
|
|
return len(users)
|
|
|
|
|
|
def sync_all_controllers():
|
|
with get_db() as db:
|
|
rows = db.execute("SELECT id FROM controllers WHERE enabled = 1").fetchall()
|
|
for r in rows:
|
|
sync_controller(r["id"])
|
|
|
|
|
|
def register_webhook(host, port, token, dashboard_url, name):
|
|
r = requests.post(
|
|
f"{controller_base(host, port)}/webhooks/endpoints",
|
|
headers={
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"name": name,
|
|
"endpoint": dashboard_url,
|
|
"events": ["access.door.unlock"],
|
|
},
|
|
verify=False,
|
|
timeout=10,
|
|
)
|
|
return r
|
|
|
|
|
|
def delete_webhook(host, port, token, webhook_id):
|
|
if not webhook_id:
|
|
return None
|
|
try:
|
|
return requests.delete(
|
|
f"{controller_base(host, port)}/webhooks/endpoints/{webhook_id}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
verify=False,
|
|
timeout=10,
|
|
)
|
|
except Exception as e:
|
|
log.warning("delete_webhook error: %s", e)
|
|
return None
|
|
|
|
|
|
def verify_signature(secret, payload_bytes, sig_header):
|
|
if not secret:
|
|
return True # controller has no secret stored yet — accept (LAN-trust mode)
|
|
if not sig_header:
|
|
return False
|
|
try:
|
|
parts = dict(p.split("=", 1) for p in sig_header.split(","))
|
|
timestamp = parts.get("t", "")
|
|
received = parts.get("v1", "")
|
|
if not timestamp or not received:
|
|
return False
|
|
signed_payload = f"{timestamp}.".encode() + payload_bytes
|
|
expected = hmac.new(secret.encode(), signed_payload, hashlib.sha256).hexdigest()
|
|
return hmac.compare_digest(expected, received)
|
|
except Exception as e:
|
|
log.warning("Signature parse error: %s", e)
|
|
return False
|
|
|
|
|
|
def resolve_dashboard_base():
|
|
if DASHBOARD_BASE_URL:
|
|
return DASHBOARD_BASE_URL
|
|
return request.host_url.rstrip("/")
|
|
|
|
|
|
def controller_to_dict(row):
|
|
return {
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"host": row["host"],
|
|
"port": row["port"],
|
|
"enabled": bool(row["enabled"]),
|
|
"has_webhook": bool(row["webhook_id"]),
|
|
"last_sync_at": row["last_sync_at"],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Static + dashboard data
|
|
# ---------------------------------------------------------------------------
|
|
@app.route("/")
|
|
def index():
|
|
return app.send_static_file("index.html")
|
|
|
|
|
|
@app.route("/api/first-badge-status")
|
|
def first_badge_status():
|
|
date = request.args.get("date", datetime.now(pytz.timezone(TZ)).strftime("%Y-%m-%d"))
|
|
cutoff = request.args.get("cutoff", "09:00")
|
|
controller_filter = request.args.get("controller_id", "").strip() or None
|
|
include_filtered = request.args.get("include_filtered") == "1"
|
|
|
|
if not re.match(r"^\d{2}:\d{2}$", cutoff):
|
|
cutoff = "09:00"
|
|
cutoff_end = cutoff + ":59"
|
|
|
|
sql = """
|
|
SELECT
|
|
COALESCE(pm.person_id, b.controller_id || '|' || b.actor_id) AS group_key,
|
|
pm.person_id AS person_id,
|
|
COALESCE(
|
|
p.display_name,
|
|
u.full_name,
|
|
'Unknown (' || SUBSTR(b.actor_id,1,8) || '...)'
|
|
) AS name,
|
|
COALESCE(p.filtered, u.filtered, 0) AS filtered,
|
|
MIN(b.ts) AS first_ts,
|
|
MAX(b.ts) AS latest_ts,
|
|
GROUP_CONCAT(DISTINCT c.name) AS sources,
|
|
GROUP_CONCAT(DISTINCT b.controller_id) AS controller_ids,
|
|
GROUP_CONCAT(DISTINCT b.actor_id) AS actor_ids
|
|
FROM badge_events b
|
|
LEFT JOIN person_members pm
|
|
ON pm.controller_id = b.controller_id AND pm.actor_id = b.actor_id
|
|
LEFT JOIN persons p ON p.id = pm.person_id
|
|
LEFT JOIN user_cache u
|
|
ON u.actor_id = b.actor_id AND u.controller_id = b.controller_id
|
|
LEFT JOIN controllers c ON c.id = b.controller_id
|
|
WHERE b.date = ?
|
|
"""
|
|
params = [date]
|
|
if controller_filter:
|
|
sql += " AND b.controller_id = ?"
|
|
params.append(controller_filter)
|
|
if not include_filtered:
|
|
sql += " AND COALESCE(p.filtered, u.filtered, 0) = 0"
|
|
sql += " GROUP BY group_key ORDER BY first_ts ASC"
|
|
|
|
with get_db() as db:
|
|
rows = db.execute(sql, params).fetchall()
|
|
|
|
result = []
|
|
for r in rows:
|
|
first = r["first_ts"]
|
|
latest = r["latest_ts"]
|
|
sources_csv = r["sources"] or ""
|
|
ctrl_ids_csv = r["controller_ids"] or ""
|
|
actor_ids_csv = r["actor_ids"] or ""
|
|
sources_list = [s for s in sources_csv.split(",") if s]
|
|
ctrl_ids_list = [s for s in ctrl_ids_csv.split(",") if s]
|
|
actor_ids_list = [s for s in actor_ids_csv.split(",") if s]
|
|
result.append({
|
|
"person_id": r["person_id"],
|
|
"actor_id": actor_ids_list[0] if actor_ids_list else None,
|
|
"controller_id": ctrl_ids_list[0] if ctrl_ids_list else None,
|
|
"actor_ids": actor_ids_list,
|
|
"controller_ids": ctrl_ids_list,
|
|
"name": r["name"],
|
|
"source": sources_list[0] if sources_list else "—",
|
|
"sources": sources_list or ["—"],
|
|
"merged": bool(r["person_id"]),
|
|
"first_ts": first,
|
|
"latest_ts": latest if latest != first else None,
|
|
"status": "ON TIME" if first <= cutoff_end else "LATE",
|
|
"filtered": bool(r["filtered"]),
|
|
})
|
|
return jsonify(result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Webhook ingestion — per-controller endpoint, plus legacy compat alias
|
|
# ---------------------------------------------------------------------------
|
|
def _ingest_webhook(controller_id):
|
|
raw = request.get_data()
|
|
|
|
with get_db() as db:
|
|
c = db.execute(
|
|
"SELECT * FROM controllers WHERE id = ?", (controller_id,)
|
|
).fetchone()
|
|
if not c:
|
|
return jsonify({"error": "unknown controller"}), 404
|
|
|
|
if not verify_signature(c["webhook_secret"], raw, request.headers.get("Signature", "")):
|
|
log.warning("Signature mismatch for controller %s", c["name"])
|
|
return jsonify({"error": "invalid signature"}), 401
|
|
|
|
try:
|
|
payload = json.loads(raw)
|
|
except Exception:
|
|
return jsonify({"error": "bad json"}), 400
|
|
|
|
event = payload.get("event") or payload.get("event_object_id", "") or ""
|
|
data = payload.get("data") or {}
|
|
actor_obj = data.get("actor") or {}
|
|
actor = actor_obj.get("id")
|
|
|
|
if "access.door.unlock" not in str(event):
|
|
return jsonify({"status": "ignored"}), 200
|
|
if not actor:
|
|
return jsonify({"error": "no actor"}), 400
|
|
|
|
tz = pytz.timezone(TZ)
|
|
ts = None
|
|
|
|
top_ts_ms = payload.get("timestamp")
|
|
if isinstance(top_ts_ms, (int, float)) and top_ts_ms > 1e10:
|
|
ts = datetime.fromtimestamp(top_ts_ms / 1000.0, tz=pytz.utc)
|
|
|
|
if ts is None:
|
|
published = (data.get("event") or {}).get("published")
|
|
if isinstance(published, (int, float)) and published > 1e10:
|
|
ts = datetime.fromtimestamp(published / 1000.0, tz=pytz.utc)
|
|
|
|
if ts is None:
|
|
for field in ("created_at", "time", "occurred_at"):
|
|
raw_ts = payload.get(field)
|
|
if raw_ts:
|
|
try:
|
|
ts = datetime.fromisoformat(str(raw_ts).replace("Z", "+00:00"))
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
if ts is None:
|
|
ts = datetime.now(tz=tz)
|
|
|
|
ts_local = ts.astimezone(tz)
|
|
date = ts_local.strftime("%Y-%m-%d")
|
|
ts_str = ts_local.strftime("%H:%M:%S")
|
|
|
|
with get_db() as db:
|
|
db.execute(
|
|
"INSERT INTO badge_events (controller_id, actor_id, ts, date) VALUES (?, ?, ?, ?)",
|
|
(controller_id, actor, ts_str, date),
|
|
)
|
|
db.commit()
|
|
|
|
log.info(
|
|
"Badge-in: controller=%s actor=%s date=%s ts=%s",
|
|
c["name"], actor, date, ts_str,
|
|
)
|
|
return jsonify({"status": "ok"}), 200
|
|
|
|
|
|
@app.route("/api/unifi-access/<controller_id>", methods=["POST"])
|
|
def receive_webhook(controller_id):
|
|
return _ingest_webhook(controller_id)
|
|
|
|
|
|
@app.route("/api/unifi-access", methods=["POST"])
|
|
def receive_webhook_legacy():
|
|
"""Compat alias for installs registered before per-controller URLs existed.
|
|
Routes to the oldest controller (the env-seeded Default)."""
|
|
with get_db() as db:
|
|
row = db.execute(
|
|
"SELECT id FROM controllers ORDER BY created_at LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
return jsonify({"error": "no controllers configured"}), 404
|
|
return _ingest_webhook(row["id"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Controller management
|
|
# ---------------------------------------------------------------------------
|
|
@app.route("/api/controllers", methods=["GET"])
|
|
def list_controllers():
|
|
with get_db() as db:
|
|
rows = db.execute(
|
|
"SELECT * FROM controllers ORDER BY created_at"
|
|
).fetchall()
|
|
return jsonify([controller_to_dict(r) for r in rows])
|
|
|
|
|
|
@app.route("/api/controllers", methods=["POST"])
|
|
def add_controller():
|
|
body = request.get_json(silent=True) or {}
|
|
name = (body.get("name") or "").strip()
|
|
host = (body.get("host") or "").strip()
|
|
port = int(body.get("port") or 12445)
|
|
api_token = (body.get("api_token") or "").strip()
|
|
|
|
if not name or not host or not api_token:
|
|
return jsonify({"error": "name, host, and api_token are required"}), 400
|
|
|
|
controller_id = str(uuid.uuid4())
|
|
dashboard_base = resolve_dashboard_base()
|
|
endpoint_url = urljoin(dashboard_base + "/", f"api/unifi-access/{controller_id}")
|
|
|
|
try:
|
|
r = register_webhook(host, port, api_token, endpoint_url, f"Dashboard — {name}")
|
|
except Exception as e:
|
|
return jsonify({"error": f"webhook registration failed: {e}"}), 502
|
|
|
|
if r.status_code >= 300:
|
|
return jsonify({
|
|
"error": "webhook registration rejected by controller",
|
|
"status_code": r.status_code,
|
|
"response": r.text[:500],
|
|
}), 502
|
|
|
|
try:
|
|
payload = r.json()
|
|
except Exception:
|
|
return jsonify({"error": "unparseable controller response", "raw": r.text[:500]}), 502
|
|
|
|
data = payload.get("data") or {}
|
|
webhook_id = data.get("id", "")
|
|
webhook_secret = data.get("secret", "")
|
|
|
|
with get_db() as db:
|
|
db.execute(
|
|
"""
|
|
INSERT INTO controllers
|
|
(id, name, host, port, api_token, webhook_secret, webhook_id,
|
|
enabled, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?)
|
|
""",
|
|
(
|
|
controller_id, name, host, port, api_token,
|
|
webhook_secret, webhook_id,
|
|
datetime.now(timezone.utc).isoformat(),
|
|
),
|
|
)
|
|
db.commit()
|
|
|
|
sync_controller(controller_id)
|
|
|
|
with get_db() as db:
|
|
row = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone()
|
|
return jsonify(controller_to_dict(row)), 201
|
|
|
|
|
|
@app.route("/api/controllers/<controller_id>", methods=["PATCH"])
|
|
def update_controller(controller_id):
|
|
body = request.get_json(silent=True) or {}
|
|
fields, values = [], []
|
|
if "name" in body:
|
|
fields.append("name = ?"); values.append((body["name"] or "").strip())
|
|
if "enabled" in body:
|
|
fields.append("enabled = ?"); values.append(1 if body["enabled"] else 0)
|
|
if not fields:
|
|
return jsonify({"error": "no updatable fields provided"}), 400
|
|
values.append(controller_id)
|
|
|
|
with get_db() as db:
|
|
cur = db.execute(
|
|
f"UPDATE controllers SET {', '.join(fields)} WHERE id = ?", values
|
|
)
|
|
db.commit()
|
|
if cur.rowcount == 0:
|
|
return jsonify({"error": "not found"}), 404
|
|
row = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone()
|
|
return jsonify(controller_to_dict(row))
|
|
|
|
|
|
@app.route("/api/controllers/<controller_id>", methods=["DELETE"])
|
|
def remove_controller(controller_id):
|
|
with get_db() as db:
|
|
c = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone()
|
|
if not c:
|
|
return jsonify({"error": "not found"}), 404
|
|
|
|
delete_webhook(c["host"], c["port"], c["api_token"], c["webhook_id"])
|
|
|
|
with get_db() as db:
|
|
db.execute("DELETE FROM user_cache WHERE controller_id = ?", (controller_id,))
|
|
db.execute("DELETE FROM badge_events WHERE controller_id = ?", (controller_id,))
|
|
db.execute("DELETE FROM controllers WHERE id = ?", (controller_id,))
|
|
db.commit()
|
|
return jsonify({"status": "ok"})
|
|
|
|
|
|
@app.route("/api/controllers/<controller_id>/test", methods=["POST"])
|
|
def test_controller(controller_id):
|
|
with get_db() as db:
|
|
c = db.execute("SELECT * FROM controllers WHERE id = ?", (controller_id,)).fetchone()
|
|
if not c:
|
|
return jsonify({"error": "not found"}), 404
|
|
try:
|
|
r = fetch_controller_users(c["host"], c["port"], c["api_token"])
|
|
ok = r.status_code == 200
|
|
user_count = len(r.json().get("data", [])) if ok else None
|
|
return jsonify({
|
|
"ok": ok,
|
|
"status_code": r.status_code,
|
|
"user_count": user_count,
|
|
"message": "Connected" if ok else r.text[:200],
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"ok": False, "message": str(e)}), 200
|
|
|
|
|
|
@app.route("/api/controllers/<controller_id>/sync", methods=["POST"])
|
|
def sync_one(controller_id):
|
|
n = sync_controller(controller_id)
|
|
return jsonify({"status": "ok", "synced": n})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User cache (tenant filtering)
|
|
# ---------------------------------------------------------------------------
|
|
@app.route("/api/users", methods=["GET"])
|
|
def list_users():
|
|
controller_id = request.args.get("controller_id", "").strip() or None
|
|
filtered_arg = request.args.get("filtered")
|
|
|
|
sql = """
|
|
SELECT
|
|
u.controller_id,
|
|
u.actor_id,
|
|
u.full_name,
|
|
u.updated_at,
|
|
COALESCE(u.filtered, 0) AS filtered,
|
|
c.name AS controller_name
|
|
FROM user_cache u
|
|
LEFT JOIN controllers c ON c.id = u.controller_id
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
if controller_id:
|
|
sql += " AND u.controller_id = ?"
|
|
params.append(controller_id)
|
|
if filtered_arg in ("0", "1"):
|
|
sql += " AND COALESCE(u.filtered, 0) = ?"
|
|
params.append(int(filtered_arg))
|
|
sql += " ORDER BY c.name, u.full_name"
|
|
|
|
with get_db() as db:
|
|
rows = db.execute(sql, params).fetchall()
|
|
return jsonify([
|
|
{
|
|
"controller_id": r["controller_id"],
|
|
"controller_name": r["controller_name"] or "—",
|
|
"actor_id": r["actor_id"],
|
|
"full_name": r["full_name"],
|
|
"updated_at": r["updated_at"],
|
|
"filtered": bool(r["filtered"]),
|
|
}
|
|
for r in rows
|
|
])
|
|
|
|
|
|
@app.route("/api/users/<controller_id>/<actor_id>", methods=["PATCH"])
|
|
def update_user(controller_id, actor_id):
|
|
body = request.get_json(silent=True) or {}
|
|
if "filtered" not in body:
|
|
return jsonify({"error": "filtered field required"}), 400
|
|
filtered = 1 if body["filtered"] else 0
|
|
|
|
with get_db() as db:
|
|
ctrl = db.execute(
|
|
"SELECT id FROM controllers WHERE id = ?", (controller_id,)
|
|
).fetchone()
|
|
if not ctrl:
|
|
return jsonify({"error": "unknown controller"}), 404
|
|
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
placeholder = f"User {actor_id[:8]}"
|
|
db.execute(
|
|
"""
|
|
INSERT INTO user_cache (controller_id, actor_id, full_name, updated_at, filtered)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(controller_id, actor_id) DO UPDATE SET
|
|
filtered = excluded.filtered
|
|
""",
|
|
(controller_id, actor_id, placeholder, now_iso, filtered),
|
|
)
|
|
db.commit()
|
|
row = db.execute(
|
|
"""
|
|
SELECT u.controller_id, u.actor_id, u.full_name, u.updated_at,
|
|
COALESCE(u.filtered, 0) AS filtered, c.name AS controller_name
|
|
FROM user_cache u
|
|
LEFT JOIN controllers c ON c.id = u.controller_id
|
|
WHERE u.controller_id = ? AND u.actor_id = ?
|
|
""",
|
|
(controller_id, actor_id),
|
|
).fetchone()
|
|
|
|
return jsonify({
|
|
"controller_id": row["controller_id"],
|
|
"controller_name": row["controller_name"] or "—",
|
|
"actor_id": row["actor_id"],
|
|
"full_name": row["full_name"],
|
|
"updated_at": row["updated_at"],
|
|
"filtered": bool(row["filtered"]),
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persons (merge identities across controllers)
|
|
# ---------------------------------------------------------------------------
|
|
def _person_to_dict(db, person_row):
|
|
members = db.execute(
|
|
"""
|
|
SELECT pm.controller_id, pm.actor_id,
|
|
c.name AS controller_name,
|
|
u.full_name
|
|
FROM person_members pm
|
|
LEFT JOIN controllers c ON c.id = pm.controller_id
|
|
LEFT JOIN user_cache u
|
|
ON u.controller_id = pm.controller_id AND u.actor_id = pm.actor_id
|
|
WHERE pm.person_id = ?
|
|
ORDER BY c.name, u.full_name
|
|
""",
|
|
(person_row["id"],),
|
|
).fetchall()
|
|
return {
|
|
"id": person_row["id"],
|
|
"display_name": person_row["display_name"],
|
|
"filtered": bool(person_row["filtered"]),
|
|
"created_at": person_row["created_at"],
|
|
"members": [
|
|
{
|
|
"controller_id": m["controller_id"],
|
|
"controller_name": m["controller_name"] or "—",
|
|
"actor_id": m["actor_id"],
|
|
"full_name": m["full_name"] or f"User {m['actor_id'][:8]}",
|
|
}
|
|
for m in members
|
|
],
|
|
}
|
|
|
|
|
|
@app.route("/api/persons", methods=["GET"])
|
|
def list_persons():
|
|
with get_db() as db:
|
|
rows = db.execute("SELECT * FROM persons ORDER BY display_name").fetchall()
|
|
return jsonify([_person_to_dict(db, r) for r in rows])
|
|
|
|
|
|
@app.route("/api/persons", methods=["POST"])
|
|
def create_person():
|
|
body = request.get_json(silent=True) or {}
|
|
name = (body.get("display_name") or "").strip()
|
|
members = body.get("members") or []
|
|
if not name:
|
|
return jsonify({"error": "display_name is required"}), 400
|
|
if not isinstance(members, list) or len(members) < 1:
|
|
return jsonify({"error": "at least one member required"}), 400
|
|
|
|
cleaned = []
|
|
for m in members:
|
|
cid = (m.get("controller_id") or "").strip()
|
|
aid = (m.get("actor_id") or "").strip()
|
|
if not cid or not aid:
|
|
return jsonify({"error": "each member needs controller_id and actor_id"}), 400
|
|
cleaned.append((cid, aid))
|
|
|
|
person_id = str(uuid.uuid4())
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
|
|
with get_db() as db:
|
|
existing = db.execute(
|
|
f"""
|
|
SELECT controller_id, actor_id FROM person_members
|
|
WHERE (controller_id, actor_id) IN ({",".join(["(?,?)"] * len(cleaned))})
|
|
""",
|
|
[v for pair in cleaned for v in pair],
|
|
).fetchall()
|
|
if existing:
|
|
conflicts = [f"{r['controller_id']}/{r['actor_id'][:8]}" for r in existing]
|
|
return jsonify({
|
|
"error": "some members already belong to another person",
|
|
"conflicts": conflicts,
|
|
}), 409
|
|
|
|
db.execute(
|
|
"INSERT INTO persons (id, display_name, filtered, created_at) VALUES (?, ?, 0, ?)",
|
|
(person_id, name, now_iso),
|
|
)
|
|
db.executemany(
|
|
"INSERT INTO person_members (person_id, controller_id, actor_id) VALUES (?, ?, ?)",
|
|
[(person_id, cid, aid) for cid, aid in cleaned],
|
|
)
|
|
db.commit()
|
|
row = db.execute("SELECT * FROM persons WHERE id = ?", (person_id,)).fetchone()
|
|
return jsonify(_person_to_dict(db, row)), 201
|
|
|
|
|
|
@app.route("/api/persons/<person_id>", methods=["PATCH"])
|
|
def update_person(person_id):
|
|
body = request.get_json(silent=True) or {}
|
|
fields, values = [], []
|
|
if "display_name" in body:
|
|
name = (body["display_name"] or "").strip()
|
|
if not name:
|
|
return jsonify({"error": "display_name cannot be empty"}), 400
|
|
fields.append("display_name = ?"); values.append(name)
|
|
if "filtered" in body:
|
|
fields.append("filtered = ?"); values.append(1 if body["filtered"] else 0)
|
|
if not fields:
|
|
return jsonify({"error": "no updatable fields provided"}), 400
|
|
values.append(person_id)
|
|
|
|
with get_db() as db:
|
|
cur = db.execute(
|
|
f"UPDATE persons SET {', '.join(fields)} WHERE id = ?", values
|
|
)
|
|
db.commit()
|
|
if cur.rowcount == 0:
|
|
return jsonify({"error": "not found"}), 404
|
|
row = db.execute("SELECT * FROM persons WHERE id = ?", (person_id,)).fetchone()
|
|
return jsonify(_person_to_dict(db, row))
|
|
|
|
|
|
@app.route("/api/persons/<person_id>", methods=["DELETE"])
|
|
def delete_person(person_id):
|
|
with get_db() as db:
|
|
cur = db.execute("DELETE FROM persons WHERE id = ?", (person_id,))
|
|
db.commit()
|
|
if cur.rowcount == 0:
|
|
return jsonify({"error": "not found"}), 404
|
|
return jsonify({"status": "ok"})
|
|
|
|
|
|
@app.route("/api/persons/<person_id>/members", methods=["POST"])
|
|
def add_person_member(person_id):
|
|
body = request.get_json(silent=True) or {}
|
|
cid = (body.get("controller_id") or "").strip()
|
|
aid = (body.get("actor_id") or "").strip()
|
|
if not cid or not aid:
|
|
return jsonify({"error": "controller_id and actor_id are required"}), 400
|
|
|
|
with get_db() as db:
|
|
if not db.execute("SELECT 1 FROM persons WHERE id = ?", (person_id,)).fetchone():
|
|
return jsonify({"error": "person not found"}), 404
|
|
existing = db.execute(
|
|
"SELECT person_id FROM person_members WHERE controller_id = ? AND actor_id = ?",
|
|
(cid, aid),
|
|
).fetchone()
|
|
if existing and existing["person_id"] != person_id:
|
|
return jsonify({"error": "actor already belongs to another person"}), 409
|
|
db.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO person_members (person_id, controller_id, actor_id)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(person_id, cid, aid),
|
|
)
|
|
db.commit()
|
|
row = db.execute("SELECT * FROM persons WHERE id = ?", (person_id,)).fetchone()
|
|
return jsonify(_person_to_dict(db, row))
|
|
|
|
|
|
@app.route("/api/persons/<person_id>/members/<controller_id>/<actor_id>", methods=["DELETE"])
|
|
def remove_person_member(person_id, controller_id, actor_id):
|
|
with get_db() as db:
|
|
cur = db.execute(
|
|
"""
|
|
DELETE FROM person_members
|
|
WHERE person_id = ? AND controller_id = ? AND actor_id = ?
|
|
""",
|
|
(person_id, controller_id, actor_id),
|
|
)
|
|
if cur.rowcount == 0:
|
|
return jsonify({"error": "member not found"}), 404
|
|
# If that was the last member, drop the person too.
|
|
remaining = db.execute(
|
|
"SELECT COUNT(*) AS n FROM person_members WHERE person_id = ?", (person_id,)
|
|
).fetchone()["n"]
|
|
if remaining == 0:
|
|
db.execute("DELETE FROM persons WHERE id = ?", (person_id,))
|
|
db.commit()
|
|
return jsonify({"status": "ok", "person_dissolved": True})
|
|
db.commit()
|
|
row = db.execute("SELECT * FROM persons WHERE id = ?", (person_id,)).fetchone()
|
|
return jsonify(_person_to_dict(db, row))
|
|
|
|
|
|
@app.route("/api/persons/suggestions", methods=["GET"])
|
|
def person_suggestions():
|
|
"""Return groups of (controller, actor) rows that share an exact full_name
|
|
across different controllers and are NOT already part of a person."""
|
|
with get_db() as db:
|
|
rows = db.execute(
|
|
"""
|
|
SELECT u.controller_id, u.actor_id, u.full_name,
|
|
c.name AS controller_name
|
|
FROM user_cache u
|
|
LEFT JOIN controllers c ON c.id = u.controller_id
|
|
LEFT JOIN person_members pm
|
|
ON pm.controller_id = u.controller_id AND pm.actor_id = u.actor_id
|
|
WHERE pm.person_id IS NULL
|
|
AND u.full_name IS NOT NULL
|
|
AND TRIM(u.full_name) <> ''
|
|
AND u.full_name NOT LIKE 'User %'
|
|
ORDER BY LOWER(u.full_name), c.name
|
|
"""
|
|
).fetchall()
|
|
|
|
groups = {}
|
|
for r in rows:
|
|
key = r["full_name"].strip().lower()
|
|
groups.setdefault(key, []).append({
|
|
"controller_id": r["controller_id"],
|
|
"controller_name": r["controller_name"] or "—",
|
|
"actor_id": r["actor_id"],
|
|
"full_name": r["full_name"],
|
|
})
|
|
|
|
suggestions = []
|
|
for key, members in groups.items():
|
|
# Only suggest when the same name spans at least 2 distinct controllers
|
|
controllers = {m["controller_id"] for m in members}
|
|
if len(controllers) >= 2:
|
|
suggestions.append({
|
|
"display_name": members[0]["full_name"],
|
|
"members": members,
|
|
})
|
|
suggestions.sort(key=lambda s: s["display_name"].lower())
|
|
return jsonify(suggestions)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Misc admin
|
|
# ---------------------------------------------------------------------------
|
|
@app.route("/api/sync-users")
|
|
def manual_sync_all():
|
|
sync_all_controllers()
|
|
return jsonify({"status": "synced"})
|
|
|
|
|
|
@app.route("/api/reset-day", methods=["DELETE"])
|
|
def reset_day():
|
|
date = request.args.get("date", datetime.now(pytz.timezone(TZ)).strftime("%Y-%m-%d"))
|
|
controller_id = request.args.get("controller_id", "").strip() or None
|
|
sql = "DELETE FROM badge_events WHERE date = ?"
|
|
params = [date]
|
|
if controller_id:
|
|
sql += " AND controller_id = ?"
|
|
params.append(controller_id)
|
|
with get_db() as db:
|
|
cur = db.execute(sql, params)
|
|
db.commit()
|
|
return jsonify({"status": "ok", "deleted": cur.rowcount, "date": date})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Boot
|
|
# ---------------------------------------------------------------------------
|
|
with app.app_context():
|
|
init_db()
|
|
sync_all_controllers()
|
|
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(sync_all_controllers, "interval", hours=1)
|
|
scheduler.start()
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))
|