Initial commit from agent

This commit is contained in:
2026-03-24 00:11:34 -05:00
commit 0c777488d3
69 changed files with 4253 additions and 0 deletions

0
capabilities/__init__.py Normal file
View File

View File

@@ -0,0 +1,48 @@
"""
Chrome browser automation via Selenium.
Selenium 4+ handles ChromeDriver automatically — no manual install needed.
"""
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import urllib.parse
_driver = None
def _get_driver(headless: bool = False) -> webdriver.Chrome:
global _driver
if _driver is None:
options = Options()
if headless:
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
_driver = webdriver.Chrome(options=options)
return _driver
def navigate_to(url: str):
"""Navigate Chrome to a URL."""
driver = _get_driver()
driver.get(url)
return driver.title
def search_web(query: str):
"""Perform a Google search in Chrome."""
encoded = urllib.parse.quote_plus(query)
return navigate_to(f"https://www.google.com/search?q={encoded}")
def get_page_title() -> str:
driver = _get_driver()
return driver.title
def get_page_url() -> str:
driver = _get_driver()
return driver.current_url
def close_browser():
global _driver
if _driver:
_driver.quit()
_driver = None

View File

@@ -0,0 +1,61 @@
"""
Google Calendar API integration.
Reads today's events and upcoming schedule.
"""
import datetime
from googleapiclient.discovery import build
from core.google_auth import get_credentials
def _service():
return build('calendar', 'v3', credentials=get_credentials())
def get_todays_events() -> list[dict]:
"""Return all events happening today."""
now = datetime.datetime.now(tz=datetime.timezone.utc)
end = now.replace(hour=23, minute=59, second=59)
result = _service().events().list(
calendarId='primary',
timeMin=now.isoformat(),
timeMax=end.isoformat(),
singleEvents=True,
orderBy='startTime'
).execute()
return [
{
'summary': e.get('summary', '(No title)'),
'start': e['start'].get('dateTime', e['start'].get('date')),
'end': e['end'].get('dateTime', e['end'].get('date')),
'location': e.get('location', ''),
}
for e in result.get('items', [])
]
def get_upcoming_events(days: int = 7) -> list[dict]:
"""Return events in the next N days."""
now = datetime.datetime.now(tz=datetime.timezone.utc)
end = now + datetime.timedelta(days=days)
result = _service().events().list(
calendarId='primary',
timeMin=now.isoformat(),
timeMax=end.isoformat(),
maxResults=20,
singleEvents=True,
orderBy='startTime'
).execute()
return [
{
'summary': e.get('summary', '(No title)'),
'start': e['start'].get('dateTime', e['start'].get('date')),
}
for e in result.get('items', [])
]
def create_event(summary: str, start_dt: str, end_dt: str, description: str = "") -> dict:
"""Create a new calendar event. Dates in ISO 8601 format."""
event = {
'summary': summary,
'description': description,
'start': {'dateTime': start_dt, 'timeZone': 'America/Chicago'},
'end': {'dateTime': end_dt, 'timeZone': 'America/Chicago'},
}
return _service().events().insert(calendarId='primary', body=event).execute()

View File

@@ -0,0 +1,54 @@
"""
Gmail API integration — read-only access.
Fetches unread messages and supports search queries.
"""
from googleapiclient.discovery import build
from core.google_auth import get_credentials
def _service():
return build('gmail', 'v1', credentials=get_credentials())
def get_unread_emails(count: int = 10) -> list[dict]:
"""Fetch the most recent unread emails."""
svc = _service()
result = svc.users().messages().list(
userId='me', q='is:unread', maxResults=count
).execute()
messages = result.get('messages', [])
emails = []
for msg in messages:
detail = svc.users().messages().get(
userId='me', id=msg['id'], format='metadata',
metadataHeaders=['Subject', 'From', 'Date']
).execute()
headers = {h['name']: h['value'] for h in detail['payload']['headers']}
emails.append({
'id': msg['id'],
'subject': headers.get('Subject', '(No subject)'),
'from': headers.get('From', ''),
'date': headers.get('Date', ''),
'snippet': detail.get('snippet', ''),
})
return emails
def search_emails(query: str, count: int = 20) -> list[dict]:
"""Search Gmail using standard Gmail search syntax."""
svc = _service()
result = svc.users().messages().list(
userId='me', q=query, maxResults=count
).execute()
messages = result.get('messages', [])
emails = []
for msg in messages:
detail = svc.users().messages().get(
userId='me', id=msg['id'], format='metadata',
metadataHeaders=['Subject', 'From', 'Date']
).execute()
headers = {h['name']: h['value'] for h in detail['payload']['headers']}
emails.append({
'id': msg['id'],
'subject': headers.get('Subject', '(No subject)'),
'from': headers.get('From', ''),
'snippet': detail.get('snippet', ''),
})
return emails

