From 571152ec39e4cd55393b51ae8d56f897f4091bc1 Mon Sep 17 00:00:00 2001 From: jason Date: Tue, 31 Mar 2026 09:06:15 -0500 Subject: [PATCH] Add Odoo MCP server with ProxyAwareTransport fix (v0.2.0) Fixes DNS resolution failure in Cowork sandbox caused by xmlrpc.client.SafeTransport ignoring HTTPS_PROXY environment variable. ProxyAwareTransport overrides request() to use urllib.request with ProxyHandler(), which correctly routes through localhost:3128. --- server/odoo_mcp.py | 676 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 676 insertions(+) create mode 100644 server/odoo_mcp.py diff --git a/server/odoo_mcp.py b/server/odoo_mcp.py new file mode 100644 index 0000000..3640fba --- /dev/null +++ b/server/odoo_mcp.py @@ -0,0 +1,676 @@ +#!/usr/bin/env python3 +""" +Odoo MCP Server for MPM +Connects to mpmedia.odoo.com via XML-RPC and exposes tools for +Products, Knowledge, Contacts, Sales, CRM, Project, Helpdesk, +Purchase, Inventory, Employees, and Knowledge Templates. +""" + +import os +import re +import xmlrpc.client +from typing import Optional +from mcp.server.fastmcp import FastMCP + +# ── Configuration ──────────────────────────────────────────────────────────── +ODOO_URL = os.environ.get("ODOO_URL", "https://mpmedia.odoo.com") +ODOO_DB = os.environ.get("ODOO_DB", "mpmedia-odoo-sh-main-13285275") +ODOO_USERNAME = os.environ.get("ODOO_USERNAME", "bgilliom@mpmedia.tv") +ODOO_API_KEY = os.environ.get("ODOO_API_KEY", "") + +import urllib.request +import urllib.error + + +class ProxyAwareTransport(xmlrpc.client.SafeTransport): + """xmlrpc transport that routes through the system HTTPS proxy. + + Python's xmlrpc.client.SafeTransport makes raw socket connections and + ignores HTTPS_PROXY / https_proxy environment variables entirely. + This transport replaces the low-level request method with urllib.request, + which correctly picks up the proxy environment variables set by the + Cowork sandbox (HTTPS_PROXY=http://localhost:3128). + """ + + def request(self, host, handler, request_body, verbose=False): + url = f"https://{host}{handler}" + headers = { + "Content-Type": "text/xml", + "Accept-Encoding": "identity", + "User-Agent": "xmlrpc-odoo-mpm/1.0", + } + req = urllib.request.Request(url, request_body, headers) + # build_opener with ProxyHandler picks up HTTPS_PROXY from environment + opener = urllib.request.build_opener(urllib.request.ProxyHandler()) + try: + with opener.open(req, timeout=30) as resp: + return self.parse_response(resp) + except urllib.error.HTTPError as e: + raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers)) + except urllib.error.URLError as e: + raise xmlrpc.client.ProtocolError(url, 0, str(e.reason), {}) + + +_proxy_transport = ProxyAwareTransport() + +# ── Odoo client ─────────────────────────────────────────────────────────────── +_uid: Optional[int] = None +_models = None + +def _connect(): + global _uid, _models + if _uid is not None: + return + common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common", transport=_proxy_transport) + _uid = common.authenticate(ODOO_DB, ODOO_USERNAME, ODOO_API_KEY, {}) + if not _uid: + raise RuntimeError("Odoo authentication failed. Check ODOO_USERNAME and ODOO_API_KEY.") + _models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object", transport=_proxy_transport) + +def _call(model: str, method: str, args=None, kwargs=None): + _connect() + return _models.execute_kw( + ODOO_DB, _uid, ODOO_API_KEY, + model, method, + args or [[]], + kwargs or {} + ) + +def _search_read(model, domain=None, fields=None, limit=20, offset=0, order=None): + kw = {"limit": limit, "offset": offset} + if fields: + kw["fields"] = fields + if order: + kw["order"] = order + return _call(model, "search_read", [domain or []], kw) + +def _read(model, ids, fields=None): + kw = {} + if fields: + kw["fields"] = fields + return _call(model, "read", [ids], kw) + +def _create(model, vals): + return _call(model, "create", [vals]) + +def _write(model, ids, vals): + return _call(model, "write", [[ids] if isinstance(ids, int) else ids, vals]) + +# ── FastMCP App ─────────────────────────────────────────────────────────────── +mcp = FastMCP("Odoo MPM") + + +# ════════════════════════════════════════════════════════════════════════════ +# PRODUCTS +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_products(query: str = "", limit: int = 20, product_type: str = "") -> list: + """Search products/product templates by name or internal reference. + product_type can be: 'consu' (consumable), 'service', 'product' (storable). + Returns id, name, default_code, type, list_price, categ_id.""" + domain = [] + if query: + domain.append(["name", "ilike", query]) + if product_type: + domain.append(["type", "=", product_type]) + return _search_read("product.template", domain, + ["id", "name", "default_code", "type", "list_price", "categ_id", "active"], + limit=limit, order="name asc") + +@mcp.tool() +def get_product(product_id: int) -> dict: + """Get full details of a product template by ID, including description, + variants, pricing, category, supplier info, and stock info.""" + r = _read("product.template", [product_id], + ["id", "name", "default_code", "type", "list_price", "standard_price", + "categ_id", "description", "description_sale", "description_purchase", + "uom_id", "uom_po_id", "active", "sale_ok", "purchase_ok", + "product_variant_count", "barcode"]) + return r[0] if r else {} + +@mcp.tool() +def get_product_stock(product_id: int) -> list: + """Get current stock quantities for a product (by product.template ID) + across all internal locations.""" + # Get all product.product IDs under this template + variants = _search_read("product.product", + [["product_tmpl_id", "=", product_id]], + ["id", "display_name"]) + if not variants: + return [] + variant_ids = [v["id"] for v in variants] + return _search_read("stock.quant", + [["product_id", "in", variant_ids], + ["location_id.usage", "=", "internal"]], + ["product_id", "location_id", "quantity", "reserved_quantity"], + limit=50) + + +# ════════════════════════════════════════════════════════════════════════════ +# KNOWLEDGE +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_knowledge_articles(query: str = "", limit: int = 20) -> list: + """Search Knowledge base articles by title. Returns id, name, parent article.""" + domain = [["name", "ilike", query]] if query else [] + return _search_read("knowledge.article", domain, + ["id", "name", "parent_id", "is_published", "write_date"], + limit=limit, order="write_date desc") + +@mcp.tool() +def get_knowledge_article(article_id: int) -> dict: + """Get the full content of a Knowledge article by ID. + Note: Inline base64 images are replaced with [embedded image] placeholders + to keep the response size manageable.""" + r = _read("knowledge.article", [article_id], + ["id", "name", "body", "parent_id", "child_ids", + "is_published", "write_date", "write_uid"]) + if not r: + return {} + article = r[0] + if article.get("body"): + article["body"] = re.sub( + r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', + 'src="[embedded image]"', + article["body"] + ) + return article + +@mcp.tool() +def create_knowledge_article(name: str, body: str, parent_id: int = None) -> int: + """Create a new Knowledge article. body is HTML. Returns new article ID.""" + vals = {"name": name, "body": body} + if parent_id: + vals["parent_id"] = parent_id + return _create("knowledge.article", vals) + +@mcp.tool() +def update_knowledge_article(article_id: int, name: str = "", body: str = "") -> bool: + """Update a Knowledge article's title and/or body (HTML).""" + vals = {} + if name: + vals["name"] = name + if body: + vals["body"] = body + if not vals: + return False + return _write("knowledge.article", article_id, vals) + +@mcp.tool() +def search_knowledge_templates(query: str = "", category: str = "", limit: int = 50) -> list: + """Search Knowledge Base article templates. + Optionally filter by template name or category name (e.g. 'Productivity', 'Sales', + 'Marketing', 'Company Organization', 'Product Management'). + Returns id, template_name, template_description, category, and sequence.""" + domain = [["is_template", "=", True]] + if query: + domain.append(["template_name", "ilike", query]) + if category: + domain.append(["template_category_id.name", "ilike", category]) + return _search_read("knowledge.article", domain, + ["id", "name", "template_name", "template_description", + "template_category_id", "template_sequence", "is_published"], + limit=limit, order="template_category_sequence asc, template_sequence asc") + +@mcp.tool() +def get_knowledge_template(template_id: int) -> dict: + """Get full details of a Knowledge Base article template by ID, including + the template body content. Inline base64 images are replaced with + [embedded image] placeholders.""" + r = _read("knowledge.article", [template_id], + ["id", "name", "template_name", "template_description", + "template_body", "template_preview", "template_category_id", + "template_sequence", "is_published"]) + if not r: + return {} + template = r[0] + if template.get("template_body"): + template["template_body"] = re.sub( + r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', + 'src="[embedded image]"', + template["template_body"] + ) + if template.get("template_preview"): + template["template_preview"] = re.sub( + r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"', + 'src="[embedded image]"', + template["template_preview"] + ) + return template + +@mcp.tool() +def list_knowledge_template_categories() -> list: + """List all Knowledge Base article template categories with their IDs and names.""" + return _search_read("knowledge.article.template.category", [], + ["id", "name", "sequence"], + limit=50, order="sequence asc") + + +# ════════════════════════════════════════════════════════════════════════════ +# CONTACTS +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_contacts(query: str = "", is_company: bool = None, limit: int = 20) -> list: + """Search contacts/partners by name, email, or phone. + Set is_company=True for companies only, False for individuals.""" + domain = [] + if query: + domain.append("|") + domain.append(["name", "ilike", query]) + domain.append("|") + domain.append(["email", "ilike", query]) + domain.append(["phone", "ilike", query]) + if is_company is not None: + domain.append(["is_company", "=", is_company]) + return _search_read("res.partner", domain, + ["id", "name", "email", "phone", "mobile", "is_company", + "street", "city", "state_id", "country_id", "website"], + limit=limit, order="name asc") + +@mcp.tool() +def get_contact(contact_id: int) -> dict: + """Get full details of a contact/partner by ID.""" + r = _read("res.partner", [contact_id], + ["id", "name", "email", "phone", "mobile", "is_company", "parent_id", + "street", "street2", "city", "state_id", "zip", "country_id", + "website", "comment", "category_id", "child_ids", "user_id"]) + return r[0] if r else {} + +@mcp.tool() +def create_contact(name: str, email: str = "", phone: str = "", + is_company: bool = False, parent_id: int = None, + street: str = "", city: str = "") -> int: + """Create a new contact. Returns the new contact ID.""" + vals = {"name": name, "is_company": is_company} + if email: vals["email"] = email + if phone: vals["phone"] = phone + if parent_id: vals["parent_id"] = parent_id + if street: vals["street"] = street + if city: vals["city"] = city + return _create("res.partner", vals) + + +# ════════════════════════════════════════════════════════════════════════════ +# SALES +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_sales_orders(query: str = "", state: str = "", limit: int = 20) -> list: + """Search sales orders by name or customer. State options: draft, sent, sale, done, cancel.""" + domain = [] + if query: + domain.append("|") + domain.append(["name", "ilike", query]) + domain.append(["partner_id.name", "ilike", query]) + if state: + domain.append(["state", "=", state]) + return _search_read("sale.order", domain, + ["id", "name", "partner_id", "state", "amount_total", + "date_order", "user_id", "validity_date"], + limit=limit, order="date_order desc") + +@mcp.tool() +def get_sales_order(order_id: int) -> dict: + """Get full details of a sales order including line items.""" + r = _read("sale.order", [order_id], + ["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax", + "amount_total", "date_order", "user_id", "note", "order_line", + "validity_date", "payment_term_id", "commitment_date"]) + if not r: + return {} + order = r[0] + if order.get("order_line"): + lines = _read("sale.order.line", order["order_line"], + ["id", "product_id", "name", "product_uom_qty", + "price_unit", "price_subtotal", "qty_delivered", "qty_invoiced"]) + order["lines"] = lines + return order + +@mcp.tool() +def create_sales_order(partner_id: int, lines: list) -> int: + """Create a sales order. lines is a list of dicts with keys: + product_id (int), product_uom_qty (float), price_unit (float). + Returns new order ID.""" + order_lines = [] + for l in lines: + order_lines.append((0, 0, { + "product_id": l["product_id"], + "product_uom_qty": l.get("product_uom_qty", 1), + "price_unit": l.get("price_unit", 0), + })) + return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines}) + + +# ════════════════════════════════════════════════════════════════════════════ +# CRM +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_crm_leads(query: str = "", stage: str = "", assigned_to_me: bool = False, + limit: int = 20) -> list: + """Search CRM leads/opportunities. Filter by name, stage name, or assigned to current user.""" + domain = [["type", "=", "opportunity"]] + if query: + domain.append("|") + domain.append(["name", "ilike", query]) + domain.append(["partner_id.name", "ilike", query]) + if stage: + domain.append(["stage_id.name", "ilike", stage]) + return _search_read("crm.lead", domain, + ["id", "name", "partner_id", "stage_id", "expected_revenue", + "probability", "user_id", "date_deadline", "priority"], + limit=limit, order="date_deadline asc") + +@mcp.tool() +def get_crm_lead(lead_id: int) -> dict: + """Get full details of a CRM lead/opportunity by ID.""" + r = _read("crm.lead", [lead_id], + ["id", "name", "type", "partner_id", "stage_id", "user_id", + "expected_revenue", "probability", "date_deadline", "priority", + "description", "phone", "email_from", "tag_ids", + "activity_ids", "date_conversion"]) + return r[0] if r else {} + +@mcp.tool() +def create_crm_lead(name: str, partner_id: int = None, expected_revenue: float = 0, + description: str = "", email: str = "", phone: str = "") -> int: + """Create a new CRM opportunity. Returns new lead ID.""" + vals = {"name": name, "type": "opportunity", "expected_revenue": expected_revenue} + if partner_id: vals["partner_id"] = partner_id + if description: vals["description"] = description + if email: vals["email_from"] = email + if phone: vals["phone"] = phone + return _create("crm.lead", vals) + +@mcp.tool() +def update_crm_lead(lead_id: int, stage_id: int = None, probability: float = None, + expected_revenue: float = None, note: str = "") -> bool: + """Update a CRM lead's stage, probability, revenue, or notes.""" + vals = {} + if stage_id is not None: vals["stage_id"] = stage_id + if probability is not None: vals["probability"] = probability + if expected_revenue is not None: vals["expected_revenue"] = expected_revenue + if note: vals["description"] = note + return _write("crm.lead", lead_id, vals) if vals else False + +@mcp.tool() +def list_crm_stages() -> list: + """List all CRM pipeline stages with their IDs and names.""" + return _search_read("crm.stage", [], ["id", "name", "sequence", "probability"], + limit=50, order="sequence asc") + + +# ════════════════════════════════════════════════════════════════════════════ +# PROJECT +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def list_projects(limit: int = 50) -> list: + """List all active projects with id, name, description, and task count.""" + return _search_read("project.project", [["active", "=", True]], + ["id", "name", "description", "task_count", "user_id", "date_start", "date"], + limit=limit, order="name asc") + +@mcp.tool() +def get_project(project_id: int) -> dict: + """Get full details of a project by ID.""" + r = _read("project.project", [project_id], + ["id", "name", "description", "user_id", "task_count", + "date_start", "date", "tag_ids", "privacy_visibility"]) + return r[0] if r else {} + +@mcp.tool() +def search_tasks(query: str = "", project_id: int = None, stage: str = "", + limit: int = 30) -> list: + """Search project tasks by title, project, or stage name.""" + domain = [] + if query: + domain.append(["name", "ilike", query]) + if project_id: + domain.append(["project_id", "=", project_id]) + if stage: + domain.append(["stage_id.name", "ilike", stage]) + return _search_read("project.task", domain, + ["id", "name", "project_id", "stage_id", "user_ids", + "date_deadline", "priority", "kanban_state", "tag_ids"], + limit=limit, order="date_deadline asc") + +@mcp.tool() +def get_task(task_id: int) -> dict: + """Get full details of a project task by ID including description and subtasks.""" + r = _read("project.task", [task_id], + ["id", "name", "project_id", "stage_id", "user_ids", "description", + "date_deadline", "priority", "kanban_state", "tag_ids", + "child_ids", "depend_on_ids", "planned_hours", "effective_hours"]) + return r[0] if r else {} + +@mcp.tool() +def create_task(name: str, project_id: int, description: str = "", + date_deadline: str = "", user_ids: list = None) -> int: + """Create a project task. date_deadline format: YYYY-MM-DD. Returns task ID.""" + vals = {"name": name, "project_id": project_id} + if description: vals["description"] = description + if date_deadline: vals["date_deadline"] = date_deadline + if user_ids: vals["user_ids"] = [(6, 0, user_ids)] + return _create("project.task", vals) + +@mcp.tool() +def update_task(task_id: int, name: str = "", stage_id: int = None, + description: str = "", date_deadline: str = "", + kanban_state: str = "") -> bool: + """Update a task. kanban_state: normal, done, blocked.""" + vals = {} + if name: vals["name"] = name + if stage_id: vals["stage_id"] = stage_id + if description: vals["description"] = description + if date_deadline: vals["date_deadline"] = date_deadline + if kanban_state: vals["kanban_state"] = kanban_state + return _write("project.task", task_id, vals) if vals else False + +@mcp.tool() +def list_task_stages(project_id: int = None) -> list: + """List task stages. Optionally filter by project.""" + domain = [] + if project_id: + domain.append(["project_ids", "in", [project_id]]) + return _search_read("project.task.type", domain, + ["id", "name", "sequence"], limit=50, order="sequence asc") + + +# ════════════════════════════════════════════════════════════════════════════ +# HELPDESK +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_helpdesk_tickets(query: str = "", stage: str = "", team: str = "", + created_after: str = "", created_before: str = "", + limit: int = 20) -> list: + """Search helpdesk tickets by name, stage, or team. + created_after and created_before accept datetime strings in 'YYYY-MM-DD' or + 'YYYY-MM-DD HH:MM:SS' format to filter by creation date. + Example: created_after='2026-03-29' returns tickets created in the last 24 hours.""" + domain = [] + if query: + domain.append(["name", "ilike", query]) + if stage: + domain.append(["stage_id.name", "ilike", stage]) + if team: + domain.append(["team_id.name", "ilike", team]) + if created_after: + domain.append(["create_date", ">=", created_after]) + if created_before: + domain.append(["create_date", "<=", created_before]) + return _search_read("helpdesk.ticket", domain, + ["id", "name", "partner_id", "stage_id", "team_id", + "user_id", "priority", "create_date"], + limit=limit, order="create_date desc") + +@mcp.tool() +def get_helpdesk_ticket(ticket_id: int) -> dict: + """Get full details of a helpdesk ticket by ID.""" + r = _read("helpdesk.ticket", [ticket_id], + ["id", "name", "partner_id", "stage_id", "team_id", "user_id", + "priority", "description", "create_date", + "date_last_stage_update", "kanban_state", "tag_ids"]) + return r[0] if r else {} + +@mcp.tool() +def create_helpdesk_ticket(name: str, description: str = "", + partner_id: int = None, team_id: int = None) -> int: + """Create a helpdesk ticket. Returns new ticket ID.""" + vals = {"name": name} + if description: vals["description"] = description + if partner_id: vals["partner_id"] = partner_id + if team_id: vals["team_id"] = team_id + return _create("helpdesk.ticket", vals) + +@mcp.tool() +def update_helpdesk_ticket(ticket_id: int, stage_id: int = None, + user_id: int = None, note: str = "") -> bool: + """Update a helpdesk ticket's stage, assignee, or add a note.""" + vals = {} + if stage_id: vals["stage_id"] = stage_id + if user_id: vals["user_id"] = user_id + if note: vals["description"] = note + return _write("helpdesk.ticket", ticket_id, vals) if vals else False + +@mcp.tool() +def list_helpdesk_teams() -> list: + """List all helpdesk teams.""" + return _search_read("helpdesk.team", [], ["id", "name", "description"], limit=20) + + +# ════════════════════════════════════════════════════════════════════════════ +# PURCHASE +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_purchase_orders(query: str = "", state: str = "", limit: int = 20) -> list: + """Search purchase orders by name or vendor. + State: draft, sent, to approve, purchase, done, cancel.""" + domain = [] + if query: + domain.append("|") + domain.append(["name", "ilike", query]) + domain.append(["partner_id.name", "ilike", query]) + if state: + domain.append(["state", "=", state]) + return _search_read("purchase.order", domain, + ["id", "name", "partner_id", "state", "amount_total", + "date_order", "user_id", "date_planned"], + limit=limit, order="date_order desc") + +@mcp.tool() +def get_purchase_order(order_id: int) -> dict: + """Get full details of a purchase order including line items.""" + r = _read("purchase.order", [order_id], + ["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax", + "amount_total", "date_order", "user_id", "notes", + "order_line", "date_planned", "payment_term_id"]) + if not r: + return {} + order = r[0] + if order.get("order_line"): + lines = _read("purchase.order.line", order["order_line"], + ["id", "product_id", "name", "product_qty", "price_unit", + "price_subtotal", "qty_received", "qty_invoiced", "date_planned"]) + order["lines"] = lines + return order + + +# ════════════════════════════════════════════════════════════════════════════ +# INVENTORY +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_inventory(query: str = "", location: str = "internal", + limit: int = 30) -> list: + """Search current stock levels by product name. + location: 'internal' (default), 'customer', 'supplier', 'transit'.""" + domain = [["location_id.usage", "=", location], ["quantity", ">", 0]] + if query: + domain.append(["product_id.name", "ilike", query]) + return _search_read("stock.quant", domain, + ["product_id", "location_id", "quantity", "reserved_quantity", + "lot_id", "package_id"], + limit=limit, order="product_id asc") + +@mcp.tool() +def get_stock_moves(product_id: int, limit: int = 20) -> list: + """Get recent stock moves for a product (product.product ID).""" + return _search_read("stock.move", [ + ["product_id", "=", product_id], + ["state", "=", "done"] + ], ["id", "name", "product_id", "product_uom_qty", "location_id", + "location_dest_id", "date", "picking_id", "origin"], + limit=limit, order="date desc") + +@mcp.tool() +def list_internal_locations() -> list: + """List all internal warehouse/stock locations.""" + return _search_read("stock.location", + [["usage", "=", "internal"], ["active", "=", True]], + ["id", "name", "complete_name", "location_id"], + limit=100, order="complete_name asc") + + +# ════════════════════════════════════════════════════════════════════════════ +# EMPLOYEES +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def search_employees(query: str = "", department: str = "", + job_title: str = "", limit: int = 30) -> list: + """Search employees by name, department, or job title.""" + domain = [["active", "=", True]] + if query: + domain.append(["name", "ilike", query]) + if department: + domain.append(["department_id.name", "ilike", department]) + if job_title: + domain.append(["job_title", "ilike", job_title]) + return _search_read("hr.employee", domain, + ["id", "name", "job_title", "department_id", "work_email", + "work_phone", "parent_id", "coach_id"], + limit=limit, order="name asc") + +@mcp.tool() +def get_employee(employee_id: int) -> dict: + """Get full details of an employee by ID.""" + r = _read("hr.employee", [employee_id], + ["id", "name", "job_title", "job_id", "department_id", "work_email", + "work_phone", "mobile_phone", "parent_id", "coach_id", + "address_id", "resource_calendar_id", "tz", + "birthday", "gender", "marital", "country_id"]) + return r[0] if r else {} + +@mcp.tool() +def list_departments() -> list: + """List all HR departments.""" + return _search_read("hr.department", [], + ["id", "name", "parent_id", "manager_id"], limit=50, order="name asc") + + +# ════════════════════════════════════════════════════════════════════════════ +# UTILITY +# ════════════════════════════════════════════════════════════════════════════ + +@mcp.tool() +def odoo_search(model: str, field: str, query: str, limit: int = 10) -> list: + """Generic search on any Odoo model. Searches field 'field' using ilike. + Useful for looking up stage IDs, category IDs, etc. + Example: odoo_search('project.task.type', 'name', 'In Progress')""" + return _search_read(model, [[field, "ilike", query]], limit=limit) + +@mcp.tool() +def odoo_get_record(model: str, record_id: int) -> dict: + """Fetch a single record from any Odoo model by ID. Returns all default fields.""" + r = _call(model, "read", [[record_id]], {}) + return r[0] if r else {} + + +if __name__ == "__main__": + mcp.run()