336 lines
13 KiB
Python
336 lines
13 KiB
Python
"""
|
||
query_engine.py — Natural language geometric query handler.
|
||
|
||
Supports both single-query (--query "...") and interactive REPL (--repl).
|
||
REPL keeps the model in memory between queries for speed.
|
||
All output is formatted ASCII tables.
|
||
|
||
Supported query types (see SKILL.md for full reference):
|
||
bounding box overall model extents
|
||
face count total faces by type
|
||
all parts full assembly listing
|
||
list all holes all cylindrical through-features
|
||
list all mounting holes cylinders dia < 15mm, axis ⊥ primary face
|
||
holes diameter N filter by diameter
|
||
wall thickness min distance between opposing parallel faces
|
||
largest face largest planar face area
|
||
help list supported queries
|
||
exit / quit exit REPL
|
||
"""
|
||
import logging
|
||
import re
|
||
import textwrap
|
||
from typing import Optional
|
||
|
||
from .loader import StepModel
|
||
|
||
logger = logging.getLogger("step_processor.query")
|
||
|
||
# Regex patterns for query routing
|
||
_BBOX_RE = re.compile(r"\b(bounding.?box|extents|dimensions|overall.?size)\b", re.I)
|
||
_FACECOUNT_RE = re.compile(r"\b(face.?count|how many faces|number of faces)\b", re.I)
|
||
_PARTS_RE = re.compile(r"\ball.?parts|part.?list|assembly|components\b", re.I)
|
||
_HOLES_RE = re.compile(r"\bholes?\b", re.I)
|
||
_MOUNTING_RE = re.compile(r"\bmounting\b", re.I)
|
||
_DIA_RE = re.compile(r"diameter\s+([\d.]+)\s*mm?", re.I)
|
||
_WALL_RE = re.compile(r"\bwall.?thickness\b", re.I)
|
||
_LARGEST_RE = re.compile(r"\blargest.?face\b", re.I)
|
||
_HELP_RE = re.compile(r"\bhelp\b", re.I)
|
||
_EXIT_RE = re.compile(r"\b(exit|quit|q)\b", re.I)
|
||
|
||
|
||
def run_query(model: StepModel, query: str) -> str:
|
||
"""Dispatch a query string and return formatted output."""
|
||
q = query.strip()
|
||
if _EXIT_RE.search(q):
|
||
return "EXIT"
|
||
if _HELP_RE.search(q):
|
||
return _help_text()
|
||
if _BBOX_RE.search(q):
|
||
return _query_bounding_box(model)
|
||
if _FACECOUNT_RE.search(q):
|
||
return _query_face_count(model)
|
||
if _PARTS_RE.search(q):
|
||
return _query_all_parts(model)
|
||
if _HOLES_RE.search(q):
|
||
dia_match = _DIA_RE.search(q)
|
||
dia_filter = float(dia_match.group(1)) if dia_match else None
|
||
mounting_only = bool(_MOUNTING_RE.search(q))
|
||
return _query_holes(model, mounting_only=mounting_only, dia_filter=dia_filter)
|
||
if _WALL_RE.search(q):
|
||
return _query_wall_thickness(model)
|
||
if _LARGEST_RE.search(q):
|
||
return _query_largest_face(model)
|
||
return (f"Query not recognized: '{q}'\n"
|
||
f"Type 'help' to see supported queries.")
|
||
|
||
|
||
def repl(model: StepModel, step_path):
|
||
"""Launch interactive REPL. Returns when user types exit/quit."""
|
||
print(f"\nSTEP Query REPL — {step_path.name}")
|
||
print("Type 'help' for supported queries, 'exit' to quit.\n")
|
||
while True:
|
||
try:
|
||
q = input("> ").strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
print()
|
||
break
|
||
if not q:
|
||
continue
|
||
result = run_query(model, q)
|
||
if result == "EXIT":
|
||
break
|
||
print(result)
|
||
print()
|
||
|
||
|
||
# ── Query implementations ─────────────────────────────────────────────────────
|
||
|
||
def _query_bounding_box(model: StepModel) -> str:
|
||
try:
|
||
if model.backend == "build123d":
|
||
bb = model.shape.bounding_box()
|
||
x = round(bb.size.X, 2)
|
||
y = round(bb.size.Y, 2)
|
||
z = round(bb.size.Z, 2)
|
||
else:
|
||
bb = model.shape.bounding_box
|
||
x = round(bb.XMax - bb.XMin, 2)
|
||
y = round(bb.YMax - bb.YMin, 2)
|
||
z = round(bb.ZMax - bb.ZMin, 2)
|
||
return _table(
|
||
f"BOUNDING BOX — {model.path.name}",
|
||
["Axis", "Dimension"],
|
||
[["Width (X)", f"{x} mm ({x/25.4:.3f} in)"],
|
||
["Depth (Y)", f"{y} mm ({y/25.4:.3f} in)"],
|
||
["Height (Z)", f"{z} mm ({z/25.4:.3f} in)"]]
|
||
)
|
||
except Exception as e:
|
||
return f"Bounding box query failed: {e}"
|
||
|
||
|
||
def _query_face_count(model: StepModel) -> str:
|
||
if model.backend != "build123d":
|
||
return f"Face count query requires build123d (loaded via {model.backend})"
|
||
try:
|
||
from build123d import Compound
|
||
all_faces = model.shape.faces()
|
||
planar = sum(1 for f in all_faces if f.geom_type() == "PLANE")
|
||
cylindrical = sum(1 for f in all_faces if f.geom_type() == "CYLINDER")
|
||
other = len(all_faces) - planar - cylindrical
|
||
return _table(
|
||
f"FACE COUNT — {model.path.name}",
|
||
["Type", "Count"],
|
||
[["Planar", str(planar)],
|
||
["Cylindrical", str(cylindrical)],
|
||
["Other", str(other)],
|
||
["Total", str(len(all_faces))]]
|
||
)
|
||
except Exception as e:
|
||
return f"Face count failed: {e}"
|
||
|
||
|
||
def _query_all_parts(model: StepModel) -> str:
|
||
if not model.parts:
|
||
return (f"No assembly structure found in {model.path.name}.\n"
|
||
f"File appears to be a single solid body.")
|
||
rows = []
|
||
for p in model.parts:
|
||
rows.append([
|
||
p.get("part_number", "—"),
|
||
p.get("name", "—"),
|
||
str(p.get("quantity", 1)),
|
||
str(p.get("level", 0)),
|
||
p.get("parent", ""),
|
||
])
|
||
return _table(
|
||
f"ALL PARTS — {model.path.name}",
|
||
["#", "Name", "Qty", "Level", "Parent"],
|
||
rows
|
||
) + f"\nTotal: {len(model.parts)} parts"
|
||
|
||
|
||
def _query_holes(model: StepModel, mounting_only: bool = False,
|
||
dia_filter: Optional[float] = None) -> str:
|
||
if model.backend != "build123d":
|
||
return f"Hole query requires build123d (loaded via {model.backend})"
|
||
try:
|
||
holes = _find_holes(model, mounting_only=mounting_only, dia_filter=dia_filter)
|
||
if not holes:
|
||
label = "mounting holes" if mounting_only else "holes"
|
||
qualifier = f" ≈{dia_filter}mm" if dia_filter else ""
|
||
return f"No {label}{qualifier} found in {model.path.name}."
|
||
header = "MOUNTING HOLES" if mounting_only else "ALL HOLES"
|
||
# Group by diameter bucket for summary view
|
||
from collections import Counter
|
||
dia_counts = Counter(round(h["dia"], 1) for h in holes)
|
||
MAX_ROWS = 50
|
||
display_holes = holes[:MAX_ROWS]
|
||
rows = []
|
||
for i, h in enumerate(display_holes, 1):
|
||
rows.append([
|
||
str(i),
|
||
f"{h['dia']:.2f} mm",
|
||
f"{h['depth']:.2f} mm" if h["depth"] else "—",
|
||
f"({h['x']:.1f}, {h['y']:.1f}, {h['z']:.1f})",
|
||
])
|
||
result = _table(
|
||
f"{header} — {model.path.name}",
|
||
["#", "Diameter", "Depth", "Position (x,y,z)"],
|
||
rows
|
||
)
|
||
result += f"\nShowing {len(display_holes)} of {len(holes)} unique hole locations"
|
||
result += "\n\nDIAMETER SUMMARY"
|
||
result += "\n" + "─" * 30
|
||
for dia, count in sorted(dia_counts.items()):
|
||
result += f"\n {dia:.1f} mm ×{count}"
|
||
result += "\n" + "─" * 30
|
||
return result
|
||
except Exception as e:
|
||
return f"Hole query failed: {e}"
|
||
|
||
|
||
def _find_holes(model: StepModel, mounting_only: bool, dia_filter):
|
||
"""Extract and deduplicate cylindrical faces from build123d model.
|
||
|
||
Deduplication: round axis position to 1mm grid, group by (dia_bucket, x, y, z).
|
||
This collapses multiple cylindrical faces from the same physical hole
|
||
(e.g. inner + outer surface of same cylinder) into one entry.
|
||
"""
|
||
from OCP.BRepAdaptor import BRepAdaptor_Surface
|
||
from OCP.GeomAbs import GeomAbs_Cylinder
|
||
|
||
seen = {} # key → best entry
|
||
try:
|
||
faces = model.shape.faces()
|
||
except Exception:
|
||
return []
|
||
|
||
for face in faces:
|
||
try:
|
||
adaptor = BRepAdaptor_Surface(face.wrapped)
|
||
if adaptor.GetType() != GeomAbs_Cylinder:
|
||
continue
|
||
cyl = adaptor.Cylinder()
|
||
r = cyl.Radius()
|
||
dia = round(r * 2, 2)
|
||
# Diameter filters
|
||
if mounting_only and dia > 15.0:
|
||
continue
|
||
if dia_filter and abs(dia - dia_filter) > 0.5:
|
||
continue
|
||
axis_pt = cyl.Location()
|
||
# Round to 1mm grid for deduplication
|
||
gx = round(axis_pt.X())
|
||
gy = round(axis_pt.Y())
|
||
gz = round(axis_pt.Z())
|
||
dia_bucket = round(dia, 1)
|
||
key = (dia_bucket, gx, gy, gz)
|
||
if key not in seen:
|
||
bb = face.bounding_box()
|
||
depth = round(max(bb.size.X, bb.size.Y, bb.size.Z), 2)
|
||
seen[key] = {
|
||
"dia": dia,
|
||
"depth": depth,
|
||
"x": round(axis_pt.X(), 1),
|
||
"y": round(axis_pt.Y(), 1),
|
||
"z": round(axis_pt.Z(), 1),
|
||
}
|
||
except Exception:
|
||
continue
|
||
return list(seen.values())
|
||
|
||
|
||
def _query_wall_thickness(model: StepModel) -> str:
|
||
if model.backend != "build123d":
|
||
return f"Wall thickness query requires build123d (loaded via {model.backend})"
|
||
try:
|
||
faces = model.shape.faces()
|
||
planar = [f for f in faces if f.geom_type() == "PLANE"]
|
||
if len(planar) < 2:
|
||
return "Insufficient planar faces to determine wall thickness."
|
||
# Heuristic: find minimum non-zero distance between parallel opposing faces
|
||
min_t = None
|
||
for i, f1 in enumerate(planar):
|
||
n1 = f1.normal_at()
|
||
for f2 in planar[i+1:]:
|
||
n2 = f2.normal_at()
|
||
# Parallel if normals are anti-parallel
|
||
dot = abs(n1.dot(n2))
|
||
if dot > 0.99:
|
||
c1 = f1.center()
|
||
c2 = f2.center()
|
||
dist = round(abs((c1 - c2).dot(n1)), 3)
|
||
if dist > 0.01:
|
||
if min_t is None or dist < min_t:
|
||
min_t = dist
|
||
if min_t is None:
|
||
return "Could not determine wall thickness from available faces."
|
||
return _table(
|
||
f"WALL THICKNESS — {model.path.name}",
|
||
["Measurement", "Value"],
|
||
[["Minimum wall thickness",
|
||
f"{min_t} mm ({min_t/25.4:.3f} in)"]]
|
||
)
|
||
except Exception as e:
|
||
return f"Wall thickness query failed: {e}"
|
||
|
||
|
||
def _query_largest_face(model: StepModel) -> str:
|
||
if model.backend != "build123d":
|
||
return f"Largest face query requires build123d (loaded via {model.backend})"
|
||
try:
|
||
faces = model.shape.faces()
|
||
planar = [(f, f.area()) for f in faces if f.geom_type() == "PLANE"]
|
||
if not planar:
|
||
return "No planar faces found."
|
||
largest, area = max(planar, key=lambda x: x[1])
|
||
bb = largest.bounding_box()
|
||
return _table(
|
||
f"LARGEST PLANAR FACE — {model.path.name}",
|
||
["Property", "Value"],
|
||
[["Area", f"{round(area, 2)} mm²"],
|
||
["Width", f"{round(bb.size.X, 2)} mm"],
|
||
["Height", f"{round(bb.size.Z, 2)} mm"]]
|
||
)
|
||
except Exception as e:
|
||
return f"Largest face query failed: {e}"
|
||
|
||
|
||
# ── Formatting helpers ─────────────────────────────────────────────────────────
|
||
|
||
def _table(title: str, headers: list, rows: list) -> str:
|
||
col_widths = [len(h) for h in headers]
|
||
for row in rows:
|
||
for i, cell in enumerate(row):
|
||
col_widths[i] = max(col_widths[i], len(str(cell)))
|
||
sep = "─" * (sum(col_widths) + 3 * len(headers) - 1)
|
||
lines = [title, sep]
|
||
header_line = " ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers))
|
||
lines.append(header_line)
|
||
lines.append(sep)
|
||
for row in rows:
|
||
lines.append(" ".join(str(cell).ljust(col_widths[i]) for i, cell in enumerate(row)))
|
||
lines.append(sep)
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _help_text() -> str:
|
||
return textwrap.dedent("""\
|
||
SUPPORTED QUERIES
|
||
─────────────────────────────────────────────────────────────
|
||
bounding box Overall model extents (W×D×H in mm)
|
||
face count Faces by type (planar, cylindrical, other)
|
||
all parts Full assembly listing with quantities
|
||
list all holes All cylindrical through-features
|
||
list all mounting holes Holes smaller than 15mm diameter
|
||
holes diameter 4.2mm Filter holes by specific diameter
|
||
wall thickness Minimum wall thickness estimate
|
||
largest face Largest planar face area
|
||
help Show this message
|
||
exit Exit the REPL
|
||
─────────────────────────────────────────────────────────────
|
||
Tip: geometry queries require build123d backend.
|
||
If the file loaded via FreeCAD fallback, only bounding box
|
||
and parts list are available.""")
|