View File

@@ -0,0 +1,43 @@
"""
File system operations — create projects, manage folders, list files.
Cross-platform: uses pathlib and os only.
"""
import os
import subprocess
from pathlib import Path
def create_project(name: str, base_path: str = "C:/Projects") -> str:
"""Create a new project folder with README and .gitignore."""
path = Path(base_path) / name
path.mkdir(parents=True, exist_ok=True)
(path / "README.md").write_text(f"# {name}\n\nProject created by JARVIS.\n")
(path / ".gitignore").write_text("__pycache__/\n.env\n*.pyc\n")
return str(path)
def list_directory(path: str) -> list[dict]:
"""List files and folders in a directory."""
p = Path(path)
if not p.exists():
return []
return [
{'name': item.name, 'type': 'dir' if item.is_dir() else 'file', 'size': item.stat().st_size if item.is_file() else None}
for item in sorted(p.iterdir())
]
def read_file(path: str) -> str:
"""Read the contents of a text file."""
return Path(path).read_text(encoding='utf-8')
def write_file(path: str, content: str):
"""Write content to a file, creating parent directories if needed."""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding='utf-8')
def delete_file(path: str):
"""Delete a file."""
Path(path).unlink(missing_ok=True)
def open_folder(path: str):
"""Open a folder in Windows Explorer."""
os.startfile(path)

View File

@@ -0,0 +1,39 @@
"""
Git operations via subprocess.
Wraps common git commands for use by JARVIS.
"""
import subprocess
from pathlib import Path
def _git(path: str, *args) -> dict:
result = subprocess.run(
["git", *args], cwd=path, capture_output=True, text=True
)
return {
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode
}
def get_git_status(path: str) -> str:
return _git(path, "status")["stdout"]
def get_git_log(path: str, count: int = 5) -> str:
return _git(path, "log", f"--oneline", f"-{count}")["stdout"]
def git_diff(path: str) -> str:
return _git(path, "diff")["stdout"]
def git_pull(path: str) -> dict:
return _git(path, "pull")
def git_add_commit(path: str, message: str) -> dict:
_git(path, "add", "-A")
return _git(path, "commit", "-m", message)
def git_push(path: str) -> dict:
return _git(path, "push")
def list_branches(path: str) -> list[str]:
result = _git(path, "branch", "-a")
return [b.strip().lstrip("* ") for b in result["stdout"].splitlines() if b.strip()]

View File

@@ -0,0 +1,68 @@
"""
Google Keep notes via gkeepapi (unofficial).
Falls back to local Markdown files if Keep is unavailable.
NOTE: The official Google Keep API requires Google Workspace Enterprise.
For personal/standard accounts use gkeepapi or the Markdown fallback below.
"""
import os
from pathlib import Path
# ─── gkeepapi backend ──────────────────────────
try:
import gkeepapi
_keep = None
def _init():
global _keep
if _keep is None:
email = os.getenv("GOOGLE_KEEP_EMAIL")
token = os.getenv("GOOGLE_KEEP_MASTER_TOKEN")
if not email or not token:
raise ValueError("Set GOOGLE_KEEP_EMAIL and GOOGLE_KEEP_MASTER_TOKEN in .env")
_keep = gkeepapi.Keep()
_keep.authenticate(email, token)
return _keep
def create_note(title: str, content: str) -> str:
keep = _init()
note = keep.createNote(title, content)
note.pinned = True
keep.sync()
return note.id
def get_all_notes() -> list[dict]:
keep = _init()
return [{'id': n.id, 'title': n.title, 'text': n.text} for n in keep.all()]
def search_notes(query: str) -> list[dict]:
keep = _init()
return [{'id': n.id, 'title': n.title, 'text': n.text} for n in keep.find(query=query)]
NOTES_BACKEND = "gkeepapi"
except ImportError:
# ─── Markdown fallback ──────────────────────
NOTES_DIR = Path.home() / "JARVIS_Notes"
NOTES_DIR.mkdir(exist_ok=True)
NOTES_BACKEND = "markdown"
def create_note(title: str, content: str) -> str:
safe_title = "".join(c for c in title if c.isalnum() or c in " _-").strip()
path = NOTES_DIR / f"{safe_title}.md"
path.write_text(f"# {title}\n\n{content}\n")
return str(path)
def get_all_notes() -> list[dict]:
return [
{'title': p.stem, 'text': p.read_text()}
for p in NOTES_DIR.glob("*.md")
]
def search_notes(query: str) -> list[dict]:
results = []
for p in NOTES_DIR.glob("*.md"):
text = p.read_text()
if query.lower() in text.lower():
results.append({'title': p.stem, 'text': text})
return results

