bench: add scale benchmark suite (94 tests)

Benchmark mempalace at configurable scale (1K–100K drawers) to find
real-world performance limits. Tests cover MCP tool OOM thresholds,
ChromaDB query degradation, search recall@k, mining throughput,
knowledge graph concurrency, memory leak detection, palace boost
quantification, and Layer1 unbounded fetch behavior.

- tests/benchmarks/ with 8 test modules + data generator + report system
- Deterministic data factory with planted needles for recall measurement
- JSON report output with regression detection (--bench-report flag)
- CI benchmark job on PRs at small scale
- psutil added as dev dependency for RSS tracking
This commit is contained in:
Igor Lins e Silva
2026-04-07 19:39:06 -03:00
parent 71736a3f4f
commit 7b89291334
15 changed files with 2453 additions and 3 deletions
+17 -1
View File
@@ -18,7 +18,23 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- run: pip install -e ".[dev]"
- run: python -m pytest tests/ -v
- run: python -m pytest tests/ -v --ignore=tests/benchmarks
benchmark:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: "3.11"
- run: pip install -e ".[dev]"
- run: python -m pytest tests/benchmarks/ -v -m "benchmark and not stress and not slow" --bench-scale=small --bench-report=bench-results.json
- uses: actions/upload-artifact@v6
if: always()
with:
name: benchmark-results
path: bench-results.json
lint:
runs-on: ubuntu-latest
+8 -2
View File
@@ -38,11 +38,11 @@ Repository = "https://github.com/milla-jovovich/mempalace"
mempalace = "mempalace:main"
[project.optional-dependencies]
dev = ["pytest>=7.0", "ruff>=0.4.0"]
dev = ["pytest>=7.0", "ruff>=0.4.0", "psutil>=5.9"]
spellcheck = ["autocorrect>=2.0"]
[dependency-groups]
dev = ["pytest>=7.0", "ruff>=0.4.0"]
dev = ["pytest>=7.0", "ruff>=0.4.0", "psutil>=5.9"]
[build-system]
requires = ["hatchling"]
@@ -64,3 +64,9 @@ quote-style = "double"
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]
markers = [
"benchmark: scale/performance benchmark tests",
"slow: tests that take more than 30 seconds",
"stress: destructive scale tests (100K+ drawers)",
]
+136
View File
@@ -0,0 +1,136 @@
# MemPalace Scale Benchmark Suite
94 tests that benchmark mempalace at scale to validate real-world performance limits.
## Why
MemPalace has strong academic scores (96.6% R@5 on LongMemEval) but no empirical data on how it behaves at scale. Key unknowns:
- `tool_status()` loads ALL metadata into memory — at what palace size does this OOM?
- `PersistentClient` is re-instantiated on every MCP call — what's the overhead?
- Modified files are never re-ingested — what's the skip-check cost at scale?
- How does query latency degrade as the palace grows from 1K to 100K drawers?
- Does wing/room filtering actually improve retrieval, and by how much?
This suite finds those answers.
## Quick Start
```bash
# Fast smoke test (~2 min)
uv run pytest tests/benchmarks/ -v --bench-scale=small -m "benchmark and not slow"
# Full small scale (~30 min)
uv run pytest tests/benchmarks/ -v --bench-scale=small
# Medium scale with JSON report
uv run pytest tests/benchmarks/ -v --bench-scale=medium --bench-report=results.json
# Stress test (local only, very slow)
uv run pytest tests/benchmarks/ -v --bench-scale=stress -m stress
```
## Scale Levels
| Level | Drawers | Wings | Rooms/Wing | KG Triples | Use case |
|---------|---------|-------|------------|------------|---------------------|
| small | 1,000 | 3 | 5 | 200 | CI, quick checks |
| medium | 10,000 | 8 | 12 | 2,000 | Pre-release testing |
| large | 50,000 | 15 | 20 | 10,000 | Scale limit finding |
| stress | 100,000 | 25 | 30 | 50,000 | Breaking point |
## Test Modules
### Critical Path
| File | What it tests |
|------|--------------|
| `test_mcp_bench.py` | MCP tool response times, unbounded metadata fetch, client re-instantiation overhead |
| `test_chromadb_stress.py` | ChromaDB breaking point, query degradation curve, batch vs sequential insert |
| `test_memory_profile.py` | RSS/heap growth over repeated operations, leak detection |
### Performance Baselines
| File | What it tests |
|------|--------------|
| `test_ingest_bench.py` | Mining throughput (files/sec, drawers/sec), peak RSS, chunking speed, re-ingest skip overhead |
| `test_search_bench.py` | Query latency vs palace size, recall@k with planted needles, concurrent queries, n_results scaling |
### Architectural Validation
| File | What it tests |
|------|--------------|
| `test_palace_boost.py` | Retrieval improvement from wing/room filtering at different scales |
| `test_knowledge_graph_bench.py` | Triple insertion rate, temporal query accuracy, SQLite concurrent access |
| `test_layers_bench.py` | MemoryStack wake-up cost, Layer1 unbounded fetch, token budget compliance |
## Architecture
```
tests/benchmarks/
conftest.py # --bench-scale / --bench-report CLI options, fixtures, markers
data_generator.py # Deterministic data factory (seeded RNG, planted needles)
report.py # JSON report writer + regression checker
test_*.py # 8 test modules (94 tests total)
```
### Data Generator
`PalaceDataGenerator(seed=42, scale="small")` produces deterministic, realistic test data:
- **`generate_project_tree()`** — writes real files + `mempalace.yaml` for `mine()` to ingest
- **`populate_palace_directly()`** — bypasses mining, inserts directly into ChromaDB (10-100x faster for search/MCP benchmarks)
- **`generate_kg_triples()`** — entity-relationship triples with temporal validity
- **`generate_search_queries()`** — queries with known-good answers for recall measurement
**Planted needles**: Unique identifiable content (e.g., `NEEDLE_0042: PostgreSQL vacuum autovacuum threshold...`) seeded into specific wings/rooms. Search queries target these needles, enabling recall@k measurement without an LLM judge.
### JSON Reports
When run with `--bench-report=path.json`, produces machine-readable output:
```json
{
"timestamp": "2026-04-07T...",
"git_sha": "abc123",
"scale": "small",
"system": {"os": "linux", "cpu_count": 8},
"results": {
"mcp_status": {"latency_ms_at_1000": 45.2, "rss_delta_mb_at_5000": 12.3},
"search": {"avg_latency_ms_at_5000": 23.1, "recall_at_5": 0.92},
"chromadb_insert": {"sequential_ms": 8500, "batched_ms": 1200, "speedup_ratio": 7.1}
}
}
```
### Regression Detection
```python
from tests.benchmarks.report import check_regression
regressions = check_regression("current.json", "baseline.json", threshold=0.2)
# Returns list of metric descriptions that degraded beyond 20%
```
## CI Integration
The GitHub Actions workflow runs benchmarks on PRs at small scale:
```yaml
benchmark:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
# Runs: pytest tests/benchmarks/ -m "benchmark and not stress and not slow" --bench-scale=small
```
Existing unit tests are isolated with `--ignore=tests/benchmarks`.
## Markers
- `@pytest.mark.benchmark` — all benchmark tests
- `@pytest.mark.slow` — tests taking >30s even at small scale
- `@pytest.mark.stress` — tests that should only run at large/stress scale
## Dependencies
Only one new dependency beyond the existing dev stack: `psutil` (for cross-platform RSS measurement). `tracemalloc` and `resource` are stdlib.
+1
View File
@@ -0,0 +1 @@
# MemPalace scale benchmark suite
+146
View File
@@ -0,0 +1,146 @@
"""Benchmark-specific pytest configuration, fixtures, and CLI options."""
import json
import os
import shutil
import tempfile
import pytest
SCALE_OPTIONS = ["small", "medium", "large", "stress"]
def pytest_addoption(parser):
parser.addoption(
"--bench-scale",
default="small",
choices=SCALE_OPTIONS,
help="Scale level for benchmark tests: small (1K), medium (10K), large (50K), stress (100K)",
)
parser.addoption(
"--bench-report",
default=None,
help="Path for JSON benchmark report output",
)
@pytest.fixture(scope="session")
def bench_scale(request):
"""The configured benchmark scale level."""
return request.config.getoption("--bench-scale")
@pytest.fixture(scope="session")
def bench_report_path(request):
"""Path for JSON report output, or None."""
return request.config.getoption("--bench-report")
@pytest.fixture
def palace_dir(tmp_path):
"""Isolated palace directory for a single test."""
p = tmp_path / "palace"
p.mkdir()
return str(p)
@pytest.fixture
def kg_db(tmp_path):
"""Isolated KG SQLite path for a single test."""
return str(tmp_path / "test_kg.sqlite3")
@pytest.fixture
def config_dir(tmp_path):
"""Isolated config directory for monkeypatching MempalaceConfig."""
d = tmp_path / "config"
d.mkdir()
config = {"palace_path": str(tmp_path / "palace"), "collection_name": "mempalace_drawers"}
with open(d / "config.json", "w") as f:
json.dump(config, f)
return str(d)
@pytest.fixture
def project_dir(tmp_path):
"""Temporary project directory for mining tests."""
d = tmp_path / "project"
d.mkdir()
return d
# ── Session-scoped result collector ──────────────────────────────────────
class BenchmarkResults:
"""Collect benchmark metrics across all tests in a session."""
def __init__(self):
self.results = {}
def record(self, category: str, metric: str, value):
if category not in self.results:
self.results[category] = {}
self.results[category][metric] = value
@pytest.fixture(scope="session")
def bench_results():
"""Session-scoped results collector shared by all benchmark tests."""
return BenchmarkResults()
def pytest_terminal_summary(terminalreporter, config):
"""Write JSON benchmark report after all tests complete."""
report_path = config.getoption("--bench-report", default=None)
if not report_path:
return
# Collect results from the session fixture if available
# The results are written by individual tests via bench_results fixture
import platform
import subprocess
try:
git_sha = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"], text=True, stderr=subprocess.DEVNULL
).strip()
except Exception:
git_sha = "unknown"
try:
import chromadb
chromadb_version = chromadb.__version__
except Exception:
chromadb_version = "unknown"
report = {
"timestamp": __import__("datetime").datetime.now().isoformat(),
"git_sha": git_sha,
"python_version": platform.python_version(),
"chromadb_version": chromadb_version,
"scale": config.getoption("--bench-scale", default="small"),
"system": {
"os": platform.system().lower(),
"cpu_count": os.cpu_count(),
"platform": platform.platform(),
},
"results": {},
}
# Read results from a temp file written by the bench_results fixture
results_file = os.path.join(tempfile.gettempdir(), "mempalace_bench_results.json")
if os.path.exists(results_file):
try:
with open(results_file) as f:
report["results"] = json.load(f)
os.unlink(results_file)
except Exception:
pass
os.makedirs(os.path.dirname(os.path.abspath(report_path)), exist_ok=True)
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
terminalreporter.write_line(f"\nBenchmark report written to: {report_path}")
+395
View File
@@ -0,0 +1,395 @@
"""
Deterministic data factory for MemPalace scale benchmarks.
Generates realistic project files, conversations, and KG triples at
configurable scale levels. All randomness uses seeded RNG for reproducibility.
Planted "needle" drawers enable recall measurement without an LLM judge.
"""
import hashlib
import os
import random
import string
from datetime import datetime, timedelta
from pathlib import Path
import chromadb
import yaml
# ── Scale configurations ─────────────────────────────────────────────────
SCALE_CONFIGS = {
"small": {"drawers": 1_000, "wings": 3, "rooms_per_wing": 5, "kg_entities": 50, "kg_triples": 200, "needles": 20, "search_queries": 20},
"medium": {"drawers": 10_000, "wings": 8, "rooms_per_wing": 12, "kg_entities": 200, "kg_triples": 2_000, "needles": 50, "search_queries": 50},
"large": {"drawers": 50_000, "wings": 15, "rooms_per_wing": 20, "kg_entities": 500, "kg_triples": 10_000, "needles": 100, "search_queries": 100},
"stress": {"drawers": 100_000, "wings": 25, "rooms_per_wing": 30, "kg_entities": 1_000, "kg_triples": 50_000, "needles": 200, "search_queries": 200},
}
# ── Vocabulary banks for realistic content ───────────────────────────────
WING_NAMES = [
"webapp", "backend_api", "mobile_app", "data_pipeline", "ml_platform",
"devops", "auth_service", "payments", "analytics", "docs_site",
"cli_tool", "dashboard", "notification_service", "search_engine",
"user_mgmt", "inventory", "reporting", "testing_infra", "monitoring",
"email_service", "chat_bot", "file_storage", "scheduler", "gateway",
"marketplace",
]
ROOM_NAMES = [
"backend", "frontend", "api", "database", "auth", "tests", "docs",
"config", "deployment", "models", "views", "controllers", "middleware",
"utils", "schemas", "migrations", "fixtures", "scripts", "styles",
"components", "hooks", "services", "routes", "templates", "static",
"media", "logging", "cache", "queue", "workers",
]
TECH_TERMS = [
"authentication", "authorization", "middleware", "endpoint", "REST API",
"GraphQL", "WebSocket", "database migration", "ORM", "query optimization",
"caching strategy", "load balancer", "rate limiting", "pagination",
"serialization", "validation", "error handling", "logging framework",
"monitoring", "deployment pipeline", "CI/CD", "containerization",
"microservice", "event sourcing", "message queue", "pub/sub",
"connection pooling", "session management", "token refresh", "CORS",
"SSL termination", "health check", "circuit breaker", "retry logic",
"batch processing", "stream processing", "data pipeline", "ETL",
"feature flag", "A/B testing", "blue-green deployment", "canary release",
]
CODE_SNIPPETS = [
"def process_request(data):\n validated = schema.validate(data)\n result = handler.execute(validated)\n return Response(result, status=200)\n",
"class UserRepository:\n def __init__(self, db):\n self.db = db\n def find_by_id(self, user_id):\n return self.db.query(User).filter(User.id == user_id).first()\n",
"async def fetch_data(url, timeout=30):\n async with aiohttp.ClientSession() as session:\n async with session.get(url, timeout=timeout) as resp:\n return await resp.json()\n",
"const handleSubmit = async (formData) => {\n try {\n const response = await api.post('/users', formData);\n dispatch({ type: 'USER_CREATED', payload: response.data });\n } catch (error) {\n setError(error.message);\n }\n};\n",
"SELECT u.name, COUNT(o.id) as order_count\nFROM users u\nLEFT JOIN orders o ON u.id = o.user_id\nWHERE u.created_at > '2025-01-01'\nGROUP BY u.name\nHAVING COUNT(o.id) > 5\nORDER BY order_count DESC;\n",
]
PROSE_TEMPLATES = [
"The {component} module handles {task}. It was refactored in {month} to improve {quality}. Key design decision: {decision}.",
"Bug report: {component} fails when {condition}. Root cause: {cause}. Fixed by {fix}. Regression test added in {test_file}.",
"Architecture decision: switched from {old_tech} to {new_tech} for {reason}. Migration completed {date}. Performance improved by {percent}%.",
"Meeting notes: discussed {topic} with {person}. Agreed to {action}. Deadline: {deadline}. Follow-up: {followup}.",
"Feature spec: {feature_name} allows users to {capability}. Dependencies: {deps}. Estimated effort: {effort} days.",
]
ENTITY_NAMES = [
"Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Heidi",
"Ivan", "Judy", "Karl", "Linda", "Mike", "Nina", "Oscar", "Pat",
"Quinn", "Rita", "Steve", "Tina", "Ursula", "Victor", "Wendy", "Xander",
]
ENTITY_TYPES = ["person", "project", "tool", "concept", "team", "service"]
PREDICATES = [
"works_on", "manages", "reports_to", "collaborates_with", "created",
"maintains", "uses", "depends_on", "replaced", "reviewed", "deployed",
"tested", "documented", "mentors", "leads", "contributes_to",
]
class PalaceDataGenerator:
"""Generate deterministic, realistic test data at configurable scale."""
def __init__(self, seed=42, scale="small"):
self.rng = random.Random(seed)
self.scale = scale
self.cfg = SCALE_CONFIGS[scale]
self.wings = WING_NAMES[: self.cfg["wings"]]
self.rooms_by_wing = {}
for wing in self.wings:
n = self.cfg["rooms_per_wing"]
rooms = self.rng.sample(ROOM_NAMES, min(n, len(ROOM_NAMES)))
self.rooms_by_wing[wing] = rooms
# Planted needles for recall measurement
self.needles = []
self._generate_needles()
def _generate_needles(self):
"""Create unique needle content for recall testing."""
topics = [
"Fibonacci sequence optimization uses memoization with O(n) space complexity",
"PostgreSQL vacuum autovacuum threshold set to 50 percent for table users",
"Redis cluster failover timeout configured at 30 seconds with sentinel monitoring",
"Kubernetes horizontal pod autoscaler targets 70 percent CPU utilization",
"GraphQL subscription uses WebSocket transport with heartbeat interval 25 seconds",
"JWT token rotation policy requires refresh every 15 minutes with sliding window",
"Elasticsearch index sharding strategy uses 5 primary shards with 1 replica each",
"Docker multi-stage build reduces image size from 1.2GB to 180MB for production",
"Apache Kafka consumer group rebalance timeout set to 45 seconds",
"MongoDB change streams resume token persisted every 100 operations",
"gRPC streaming uses bidirectional flow control with 64KB window size",
"Prometheus alerting rule fires when p99 latency exceeds 500ms for 5 minutes",
"Terraform state locking uses DynamoDB with consistent reads enabled",
"Nginx rate limiting configured at 100 requests per second with burst of 50",
"SQLAlchemy connection pool size set to 20 with max overflow of 10 connections",
"React concurrent mode uses startTransition for non-urgent state updates",
"AWS Lambda cold start mitigation uses provisioned concurrency of 10 instances",
"Git bisect automated with custom test script for regression hunting",
"OpenTelemetry trace sampling rate set to 10 percent in production environment",
"Celery worker prefetch multiplier set to 1 for fair task distribution",
]
for i in range(self.cfg["needles"]):
topic = topics[i % len(topics)]
wing = self.rng.choice(self.wings)
room = self.rng.choice(self.rooms_by_wing[wing])
needle_id = f"NEEDLE_{i:04d}"
content = f"{needle_id}: {topic}. This is a unique planted needle for recall benchmarking at scale."
self.needles.append({
"id": needle_id,
"content": content,
"wing": wing,
"room": room,
"query": topic.split(" uses ")[0] if " uses " in topic else topic.split(" set to ")[0] if " set to " in topic else topic[:60],
})
def _random_text(self, min_chars=600, max_chars=900):
"""Generate a random text block of realistic content."""
parts = []
total = 0
target = self.rng.randint(min_chars, max_chars)
while total < target:
choice = self.rng.random()
if choice < 0.3:
text = self.rng.choice(CODE_SNIPPETS)
elif choice < 0.7:
template = self.rng.choice(PROSE_TEMPLATES)
text = template.format(
component=self.rng.choice(ROOM_NAMES),
task=self.rng.choice(TECH_TERMS),
month=self.rng.choice(["January", "February", "March", "April", "May"]),
quality=self.rng.choice(["performance", "readability", "test coverage", "latency"]),
decision=self.rng.choice(TECH_TERMS),
condition=self.rng.choice(TECH_TERMS) + " is null",
cause=self.rng.choice(["race condition", "null pointer", "timeout", "OOM"]),
fix="adding " + self.rng.choice(TECH_TERMS),
test_file=f"test_{self.rng.choice(ROOM_NAMES)}.py",
old_tech=self.rng.choice(["MySQL", "Flask", "REST", "Jenkins"]),
new_tech=self.rng.choice(["PostgreSQL", "FastAPI", "GraphQL", "GitHub Actions"]),
reason=self.rng.choice(TECH_TERMS),
date=f"2025-{self.rng.randint(1,12):02d}-{self.rng.randint(1,28):02d}",
percent=self.rng.randint(10, 80),
topic=self.rng.choice(TECH_TERMS),
person=self.rng.choice(ENTITY_NAMES),
action=self.rng.choice(["refactor", "migrate", "optimize", "test"]),
deadline=f"2025-{self.rng.randint(1,12):02d}-{self.rng.randint(1,28):02d}",
followup=self.rng.choice(TECH_TERMS),
feature_name=self.rng.choice(TECH_TERMS),
capability=self.rng.choice(TECH_TERMS),
deps=", ".join(self.rng.sample(TECH_TERMS, 2)),
effort=self.rng.randint(1, 15),
)
else:
words = self.rng.sample(TECH_TERMS, min(5, len(TECH_TERMS)))
text = " ".join(words) + ". " + self.rng.choice(TECH_TERMS) + " implementation details follow.\n"
parts.append(text)
total += len(text)
return "\n".join(parts)[:max_chars]
# ── Project tree generation (for mine() tests) ───────────────────────
def generate_project_tree(self, base_path, wing=None, rooms=None, n_files=50):
"""
Write realistic project files + mempalace.yaml to base_path.
Returns the project path suitable for passing to mine().
"""
base = Path(base_path)
base.mkdir(parents=True, exist_ok=True)
wing = wing or self.rng.choice(self.wings)
rooms = rooms or self.rooms_by_wing.get(wing, ["general"])
# Write mempalace.yaml
room_defs = [{"name": r, "description": f"{r} code and docs"} for r in rooms]
with open(base / "mempalace.yaml", "w") as f:
yaml.dump({"wing": wing, "rooms": room_defs}, f)
# Write files distributed across room directories
files_written = 0
for i in range(n_files):
room = rooms[i % len(rooms)]
room_dir = base / room
room_dir.mkdir(parents=True, exist_ok=True)
ext = self.rng.choice([".py", ".js", ".md", ".ts", ".yaml"])
filename = f"file_{i:04d}{ext}"
content = self._random_text(400, 2000)
(room_dir / filename).write_text(content, encoding="utf-8")
files_written += 1
return str(base), wing, rooms, files_written
# ── Conversation file generation (for mine_convos() tests) ───────────
def generate_conversation_files(self, base_path, wing=None, n_files=20):
"""Write conversation transcript files for convo_miner tests."""
base = Path(base_path)
base.mkdir(parents=True, exist_ok=True)
wing = wing or self.rng.choice(self.wings)
for i in range(n_files):
lines = []
n_exchanges = self.rng.randint(5, 20)
for j in range(n_exchanges):
user_msg = f"> User: {self.rng.choice(TECH_TERMS)}? How does {self.rng.choice(TECH_TERMS)} work with {self.rng.choice(TECH_TERMS)}?"
ai_msg = self._random_text(200, 600)
lines.append(user_msg)
lines.append(ai_msg)
lines.append("")
(base / f"convo_{i:04d}.txt").write_text("\n".join(lines), encoding="utf-8")
return str(base), wing
# ── Direct palace population (bypasses mining for speed) ─────────────
def populate_palace_directly(self, palace_path, n_drawers=None, include_needles=True):
"""
Insert drawers directly into ChromaDB, bypassing the mining pipeline.
Much faster than mining for benchmarks that only care about
search/MCP behavior on a pre-populated palace.
Returns (client, collection, needle_info).
"""
n_drawers = n_drawers or self.cfg["drawers"]
os.makedirs(palace_path, exist_ok=True)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")
batch_size = 500
docs = []
ids = []
metas = []
# Insert needles first
needle_info = []
if include_needles:
for needle in self.needles:
needle_id = f"drawer_{needle['wing']}_{needle['room']}_{hashlib.md5(needle['id'].encode()).hexdigest()[:16]}"
docs.append(needle["content"])
ids.append(needle_id)
metas.append({
"wing": needle["wing"],
"room": needle["room"],
"source_file": f"needle_{needle['id']}.txt",
"chunk_index": 0,
"added_by": "benchmark",
"filed_at": datetime.now().isoformat(),
})
needle_info.append({"id": needle_id, "query": needle["query"], "wing": needle["wing"], "room": needle["room"]})
# Fill remaining drawers with realistic content
remaining = n_drawers - len(docs)
for i in range(remaining):
wing = self.wings[i % len(self.wings)]
rooms = self.rooms_by_wing[wing]
room = rooms[i % len(rooms)]
content = self._random_text(400, 800)
drawer_id = f"drawer_{wing}_{room}_{hashlib.md5(f'gen_{i}'.encode()).hexdigest()[:16]}"
docs.append(content)
ids.append(drawer_id)
metas.append({
"wing": wing,
"room": room,
"source_file": f"generated_{i:06d}.txt",
"chunk_index": i % 10,
"added_by": "benchmark",
"filed_at": datetime.now().isoformat(),
})
# Flush in batches
if len(docs) >= batch_size:
col.add(documents=docs, ids=ids, metadatas=metas)
docs, ids, metas = [], [], []
# Flush remainder
if docs:
col.add(documents=docs, ids=ids, metadatas=metas)
return client, col, needle_info
# ── KG triple generation ─────────────────────────────────────────────
def generate_kg_triples(self, n_entities=None, n_triples=None):
"""
Generate realistic entity-relationship triples.
Returns (entities, triples) where:
entities = [(name, type), ...]
triples = [(subject, predicate, object, valid_from, valid_to), ...]
"""
n_entities = n_entities or self.cfg["kg_entities"]
n_triples = n_triples or self.cfg["kg_triples"]
# Generate entities
entities = []
entity_names = []
for i in range(n_entities):
if i < len(ENTITY_NAMES):
name = ENTITY_NAMES[i]
else:
name = f"Entity_{i:04d}"
etype = self.rng.choice(ENTITY_TYPES)
entities.append((name, etype))
entity_names.append(name)
# Generate triples
triples = []
base_date = datetime(2024, 1, 1)
for i in range(n_triples):
subject = self.rng.choice(entity_names)
obj = self.rng.choice(entity_names)
while obj == subject:
obj = self.rng.choice(entity_names)
predicate = self.rng.choice(PREDICATES)
days_offset = self.rng.randint(0, 730)
valid_from = (base_date + timedelta(days=days_offset)).strftime("%Y-%m-%d")
# 30% chance of having a valid_to
valid_to = None
if self.rng.random() < 0.3:
end_offset = self.rng.randint(30, 365)
valid_to = (base_date + timedelta(days=days_offset + end_offset)).strftime("%Y-%m-%d")
triples.append((subject, predicate, obj, valid_from, valid_to))
return entities, triples
# ── Search query generation ──────────────────────────────────────────
def generate_search_queries(self, n_queries=None):
"""
Generate search queries with expected results.
Returns list of {"query": str, "expected_wing": str|None, "expected_room": str|None, "is_needle": bool}.
Needle queries have known-good answers for recall measurement.
"""
n_queries = n_queries or self.cfg["search_queries"]
queries = []
# Half are needle queries (known-good answers)
n_needle = min(n_queries // 2, len(self.needles))
for needle in self.needles[:n_needle]:
queries.append({
"query": needle["query"],
"expected_wing": needle["wing"],
"expected_room": needle["room"],
"needle_id": needle["id"],
"is_needle": True,
})
# Other half are generic queries (measure latency, not recall)
n_generic = n_queries - n_needle
for _ in range(n_generic):
queries.append({
"query": self.rng.choice(TECH_TERMS) + " " + self.rng.choice(TECH_TERMS),
"expected_wing": None,
"expected_room": None,
"needle_id": None,
"is_needle": False,
})
self.rng.shuffle(queries)
return queries
+91
View File
@@ -0,0 +1,91 @@
"""
Benchmark report utilities — JSON output and regression detection.
Each test records metrics via record_metric(). At session end, the
conftest.py pytest_terminal_summary hook writes the collected results.
"""
import json
import os
import tempfile
from datetime import datetime
RESULTS_FILE = os.path.join(tempfile.gettempdir(), "mempalace_bench_results.json")
def record_metric(category: str, metric: str, value):
"""Append a metric to the session results file (JSON on disk)."""
results = {}
if os.path.exists(RESULTS_FILE):
try:
with open(RESULTS_FILE) as f:
results = json.load(f)
except (json.JSONDecodeError, OSError):
results = {}
if category not in results:
results[category] = {}
results[category][metric] = value
with open(RESULTS_FILE, "w") as f:
json.dump(results, f, indent=2)
def check_regression(current_report: str, baseline_report: str, threshold: float = 0.2):
"""
Compare current benchmark results against a baseline.
Returns a list of regression descriptions. Empty list = no regressions.
threshold: fractional degradation allowed (0.2 = 20% worse is OK).
"""
with open(current_report) as f:
current = json.load(f)
with open(baseline_report) as f:
baseline = json.load(f)
regressions = []
# Metrics where HIGHER is worse (latency, memory, etc.)
higher_is_worse = {
"latency", "rss", "memory", "oom", "lock_failures", "elapsed",
"p50_ms", "p95_ms", "p99_ms", "rss_delta_mb", "peak_rss_mb",
}
# Metrics where LOWER is worse (throughput, recall, etc.)
lower_is_worse = {
"recall", "throughput", "per_sec", "files_per_sec", "drawers_per_sec",
"triples_per_sec", "improvement",
}
for category in baseline.get("results", {}):
if category not in current.get("results", {}):
continue
for metric, base_val in baseline["results"][category].items():
if metric not in current["results"][category]:
continue
curr_val = current["results"][category][metric]
if not isinstance(base_val, (int, float)) or not isinstance(curr_val, (int, float)):
continue
if base_val == 0:
continue
# Determine direction
is_latency_like = any(kw in metric.lower() for kw in higher_is_worse)
is_throughput_like = any(kw in metric.lower() for kw in lower_is_worse)
if is_latency_like:
# Higher is worse — check if current exceeds baseline by threshold
if curr_val > base_val * (1 + threshold):
pct = ((curr_val - base_val) / base_val) * 100
regressions.append(
f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold*100:.0f}%)"
)
elif is_throughput_like:
# Lower is worse — check if current is below baseline by threshold
if curr_val < base_val * (1 - threshold):
pct = ((curr_val - base_val) / base_val) * 100
regressions.append(
f"{category}/{metric}: {base_val:.2f} -> {curr_val:.2f} ({pct:+.1f}%, threshold {threshold*100:.0f}%)"
)
return regressions
+203
View File
@@ -0,0 +1,203 @@
"""
ChromaDB stress tests — find the breaking point.
Tests the raw ChromaDB patterns used by mempalace to determine:
- At what collection size does col.get(include=["metadatas"]) become dangerous?
- How does query latency degrade as collection grows?
- How much faster is batched insertion vs sequential?
"""
import os
import time
import chromadb
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
def _get_rss_mb():
try:
import psutil
return psutil.Process().memory_info().rss / (1024 * 1024)
except ImportError:
import resource
import platform
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if platform.system() == "Darwin":
return usage / (1024 * 1024)
return usage / 1024
@pytest.mark.benchmark
class TestGetAllMetadatasOOM:
"""
The specific pattern causing finding #3:
col.get(include=["metadatas"]) with NO limit.
Measures RSS growth to find when this becomes dangerous.
"""
SIZES = [1_000, 2_500, 5_000, 10_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_get_all_metadatas_rss(self, n_drawers, tmp_path, bench_scale):
"""RSS growth from fetching all metadata at once."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
rss_before = _get_rss_mb()
start = time.perf_counter()
all_meta = col.get(include=["metadatas"])["metadatas"]
elapsed_ms = (time.perf_counter() - start) * 1000
rss_after = _get_rss_mb()
assert len(all_meta) == n_drawers
rss_delta = rss_after - rss_before
record_metric("chromadb_get_all", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2))
record_metric("chromadb_get_all", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
@pytest.mark.benchmark
class TestQueryDegradation:
"""Measure query latency as collection grows."""
SIZES = [1_000, 2_500, 5_000, 10_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_query_latency_at_size(self, n_drawers, tmp_path, bench_scale):
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
queries = [
"authentication middleware optimization",
"database connection pooling strategy",
"error handling retry logic",
"deployment pipeline configuration",
"load balancer health check",
]
latencies = []
for q in queries:
start = time.perf_counter()
results = col.query(query_texts=[q], n_results=5, include=["documents", "distances"])
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
assert results["documents"][0] # got results
avg_ms = sum(latencies) / len(latencies)
p95_ms = sorted(latencies)[int(len(latencies) * 0.95)]
record_metric("chromadb_query", f"avg_latency_ms_at_{n_drawers}", round(avg_ms, 1))
record_metric("chromadb_query", f"p95_latency_ms_at_{n_drawers}", round(p95_ms, 1))
@pytest.mark.benchmark
class TestBulkInsertPerformance:
"""Compare batch insertion vs sequential add_drawer pattern."""
def test_sequential_vs_batched(self, tmp_path):
"""The current miner uses single-document add(). How much faster is batching?"""
n_docs = 500
gen = PalaceDataGenerator(seed=42)
# Generate content
contents = [gen._random_text(400, 800) for _ in range(n_docs)]
# Sequential insertion (mimics add_drawer pattern)
palace_seq = str(tmp_path / "seq")
os.makedirs(palace_seq)
client_seq = chromadb.PersistentClient(path=palace_seq)
col_seq = client_seq.get_or_create_collection("mempalace_drawers")
start = time.perf_counter()
for i, content in enumerate(contents):
col_seq.add(
documents=[content],
ids=[f"seq_{i}"],
metadatas=[{"wing": "test", "room": "bench", "chunk_index": i}],
)
sequential_ms = (time.perf_counter() - start) * 1000
# Batched insertion
palace_batch = str(tmp_path / "batch")
os.makedirs(palace_batch)
client_batch = chromadb.PersistentClient(path=palace_batch)
col_batch = client_batch.get_or_create_collection("mempalace_drawers")
batch_size = 100
start = time.perf_counter()
for batch_start in range(0, n_docs, batch_size):
batch_end = min(batch_start + batch_size, n_docs)
batch_docs = contents[batch_start:batch_end]
batch_ids = [f"batch_{i}" for i in range(batch_start, batch_end)]
batch_metas = [{"wing": "test", "room": "bench", "chunk_index": i} for i in range(batch_start, batch_end)]
col_batch.add(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
batched_ms = (time.perf_counter() - start) * 1000
speedup = sequential_ms / max(batched_ms, 0.01)
assert col_seq.count() == n_docs
assert col_batch.count() == n_docs
record_metric("chromadb_insert", "sequential_ms", round(sequential_ms, 1))
record_metric("chromadb_insert", "batched_ms", round(batched_ms, 1))
record_metric("chromadb_insert", "speedup_ratio", round(speedup, 2))
record_metric("chromadb_insert", "n_docs", n_docs)
record_metric("chromadb_insert", "batch_size", batch_size)
@pytest.mark.benchmark
@pytest.mark.slow
class TestMaxCollectionSize:
"""Incrementally grow collection to find practical limits."""
def test_incremental_growth(self, tmp_path, bench_scale):
"""Add drawers in batches, measure latency per batch."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
cfg = gen.cfg
target = min(cfg["drawers"], 10_000) # cap at 10K for this test
palace_path = str(tmp_path / "palace")
os.makedirs(palace_path)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")
batch_size = 500
batch_times = []
total_inserted = 0
for batch_num in range(0, target, batch_size):
n = min(batch_size, target - batch_num)
docs = [gen._random_text(400, 800) for _ in range(n)]
ids = [f"growth_{batch_num + i}" for i in range(n)]
metas = [
{"wing": gen.wings[i % len(gen.wings)], "room": "bench", "chunk_index": i}
for i in range(batch_num, batch_num + n)
]
start = time.perf_counter()
col.add(documents=docs, ids=ids, metadatas=metas)
batch_ms = (time.perf_counter() - start) * 1000
total_inserted += n
batch_times.append({"at_size": total_inserted, "batch_ms": round(batch_ms, 1)})
assert col.count() == total_inserted
# Record first and last batch times to show degradation
record_metric("chromadb_growth", "first_batch_ms", batch_times[0]["batch_ms"])
record_metric("chromadb_growth", "last_batch_ms", batch_times[-1]["batch_ms"])
record_metric("chromadb_growth", "total_inserted", total_inserted)
record_metric("chromadb_growth", "batch_times", batch_times)
+165
View File
@@ -0,0 +1,165 @@
"""
Ingestion throughput benchmarks.
Measures mining performance at scale:
- Files/sec and drawers/sec through the full mine() pipeline
- Peak RSS during mining
- Chunking throughput isolated from ChromaDB
- Re-ingest skip overhead (finding #11: file_already_mined check)
"""
import os
import time
import chromadb
import pytest
import yaml
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
def _get_rss_mb():
try:
import psutil
return psutil.Process().memory_info().rss / (1024 * 1024)
except ImportError:
import resource
import platform
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if platform.system() == "Darwin":
return usage / (1024 * 1024)
return usage / 1024
@pytest.mark.benchmark
class TestMineThroughput:
"""Measure the full mine() pipeline throughput."""
@pytest.mark.parametrize("n_files", [20, 50, 100])
def test_mine_files_per_second(self, n_files, tmp_path, bench_scale):
"""End-to-end mining throughput: generate files, mine, count drawers."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
project_path, wing, rooms, files_written = gen.generate_project_tree(
tmp_path / "project", n_files=n_files
)
palace_path = str(tmp_path / "palace")
from mempalace.miner import mine
start = time.perf_counter()
mine(project_path, palace_path)
elapsed = time.perf_counter() - start
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
drawer_count = col.count()
files_per_sec = files_written / max(elapsed, 0.001)
drawers_per_sec = drawer_count / max(elapsed, 0.001)
record_metric("ingest", f"files_per_sec_at_{n_files}", round(files_per_sec, 1))
record_metric("ingest", f"drawers_per_sec_at_{n_files}", round(drawers_per_sec, 1))
record_metric("ingest", f"elapsed_sec_at_{n_files}", round(elapsed, 2))
record_metric("ingest", f"drawers_created_at_{n_files}", drawer_count)
def test_mine_peak_rss(self, tmp_path, bench_scale):
"""Track peak RSS during a mining run."""
import threading
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
project_path, wing, rooms, files_written = gen.generate_project_tree(
tmp_path / "project", n_files=100
)
palace_path = str(tmp_path / "palace")
from mempalace.miner import mine
rss_samples = []
stop_sampling = threading.Event()
def sample_rss():
while not stop_sampling.is_set():
rss_samples.append(_get_rss_mb())
stop_sampling.wait(0.1)
sampler = threading.Thread(target=sample_rss, daemon=True)
sampler.start()
rss_before = _get_rss_mb()
mine(project_path, palace_path)
stop_sampling.set()
sampler.join(timeout=1)
peak_rss = max(rss_samples) if rss_samples else _get_rss_mb()
rss_delta = peak_rss - rss_before
record_metric("ingest", "peak_rss_mb", round(peak_rss, 1))
record_metric("ingest", "rss_delta_mb", round(rss_delta, 1))
@pytest.mark.benchmark
class TestChunkThroughput:
"""Isolate chunking performance from ChromaDB insertion."""
@pytest.mark.parametrize("content_size_kb", [1, 10, 100])
def test_chunk_text_throughput(self, content_size_kb):
"""Measure chunk_text speed for different content sizes."""
from mempalace.miner import chunk_text
gen = PalaceDataGenerator(seed=42)
# Generate content of target size
content = gen._random_text(content_size_kb * 500, content_size_kb * 1200)
# Pad to approximate target KB
while len(content) < content_size_kb * 1024:
content += "\n" + gen._random_text(200, 500)
n_iterations = 50
start = time.perf_counter()
total_chunks = 0
for _ in range(n_iterations):
chunks = chunk_text(content, "bench_file.py")
total_chunks += len(chunks)
elapsed = time.perf_counter() - start
chunks_per_sec = total_chunks / max(elapsed, 0.001)
kb_per_sec = (len(content) * n_iterations / 1024) / max(elapsed, 0.001)
record_metric("chunking", f"chunks_per_sec_at_{content_size_kb}kb", round(chunks_per_sec, 1))
record_metric("chunking", f"kb_per_sec_at_{content_size_kb}kb", round(kb_per_sec, 1))
@pytest.mark.benchmark
class TestReingestSkipOverhead:
"""Finding #11: file_already_mined() check overhead at scale."""
def test_skip_check_cost(self, tmp_path):
"""Mine files, then re-mine — measure cost of skip checks."""
gen = PalaceDataGenerator(seed=42, scale="small")
project_path, wing, rooms, files_written = gen.generate_project_tree(
tmp_path / "project", n_files=50
)
palace_path = str(tmp_path / "palace")
from mempalace.miner import mine
# First mine
mine(project_path, palace_path)
client = chromadb.PersistentClient(path=palace_path)
col = client.get_collection("mempalace_drawers")
initial_count = col.count()
# Re-mine (all files should be skipped)
start = time.perf_counter()
mine(project_path, palace_path)
skip_elapsed = time.perf_counter() - start
# Verify no new drawers added
final_count = col.count()
assert final_count == initial_count, "Re-mine should not add new drawers"
record_metric("reingest", "skip_check_elapsed_sec", round(skip_elapsed, 2))
record_metric("reingest", "files_checked", files_written)
record_metric("reingest", "skip_check_per_file_ms", round(skip_elapsed * 1000 / max(files_written, 1), 1))
@@ -0,0 +1,284 @@
"""
Knowledge graph benchmarks — SQLite temporal KG at scale.
Tests triple insertion throughput, query latency, temporal accuracy,
and SQLite concurrent access behavior.
"""
import threading
import time
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
@pytest.mark.benchmark
class TestTripleInsertionRate:
"""Measure triples/sec at different scales."""
@pytest.mark.parametrize("n_triples", [200, 1_000, 5_000])
def test_insertion_throughput(self, n_triples, tmp_path):
gen = PalaceDataGenerator(seed=42, scale="small")
entities, triples = gen.generate_kg_triples(
n_entities=min(n_triples // 2, 200), n_triples=n_triples
)
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
# Insert entities first
for name, etype in entities:
kg.add_entity(name, etype)
# Measure triple insertion
start = time.perf_counter()
for subject, predicate, obj, valid_from, valid_to in triples:
kg.add_triple(
subject, predicate, obj, valid_from=valid_from, valid_to=valid_to
)
elapsed = time.perf_counter() - start
triples_per_sec = n_triples / max(elapsed, 0.001)
record_metric("kg_insert", f"triples_per_sec_at_{n_triples}", round(triples_per_sec, 1))
record_metric("kg_insert", f"elapsed_sec_at_{n_triples}", round(elapsed, 3))
@pytest.mark.benchmark
class TestQueryEntityLatency:
"""Query latency for entities with varying relationship counts."""
def test_query_latency_vs_relationships(self, tmp_path):
"""Create entities with 10, 50, 100 relationships and measure query time."""
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
gen = PalaceDataGenerator(seed=42)
# Create a hub entity connected to many others
kg.add_entity("Hub", "person")
target_counts = [10, 50, 100]
for target in target_counts:
for i in range(target):
entity_name = f"Node_{target}_{i}"
kg.add_entity(entity_name, "project")
kg.add_triple("Hub", "works_on", entity_name, valid_from="2025-01-01")
# Measure query for Hub (which has sum(target_counts) relationships)
latencies = []
for _ in range(20):
start = time.perf_counter()
result = kg.query_entity("Hub")
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
avg_ms = sum(latencies) / len(latencies)
total_rels = sum(target_counts)
record_metric("kg_query", f"avg_ms_with_{total_rels}_rels", round(avg_ms, 2))
record_metric("kg_query", "total_relationships", total_rels)
@pytest.mark.benchmark
class TestTimelinePerformance:
"""timeline() with no entity filter does a full table scan."""
@pytest.mark.parametrize("n_triples", [200, 1_000, 5_000])
def test_timeline_latency(self, n_triples, tmp_path):
from mempalace.knowledge_graph import KnowledgeGraph
gen = PalaceDataGenerator(seed=42)
entities, triples = gen.generate_kg_triples(
n_entities=min(n_triples // 2, 200), n_triples=n_triples
)
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
for name, etype in entities:
kg.add_entity(name, etype)
for subject, predicate, obj, valid_from, valid_to in triples:
kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to)
# Measure timeline (no filter = full scan with LIMIT 100)
latencies = []
for _ in range(10):
start = time.perf_counter()
result = kg.timeline()
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
avg_ms = sum(latencies) / len(latencies)
record_metric("kg_timeline", f"avg_ms_at_{n_triples}", round(avg_ms, 2))
@pytest.mark.benchmark
class TestTemporalQueryAccuracy:
"""Verify temporal filtering correctness at scale."""
def test_as_of_filtering(self, tmp_path):
"""Insert triples with known temporal ranges, verify as_of queries."""
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
kg.add_entity("Alice", "person")
kg.add_entity("ProjectA", "project")
kg.add_entity("ProjectB", "project")
# Alice worked on ProjectA from 2024-01 to 2024-06
kg.add_triple("Alice", "works_on", "ProjectA", valid_from="2024-01-01", valid_to="2024-06-30")
# Alice worked on ProjectB from 2024-07 onwards
kg.add_triple("Alice", "works_on", "ProjectB", valid_from="2024-07-01")
# Add noise triples
gen = PalaceDataGenerator(seed=42)
entities, triples = gen.generate_kg_triples(n_entities=50, n_triples=500)
for name, etype in entities:
kg.add_entity(name, etype)
for subject, predicate, obj, valid_from, valid_to in triples:
kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to)
# Query Alice as of March 2024 — should find ProjectA
result_march = kg.query_entity("Alice", as_of="2024-03-15")
project_names = [r.get("object") or r.get("name", "") for r in result_march] if isinstance(result_march, list) else []
# Query Alice as of September 2024 — should find ProjectB
result_sept = kg.query_entity("Alice", as_of="2024-09-15")
record_metric("kg_temporal", "march_query_results", len(result_march) if isinstance(result_march, list) else 0)
record_metric("kg_temporal", "sept_query_results", len(result_sept) if isinstance(result_sept, list) else 0)
@pytest.mark.benchmark
class TestSQLiteConcurrentAccess:
"""Test concurrent read/write behavior with SQLite (finding #8)."""
def test_concurrent_writers(self, tmp_path):
"""N threads writing triples simultaneously — count lock failures."""
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
gen = PalaceDataGenerator(seed=42)
# Pre-create entities
for i in range(100):
kg.add_entity(f"Entity_{i}", "concept")
n_threads = 4
triples_per_thread = 50
lock_failures = []
successes = []
def writer(thread_id):
fails = 0
ok = 0
for i in range(triples_per_thread):
try:
kg.add_triple(
f"Entity_{thread_id * 10}",
"relates_to",
f"Entity_{(thread_id * 10 + i) % 100}",
valid_from="2025-01-01",
)
ok += 1
except Exception:
fails += 1
lock_failures.append(fails)
successes.append(ok)
threads = [threading.Thread(target=writer, args=(t,)) for t in range(n_threads)]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join(timeout=30)
elapsed = time.perf_counter() - start
total_failures = sum(lock_failures)
total_successes = sum(successes)
record_metric("kg_concurrent", "total_failures", total_failures)
record_metric("kg_concurrent", "total_successes", total_successes)
record_metric("kg_concurrent", "elapsed_sec", round(elapsed, 2))
record_metric("kg_concurrent", "threads", n_threads)
record_metric("kg_concurrent", "triples_per_thread", triples_per_thread)
def test_concurrent_read_write(self, tmp_path):
"""Readers and writers running simultaneously."""
from mempalace.knowledge_graph import KnowledgeGraph
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
# Seed some data
for i in range(50):
kg.add_entity(f"E_{i}", "concept")
for i in range(200):
kg.add_triple(f"E_{i % 50}", "links", f"E_{(i + 1) % 50}", valid_from="2025-01-01")
read_errors = []
write_errors = []
def reader():
fails = 0
for i in range(50):
try:
kg.query_entity(f"E_{i % 50}")
except Exception:
fails += 1
read_errors.append(fails)
def writer():
fails = 0
for i in range(50):
try:
kg.add_triple(f"E_{i % 50}", "new_rel", f"E_{(i + 7) % 50}", valid_from="2025-06-01")
except Exception:
fails += 1
write_errors.append(fails)
threads = [
threading.Thread(target=reader),
threading.Thread(target=reader),
threading.Thread(target=writer),
threading.Thread(target=writer),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=30)
record_metric("kg_concurrent_rw", "read_errors", sum(read_errors))
record_metric("kg_concurrent_rw", "write_errors", sum(write_errors))
@pytest.mark.benchmark
class TestKGStats:
"""Measure stats() performance as graph grows."""
@pytest.mark.parametrize("n_triples", [200, 1_000, 5_000])
def test_stats_latency(self, n_triples, tmp_path):
from mempalace.knowledge_graph import KnowledgeGraph
gen = PalaceDataGenerator(seed=42)
entities, triples = gen.generate_kg_triples(
n_entities=min(n_triples // 2, 200), n_triples=n_triples
)
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
for name, etype in entities:
kg.add_entity(name, etype)
for subject, predicate, obj, valid_from, valid_to in triples:
kg.add_triple(subject, predicate, obj, valid_from=valid_from, valid_to=valid_to)
latencies = []
for _ in range(10):
start = time.perf_counter()
result = kg.stats()
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
avg_ms = sum(latencies) / len(latencies)
record_metric("kg_stats", f"avg_ms_at_{n_triples}", round(avg_ms, 2))
+206
View File
@@ -0,0 +1,206 @@
"""
Memory stack (layers.py) benchmarks.
Tests MemoryStack.wake_up(), Layer1.generate(), and Layer2/L3
at scale. Layer1 has the same unbounded col.get() as tool_status.
"""
import os
import time
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
def _get_rss_mb():
try:
import psutil
return psutil.Process().memory_info().rss / (1024 * 1024)
except ImportError:
import resource
import platform
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if platform.system() == "Darwin":
return usage / (1024 * 1024)
return usage / 1024
@pytest.mark.benchmark
class TestWakeUpCost:
"""Measure wake_up() time (L0 + L1) at different palace sizes."""
SIZES = [500, 1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_wakeup_latency(self, n_drawers, tmp_path, bench_scale):
"""L0+L1 generation time grows with palace size because L1 fetches all."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
# Create identity file
identity_path = str(tmp_path / "identity.txt")
with open(identity_path, "w") as f:
f.write("I am a test AI. Traits: precise, fast.\n")
from mempalace.layers import MemoryStack
stack = MemoryStack(palace_path=palace_path, identity_path=identity_path)
latencies = []
for _ in range(5):
start = time.perf_counter()
text = stack.wake_up()
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
assert "L0" in text or "L1" in text or "IDENTITY" in text or "ESSENTIAL" in text
avg_ms = sum(latencies) / len(latencies)
record_metric("layers_wakeup", f"avg_ms_at_{n_drawers}", round(avg_ms, 1))
@pytest.mark.benchmark
class TestLayer1UnboundedFetch:
"""Layer1.generate() fetches ALL drawers — same pattern as tool_status."""
SIZES = [500, 1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_layer1_rss_growth(self, n_drawers, tmp_path):
"""Track RSS from Layer1 fetching all drawers at different sizes."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
from mempalace.layers import Layer1
layer = Layer1(palace_path=palace_path)
rss_before = _get_rss_mb()
start = time.perf_counter()
text = layer.generate()
elapsed_ms = (time.perf_counter() - start) * 1000
rss_after = _get_rss_mb()
rss_delta = rss_after - rss_before
assert "L1" in text
record_metric("layer1", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
record_metric("layer1", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2))
def test_layer1_wing_filtered(self, tmp_path):
"""Wing-filtered Layer1 should fetch fewer drawers."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
from mempalace.layers import Layer1
wing = gen.wings[0]
# Unfiltered
layer_all = Layer1(palace_path=palace_path)
start = time.perf_counter()
layer_all.generate()
unfiltered_ms = (time.perf_counter() - start) * 1000
# Wing-filtered
layer_wing = Layer1(palace_path=palace_path, wing=wing)
start = time.perf_counter()
layer_wing.generate()
filtered_ms = (time.perf_counter() - start) * 1000
record_metric("layer1_filter", "unfiltered_ms", round(unfiltered_ms, 1))
record_metric("layer1_filter", "filtered_ms", round(filtered_ms, 1))
if unfiltered_ms > 0:
record_metric("layer1_filter", "speedup_pct", round((1 - filtered_ms / unfiltered_ms) * 100, 1))
@pytest.mark.benchmark
class TestWakeUpTokenBudget:
"""Verify L0+L1 stays within token budget even at large palace sizes."""
SIZES = [500, 1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_token_budget(self, n_drawers, tmp_path):
"""L1 has MAX_CHARS=3200 cap. Verify it holds at scale."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
identity_path = str(tmp_path / "identity.txt")
with open(identity_path, "w") as f:
f.write("I am a benchmark AI.\n")
from mempalace.layers import MemoryStack
stack = MemoryStack(palace_path=palace_path, identity_path=identity_path)
text = stack.wake_up()
token_estimate = len(text) // 4
# Budget is ~600-900 tokens. Allow up to 1200 for safety margin.
record_metric("wakeup_budget", f"tokens_at_{n_drawers}", token_estimate)
record_metric("wakeup_budget", f"chars_at_{n_drawers}", len(text))
assert token_estimate < 1200, f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers"
@pytest.mark.benchmark
class TestLayer2Retrieval:
"""Layer2 on-demand retrieval with filters."""
def test_layer2_latency(self, tmp_path, bench_scale):
"""L2 retrieval with wing filter at scale."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
from mempalace.layers import Layer2
layer = Layer2(palace_path=palace_path)
wing = gen.wings[0]
latencies = []
for _ in range(10):
start = time.perf_counter()
text = layer.retrieve(wing=wing, n_results=10)
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
avg_ms = sum(latencies) / len(latencies)
record_metric("layer2", "avg_retrieval_ms", round(avg_ms, 1))
@pytest.mark.benchmark
class TestLayer3Search:
"""Layer3 semantic search through the MemoryStack interface."""
def test_layer3_latency(self, tmp_path, bench_scale):
"""L3 search latency through MemoryStack."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
identity_path = str(tmp_path / "identity.txt")
with open(identity_path, "w") as f:
f.write("I am a benchmark AI.\n")
from mempalace.layers import MemoryStack
stack = MemoryStack(palace_path=palace_path, identity_path=identity_path)
queries = ["authentication", "database", "deployment", "testing", "monitoring"]
latencies = []
for q in queries:
start = time.perf_counter()
text = stack.search(q, n_results=5)
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
avg_ms = sum(latencies) / len(latencies)
record_metric("layer3", "avg_search_ms", round(avg_ms, 1))
+226
View File
@@ -0,0 +1,226 @@
"""
MCP server tool performance benchmarks.
Validates production readiness findings:
- Finding #3: tool_status() unbounded col.get(include=["metadatas"]) → OOM
- Finding #7: _get_collection() re-instantiates PersistentClient every call
- Finding #3 variants: tool_list_wings(), tool_get_taxonomy() same pattern
Calls MCP tool handler functions directly with monkeypatched _config.
"""
import time
import chromadb
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator, SCALE_CONFIGS
from tests.benchmarks.report import record_metric
# ── Helpers ──────────────────────────────────────────────────────────────
def _make_palace(tmp_path, n_drawers, scale="small"):
"""Create a palace with exactly n_drawers, return palace_path."""
gen = PalaceDataGenerator(seed=42, scale=scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
return palace_path
def _patch_mcp_config(monkeypatch, palace_path, tmp_path):
"""Monkeypatch mcp_server._config and _kg to point at test dirs."""
from mempalace.config import MempalaceConfig
from mempalace.knowledge_graph import KnowledgeGraph
cfg = MempalaceConfig(config_dir=str(tmp_path / "cfg"))
# Override palace_path directly on the object
monkeypatch.setattr(cfg, "_file_config", {"palace_path": palace_path})
import mempalace.mcp_server as mcp_mod
monkeypatch.setattr(mcp_mod, "_config", cfg)
monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")))
def _get_rss_mb():
"""Get current process RSS in MB."""
try:
import psutil
return psutil.Process().memory_info().rss / (1024 * 1024)
except ImportError:
import resource
# ru_maxrss is in KB on Linux, bytes on macOS
import platform
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if platform.system() == "Darwin":
return usage / (1024 * 1024)
return usage / 1024
# ── Tests ────────────────────────────────────────────────────────────────
@pytest.mark.benchmark
class TestToolStatusOOM:
"""Finding #3: tool_status loads ALL metadata into memory."""
SIZES = [500, 1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_tool_status_rss_growth(self, n_drawers, tmp_path, monkeypatch):
"""Measure RSS growth from tool_status at different palace sizes."""
palace_path = _make_palace(tmp_path, n_drawers)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import tool_status
rss_before = _get_rss_mb()
result = tool_status()
rss_after = _get_rss_mb()
rss_delta = rss_after - rss_before
assert "error" not in result, f"tool_status failed: {result}"
assert result["total_drawers"] == n_drawers
record_metric("mcp_status", f"rss_delta_mb_at_{n_drawers}", round(rss_delta, 2))
@pytest.mark.parametrize("n_drawers", SIZES)
def test_tool_status_latency(self, n_drawers, tmp_path, monkeypatch):
"""Measure tool_status response time at different palace sizes."""
palace_path = _make_palace(tmp_path, n_drawers)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import tool_status
# Warm up
tool_status()
start = time.perf_counter()
result = tool_status()
elapsed_ms = (time.perf_counter() - start) * 1000
assert "error" not in result
record_metric("mcp_status", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
@pytest.mark.benchmark
class TestToolListWingsUnbounded:
"""Finding #3 variant: tool_list_wings also fetches ALL metadata."""
@pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500, 5_000])
def test_list_wings_latency(self, n_drawers, tmp_path, monkeypatch):
palace_path = _make_palace(tmp_path, n_drawers)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import tool_list_wings
start = time.perf_counter()
result = tool_list_wings()
elapsed_ms = (time.perf_counter() - start) * 1000
assert "wings" in result
record_metric("mcp_list_wings", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
@pytest.mark.benchmark
class TestToolGetTaxonomyUnbounded:
"""Finding #3 variant: tool_get_taxonomy also fetches ALL metadata."""
@pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500, 5_000])
def test_get_taxonomy_latency(self, n_drawers, tmp_path, monkeypatch):
palace_path = _make_palace(tmp_path, n_drawers)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import tool_get_taxonomy
start = time.perf_counter()
result = tool_get_taxonomy()
elapsed_ms = (time.perf_counter() - start) * 1000
assert "taxonomy" in result
record_metric("mcp_taxonomy", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
@pytest.mark.benchmark
class TestClientReinstantiation:
"""Finding #7: _get_collection() creates new PersistentClient every call."""
def test_reinstantiation_overhead(self, tmp_path, monkeypatch):
"""Measure cost of 50 _get_collection() calls vs a cached client."""
palace_path = _make_palace(tmp_path, 500)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import _get_collection
n_calls = 50
# Measure re-instantiation (current behavior)
start = time.perf_counter()
for _ in range(n_calls):
col = _get_collection()
assert col is not None
uncached_ms = (time.perf_counter() - start) * 1000
# Measure cached client (what it should be)
client = chromadb.PersistentClient(path=palace_path)
cached_col = client.get_collection("mempalace_drawers")
start = time.perf_counter()
for _ in range(n_calls):
_ = cached_col.count()
cached_ms = (time.perf_counter() - start) * 1000
overhead_ratio = uncached_ms / max(cached_ms, 0.01)
record_metric("client_reinstantiation", "uncached_total_ms", round(uncached_ms, 1))
record_metric("client_reinstantiation", "cached_total_ms", round(cached_ms, 1))
record_metric("client_reinstantiation", "overhead_ratio", round(overhead_ratio, 2))
record_metric("client_reinstantiation", "n_calls", n_calls)
@pytest.mark.benchmark
class TestToolSearchLatency:
"""tool_search uses query() not get(), should scale better."""
@pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500, 5_000])
def test_search_latency(self, n_drawers, tmp_path, monkeypatch):
palace_path = _make_palace(tmp_path, n_drawers)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import tool_search
queries = ["authentication middleware", "database migration", "error handling"]
latencies = []
for q in queries:
start = time.perf_counter()
result = tool_search(query=q, limit=5)
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
assert "error" not in result
avg_ms = sum(latencies) / len(latencies)
record_metric("mcp_search", f"avg_latency_ms_at_{n_drawers}", round(avg_ms, 1))
@pytest.mark.benchmark
class TestDuplicateCheckCost:
"""tool_add_drawer calls tool_check_duplicate first — measure overhead."""
@pytest.mark.parametrize("n_drawers", [500, 1_000, 2_500])
def test_duplicate_check_latency(self, n_drawers, tmp_path, monkeypatch):
palace_path = _make_palace(tmp_path, n_drawers)
_patch_mcp_config(monkeypatch, palace_path, tmp_path)
from mempalace.mcp_server import tool_check_duplicate
test_content = "This is unique test content for duplicate checking benchmark."
start = time.perf_counter()
result = tool_check_duplicate(content=test_content)
elapsed_ms = (time.perf_counter() - start) * 1000
assert "error" not in result
record_metric("mcp_duplicate_check", f"latency_ms_at_{n_drawers}", round(elapsed_ms, 1))
+178
View File
@@ -0,0 +1,178 @@
"""
Memory profiling benchmarks — detect leaks and measure RSS growth.
Uses tracemalloc for heap snapshots and psutil/resource for RSS.
Targets the highest-risk code paths:
- Repeated search() calls (PersistentClient re-instantiation)
- Repeated tool_status() calls (unbounded metadata fetch)
- Layer1.generate() (fetches all drawers)
"""
import time
import tracemalloc
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
def _get_rss_mb():
try:
import psutil
return psutil.Process().memory_info().rss / (1024 * 1024)
except ImportError:
import resource
import platform
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if platform.system() == "Darwin":
return usage / (1024 * 1024)
return usage / 1024
@pytest.mark.benchmark
class TestSearchMemoryProfile:
"""Track RSS growth over repeated search_memories() calls."""
def test_search_rss_growth(self, tmp_path):
"""Issue 200 searches and track RSS every 50 calls."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=1_000, include_needles=False)
from mempalace.searcher import search_memories
n_calls = 200
check_interval = 50
queries = ["authentication", "database", "deployment", "error handling", "testing"]
rss_readings = []
rss_readings.append(("start", _get_rss_mb()))
for i in range(n_calls):
q = queries[i % len(queries)]
search_memories(q, palace_path=palace_path, n_results=5)
if (i + 1) % check_interval == 0:
rss_readings.append((f"after_{i + 1}", _get_rss_mb()))
start_rss = rss_readings[0][1]
end_rss = rss_readings[-1][1]
growth = end_rss - start_rss
record_metric("memory_search", "rss_start_mb", round(start_rss, 2))
record_metric("memory_search", "rss_end_mb", round(end_rss, 2))
record_metric("memory_search", "rss_growth_mb", round(growth, 2))
record_metric("memory_search", "n_calls", n_calls)
record_metric("memory_search", "growth_per_100_calls_mb", round(growth / (n_calls / 100), 2))
@pytest.mark.benchmark
class TestToolStatusMemoryProfile:
"""Track RSS growth from repeated tool_status() calls."""
def test_tool_status_repeated_calls(self, tmp_path, monkeypatch):
"""tool_status loads ALL metadata each call — does it leak?"""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
from mempalace.config import MempalaceConfig
from mempalace.knowledge_graph import KnowledgeGraph
import mempalace.mcp_server as mcp_mod
cfg = MempalaceConfig(config_dir=str(tmp_path / "cfg"))
monkeypatch.setattr(cfg, "_file_config", {"palace_path": palace_path})
monkeypatch.setattr(mcp_mod, "_config", cfg)
monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")))
from mempalace.mcp_server import tool_status
n_calls = 50
rss_readings = []
rss_readings.append(("start", _get_rss_mb()))
for i in range(n_calls):
result = tool_status()
assert result["total_drawers"] == 2_000
if (i + 1) % 10 == 0:
rss_readings.append((f"after_{i + 1}", _get_rss_mb()))
start_rss = rss_readings[0][1]
end_rss = rss_readings[-1][1]
growth = end_rss - start_rss
record_metric("memory_tool_status", "rss_start_mb", round(start_rss, 2))
record_metric("memory_tool_status", "rss_end_mb", round(end_rss, 2))
record_metric("memory_tool_status", "rss_growth_mb", round(growth, 2))
record_metric("memory_tool_status", "n_calls", n_calls)
record_metric("memory_tool_status", "palace_size", 2_000)
@pytest.mark.benchmark
class TestLayer1MemoryProfile:
"""Layer1.generate() fetches ALL drawers — same risk as tool_status."""
def test_layer1_repeated_generate(self, tmp_path):
"""Layer1 fetches all drawers for scoring. Track memory over repeats."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
from mempalace.layers import Layer1
layer = Layer1(palace_path=palace_path)
n_calls = 30
rss_readings = []
rss_readings.append(("start", _get_rss_mb()))
for i in range(n_calls):
text = layer.generate()
assert "L1" in text
if (i + 1) % 10 == 0:
rss_readings.append((f"after_{i + 1}", _get_rss_mb()))
start_rss = rss_readings[0][1]
end_rss = rss_readings[-1][1]
growth = end_rss - start_rss
record_metric("memory_layer1", "rss_start_mb", round(start_rss, 2))
record_metric("memory_layer1", "rss_end_mb", round(end_rss, 2))
record_metric("memory_layer1", "rss_growth_mb", round(growth, 2))
record_metric("memory_layer1", "n_calls", n_calls)
@pytest.mark.benchmark
class TestHeapSnapshot:
"""Use tracemalloc to identify top memory allocators during search."""
def test_search_heap_top_allocators(self, tmp_path):
"""Identify which code paths allocate the most memory during search."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=1_000, include_needles=False)
from mempalace.searcher import search_memories
tracemalloc.start()
snap_before = tracemalloc.take_snapshot()
for i in range(100):
search_memories("test query", palace_path=palace_path, n_results=5)
snap_after = tracemalloc.take_snapshot()
tracemalloc.stop()
stats = snap_after.compare_to(snap_before, "lineno")
top_allocators = []
for stat in stats[:10]:
top_allocators.append({
"file": str(stat.traceback),
"size_kb": round(stat.size / 1024, 1),
"count": stat.count,
})
total_growth_kb = sum(s["size_kb"] for s in top_allocators)
record_metric("heap_search", "top_10_growth_kb", round(total_growth_kb, 1))
record_metric("heap_search", "n_searches", 100)
+172
View File
@@ -0,0 +1,172 @@
"""
Palace boost validation — does wing/room filtering actually help?
Quantifies the retrieval improvement from the palace spatial metaphor.
Uses planted needles to measure recall with and without filtering
at different scales.
"""
import time
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
@pytest.mark.benchmark
class TestFilteredVsUnfilteredRecall:
"""Quantify palace boost: recall improvement from wing/room filtering."""
SIZES = [1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_palace_boost_recall(self, n_drawers, tmp_path, bench_scale):
"""Compare recall@5 with/without wing filter at increasing scale."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
_, _, needle_info = gen.populate_palace_directly(
palace_path, n_drawers=n_drawers, include_needles=True
)
from mempalace.searcher import search_memories
n_queries = min(10, len(needle_info))
unfiltered_hits = 0
wing_filtered_hits = 0
room_filtered_hits = 0
for needle in needle_info[:n_queries]:
# Unfiltered search
result = search_memories(needle["query"], palace_path=palace_path, n_results=5)
texts = [h["text"] for h in result.get("results", [])]
if any("NEEDLE_" in t for t in texts[:5]):
unfiltered_hits += 1
# Wing-filtered search
result = search_memories(
needle["query"], palace_path=palace_path, wing=needle["wing"], n_results=5
)
texts = [h["text"] for h in result.get("results", [])]
if any("NEEDLE_" in t for t in texts[:5]):
wing_filtered_hits += 1
# Wing+room filtered search
result = search_memories(
needle["query"],
palace_path=palace_path,
wing=needle["wing"],
room=needle["room"],
n_results=5,
)
texts = [h["text"] for h in result.get("results", [])]
if any("NEEDLE_" in t for t in texts[:5]):
room_filtered_hits += 1
recall_none = unfiltered_hits / max(n_queries, 1)
recall_wing = wing_filtered_hits / max(n_queries, 1)
recall_room = room_filtered_hits / max(n_queries, 1)
boost_wing = recall_wing - recall_none
boost_room = recall_room - recall_none
record_metric("palace_boost", f"recall_unfiltered_at_{n_drawers}", round(recall_none, 3))
record_metric("palace_boost", f"recall_wing_filtered_at_{n_drawers}", round(recall_wing, 3))
record_metric("palace_boost", f"recall_room_filtered_at_{n_drawers}", round(recall_room, 3))
record_metric("palace_boost", f"wing_boost_at_{n_drawers}", round(boost_wing, 3))
record_metric("palace_boost", f"room_boost_at_{n_drawers}", round(boost_room, 3))
@pytest.mark.benchmark
class TestFilterLatencyBenefit:
"""Does filtering reduce query latency by narrowing the search space?"""
def test_filter_speedup(self, tmp_path, bench_scale):
"""Compare latency: no filter vs wing vs wing+room."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=5_000, include_needles=False)
from mempalace.searcher import search_memories
wing = gen.wings[0]
room = gen.rooms_by_wing[wing][0]
query = "authentication middleware optimization"
n_runs = 10
# No filter
latencies_none = []
for _ in range(n_runs):
start = time.perf_counter()
search_memories(query, palace_path=palace_path, n_results=5)
latencies_none.append((time.perf_counter() - start) * 1000)
# Wing filter
latencies_wing = []
for _ in range(n_runs):
start = time.perf_counter()
search_memories(query, palace_path=palace_path, wing=wing, n_results=5)
latencies_wing.append((time.perf_counter() - start) * 1000)
# Wing + room filter
latencies_room = []
for _ in range(n_runs):
start = time.perf_counter()
search_memories(query, palace_path=palace_path, wing=wing, room=room, n_results=5)
latencies_room.append((time.perf_counter() - start) * 1000)
avg_none = sum(latencies_none) / len(latencies_none)
avg_wing = sum(latencies_wing) / len(latencies_wing)
avg_room = sum(latencies_room) / len(latencies_room)
record_metric("filter_latency", "avg_unfiltered_ms", round(avg_none, 1))
record_metric("filter_latency", "avg_wing_filtered_ms", round(avg_wing, 1))
record_metric("filter_latency", "avg_room_filtered_ms", round(avg_room, 1))
if avg_none > 0:
record_metric("filter_latency", "wing_speedup_pct", round((1 - avg_wing / avg_none) * 100, 1))
record_metric("filter_latency", "room_speedup_pct", round((1 - avg_room / avg_none) * 100, 1))
@pytest.mark.benchmark
class TestBoostAtIncreasingScale:
"""Does the palace boost increase as the palace grows?"""
def test_boost_scaling(self, tmp_path, bench_scale):
"""Measure wing-filtered recall improvement at multiple sizes."""
sizes = [500, 1_000, 2_500]
boosts = []
for size in sizes:
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / f"palace_{size}")
_, _, needle_info = gen.populate_palace_directly(
palace_path, n_drawers=size, include_needles=True
)
from mempalace.searcher import search_memories
n_queries = min(8, len(needle_info))
unfiltered_hits = 0
filtered_hits = 0
for needle in needle_info[:n_queries]:
result = search_memories(needle["query"], palace_path=palace_path, n_results=5)
if any("NEEDLE_" in h["text"] for h in result.get("results", [])[:5]):
unfiltered_hits += 1
result = search_memories(
needle["query"], palace_path=palace_path, wing=needle["wing"], n_results=5
)
if any("NEEDLE_" in h["text"] for h in result.get("results", [])[:5]):
filtered_hits += 1
recall_none = unfiltered_hits / max(n_queries, 1)
recall_filtered = filtered_hits / max(n_queries, 1)
boost = recall_filtered - recall_none
boosts.append({"size": size, "boost": boost})
record_metric("boost_scaling", "boosts_by_size", boosts)
# Check if boost increases with scale (the hypothesis)
if len(boosts) >= 2:
trend_positive = boosts[-1]["boost"] >= boosts[0]["boost"]
record_metric("boost_scaling", "trend_positive", trend_positive)
+225
View File
@@ -0,0 +1,225 @@
"""
Search performance benchmarks.
Measures query latency, recall@k, and concurrent search behavior
as palace size grows. Uses planted needles for recall measurement.
"""
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from tests.benchmarks.data_generator import PalaceDataGenerator
from tests.benchmarks.report import record_metric
@pytest.mark.benchmark
class TestSearchLatencyVsSize:
"""Query latency scaling as palace grows."""
SIZES = [500, 1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_search_latency_curve(self, n_drawers, tmp_path, bench_scale):
"""Measure average search latency at different palace sizes."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=n_drawers, include_needles=False)
from mempalace.searcher import search_memories
queries = [
"authentication middleware",
"database optimization",
"error handling patterns",
"deployment configuration",
"testing strategy",
]
latencies = []
for q in queries:
start = time.perf_counter()
result = search_memories(q, palace_path=palace_path, n_results=5)
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
assert "error" not in result
avg_ms = sum(latencies) / len(latencies)
sorted_lat = sorted(latencies)
p50_ms = sorted_lat[len(sorted_lat) // 2]
p95_ms = sorted_lat[int(len(sorted_lat) * 0.95)]
record_metric("search", f"avg_latency_ms_at_{n_drawers}", round(avg_ms, 1))
record_metric("search", f"p50_ms_at_{n_drawers}", round(p50_ms, 1))
record_metric("search", f"p95_ms_at_{n_drawers}", round(p95_ms, 1))
@pytest.mark.benchmark
class TestSearchRecallAtScale:
"""Planted needle recall — does accuracy degrade as palace grows?"""
SIZES = [500, 1_000, 2_500, 5_000]
@pytest.mark.parametrize("n_drawers", SIZES)
def test_recall_at_k(self, n_drawers, tmp_path, bench_scale):
"""Recall@5 and Recall@10 using planted needles."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
_, _, needle_info = gen.populate_palace_directly(
palace_path, n_drawers=n_drawers, include_needles=True
)
from mempalace.searcher import search_memories
hits_at_5 = 0
hits_at_10 = 0
total_needle_queries = min(10, len(needle_info))
for needle in needle_info[:total_needle_queries]:
result = search_memories(
needle["query"], palace_path=palace_path, n_results=10
)
if "error" in result:
continue
texts = [h["text"] for h in result.get("results", [])]
# Check if needle content appears in top 5
found_at_5 = any("NEEDLE_" in t for t in texts[:5])
found_at_10 = any("NEEDLE_" in t for t in texts[:10])
if found_at_5:
hits_at_5 += 1
if found_at_10:
hits_at_10 += 1
recall_at_5 = hits_at_5 / max(total_needle_queries, 1)
recall_at_10 = hits_at_10 / max(total_needle_queries, 1)
record_metric("search_recall", f"recall_at_5_at_{n_drawers}", round(recall_at_5, 3))
record_metric("search_recall", f"recall_at_10_at_{n_drawers}", round(recall_at_10, 3))
@pytest.mark.benchmark
class TestSearchFilteredVsUnfiltered:
"""Compare search performance with and without wing/room filters."""
def test_filter_impact(self, tmp_path, bench_scale):
"""Measure latency and recall difference with wing filtering."""
gen = PalaceDataGenerator(seed=42, scale=bench_scale)
palace_path = str(tmp_path / "palace")
_, _, needle_info = gen.populate_palace_directly(
palace_path, n_drawers=2_000, include_needles=True
)
from mempalace.searcher import search_memories
filtered_latencies = []
unfiltered_latencies = []
filtered_hits = 0
unfiltered_hits = 0
n_queries = min(10, len(needle_info))
for needle in needle_info[:n_queries]:
# Unfiltered
start = time.perf_counter()
result_unfiltered = search_memories(
needle["query"], palace_path=palace_path, n_results=5
)
unfiltered_latencies.append((time.perf_counter() - start) * 1000)
if any("NEEDLE_" in h["text"] for h in result_unfiltered.get("results", [])[:5]):
unfiltered_hits += 1
# Filtered by wing
start = time.perf_counter()
result_filtered = search_memories(
needle["query"],
palace_path=palace_path,
wing=needle["wing"],
n_results=5,
)
filtered_latencies.append((time.perf_counter() - start) * 1000)
if any("NEEDLE_" in h["text"] for h in result_filtered.get("results", [])[:5]):
filtered_hits += 1
avg_unfiltered = sum(unfiltered_latencies) / max(len(unfiltered_latencies), 1)
avg_filtered = sum(filtered_latencies) / max(len(filtered_latencies), 1)
latency_improvement = ((avg_unfiltered - avg_filtered) / max(avg_unfiltered, 0.01)) * 100
record_metric("search_filter", "avg_unfiltered_ms", round(avg_unfiltered, 1))
record_metric("search_filter", "avg_filtered_ms", round(avg_filtered, 1))
record_metric("search_filter", "latency_improvement_pct", round(latency_improvement, 1))
record_metric("search_filter", "unfiltered_recall_at_5", round(unfiltered_hits / max(n_queries, 1), 3))
record_metric("search_filter", "filtered_recall_at_5", round(filtered_hits / max(n_queries, 1), 3))
@pytest.mark.benchmark
class TestConcurrentSearch:
"""Concurrent query performance — tests PersistentClient contention."""
def test_concurrent_queries(self, tmp_path):
"""Issue N simultaneous queries and measure p50/p95/p99."""
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
from mempalace.searcher import search_memories
queries = [
"authentication", "database", "deployment", "error handling",
"testing", "monitoring", "caching", "middleware",
"serialization", "validation",
] * 3 # 30 total queries
def run_search(query):
start = time.perf_counter()
result = search_memories(query, palace_path=palace_path, n_results=5)
elapsed = (time.perf_counter() - start) * 1000
return elapsed, "error" not in result
# Concurrent execution
latencies = []
errors = 0
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(run_search, q): q for q in queries}
for future in as_completed(futures):
elapsed, success = future.result()
latencies.append(elapsed)
if not success:
errors += 1
sorted_lat = sorted(latencies)
n = len(sorted_lat)
record_metric("concurrent_search", "p50_ms", round(sorted_lat[n // 2], 1))
record_metric("concurrent_search", "p95_ms", round(sorted_lat[int(n * 0.95)], 1))
record_metric("concurrent_search", "p99_ms", round(sorted_lat[int(n * 0.99)], 1))
record_metric("concurrent_search", "avg_ms", round(sum(sorted_lat) / n, 1))
record_metric("concurrent_search", "error_count", errors)
record_metric("concurrent_search", "total_queries", len(queries))
record_metric("concurrent_search", "workers", 4)
@pytest.mark.benchmark
class TestSearchNResultsScaling:
"""How does n_results affect query latency?"""
@pytest.mark.parametrize("n_results", [1, 5, 10, 25, 50])
def test_n_results_latency(self, n_results, tmp_path):
gen = PalaceDataGenerator(seed=42, scale="small")
palace_path = str(tmp_path / "palace")
gen.populate_palace_directly(palace_path, n_drawers=2_000, include_needles=False)
from mempalace.searcher import search_memories
latencies = []
for _ in range(5):
start = time.perf_counter()
result = search_memories(
"authentication middleware", palace_path=palace_path, n_results=n_results
)
latencies.append((time.perf_counter() - start) * 1000)
avg_ms = sum(latencies) / len(latencies)
record_metric("search_n_results", f"avg_ms_at_n_{n_results}", round(avg_ms, 1))