Add Odoo MCP server with ProxyAwareTransport fix (v0.2.0)
Fixes DNS resolution failure in Cowork sandbox caused by xmlrpc.client.SafeTransport ignoring HTTPS_PROXY environment variable. ProxyAwareTransport overrides request() to use urllib.request with ProxyHandler(), which correctly routes through localhost:3128.
This commit is contained in:
@@ -0,0 +1,676 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Odoo MCP Server for MPM
|
||||
Connects to mpmedia.odoo.com via XML-RPC and exposes tools for
|
||||
Products, Knowledge, Contacts, Sales, CRM, Project, Helpdesk,
|
||||
Purchase, Inventory, Employees, and Knowledge Templates.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import xmlrpc.client
|
||||
from typing import Optional
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
# ── Configuration ────────────────────────────────────────────────────────────
|
||||
ODOO_URL = os.environ.get("ODOO_URL", "https://mpmedia.odoo.com")
|
||||
ODOO_DB = os.environ.get("ODOO_DB", "mpmedia-odoo-sh-main-13285275")
|
||||
ODOO_USERNAME = os.environ.get("ODOO_USERNAME", "bgilliom@mpmedia.tv")
|
||||
ODOO_API_KEY = os.environ.get("ODOO_API_KEY", "")
|
||||
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
|
||||
class ProxyAwareTransport(xmlrpc.client.SafeTransport):
|
||||
"""xmlrpc transport that routes through the system HTTPS proxy.
|
||||
|
||||
Python's xmlrpc.client.SafeTransport makes raw socket connections and
|
||||
ignores HTTPS_PROXY / https_proxy environment variables entirely.
|
||||
This transport replaces the low-level request method with urllib.request,
|
||||
which correctly picks up the proxy environment variables set by the
|
||||
Cowork sandbox (HTTPS_PROXY=http://localhost:3128).
|
||||
"""
|
||||
|
||||
def request(self, host, handler, request_body, verbose=False):
|
||||
url = f"https://{host}{handler}"
|
||||
headers = {
|
||||
"Content-Type": "text/xml",
|
||||
"Accept-Encoding": "identity",
|
||||
"User-Agent": "xmlrpc-odoo-mpm/1.0",
|
||||
}
|
||||
req = urllib.request.Request(url, request_body, headers)
|
||||
# build_opener with ProxyHandler picks up HTTPS_PROXY from environment
|
||||
opener = urllib.request.build_opener(urllib.request.ProxyHandler())
|
||||
try:
|
||||
with opener.open(req, timeout=30) as resp:
|
||||
return self.parse_response(resp)
|
||||
except urllib.error.HTTPError as e:
|
||||
raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers))
|
||||
except urllib.error.URLError as e:
|
||||
raise xmlrpc.client.ProtocolError(url, 0, str(e.reason), {})
|
||||
|
||||
|
||||
_proxy_transport = ProxyAwareTransport()
|
||||
|
||||
# ── Odoo client ───────────────────────────────────────────────────────────────
|
||||
_uid: Optional[int] = None
|
||||
_models = None
|
||||
|
||||
def _connect():
|
||||
global _uid, _models
|
||||
if _uid is not None:
|
||||
return
|
||||
common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common", transport=_proxy_transport)
|
||||
_uid = common.authenticate(ODOO_DB, ODOO_USERNAME, ODOO_API_KEY, {})
|
||||
if not _uid:
|
||||
raise RuntimeError("Odoo authentication failed. Check ODOO_USERNAME and ODOO_API_KEY.")
|
||||
_models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object", transport=_proxy_transport)
|
||||
|
||||
def _call(model: str, method: str, args=None, kwargs=None):
|
||||
_connect()
|
||||
return _models.execute_kw(
|
||||
ODOO_DB, _uid, ODOO_API_KEY,
|
||||
model, method,
|
||||
args or [[]],
|
||||
kwargs or {}
|
||||
)
|
||||
|
||||
def _search_read(model, domain=None, fields=None, limit=20, offset=0, order=None):
|
||||
kw = {"limit": limit, "offset": offset}
|
||||
if fields:
|
||||
kw["fields"] = fields
|
||||
if order:
|
||||
kw["order"] = order
|
||||
return _call(model, "search_read", [domain or []], kw)
|
||||
|
||||
def _read(model, ids, fields=None):
|
||||
kw = {}
|
||||
if fields:
|
||||
kw["fields"] = fields
|
||||
return _call(model, "read", [ids], kw)
|
||||
|
||||
def _create(model, vals):
|
||||
return _call(model, "create", [vals])
|
||||
|
||||
def _write(model, ids, vals):
|
||||
return _call(model, "write", [[ids] if isinstance(ids, int) else ids, vals])
|
||||
|
||||
# ── FastMCP App ───────────────────────────────────────────────────────────────
|
||||
mcp = FastMCP("Odoo MPM")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# PRODUCTS
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_products(query: str = "", limit: int = 20, product_type: str = "") -> list:
|
||||
"""Search products/product templates by name or internal reference.
|
||||
product_type can be: 'consu' (consumable), 'service', 'product' (storable).
|
||||
Returns id, name, default_code, type, list_price, categ_id."""
|
||||
domain = []
|
||||
if query:
|
||||
domain.append(["name", "ilike", query])
|
||||
if product_type:
|
||||
domain.append(["type", "=", product_type])
|
||||
return _search_read("product.template", domain,
|
||||
["id", "name", "default_code", "type", "list_price", "categ_id", "active"],
|
||||
limit=limit, order="name asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_product(product_id: int) -> dict:
|
||||
"""Get full details of a product template by ID, including description,
|
||||
variants, pricing, category, supplier info, and stock info."""
|
||||
r = _read("product.template", [product_id],
|
||||
["id", "name", "default_code", "type", "list_price", "standard_price",
|
||||
"categ_id", "description", "description_sale", "description_purchase",
|
||||
"uom_id", "uom_po_id", "active", "sale_ok", "purchase_ok",
|
||||
"product_variant_count", "barcode"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def get_product_stock(product_id: int) -> list:
|
||||
"""Get current stock quantities for a product (by product.template ID)
|
||||
across all internal locations."""
|
||||
# Get all product.product IDs under this template
|
||||
variants = _search_read("product.product",
|
||||
[["product_tmpl_id", "=", product_id]],
|
||||
["id", "display_name"])
|
||||
if not variants:
|
||||
return []
|
||||
variant_ids = [v["id"] for v in variants]
|
||||
return _search_read("stock.quant",
|
||||
[["product_id", "in", variant_ids],
|
||||
["location_id.usage", "=", "internal"]],
|
||||
["product_id", "location_id", "quantity", "reserved_quantity"],
|
||||
limit=50)
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# KNOWLEDGE
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_knowledge_articles(query: str = "", limit: int = 20) -> list:
|
||||
"""Search Knowledge base articles by title. Returns id, name, parent article."""
|
||||
domain = [["name", "ilike", query]] if query else []
|
||||
return _search_read("knowledge.article", domain,
|
||||
["id", "name", "parent_id", "is_published", "write_date"],
|
||||
limit=limit, order="write_date desc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_knowledge_article(article_id: int) -> dict:
|
||||
"""Get the full content of a Knowledge article by ID.
|
||||
Note: Inline base64 images are replaced with [embedded image] placeholders
|
||||
to keep the response size manageable."""
|
||||
r = _read("knowledge.article", [article_id],
|
||||
["id", "name", "body", "parent_id", "child_ids",
|
||||
"is_published", "write_date", "write_uid"])
|
||||
if not r:
|
||||
return {}
|
||||
article = r[0]
|
||||
if article.get("body"):
|
||||
article["body"] = re.sub(
|
||||
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
||||
'src="[embedded image]"',
|
||||
article["body"]
|
||||
)
|
||||
return article
|
||||
|
||||
@mcp.tool()
|
||||
def create_knowledge_article(name: str, body: str, parent_id: int = None) -> int:
|
||||
"""Create a new Knowledge article. body is HTML. Returns new article ID."""
|
||||
vals = {"name": name, "body": body}
|
||||
if parent_id:
|
||||
vals["parent_id"] = parent_id
|
||||
return _create("knowledge.article", vals)
|
||||
|
||||
@mcp.tool()
|
||||
def update_knowledge_article(article_id: int, name: str = "", body: str = "") -> bool:
|
||||
"""Update a Knowledge article's title and/or body (HTML)."""
|
||||
vals = {}
|
||||
if name:
|
||||
vals["name"] = name
|
||||
if body:
|
||||
vals["body"] = body
|
||||
if not vals:
|
||||
return False
|
||||
return _write("knowledge.article", article_id, vals)
|
||||
|
||||
@mcp.tool()
|
||||
def search_knowledge_templates(query: str = "", category: str = "", limit: int = 50) -> list:
|
||||
"""Search Knowledge Base article templates.
|
||||
Optionally filter by template name or category name (e.g. 'Productivity', 'Sales',
|
||||
'Marketing', 'Company Organization', 'Product Management').
|
||||
Returns id, template_name, template_description, category, and sequence."""
|
||||
domain = [["is_template", "=", True]]
|
||||
if query:
|
||||
domain.append(["template_name", "ilike", query])
|
||||
if category:
|
||||
domain.append(["template_category_id.name", "ilike", category])
|
||||
return _search_read("knowledge.article", domain,
|
||||
["id", "name", "template_name", "template_description",
|
||||
"template_category_id", "template_sequence", "is_published"],
|
||||
limit=limit, order="template_category_sequence asc, template_sequence asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_knowledge_template(template_id: int) -> dict:
|
||||
"""Get full details of a Knowledge Base article template by ID, including
|
||||
the template body content. Inline base64 images are replaced with
|
||||
[embedded image] placeholders."""
|
||||
r = _read("knowledge.article", [template_id],
|
||||
["id", "name", "template_name", "template_description",
|
||||
"template_body", "template_preview", "template_category_id",
|
||||
"template_sequence", "is_published"])
|
||||
if not r:
|
||||
return {}
|
||||
template = r[0]
|
||||
if template.get("template_body"):
|
||||
template["template_body"] = re.sub(
|
||||
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
||||
'src="[embedded image]"',
|
||||
template["template_body"]
|
||||
)
|
||||
if template.get("template_preview"):
|
||||
template["template_preview"] = re.sub(
|
||||
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
|
||||
'src="[embedded image]"',
|
||||
template["template_preview"]
|
||||
)
|
||||
return template
|
||||
|
||||
@mcp.tool()
|
||||
def list_knowledge_template_categories() -> list:
|
||||
"""List all Knowledge Base article template categories with their IDs and names."""
|
||||
return _search_read("knowledge.article.template.category", [],
|
||||
["id", "name", "sequence"],
|
||||
limit=50, order="sequence asc")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# CONTACTS
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_contacts(query: str = "", is_company: bool = None, limit: int = 20) -> list:
|
||||
"""Search contacts/partners by name, email, or phone.
|
||||
Set is_company=True for companies only, False for individuals."""
|
||||
domain = []
|
||||
if query:
|
||||
domain.append("|")
|
||||
domain.append(["name", "ilike", query])
|
||||
domain.append("|")
|
||||
domain.append(["email", "ilike", query])
|
||||
domain.append(["phone", "ilike", query])
|
||||
if is_company is not None:
|
||||
domain.append(["is_company", "=", is_company])
|
||||
return _search_read("res.partner", domain,
|
||||
["id", "name", "email", "phone", "mobile", "is_company",
|
||||
"street", "city", "state_id", "country_id", "website"],
|
||||
limit=limit, order="name asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_contact(contact_id: int) -> dict:
|
||||
"""Get full details of a contact/partner by ID."""
|
||||
r = _read("res.partner", [contact_id],
|
||||
["id", "name", "email", "phone", "mobile", "is_company", "parent_id",
|
||||
"street", "street2", "city", "state_id", "zip", "country_id",
|
||||
"website", "comment", "category_id", "child_ids", "user_id"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def create_contact(name: str, email: str = "", phone: str = "",
|
||||
is_company: bool = False, parent_id: int = None,
|
||||
street: str = "", city: str = "") -> int:
|
||||
"""Create a new contact. Returns the new contact ID."""
|
||||
vals = {"name": name, "is_company": is_company}
|
||||
if email: vals["email"] = email
|
||||
if phone: vals["phone"] = phone
|
||||
if parent_id: vals["parent_id"] = parent_id
|
||||
if street: vals["street"] = street
|
||||
if city: vals["city"] = city
|
||||
return _create("res.partner", vals)
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# SALES
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_sales_orders(query: str = "", state: str = "", limit: int = 20) -> list:
|
||||
"""Search sales orders by name or customer. State options: draft, sent, sale, done, cancel."""
|
||||
domain = []
|
||||
if query:
|
||||
domain.append("|")
|
||||
domain.append(["name", "ilike", query])
|
||||
domain.append(["partner_id.name", "ilike", query])
|
||||
if state:
|
||||
domain.append(["state", "=", state])
|
||||
return _search_read("sale.order", domain,
|
||||
["id", "name", "partner_id", "state", "amount_total",
|
||||
"date_order", "user_id", "validity_date"],
|
||||
limit=limit, order="date_order desc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_sales_order(order_id: int) -> dict:
|
||||
"""Get full details of a sales order including line items."""
|
||||
r = _read("sale.order", [order_id],
|
||||
["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax",
|
||||
"amount_total", "date_order", "user_id", "note", "order_line",
|
||||
"validity_date", "payment_term_id", "commitment_date"])
|
||||
if not r:
|
||||
return {}
|
||||
order = r[0]
|
||||
if order.get("order_line"):
|
||||
lines = _read("sale.order.line", order["order_line"],
|
||||
["id", "product_id", "name", "product_uom_qty",
|
||||
"price_unit", "price_subtotal", "qty_delivered", "qty_invoiced"])
|
||||
order["lines"] = lines
|
||||
return order
|
||||
|
||||
@mcp.tool()
|
||||
def create_sales_order(partner_id: int, lines: list) -> int:
|
||||
"""Create a sales order. lines is a list of dicts with keys:
|
||||
product_id (int), product_uom_qty (float), price_unit (float).
|
||||
Returns new order ID."""
|
||||
order_lines = []
|
||||
for l in lines:
|
||||
order_lines.append((0, 0, {
|
||||
"product_id": l["product_id"],
|
||||
"product_uom_qty": l.get("product_uom_qty", 1),
|
||||
"price_unit": l.get("price_unit", 0),
|
||||
}))
|
||||
return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines})
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# CRM
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_crm_leads(query: str = "", stage: str = "", assigned_to_me: bool = False,
|
||||
limit: int = 20) -> list:
|
||||
"""Search CRM leads/opportunities. Filter by name, stage name, or assigned to current user."""
|
||||
domain = [["type", "=", "opportunity"]]
|
||||
if query:
|
||||
domain.append("|")
|
||||
domain.append(["name", "ilike", query])
|
||||
domain.append(["partner_id.name", "ilike", query])
|
||||
if stage:
|
||||
domain.append(["stage_id.name", "ilike", stage])
|
||||
return _search_read("crm.lead", domain,
|
||||
["id", "name", "partner_id", "stage_id", "expected_revenue",
|
||||
"probability", "user_id", "date_deadline", "priority"],
|
||||
limit=limit, order="date_deadline asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_crm_lead(lead_id: int) -> dict:
|
||||
"""Get full details of a CRM lead/opportunity by ID."""
|
||||
r = _read("crm.lead", [lead_id],
|
||||
["id", "name", "type", "partner_id", "stage_id", "user_id",
|
||||
"expected_revenue", "probability", "date_deadline", "priority",
|
||||
"description", "phone", "email_from", "tag_ids",
|
||||
"activity_ids", "date_conversion"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def create_crm_lead(name: str, partner_id: int = None, expected_revenue: float = 0,
|
||||
description: str = "", email: str = "", phone: str = "") -> int:
|
||||
"""Create a new CRM opportunity. Returns new lead ID."""
|
||||
vals = {"name": name, "type": "opportunity", "expected_revenue": expected_revenue}
|
||||
if partner_id: vals["partner_id"] = partner_id
|
||||
if description: vals["description"] = description
|
||||
if email: vals["email_from"] = email
|
||||
if phone: vals["phone"] = phone
|
||||
return _create("crm.lead", vals)
|
||||
|
||||
@mcp.tool()
|
||||
def update_crm_lead(lead_id: int, stage_id: int = None, probability: float = None,
|
||||
expected_revenue: float = None, note: str = "") -> bool:
|
||||
"""Update a CRM lead's stage, probability, revenue, or notes."""
|
||||
vals = {}
|
||||
if stage_id is not None: vals["stage_id"] = stage_id
|
||||
if probability is not None: vals["probability"] = probability
|
||||
if expected_revenue is not None: vals["expected_revenue"] = expected_revenue
|
||||
if note: vals["description"] = note
|
||||
return _write("crm.lead", lead_id, vals) if vals else False
|
||||
|
||||
@mcp.tool()
|
||||
def list_crm_stages() -> list:
|
||||
"""List all CRM pipeline stages with their IDs and names."""
|
||||
return _search_read("crm.stage", [], ["id", "name", "sequence", "probability"],
|
||||
limit=50, order="sequence asc")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# PROJECT
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def list_projects(limit: int = 50) -> list:
|
||||
"""List all active projects with id, name, description, and task count."""
|
||||
return _search_read("project.project", [["active", "=", True]],
|
||||
["id", "name", "description", "task_count", "user_id", "date_start", "date"],
|
||||
limit=limit, order="name asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_project(project_id: int) -> dict:
|
||||
"""Get full details of a project by ID."""
|
||||
r = _read("project.project", [project_id],
|
||||
["id", "name", "description", "user_id", "task_count",
|
||||
"date_start", "date", "tag_ids", "privacy_visibility"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def search_tasks(query: str = "", project_id: int = None, stage: str = "",
|
||||
limit: int = 30) -> list:
|
||||
"""Search project tasks by title, project, or stage name."""
|
||||
domain = []
|
||||
if query:
|
||||
domain.append(["name", "ilike", query])
|
||||
if project_id:
|
||||
domain.append(["project_id", "=", project_id])
|
||||
if stage:
|
||||
domain.append(["stage_id.name", "ilike", stage])
|
||||
return _search_read("project.task", domain,
|
||||
["id", "name", "project_id", "stage_id", "user_ids",
|
||||
"date_deadline", "priority", "kanban_state", "tag_ids"],
|
||||
limit=limit, order="date_deadline asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_task(task_id: int) -> dict:
|
||||
"""Get full details of a project task by ID including description and subtasks."""
|
||||
r = _read("project.task", [task_id],
|
||||
["id", "name", "project_id", "stage_id", "user_ids", "description",
|
||||
"date_deadline", "priority", "kanban_state", "tag_ids",
|
||||
"child_ids", "depend_on_ids", "planned_hours", "effective_hours"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def create_task(name: str, project_id: int, description: str = "",
|
||||
date_deadline: str = "", user_ids: list = None) -> int:
|
||||
"""Create a project task. date_deadline format: YYYY-MM-DD. Returns task ID."""
|
||||
vals = {"name": name, "project_id": project_id}
|
||||
if description: vals["description"] = description
|
||||
if date_deadline: vals["date_deadline"] = date_deadline
|
||||
if user_ids: vals["user_ids"] = [(6, 0, user_ids)]
|
||||
return _create("project.task", vals)
|
||||
|
||||
@mcp.tool()
|
||||
def update_task(task_id: int, name: str = "", stage_id: int = None,
|
||||
description: str = "", date_deadline: str = "",
|
||||
kanban_state: str = "") -> bool:
|
||||
"""Update a task. kanban_state: normal, done, blocked."""
|
||||
vals = {}
|
||||
if name: vals["name"] = name
|
||||
if stage_id: vals["stage_id"] = stage_id
|
||||
if description: vals["description"] = description
|
||||
if date_deadline: vals["date_deadline"] = date_deadline
|
||||
if kanban_state: vals["kanban_state"] = kanban_state
|
||||
return _write("project.task", task_id, vals) if vals else False
|
||||
|
||||
@mcp.tool()
|
||||
def list_task_stages(project_id: int = None) -> list:
|
||||
"""List task stages. Optionally filter by project."""
|
||||
domain = []
|
||||
if project_id:
|
||||
domain.append(["project_ids", "in", [project_id]])
|
||||
return _search_read("project.task.type", domain,
|
||||
["id", "name", "sequence"], limit=50, order="sequence asc")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# HELPDESK
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_helpdesk_tickets(query: str = "", stage: str = "", team: str = "",
|
||||
created_after: str = "", created_before: str = "",
|
||||
limit: int = 20) -> list:
|
||||
"""Search helpdesk tickets by name, stage, or team.
|
||||
created_after and created_before accept datetime strings in 'YYYY-MM-DD' or
|
||||
'YYYY-MM-DD HH:MM:SS' format to filter by creation date.
|
||||
Example: created_after='2026-03-29' returns tickets created in the last 24 hours."""
|
||||
domain = []
|
||||
if query:
|
||||
domain.append(["name", "ilike", query])
|
||||
if stage:
|
||||
domain.append(["stage_id.name", "ilike", stage])
|
||||
if team:
|
||||
domain.append(["team_id.name", "ilike", team])
|
||||
if created_after:
|
||||
domain.append(["create_date", ">=", created_after])
|
||||
if created_before:
|
||||
domain.append(["create_date", "<=", created_before])
|
||||
return _search_read("helpdesk.ticket", domain,
|
||||
["id", "name", "partner_id", "stage_id", "team_id",
|
||||
"user_id", "priority", "create_date"],
|
||||
limit=limit, order="create_date desc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_helpdesk_ticket(ticket_id: int) -> dict:
|
||||
"""Get full details of a helpdesk ticket by ID."""
|
||||
r = _read("helpdesk.ticket", [ticket_id],
|
||||
["id", "name", "partner_id", "stage_id", "team_id", "user_id",
|
||||
"priority", "description", "create_date",
|
||||
"date_last_stage_update", "kanban_state", "tag_ids"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def create_helpdesk_ticket(name: str, description: str = "",
|
||||
partner_id: int = None, team_id: int = None) -> int:
|
||||
"""Create a helpdesk ticket. Returns new ticket ID."""
|
||||
vals = {"name": name}
|
||||
if description: vals["description"] = description
|
||||
if partner_id: vals["partner_id"] = partner_id
|
||||
if team_id: vals["team_id"] = team_id
|
||||
return _create("helpdesk.ticket", vals)
|
||||
|
||||
@mcp.tool()
|
||||
def update_helpdesk_ticket(ticket_id: int, stage_id: int = None,
|
||||
user_id: int = None, note: str = "") -> bool:
|
||||
"""Update a helpdesk ticket's stage, assignee, or add a note."""
|
||||
vals = {}
|
||||
if stage_id: vals["stage_id"] = stage_id
|
||||
if user_id: vals["user_id"] = user_id
|
||||
if note: vals["description"] = note
|
||||
return _write("helpdesk.ticket", ticket_id, vals) if vals else False
|
||||
|
||||
@mcp.tool()
|
||||
def list_helpdesk_teams() -> list:
|
||||
"""List all helpdesk teams."""
|
||||
return _search_read("helpdesk.team", [], ["id", "name", "description"], limit=20)
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# PURCHASE
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_purchase_orders(query: str = "", state: str = "", limit: int = 20) -> list:
|
||||
"""Search purchase orders by name or vendor.
|
||||
State: draft, sent, to approve, purchase, done, cancel."""
|
||||
domain = []
|
||||
if query:
|
||||
domain.append("|")
|
||||
domain.append(["name", "ilike", query])
|
||||
domain.append(["partner_id.name", "ilike", query])
|
||||
if state:
|
||||
domain.append(["state", "=", state])
|
||||
return _search_read("purchase.order", domain,
|
||||
["id", "name", "partner_id", "state", "amount_total",
|
||||
"date_order", "user_id", "date_planned"],
|
||||
limit=limit, order="date_order desc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_purchase_order(order_id: int) -> dict:
|
||||
"""Get full details of a purchase order including line items."""
|
||||
r = _read("purchase.order", [order_id],
|
||||
["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax",
|
||||
"amount_total", "date_order", "user_id", "notes",
|
||||
"order_line", "date_planned", "payment_term_id"])
|
||||
if not r:
|
||||
return {}
|
||||
order = r[0]
|
||||
if order.get("order_line"):
|
||||
lines = _read("purchase.order.line", order["order_line"],
|
||||
["id", "product_id", "name", "product_qty", "price_unit",
|
||||
"price_subtotal", "qty_received", "qty_invoiced", "date_planned"])
|
||||
order["lines"] = lines
|
||||
return order
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# INVENTORY
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_inventory(query: str = "", location: str = "internal",
|
||||
limit: int = 30) -> list:
|
||||
"""Search current stock levels by product name.
|
||||
location: 'internal' (default), 'customer', 'supplier', 'transit'."""
|
||||
domain = [["location_id.usage", "=", location], ["quantity", ">", 0]]
|
||||
if query:
|
||||
domain.append(["product_id.name", "ilike", query])
|
||||
return _search_read("stock.quant", domain,
|
||||
["product_id", "location_id", "quantity", "reserved_quantity",
|
||||
"lot_id", "package_id"],
|
||||
limit=limit, order="product_id asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_stock_moves(product_id: int, limit: int = 20) -> list:
|
||||
"""Get recent stock moves for a product (product.product ID)."""
|
||||
return _search_read("stock.move", [
|
||||
["product_id", "=", product_id],
|
||||
["state", "=", "done"]
|
||||
], ["id", "name", "product_id", "product_uom_qty", "location_id",
|
||||
"location_dest_id", "date", "picking_id", "origin"],
|
||||
limit=limit, order="date desc")
|
||||
|
||||
@mcp.tool()
|
||||
def list_internal_locations() -> list:
|
||||
"""List all internal warehouse/stock locations."""
|
||||
return _search_read("stock.location",
|
||||
[["usage", "=", "internal"], ["active", "=", True]],
|
||||
["id", "name", "complete_name", "location_id"],
|
||||
limit=100, order="complete_name asc")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# EMPLOYEES
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def search_employees(query: str = "", department: str = "",
|
||||
job_title: str = "", limit: int = 30) -> list:
|
||||
"""Search employees by name, department, or job title."""
|
||||
domain = [["active", "=", True]]
|
||||
if query:
|
||||
domain.append(["name", "ilike", query])
|
||||
if department:
|
||||
domain.append(["department_id.name", "ilike", department])
|
||||
if job_title:
|
||||
domain.append(["job_title", "ilike", job_title])
|
||||
return _search_read("hr.employee", domain,
|
||||
["id", "name", "job_title", "department_id", "work_email",
|
||||
"work_phone", "parent_id", "coach_id"],
|
||||
limit=limit, order="name asc")
|
||||
|
||||
@mcp.tool()
|
||||
def get_employee(employee_id: int) -> dict:
|
||||
"""Get full details of an employee by ID."""
|
||||
r = _read("hr.employee", [employee_id],
|
||||
["id", "name", "job_title", "job_id", "department_id", "work_email",
|
||||
"work_phone", "mobile_phone", "parent_id", "coach_id",
|
||||
"address_id", "resource_calendar_id", "tz",
|
||||
"birthday", "gender", "marital", "country_id"])
|
||||
return r[0] if r else {}
|
||||
|
||||
@mcp.tool()
|
||||
def list_departments() -> list:
|
||||
"""List all HR departments."""
|
||||
return _search_read("hr.department", [],
|
||||
["id", "name", "parent_id", "manager_id"], limit=50, order="name asc")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
# UTILITY
|
||||
# ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
@mcp.tool()
|
||||
def odoo_search(model: str, field: str, query: str, limit: int = 10) -> list:
|
||||
"""Generic search on any Odoo model. Searches field 'field' using ilike.
|
||||
Useful for looking up stage IDs, category IDs, etc.
|
||||
Example: odoo_search('project.task.type', 'name', 'In Progress')"""
|
||||
return _search_read(model, [[field, "ilike", query]], limit=limit)
|
||||
|
||||
@mcp.tool()
|
||||
def odoo_get_record(model: str, record_id: int) -> dict:
|
||||
"""Fetch a single record from any Odoo model by ID. Returns all default fields."""
|
||||
r = _call(model, "read", [[record_id]], {})
|
||||
return r[0] if r else {}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mcp.run()
|
||||
Reference in New Issue
Block a user