View File

@@ -0,0 +1,48 @@
"""
Screen vision — captures screenshots and identifies active/open windows.
Uses mss for fast capture and win32gui for window enumeration.
"""
import mss
import base64
import io
from PIL import Image
try:
import win32gui
WIN32_AVAILABLE = True
except ImportError:
WIN32_AVAILABLE = False
print("[JARVIS] pywin32 not available — window detection disabled")
def get_active_window_title() -> str:
if WIN32_AVAILABLE:
return win32gui.GetWindowText(win32gui.GetForegroundWindow())
return "Unknown"
def list_open_windows() -> list[str]:
if not WIN32_AVAILABLE:
return []
windows = []
def _enum(hwnd, _):
if win32gui.IsWindowVisible(hwnd):
title = win32gui.GetWindowText(hwnd)
if title:
windows.append(title)
win32gui.EnumWindows(_enum, None)
return windows
def capture_screen() -> str:
"""Returns base64-encoded PNG of the primary monitor."""
with mss.mss() as sct:
monitor = sct.monitors[1]
img = sct.grab(monitor)
pil_img = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
buffer = io.BytesIO()
pil_img.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
def capture_screen_file(path: str = "screenshot.png") -> str:
"""Saves screenshot to file and returns the path."""
with mss.mss() as sct:
sct.shot(mon=1, output=path)
return path

View File

@@ -0,0 +1,56 @@
"""
Google Tasks API integration.
List, create, and complete tasks across all task lists.
"""
from googleapiclient.discovery import build
from core.google_auth import get_credentials
def _service():
return build('tasks', 'v1', credentials=get_credentials())
def _get_default_list_id() -> str:
svc = _service()
lists = svc.tasklists().list(maxResults=1).execute()
return lists['items'][0]['id']
def get_pending_tasks() -> list[dict]:
"""Return all incomplete tasks across all task lists."""
svc = _service()
tasklists = svc.tasklists().list(maxResults=10).execute()
all_tasks = []
for tl in tasklists.get('items', []):
tasks = svc.tasks().list(
tasklist=tl['id'], showCompleted=False, maxResults=50
).execute()
for t in tasks.get('items', []):
all_tasks.append({
'id': t['id'],
'tasklist_id': tl['id'],
'title': t.get('title', ''),
'due': t.get('due', ''),
'notes': t.get('notes', ''),
'list': tl['title'],
})
return all_tasks
def create_task(title: str, notes: str = None, due: str = None) -> dict:
"""Create a new task in the default task list."""
svc = _service()
list_id = _get_default_list_id()
body = {'title': title}
if notes:
body['notes'] = notes
if due:
body['due'] = due
return svc.tasks().insert(tasklist=list_id, body=body).execute()
def complete_task(tasklist_id: str, task_id: str) -> dict:
"""Mark a task as completed."""
svc = _service()
task = svc.tasks().get(tasklist=tasklist_id, task=task_id).execute()
task['status'] = 'completed'
return svc.tasks().update(tasklist=tasklist_id, task=task_id, body=task).execute()
def delete_task(tasklist_id: str, task_id: str):
"""Delete a task permanently."""
_service().tasks().delete(tasklist=tasklist_id, task=task_id).execute()

View File

@@ -0,0 +1,41 @@
"""
Terminal control — runs PowerShell and CMD commands via subprocess.
Opens Windows Terminal at a given path.
"""
import subprocess
import os
def run_powershell(command: str, timeout: int = 30) -> dict:
"""Execute a PowerShell command and return stdout/stderr."""
result = subprocess.run(
["powershell", "-ExecutionPolicy", "Bypass", "-NonInteractive", "-Command", command],
capture_output=True, text=True, timeout=timeout
)
return {
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode
}
def run_cmd(command: str, timeout: int = 30) -> dict:
"""Execute a CMD command."""
result = subprocess.run(
["cmd", "/c", command],
capture_output=True, text=True, timeout=timeout
)
return {
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"returncode": result.returncode
}
def open_terminal(path: str = None):
"""Open Windows Terminal at the given directory."""
cmd = ["wt"]
if path:
cmd += ["-d", path]
subprocess.Popen(cmd)
def open_vscode(path: str):
"""Open VS Code at a given path."""
subprocess.Popen(["code", path])