"""Background job worker. CAD processing is heavy and CPU-bound, so jobs run in a ThreadPoolExecutor (serialized by default, MAX_WORKERS=1) rather than blocking the event loop with FastAPI BackgroundTasks. Job state + results are persisted to SQLite as it runs, so a client polling GET /api/jobs/{id} sees live stage updates. """ import json import logging from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from sqlmodel import Session, delete from .config import MAX_WORKERS, MODELS_DIR from .db import engine from .models import BomRow, Job, Model from .processing import run_pipeline log = logging.getLogger("step_parser.worker") _executor = ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="cadjob") def _utcnow() -> datetime: return datetime.now(timezone.utc) def submit_job(job_id: int) -> None: _executor.submit(_run_job, job_id) def _run_job(job_id: int) -> None: with Session(engine) as s: job = s.get(Job, job_id) if job is None: log.error("job %s vanished before it ran", job_id) return model = s.get(Model, job.model_id) out_dir = MODELS_DIR / str(model.id) step_path = out_dir / model.original_filename options = json.loads(job.options or "{}") job.status = "running" job.stage = "loading" job.started_at = _utcnow() s.add(job) s.commit() def progress(stage: str) -> None: j = s.get(Job, job_id) j.stage = stage s.add(j) s.commit() try: meta = run_pipeline(step_path, out_dir, options, progress) model = s.get(Model, job.model_id) model.backend = meta["backend"] model.face_count = meta["face_count"] model.part_count = meta["part_count"] model.bbox_x_mm, model.bbox_y_mm, model.bbox_z_mm = meta["bbox"] model.has_chinese = meta["has_chinese"] s.add(model) # Replace any prior BOM rows for an idempotent re-run. s.exec(delete(BomRow).where(BomRow.model_id == model.id)) for row in meta["bom_rows"]: s.add(BomRow(model_id=model.id, **row)) job = s.get(Job, job_id) job.status = "done" job.stage = "done" job.artifacts = json.dumps(meta["artifacts"]) job.finished_at = _utcnow() s.add(job) s.commit() log.info("job %s done — %d artifacts", job_id, len(meta["artifacts"])) except Exception as e: # noqa: BLE001 — record any failure on the job log.exception("job %s failed", job_id) s.rollback() j = s.get(Job, job_id) j.status = "error" j.error = f"{type(e).__name__}: {e}" j.finished_at = _utcnow() s.add(j) s.commit()