Files
odoo-readonly/server/odoo_mcp.py
T

773 lines
35 KiB
Python

#!/usr/bin/env python3
"""
Odoo MCP Server for MPM
Connects to mpmedia.odoo.com via XML-RPC and exposes tools for
Products, Knowledge, Contacts, Sales, CRM, Project, Helpdesk,
Purchase, Inventory, Employees, and Knowledge Templates.
"""
import os
import re
import xmlrpc.client
import urllib.request
import urllib.error
from typing import Optional
from mcp.server.fastmcp import FastMCP
# ── Configuration ────────────────────────────────────────────────────────────
ODOO_URL = os.environ.get("ODOO_URL", "https://mpmedia.odoo.com")
ODOO_DB = os.environ.get("ODOO_DB", "mpmedia-odoo-sh-main-13285275")
ODOO_USERNAME = os.environ.get("ODOO_USERNAME", "") # per-user: set via Keychain
KEYCHAIN_SERVICE = "odoo-mpm" # credential store service name (all platforms)
ODOO_API_KEY = os.environ.get("ODOO_API_KEY", "")
# ── Proxy-aware XML-RPC transport ─────────────────────────────────────────────
class ProxyAwareTransport(xmlrpc.client.SafeTransport):
"""Routes xmlrpc through the system HTTPS proxy (respects HTTPS_PROXY env var)."""
def request(self, host, handler, request_body, verbose=False):
url = f"https://{host}{handler}"
headers = {
"Content-Type": "text/xml",
"Accept-Encoding": "identity",
"User-Agent": "xmlrpc-odoo-mpm/1.0",
}
req = urllib.request.Request(url, request_body, headers)
opener = urllib.request.build_opener(urllib.request.ProxyHandler())
try:
with opener.open(req, timeout=30) as resp:
self.verbose = verbose # required by Transport.parse_response
return self.parse_response(resp)
except urllib.error.HTTPError as e:
raise xmlrpc.client.ProtocolError(url, e.code, e.msg, dict(e.headers))
except urllib.error.URLError as e:
raise xmlrpc.client.ProtocolError(url, 0, str(e.reason), {})
_proxy_transport = ProxyAwareTransport()
# ── Odoo client ───────────────────────────────────────────────────────────────
_uid: Optional[int] = None
_models = None
_resolved_api_key: Optional[str] = None # resolved at connect time from env or Keychain
def _keychain_get(account: str) -> str:
"""Read a credential from the system keystore (keyring).
Works on macOS (Keychain), Windows (Credential Manager), and Linux (Secret Service)."""
try:
import keyring
value = keyring.get_password(KEYCHAIN_SERVICE, account)
return value or ""
except Exception:
return ""
def _keychain_set(account: str, value: str) -> None:
"""Store a credential in the system keystore (keyring)."""
import keyring
keyring.set_password(KEYCHAIN_SERVICE, account, value)
def _keychain_delete(account: str) -> bool:
"""Delete a credential from the system keystore. Returns True if it existed."""
try:
import keyring
existing = keyring.get_password(KEYCHAIN_SERVICE, account)
if existing is not None:
keyring.delete_password(KEYCHAIN_SERVICE, account)
return True
return False
except Exception:
return False
def _get_credentials() -> tuple[str, str]:
"""Resolve username and API key: env vars take priority, then system keystore."""
username = ODOO_USERNAME or _keychain_get("odoo_username")
api_key = ODOO_API_KEY or _keychain_get("odoo_api_key")
return username, api_key
def _get_credentials() -> tuple[str, str]:
"""Resolve username and API key: env vars take priority, then macOS Keychain."""
username = ODOO_USERNAME or _keychain_get("odoo_username")
api_key = ODOO_API_KEY or _keychain_get("odoo_api_key")
return username, api_key
def _connect():
global _uid, _models, _resolved_api_key
if _uid is not None:
return
username, api_key = _get_credentials()
if not username or not api_key:
raise RuntimeError(
"Odoo credentials not configured. "
"Run the setup_odoo_credentials tool with your Odoo login email and API key. "
"See the Odoo skill instructions for how to generate your personal API key."
)
common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common", transport=_proxy_transport)
_uid = common.authenticate(ODOO_DB, username, api_key, {})
if not _uid:
raise RuntimeError(
"Odoo authentication failed. "
"Verify your username and API key, then run setup_odoo_credentials again."
)
_resolved_api_key = api_key
_models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object", transport=_proxy_transport)
def _call(model: str, method: str, args=None, kwargs=None):
_connect()
return _models.execute_kw(
ODOO_DB, _uid, _resolved_api_key,
model, method,
args or [[]],
kwargs or {}
)
def _search_read(model, domain=None, fields=None, limit=20, offset=0, order=None):
kw = {"limit": limit, "offset": offset}
if fields:
kw["fields"] = fields
if order:
kw["order"] = order
return _call(model, "search_read", [domain or []], kw)
def _read(model, ids, fields=None):
kw = {}
if fields:
kw["fields"] = fields
return _call(model, "read", [ids], kw)
def _create(model, vals):
return _call(model, "create", [vals])
def _write(model, ids, vals):
return _call(model, "write", [[ids] if isinstance(ids, int) else ids, vals])
# ── FastMCP App ───────────────────────────────────────────────────────────────
mcp = FastMCP("Odoo MPM")
# ════════════════════════════════════════════════════════════════════════════
# CREDENTIALS SETUP
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def setup_odoo_credentials(username: str, api_key: str) -> str:
"""Store your personal Odoo credentials securely in the system keystore.
This only needs to be done once per machine (macOS Keychain, Windows Credential Manager, or Linux Secret Service).
username : your Odoo login email (e.g. you@mpmedia.tv)
api_key : your personal Odoo API key — generate it in Odoo at:
My Profile → Account Security → API Keys → New API Key
Set the expiration to "No Limit" (indefinite).
Never share this key or use a colleague's key.
Credentials are stored in the OS keystore under the service 'odoo-mpm'
and are never written to any file on disk."""
_keychain_set("odoo_username", username.strip())
_keychain_set("odoo_api_key", api_key.strip())
# Reset any cached connection so next call re-authenticates with new credentials
global _uid, _models, _resolved_api_key
_uid = None
_models = None
_resolved_api_key = None
# Verify immediately
try:
_connect()
return (
f"Credentials saved and verified. "
f"Connected to {ODOO_URL} as UID {_uid}. "
f"You're all set — Odoo tools are ready to use."
)
except Exception as e:
return f"Credentials saved to Keychain but authentication failed: {e}"
@mcp.tool()
def clear_odoo_credentials() -> str:
"""Remove your stored Odoo credentials from the system keystore.
Use this if you are offboarding, rotating your API key, or troubleshooting
an authentication problem. You will need to run setup_odoo_credentials
again before using any Odoo tools."""
removed = []
for key in ("odoo_username", "odoo_api_key"):
if _keychain_delete(key):
removed.append(key)
global _uid, _models, _resolved_api_key
_uid = None
_models = None
_resolved_api_key = None
if removed:
return f"Removed {', '.join(removed)} from system keystore. Run setup_odoo_credentials to reconfigure."
return "No stored credentials found in system keystore."
# ════════════════════════════════════════════════════════════════════════════
# PRODUCTS
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_products(query: str = "", limit: int = 20, product_type: str = "") -> list:
"""Search products/product templates by name or internal reference.
product_type can be: 'consu' (consumable), 'service', 'product' (storable).
Returns id, name, default_code, type, list_price, categ_id."""
domain = []
if query:
domain.append(["name", "ilike", query])
if product_type:
domain.append(["type", "=", product_type])
return _search_read("product.template", domain,
["id", "name", "default_code", "type", "list_price", "categ_id", "active"],
limit=limit, order="name asc")
@mcp.tool()
def get_product(product_id: int) -> dict:
"""Get full details of a product template by ID, including description,
variants, pricing, category, supplier info, and stock info."""
r = _read("product.template", [product_id],
["id", "name", "default_code", "type", "list_price", "standard_price",
"categ_id", "description", "description_sale", "description_purchase",
"uom_id", "uom_po_id", "active", "sale_ok", "purchase_ok",
"product_variant_count", "barcode"])
return r[0] if r else {}
@mcp.tool()
def get_product_stock(product_id: int) -> list:
"""Get current stock quantities for a product (by product.template ID)
across all internal locations."""
# Get all product.product IDs under this template
variants = _search_read("product.product",
[["product_tmpl_id", "=", product_id]],
["id", "display_name"])
if not variants:
return []
variant_ids = [v["id"] for v in variants]
return _search_read("stock.quant",
[["product_id", "in", variant_ids],
["location_id.usage", "=", "internal"]],
["product_id", "location_id", "quantity", "reserved_quantity"],
limit=50)
# ════════════════════════════════════════════════════════════════════════════
# KNOWLEDGE
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_knowledge_articles(query: str = "", limit: int = 20) -> list:
"""Search Knowledge base articles by title. Returns id, name, parent article."""
domain = [["name", "ilike", query]] if query else []
return _search_read("knowledge.article", domain,
["id", "name", "parent_id", "is_published", "write_date"],
limit=limit, order="write_date desc")
@mcp.tool()
def get_knowledge_article(article_id: int) -> dict:
"""Get the full content of a Knowledge article by ID.
Note: Inline base64 images are replaced with [embedded image] placeholders
to keep the response size manageable."""
r = _read("knowledge.article", [article_id],
["id", "name", "body", "parent_id", "child_ids",
"is_published", "write_date", "write_uid"])
if not r:
return {}
article = r[0]
if article.get("body"):
article["body"] = re.sub(
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
'src="[embedded image]"',
article["body"]
)
return article
@mcp.tool()
def create_knowledge_article(name: str, body: str, parent_id: int = None) -> int:
"""Create a new Knowledge article. body is HTML. Returns new article ID."""
vals = {"name": name, "body": body}
if parent_id:
vals["parent_id"] = parent_id
return _create("knowledge.article", vals)
@mcp.tool()
def update_knowledge_article(article_id: int, name: str = "", body: str = "") -> bool:
"""Update a Knowledge article's title and/or body (HTML)."""
vals = {}
if name:
vals["name"] = name
if body:
vals["body"] = body
if not vals:
return False
return _write("knowledge.article", article_id, vals)
@mcp.tool()
def search_knowledge_templates(query: str = "", category: str = "", limit: int = 50) -> list:
"""Search Knowledge Base article templates.
Optionally filter by template name or category name (e.g. 'Productivity', 'Sales',
'Marketing', 'Company Organization', 'Product Management').
Returns id, template_name, template_description, category, and sequence."""
domain = [["is_template", "=", True]]
if query:
domain.append(["template_name", "ilike", query])
if category:
domain.append(["template_category_id.name", "ilike", category])
return _search_read("knowledge.article", domain,
["id", "name", "template_name", "template_description",
"template_category_id", "template_sequence", "is_published"],
limit=limit, order="template_category_sequence asc, template_sequence asc")
@mcp.tool()
def get_knowledge_template(template_id: int) -> dict:
"""Get full details of a Knowledge Base article template by ID, including
the template body content. Inline base64 images are replaced with
[embedded image] placeholders."""
r = _read("knowledge.article", [template_id],
["id", "name", "template_name", "template_description",
"template_body", "template_preview", "template_category_id",
"template_sequence", "is_published"])
if not r:
return {}
template = r[0]
if template.get("template_body"):
template["template_body"] = re.sub(
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
'src="[embedded image]"',
template["template_body"]
)
if template.get("template_preview"):
template["template_preview"] = re.sub(
r'src="data:image/[^;]+;base64,[A-Za-z0-9+/=]+"',
'src="[embedded image]"',
template["template_preview"]
)
return template
@mcp.tool()
def list_knowledge_template_categories() -> list:
"""List all Knowledge Base article template categories with their IDs and names."""
return _search_read("knowledge.article.template.category", [],
["id", "name", "sequence"],
limit=50, order="sequence asc")
# ════════════════════════════════════════════════════════════════════════════
# CONTACTS
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_contacts(query: str = "", is_company: bool = None, limit: int = 20) -> list:
"""Search contacts/partners by name, email, or phone.
Set is_company=True for companies only, False for individuals."""
domain = []
if query:
domain.append("|")
domain.append(["name", "ilike", query])
domain.append("|")
domain.append(["email", "ilike", query])
domain.append(["phone", "ilike", query])
if is_company is not None:
domain.append(["is_company", "=", is_company])
return _search_read("res.partner", domain,
["id", "name", "email", "phone", "mobile", "is_company",
"street", "city", "state_id", "country_id", "website"],
limit=limit, order="name asc")
@mcp.tool()
def get_contact(contact_id: int) -> dict:
"""Get full details of a contact/partner by ID."""
r = _read("res.partner", [contact_id],
["id", "name", "email", "phone", "mobile", "is_company", "parent_id",
"street", "street2", "city", "state_id", "zip", "country_id",
"website", "comment", "category_id", "child_ids", "user_id"])
return r[0] if r else {}
@mcp.tool()
def create_contact(name: str, email: str = "", phone: str = "",
is_company: bool = False, parent_id: int = None,
street: str = "", city: str = "") -> int:
"""Create a new contact. Returns the new contact ID."""
vals = {"name": name, "is_company": is_company}
if email: vals["email"] = email
if phone: vals["phone"] = phone
if parent_id: vals["parent_id"] = parent_id
if street: vals["street"] = street
if city: vals["city"] = city
return _create("res.partner", vals)
# ════════════════════════════════════════════════════════════════════════════
# SALES
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_sales_orders(query: str = "", state: str = "", limit: int = 20) -> list:
"""Search sales orders by name or customer. State options: draft, sent, sale, done, cancel."""
domain = []
if query:
domain.append("|")
domain.append(["name", "ilike", query])
domain.append(["partner_id.name", "ilike", query])
if state:
domain.append(["state", "=", state])
return _search_read("sale.order", domain,
["id", "name", "partner_id", "state", "amount_total",
"date_order", "user_id", "validity_date"],
limit=limit, order="date_order desc")
@mcp.tool()
def get_sales_order(order_id: int) -> dict:
"""Get full details of a sales order including line items."""
r = _read("sale.order", [order_id],
["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax",
"amount_total", "date_order", "user_id", "note", "order_line",
"validity_date", "payment_term_id", "commitment_date"])
if not r:
return {}
order = r[0]
if order.get("order_line"):
lines = _read("sale.order.line", order["order_line"],
["id", "product_id", "name", "product_uom_qty",
"price_unit", "price_subtotal", "qty_delivered", "qty_invoiced"])
order["lines"] = lines
return order
@mcp.tool()
def create_sales_order(partner_id: int, lines: list) -> int:
"""Create a sales order. lines is a list of dicts with keys:
product_id (int), product_uom_qty (float), price_unit (float).
Returns new order ID."""
order_lines = []
for l in lines:
order_lines.append((0, 0, {
"product_id": l["product_id"],
"product_uom_qty": l.get("product_uom_qty", 1),
"price_unit": l.get("price_unit", 0),
}))
return _create("sale.order", {"partner_id": partner_id, "order_line": order_lines})
# ════════════════════════════════════════════════════════════════════════════
# CRM
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_crm_leads(query: str = "", stage: str = "", assigned_to_me: bool = False,
limit: int = 20) -> list:
"""Search CRM leads/opportunities. Filter by name, stage name, or assigned to current user."""
domain = [["type", "=", "opportunity"]]
if query:
domain.append("|")
domain.append(["name", "ilike", query])
domain.append(["partner_id.name", "ilike", query])
if stage:
domain.append(["stage_id.name", "ilike", stage])
return _search_read("crm.lead", domain,
["id", "name", "partner_id", "stage_id", "expected_revenue",
"probability", "user_id", "date_deadline", "priority"],
limit=limit, order="date_deadline asc")
@mcp.tool()
def get_crm_lead(lead_id: int) -> dict:
"""Get full details of a CRM lead/opportunity by ID."""
r = _read("crm.lead", [lead_id],
["id", "name", "type", "partner_id", "stage_id", "user_id",
"expected_revenue", "probability", "date_deadline", "priority",
"description", "phone", "email_from", "tag_ids",
"activity_ids", "date_conversion"])
return r[0] if r else {}
@mcp.tool()
def create_crm_lead(name: str, partner_id: int = None, expected_revenue: float = 0,
description: str = "", email: str = "", phone: str = "") -> int:
"""Create a new CRM opportunity. Returns new lead ID."""
vals = {"name": name, "type": "opportunity", "expected_revenue": expected_revenue}
if partner_id: vals["partner_id"] = partner_id
if description: vals["description"] = description
if email: vals["email_from"] = email
if phone: vals["phone"] = phone
return _create("crm.lead", vals)
@mcp.tool()
def update_crm_lead(lead_id: int, stage_id: int = None, probability: float = None,
expected_revenue: float = None, note: str = "") -> bool:
"""Update a CRM lead's stage, probability, revenue, or notes."""
vals = {}
if stage_id is not None: vals["stage_id"] = stage_id
if probability is not None: vals["probability"] = probability
if expected_revenue is not None: vals["expected_revenue"] = expected_revenue
if note: vals["description"] = note
return _write("crm.lead", lead_id, vals) if vals else False
@mcp.tool()
def list_crm_stages() -> list:
"""List all CRM pipeline stages with their IDs and names."""
return _search_read("crm.stage", [], ["id", "name", "sequence", "probability"],
limit=50, order="sequence asc")
# ════════════════════════════════════════════════════════════════════════════
# PROJECT
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def list_projects(limit: int = 50) -> list:
"""List all active projects with id, name, description, and task count."""
return _search_read("project.project", [["active", "=", True]],
["id", "name", "description", "task_count", "user_id", "date_start", "date"],
limit=limit, order="name asc")
@mcp.tool()
def get_project(project_id: int) -> dict:
"""Get full details of a project by ID."""
r = _read("project.project", [project_id],
["id", "name", "description", "user_id", "task_count",
"date_start", "date", "tag_ids", "privacy_visibility"])
return r[0] if r else {}
@mcp.tool()
def search_tasks(query: str = "", project_id: int = None, stage: str = "",
limit: int = 30) -> list:
"""Search project tasks by title, project, or stage name."""
domain = []
if query:
domain.append(["name", "ilike", query])
if project_id:
domain.append(["project_id", "=", project_id])
if stage:
domain.append(["stage_id.name", "ilike", stage])
return _search_read("project.task", domain,
["id", "name", "project_id", "stage_id", "user_ids",
"date_deadline", "priority", "kanban_state", "tag_ids"],
limit=limit, order="date_deadline asc")
@mcp.tool()
def get_task(task_id: int) -> dict:
"""Get full details of a project task by ID including description and subtasks."""
r = _read("project.task", [task_id],
["id", "name", "project_id", "stage_id", "user_ids", "description",
"date_deadline", "priority", "kanban_state", "tag_ids",
"child_ids", "depend_on_ids", "planned_hours", "effective_hours"])
return r[0] if r else {}
@mcp.tool()
def create_task(name: str, project_id: int, description: str = "",
date_deadline: str = "", user_ids: list = None) -> int:
"""Create a project task. date_deadline format: YYYY-MM-DD. Returns task ID."""
vals = {"name": name, "project_id": project_id}
if description: vals["description"] = description
if date_deadline: vals["date_deadline"] = date_deadline
if user_ids: vals["user_ids"] = [(6, 0, user_ids)]
return _create("project.task", vals)
@mcp.tool()
def update_task(task_id: int, name: str = "", stage_id: int = None,
description: str = "", date_deadline: str = "",
kanban_state: str = "") -> bool:
"""Update a task. kanban_state: normal, done, blocked."""
vals = {}
if name: vals["name"] = name
if stage_id: vals["stage_id"] = stage_id
if description: vals["description"] = description
if date_deadline: vals["date_deadline"] = date_deadline
if kanban_state: vals["kanban_state"] = kanban_state
return _write("project.task", task_id, vals) if vals else False
@mcp.tool()
def list_task_stages(project_id: int = None) -> list:
"""List task stages. Optionally filter by project."""
domain = []
if project_id:
domain.append(["project_ids", "in", [project_id]])
return _search_read("project.task.type", domain,
["id", "name", "sequence"], limit=50, order="sequence asc")
# ════════════════════════════════════════════════════════════════════════════
# HELPDESK
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_helpdesk_tickets(query: str = "", stage: str = "", team: str = "",
created_after: str = "", created_before: str = "",
limit: int = 20) -> list:
"""Search helpdesk tickets by name, stage, or team.
created_after and created_before accept datetime strings in 'YYYY-MM-DD' or
'YYYY-MM-DD HH:MM:SS' format to filter by creation date.
Example: created_after='2026-03-29' returns tickets created in the last 24 hours."""
domain = []
if query:
domain.append(["name", "ilike", query])
if stage:
domain.append(["stage_id.name", "ilike", stage])
if team:
domain.append(["team_id.name", "ilike", team])
if created_after:
domain.append(["create_date", ">=", created_after])
if created_before:
domain.append(["create_date", "<=", created_before])
return _search_read("helpdesk.ticket", domain,
["id", "name", "partner_id", "stage_id", "team_id",
"user_id", "priority", "create_date"],
limit=limit, order="create_date desc")
@mcp.tool()
def get_helpdesk_ticket(ticket_id: int) -> dict:
"""Get full details of a helpdesk ticket by ID."""
r = _read("helpdesk.ticket", [ticket_id],
["id", "name", "partner_id", "stage_id", "team_id", "user_id",
"priority", "description", "create_date",
"date_last_stage_update", "kanban_state", "tag_ids"])
return r[0] if r else {}
@mcp.tool()
def create_helpdesk_ticket(name: str, description: str = "",
partner_id: int = None, team_id: int = None) -> int:
"""Create a helpdesk ticket. Returns new ticket ID."""
vals = {"name": name}
if description: vals["description"] = description
if partner_id: vals["partner_id"] = partner_id
if team_id: vals["team_id"] = team_id
return _create("helpdesk.ticket", vals)
@mcp.tool()
def update_helpdesk_ticket(ticket_id: int, stage_id: int = None,
user_id: int = None, note: str = "") -> bool:
"""Update a helpdesk ticket's stage, assignee, or add a note."""
vals = {}
if stage_id: vals["stage_id"] = stage_id
if user_id: vals["user_id"] = user_id
if note: vals["description"] = note
return _write("helpdesk.ticket", ticket_id, vals) if vals else False
@mcp.tool()
def list_helpdesk_teams() -> list:
"""List all helpdesk teams."""
return _search_read("helpdesk.team", [], ["id", "name", "description"], limit=20)
# ════════════════════════════════════════════════════════════════════════════
# PURCHASE
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_purchase_orders(query: str = "", state: str = "", limit: int = 20) -> list:
"""Search purchase orders by name or vendor.
State: draft, sent, to approve, purchase, done, cancel."""
domain = []
if query:
domain.append("|")
domain.append(["name", "ilike", query])
domain.append(["partner_id.name", "ilike", query])
if state:
domain.append(["state", "=", state])
return _search_read("purchase.order", domain,
["id", "name", "partner_id", "state", "amount_total",
"date_order", "user_id", "date_planned"],
limit=limit, order="date_order desc")
@mcp.tool()
def get_purchase_order(order_id: int) -> dict:
"""Get full details of a purchase order including line items."""
r = _read("purchase.order", [order_id],
["id", "name", "partner_id", "state", "amount_untaxed", "amount_tax",
"amount_total", "date_order", "user_id", "notes",
"order_line", "date_planned", "payment_term_id"])
if not r:
return {}
order = r[0]
if order.get("order_line"):
lines = _read("purchase.order.line", order["order_line"],
["id", "product_id", "name", "product_qty", "price_unit",
"price_subtotal", "qty_received", "qty_invoiced", "date_planned"])
order["lines"] = lines
return order
# ════════════════════════════════════════════════════════════════════════════
# INVENTORY
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_inventory(query: str = "", location: str = "internal",
limit: int = 30) -> list:
"""Search current stock levels by product name.
location: 'internal' (default), 'customer', 'supplier', 'transit'."""
domain = [["location_id.usage", "=", location], ["quantity", ">", 0]]
if query:
domain.append(["product_id.name", "ilike", query])
return _search_read("stock.quant", domain,
["product_id", "location_id", "quantity", "reserved_quantity",
"lot_id", "package_id"],
limit=limit, order="product_id asc")
@mcp.tool()
def get_stock_moves(product_id: int, limit: int = 20) -> list:
"""Get recent stock moves for a product (product.product ID)."""
return _search_read("stock.move", [
["product_id", "=", product_id],
["state", "=", "done"]
], ["id", "name", "product_id", "product_uom_qty", "location_id",
"location_dest_id", "date", "picking_id", "origin"],
limit=limit, order="date desc")
@mcp.tool()
def list_internal_locations() -> list:
"""List all internal warehouse/stock locations."""
return _search_read("stock.location",
[["usage", "=", "internal"], ["active", "=", True]],
["id", "name", "complete_name", "location_id"],
limit=100, order="complete_name asc")
# ════════════════════════════════════════════════════════════════════════════
# EMPLOYEES
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def search_employees(query: str = "", department: str = "",
job_title: str = "", limit: int = 30) -> list:
"""Search employees by name, department, or job title."""
domain = [["active", "=", True]]
if query:
domain.append(["name", "ilike", query])
if department:
domain.append(["department_id.name", "ilike", department])
if job_title:
domain.append(["job_title", "ilike", job_title])
return _search_read("hr.employee", domain,
["id", "name", "job_title", "department_id", "work_email",
"work_phone", "parent_id", "coach_id"],
limit=limit, order="name asc")
@mcp.tool()
def get_employee(employee_id: int) -> dict:
"""Get full details of an employee by ID."""
r = _read("hr.employee", [employee_id],
["id", "name", "job_title", "job_id", "department_id", "work_email",
"work_phone", "mobile_phone", "parent_id", "coach_id",
"address_id", "resource_calendar_id", "tz",
"birthday", "marital", "country_id"])
return r[0] if r else {}
@mcp.tool()
def list_departments() -> list:
"""List all HR departments."""
return _search_read("hr.department", [],
["id", "name", "parent_id", "manager_id"], limit=50, order="name asc")
# ════════════════════════════════════════════════════════════════════════════
# UTILITY
# ════════════════════════════════════════════════════════════════════════════
@mcp.tool()
def odoo_search(model: str, field: str, query: str, limit: int = 10) -> list:
"""Generic search on any Odoo model. Searches field 'field' using ilike.
Useful for looking up stage IDs, category IDs, etc.
Example: odoo_search('project.task.type', 'name', 'In Progress')"""
return _search_read(model, [[field, "ilike", query]], limit=limit)
@mcp.tool()
def odoo_get_record(model: str, record_id: int) -> dict:
"""Fetch a single record from any Odoo model by ID. Returns all default fields."""
r = _call(model, "read", [[record_id]], {})
return r[0] if r else {}
if __name__ == "__main__":
mcp.run()