mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add domain expert agents with comprehensive skill sets: - service-mesh-expert (cloud-infrastructure): Istio/Linkerd patterns, mTLS, observability - event-sourcing-architect (backend-development): CQRS, event stores, projections, sagas - vector-database-engineer (llm-application-dev): embeddings, similarity search, hybrid search - monorepo-architect (developer-essentials): Nx, Turborepo, Bazel, pnpm workspaces - threat-modeling-expert (security-scanning): STRIDE, attack trees, security requirements Update all documentation to reflect correct counts: - 67 plugins, 99 agents, 107 skills, 71 commands
489 lines
16 KiB
Markdown
489 lines
16 KiB
Markdown
---
|
|
name: projection-patterns
|
|
description: Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.
|
|
---
|
|
|
|
# Projection Patterns
|
|
|
|
Comprehensive guide to building projections and read models for event-sourced systems.
|
|
|
|
## When to Use This Skill
|
|
|
|
- Building CQRS read models
|
|
- Creating materialized views from events
|
|
- Optimizing query performance
|
|
- Implementing real-time dashboards
|
|
- Building search indexes from events
|
|
- Aggregating data across streams
|
|
|
|
## Core Concepts
|
|
|
|
### 1. Projection Architecture
|
|
|
|
```
|
|
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
|
│ Event Store │────►│ Projector │────►│ Read Model │
|
|
│ │ │ │ │ (Database) │
|
|
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
|
|
│ │ Events │ │ │ │ Handler │ │ │ │ Tables │ │
|
|
│ └─────────┘ │ │ │ Logic │ │ │ │ Views │ │
|
|
│ │ │ └─────────┘ │ │ │ Cache │ │
|
|
└─────────────┘ └─────────────┘ └─────────────┘
|
|
```
|
|
|
|
### 2. Projection Types
|
|
|
|
| Type | Description | Use Case |
|
|
|------|-------------|----------|
|
|
| **Live** | Real-time from subscription | Current state queries |
|
|
| **Catchup** | Process historical events | Rebuilding read models |
|
|
| **Persistent** | Stores checkpoint | Resume after restart |
|
|
| **Inline** | Same transaction as write | Strong consistency |
|
|
|
|
## Templates
|
|
|
|
### Template 1: Basic Projector
|
|
|
|
```python
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any, Callable, List
|
|
import asyncpg
|
|
|
|
@dataclass
|
|
class Event:
|
|
stream_id: str
|
|
event_type: str
|
|
data: dict
|
|
version: int
|
|
global_position: int
|
|
|
|
|
|
class Projection(ABC):
|
|
"""Base class for projections."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def name(self) -> str:
|
|
"""Unique projection name for checkpointing."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def handles(self) -> List[str]:
|
|
"""List of event types this projection handles."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def apply(self, event: Event) -> None:
|
|
"""Apply event to the read model."""
|
|
pass
|
|
|
|
|
|
class Projector:
|
|
"""Runs projections from event store."""
|
|
|
|
def __init__(self, event_store, checkpoint_store):
|
|
self.event_store = event_store
|
|
self.checkpoint_store = checkpoint_store
|
|
self.projections: List[Projection] = []
|
|
|
|
def register(self, projection: Projection):
|
|
self.projections.append(projection)
|
|
|
|
async def run(self, batch_size: int = 100):
|
|
"""Run all projections continuously."""
|
|
while True:
|
|
for projection in self.projections:
|
|
await self._run_projection(projection, batch_size)
|
|
await asyncio.sleep(0.1)
|
|
|
|
async def _run_projection(self, projection: Projection, batch_size: int):
|
|
checkpoint = await self.checkpoint_store.get(projection.name)
|
|
position = checkpoint or 0
|
|
|
|
events = await self.event_store.read_all(position, batch_size)
|
|
|
|
for event in events:
|
|
if event.event_type in projection.handles():
|
|
await projection.apply(event)
|
|
|
|
await self.checkpoint_store.save(
|
|
projection.name,
|
|
event.global_position
|
|
)
|
|
|
|
async def rebuild(self, projection: Projection):
|
|
"""Rebuild a projection from scratch."""
|
|
await self.checkpoint_store.delete(projection.name)
|
|
# Optionally clear read model tables
|
|
await self._run_projection(projection, batch_size=1000)
|
|
```
|
|
|
|
### Template 2: Order Summary Projection
|
|
|
|
```python
|
|
class OrderSummaryProjection(Projection):
|
|
"""Projects order events to a summary read model."""
|
|
|
|
def __init__(self, db_pool: asyncpg.Pool):
|
|
self.pool = db_pool
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "order_summary"
|
|
|
|
def handles(self) -> List[str]:
|
|
return [
|
|
"OrderCreated",
|
|
"OrderItemAdded",
|
|
"OrderItemRemoved",
|
|
"OrderShipped",
|
|
"OrderCompleted",
|
|
"OrderCancelled"
|
|
]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
handlers = {
|
|
"OrderCreated": self._handle_created,
|
|
"OrderItemAdded": self._handle_item_added,
|
|
"OrderItemRemoved": self._handle_item_removed,
|
|
"OrderShipped": self._handle_shipped,
|
|
"OrderCompleted": self._handle_completed,
|
|
"OrderCancelled": self._handle_cancelled,
|
|
}
|
|
|
|
handler = handlers.get(event.event_type)
|
|
if handler:
|
|
await handler(event)
|
|
|
|
async def _handle_created(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO order_summaries
|
|
(order_id, customer_id, status, total_amount, item_count, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['customer_id'],
|
|
'pending',
|
|
0,
|
|
0,
|
|
event.data['created_at']
|
|
)
|
|
|
|
async def _handle_item_added(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET total_amount = total_amount + $2,
|
|
item_count = item_count + 1,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['price'] * event.data['quantity']
|
|
)
|
|
|
|
async def _handle_item_removed(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET total_amount = total_amount - $2,
|
|
item_count = item_count - 1,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['price'] * event.data['quantity']
|
|
)
|
|
|
|
async def _handle_shipped(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET status = 'shipped',
|
|
shipped_at = $2,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['shipped_at']
|
|
)
|
|
|
|
async def _handle_completed(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET status = 'completed',
|
|
completed_at = $2,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['completed_at']
|
|
)
|
|
|
|
async def _handle_cancelled(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET status = 'cancelled',
|
|
cancelled_at = $2,
|
|
cancellation_reason = $3,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['cancelled_at'],
|
|
event.data.get('reason')
|
|
)
|
|
```
|
|
|
|
### Template 3: Elasticsearch Search Projection
|
|
|
|
```python
|
|
from elasticsearch import AsyncElasticsearch
|
|
|
|
class ProductSearchProjection(Projection):
|
|
"""Projects product events to Elasticsearch for full-text search."""
|
|
|
|
def __init__(self, es_client: AsyncElasticsearch):
|
|
self.es = es_client
|
|
self.index = "products"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "product_search"
|
|
|
|
def handles(self) -> List[str]:
|
|
return [
|
|
"ProductCreated",
|
|
"ProductUpdated",
|
|
"ProductPriceChanged",
|
|
"ProductDeleted"
|
|
]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
if event.event_type == "ProductCreated":
|
|
await self.es.index(
|
|
index=self.index,
|
|
id=event.data['product_id'],
|
|
document={
|
|
'name': event.data['name'],
|
|
'description': event.data['description'],
|
|
'category': event.data['category'],
|
|
'price': event.data['price'],
|
|
'tags': event.data.get('tags', []),
|
|
'created_at': event.data['created_at']
|
|
}
|
|
)
|
|
|
|
elif event.event_type == "ProductUpdated":
|
|
await self.es.update(
|
|
index=self.index,
|
|
id=event.data['product_id'],
|
|
doc={
|
|
'name': event.data['name'],
|
|
'description': event.data['description'],
|
|
'category': event.data['category'],
|
|
'tags': event.data.get('tags', []),
|
|
'updated_at': event.data['updated_at']
|
|
}
|
|
)
|
|
|
|
elif event.event_type == "ProductPriceChanged":
|
|
await self.es.update(
|
|
index=self.index,
|
|
id=event.data['product_id'],
|
|
doc={
|
|
'price': event.data['new_price'],
|
|
'price_updated_at': event.data['changed_at']
|
|
}
|
|
)
|
|
|
|
elif event.event_type == "ProductDeleted":
|
|
await self.es.delete(
|
|
index=self.index,
|
|
id=event.data['product_id']
|
|
)
|
|
```
|
|
|
|
### Template 4: Aggregating Projection
|
|
|
|
```python
|
|
class DailySalesProjection(Projection):
|
|
"""Aggregates sales data by day for reporting."""
|
|
|
|
def __init__(self, db_pool: asyncpg.Pool):
|
|
self.pool = db_pool
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "daily_sales"
|
|
|
|
def handles(self) -> List[str]:
|
|
return ["OrderCompleted", "OrderRefunded"]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
if event.event_type == "OrderCompleted":
|
|
await self._increment_sales(event)
|
|
elif event.event_type == "OrderRefunded":
|
|
await self._decrement_sales(event)
|
|
|
|
async def _increment_sales(self, event: Event):
|
|
date = event.data['completed_at'][:10] # YYYY-MM-DD
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)
|
|
VALUES ($1, 1, $2, $3)
|
|
ON CONFLICT (date) DO UPDATE SET
|
|
total_orders = daily_sales.total_orders + 1,
|
|
total_revenue = daily_sales.total_revenue + $2,
|
|
total_items = daily_sales.total_items + $3,
|
|
updated_at = NOW()
|
|
""",
|
|
date,
|
|
event.data['total_amount'],
|
|
event.data['item_count']
|
|
)
|
|
|
|
async def _decrement_sales(self, event: Event):
|
|
date = event.data['original_completed_at'][:10]
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE daily_sales SET
|
|
total_orders = total_orders - 1,
|
|
total_revenue = total_revenue - $2,
|
|
total_refunds = total_refunds + $2,
|
|
updated_at = NOW()
|
|
WHERE date = $1
|
|
""",
|
|
date,
|
|
event.data['refund_amount']
|
|
)
|
|
```
|
|
|
|
### Template 5: Multi-Table Projection
|
|
|
|
```python
|
|
class CustomerActivityProjection(Projection):
|
|
"""Projects customer activity across multiple tables."""
|
|
|
|
def __init__(self, db_pool: asyncpg.Pool):
|
|
self.pool = db_pool
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "customer_activity"
|
|
|
|
def handles(self) -> List[str]:
|
|
return [
|
|
"CustomerCreated",
|
|
"OrderCompleted",
|
|
"ReviewSubmitted",
|
|
"CustomerTierChanged"
|
|
]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
async with self.pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
if event.event_type == "CustomerCreated":
|
|
# Insert into customers table
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO customers (customer_id, email, name, tier, created_at)
|
|
VALUES ($1, $2, $3, 'bronze', $4)
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['email'],
|
|
event.data['name'],
|
|
event.data['created_at']
|
|
)
|
|
# Initialize activity summary
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO customer_activity_summary
|
|
(customer_id, total_orders, total_spent, total_reviews)
|
|
VALUES ($1, 0, 0, 0)
|
|
""",
|
|
event.data['customer_id']
|
|
)
|
|
|
|
elif event.event_type == "OrderCompleted":
|
|
# Update activity summary
|
|
await conn.execute(
|
|
"""
|
|
UPDATE customer_activity_summary SET
|
|
total_orders = total_orders + 1,
|
|
total_spent = total_spent + $2,
|
|
last_order_at = $3
|
|
WHERE customer_id = $1
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['total_amount'],
|
|
event.data['completed_at']
|
|
)
|
|
# Insert into order history
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO customer_order_history
|
|
(customer_id, order_id, amount, completed_at)
|
|
VALUES ($1, $2, $3, $4)
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['order_id'],
|
|
event.data['total_amount'],
|
|
event.data['completed_at']
|
|
)
|
|
|
|
elif event.event_type == "ReviewSubmitted":
|
|
await conn.execute(
|
|
"""
|
|
UPDATE customer_activity_summary SET
|
|
total_reviews = total_reviews + 1,
|
|
last_review_at = $2
|
|
WHERE customer_id = $1
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['submitted_at']
|
|
)
|
|
|
|
elif event.event_type == "CustomerTierChanged":
|
|
await conn.execute(
|
|
"""
|
|
UPDATE customers SET tier = $2, updated_at = NOW()
|
|
WHERE customer_id = $1
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['new_tier']
|
|
)
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### Do's
|
|
- **Make projections idempotent** - Safe to replay
|
|
- **Use transactions** - For multi-table updates
|
|
- **Store checkpoints** - Resume after failures
|
|
- **Monitor lag** - Alert on projection delays
|
|
- **Plan for rebuilds** - Design for reconstruction
|
|
|
|
### Don'ts
|
|
- **Don't couple projections** - Each is independent
|
|
- **Don't skip error handling** - Log and alert on failures
|
|
- **Don't ignore ordering** - Events must be processed in order
|
|
- **Don't over-normalize** - Denormalize for query patterns
|
|
|
|
## Resources
|
|
|
|
- [CQRS Pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs)
|
|
- [Projection Building Blocks](https://zimarev.com/blog/event-sourcing/projections/)
|