mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Remove references to non-existent resource files (references/, assets/, scripts/, examples/) from 115 skill SKILL.md files. These sections pointed to directories and files that were never created, causing confusion when users install skills. Also fix broken Code of Conduct links in issue templates to use absolute GitHub URLs instead of relative paths that 404.
486 lines
16 KiB
Markdown
486 lines
16 KiB
Markdown
---
|
|
name: projection-patterns
|
|
description: Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.
|
|
---
|
|
|
|
# Projection Patterns
|
|
|
|
Comprehensive guide to building projections and read models for event-sourced systems.
|
|
|
|
## When to Use This Skill
|
|
|
|
- Building CQRS read models
|
|
- Creating materialized views from events
|
|
- Optimizing query performance
|
|
- Implementing real-time dashboards
|
|
- Building search indexes from events
|
|
- Aggregating data across streams
|
|
|
|
## Core Concepts
|
|
|
|
### 1. Projection Architecture
|
|
|
|
```
|
|
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
|
│ Event Store │────►│ Projector │────►│ Read Model │
|
|
│ │ │ │ │ (Database) │
|
|
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
|
|
│ │ Events │ │ │ │ Handler │ │ │ │ Tables │ │
|
|
│ └─────────┘ │ │ │ Logic │ │ │ │ Views │ │
|
|
│ │ │ └─────────┘ │ │ │ Cache │ │
|
|
└─────────────┘ └─────────────┘ └─────────────┘
|
|
```
|
|
|
|
### 2. Projection Types
|
|
|
|
| Type | Description | Use Case |
|
|
| -------------- | --------------------------- | ---------------------- |
|
|
| **Live** | Real-time from subscription | Current state queries |
|
|
| **Catchup** | Process historical events | Rebuilding read models |
|
|
| **Persistent** | Stores checkpoint | Resume after restart |
|
|
| **Inline** | Same transaction as write | Strong consistency |
|
|
|
|
## Templates
|
|
|
|
### Template 1: Basic Projector
|
|
|
|
```python
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any, Callable, List
|
|
import asyncpg
|
|
|
|
@dataclass
|
|
class Event:
|
|
stream_id: str
|
|
event_type: str
|
|
data: dict
|
|
version: int
|
|
global_position: int
|
|
|
|
|
|
class Projection(ABC):
|
|
"""Base class for projections."""
|
|
|
|
@property
|
|
@abstractmethod
|
|
def name(self) -> str:
|
|
"""Unique projection name for checkpointing."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def handles(self) -> List[str]:
|
|
"""List of event types this projection handles."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def apply(self, event: Event) -> None:
|
|
"""Apply event to the read model."""
|
|
pass
|
|
|
|
|
|
class Projector:
|
|
"""Runs projections from event store."""
|
|
|
|
def __init__(self, event_store, checkpoint_store):
|
|
self.event_store = event_store
|
|
self.checkpoint_store = checkpoint_store
|
|
self.projections: List[Projection] = []
|
|
|
|
def register(self, projection: Projection):
|
|
self.projections.append(projection)
|
|
|
|
async def run(self, batch_size: int = 100):
|
|
"""Run all projections continuously."""
|
|
while True:
|
|
for projection in self.projections:
|
|
await self._run_projection(projection, batch_size)
|
|
await asyncio.sleep(0.1)
|
|
|
|
async def _run_projection(self, projection: Projection, batch_size: int):
|
|
checkpoint = await self.checkpoint_store.get(projection.name)
|
|
position = checkpoint or 0
|
|
|
|
events = await self.event_store.read_all(position, batch_size)
|
|
|
|
for event in events:
|
|
if event.event_type in projection.handles():
|
|
await projection.apply(event)
|
|
|
|
await self.checkpoint_store.save(
|
|
projection.name,
|
|
event.global_position
|
|
)
|
|
|
|
async def rebuild(self, projection: Projection):
|
|
"""Rebuild a projection from scratch."""
|
|
await self.checkpoint_store.delete(projection.name)
|
|
# Optionally clear read model tables
|
|
await self._run_projection(projection, batch_size=1000)
|
|
```
|
|
|
|
### Template 2: Order Summary Projection
|
|
|
|
```python
|
|
class OrderSummaryProjection(Projection):
|
|
"""Projects order events to a summary read model."""
|
|
|
|
def __init__(self, db_pool: asyncpg.Pool):
|
|
self.pool = db_pool
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "order_summary"
|
|
|
|
def handles(self) -> List[str]:
|
|
return [
|
|
"OrderCreated",
|
|
"OrderItemAdded",
|
|
"OrderItemRemoved",
|
|
"OrderShipped",
|
|
"OrderCompleted",
|
|
"OrderCancelled"
|
|
]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
handlers = {
|
|
"OrderCreated": self._handle_created,
|
|
"OrderItemAdded": self._handle_item_added,
|
|
"OrderItemRemoved": self._handle_item_removed,
|
|
"OrderShipped": self._handle_shipped,
|
|
"OrderCompleted": self._handle_completed,
|
|
"OrderCancelled": self._handle_cancelled,
|
|
}
|
|
|
|
handler = handlers.get(event.event_type)
|
|
if handler:
|
|
await handler(event)
|
|
|
|
async def _handle_created(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO order_summaries
|
|
(order_id, customer_id, status, total_amount, item_count, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['customer_id'],
|
|
'pending',
|
|
0,
|
|
0,
|
|
event.data['created_at']
|
|
)
|
|
|
|
async def _handle_item_added(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET total_amount = total_amount + $2,
|
|
item_count = item_count + 1,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['price'] * event.data['quantity']
|
|
)
|
|
|
|
async def _handle_item_removed(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET total_amount = total_amount - $2,
|
|
item_count = item_count - 1,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['price'] * event.data['quantity']
|
|
)
|
|
|
|
async def _handle_shipped(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET status = 'shipped',
|
|
shipped_at = $2,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['shipped_at']
|
|
)
|
|
|
|
async def _handle_completed(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET status = 'completed',
|
|
completed_at = $2,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['completed_at']
|
|
)
|
|
|
|
async def _handle_cancelled(self, event: Event):
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE order_summaries
|
|
SET status = 'cancelled',
|
|
cancelled_at = $2,
|
|
cancellation_reason = $3,
|
|
updated_at = NOW()
|
|
WHERE order_id = $1
|
|
""",
|
|
event.data['order_id'],
|
|
event.data['cancelled_at'],
|
|
event.data.get('reason')
|
|
)
|
|
```
|
|
|
|
### Template 3: Elasticsearch Search Projection
|
|
|
|
```python
|
|
from elasticsearch import AsyncElasticsearch
|
|
|
|
class ProductSearchProjection(Projection):
|
|
"""Projects product events to Elasticsearch for full-text search."""
|
|
|
|
def __init__(self, es_client: AsyncElasticsearch):
|
|
self.es = es_client
|
|
self.index = "products"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "product_search"
|
|
|
|
def handles(self) -> List[str]:
|
|
return [
|
|
"ProductCreated",
|
|
"ProductUpdated",
|
|
"ProductPriceChanged",
|
|
"ProductDeleted"
|
|
]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
if event.event_type == "ProductCreated":
|
|
await self.es.index(
|
|
index=self.index,
|
|
id=event.data['product_id'],
|
|
document={
|
|
'name': event.data['name'],
|
|
'description': event.data['description'],
|
|
'category': event.data['category'],
|
|
'price': event.data['price'],
|
|
'tags': event.data.get('tags', []),
|
|
'created_at': event.data['created_at']
|
|
}
|
|
)
|
|
|
|
elif event.event_type == "ProductUpdated":
|
|
await self.es.update(
|
|
index=self.index,
|
|
id=event.data['product_id'],
|
|
doc={
|
|
'name': event.data['name'],
|
|
'description': event.data['description'],
|
|
'category': event.data['category'],
|
|
'tags': event.data.get('tags', []),
|
|
'updated_at': event.data['updated_at']
|
|
}
|
|
)
|
|
|
|
elif event.event_type == "ProductPriceChanged":
|
|
await self.es.update(
|
|
index=self.index,
|
|
id=event.data['product_id'],
|
|
doc={
|
|
'price': event.data['new_price'],
|
|
'price_updated_at': event.data['changed_at']
|
|
}
|
|
)
|
|
|
|
elif event.event_type == "ProductDeleted":
|
|
await self.es.delete(
|
|
index=self.index,
|
|
id=event.data['product_id']
|
|
)
|
|
```
|
|
|
|
### Template 4: Aggregating Projection
|
|
|
|
```python
|
|
class DailySalesProjection(Projection):
|
|
"""Aggregates sales data by day for reporting."""
|
|
|
|
def __init__(self, db_pool: asyncpg.Pool):
|
|
self.pool = db_pool
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "daily_sales"
|
|
|
|
def handles(self) -> List[str]:
|
|
return ["OrderCompleted", "OrderRefunded"]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
if event.event_type == "OrderCompleted":
|
|
await self._increment_sales(event)
|
|
elif event.event_type == "OrderRefunded":
|
|
await self._decrement_sales(event)
|
|
|
|
async def _increment_sales(self, event: Event):
|
|
date = event.data['completed_at'][:10] # YYYY-MM-DD
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)
|
|
VALUES ($1, 1, $2, $3)
|
|
ON CONFLICT (date) DO UPDATE SET
|
|
total_orders = daily_sales.total_orders + 1,
|
|
total_revenue = daily_sales.total_revenue + $2,
|
|
total_items = daily_sales.total_items + $3,
|
|
updated_at = NOW()
|
|
""",
|
|
date,
|
|
event.data['total_amount'],
|
|
event.data['item_count']
|
|
)
|
|
|
|
async def _decrement_sales(self, event: Event):
|
|
date = event.data['original_completed_at'][:10]
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE daily_sales SET
|
|
total_orders = total_orders - 1,
|
|
total_revenue = total_revenue - $2,
|
|
total_refunds = total_refunds + $2,
|
|
updated_at = NOW()
|
|
WHERE date = $1
|
|
""",
|
|
date,
|
|
event.data['refund_amount']
|
|
)
|
|
```
|
|
|
|
### Template 5: Multi-Table Projection
|
|
|
|
```python
|
|
class CustomerActivityProjection(Projection):
|
|
"""Projects customer activity across multiple tables."""
|
|
|
|
def __init__(self, db_pool: asyncpg.Pool):
|
|
self.pool = db_pool
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "customer_activity"
|
|
|
|
def handles(self) -> List[str]:
|
|
return [
|
|
"CustomerCreated",
|
|
"OrderCompleted",
|
|
"ReviewSubmitted",
|
|
"CustomerTierChanged"
|
|
]
|
|
|
|
async def apply(self, event: Event) -> None:
|
|
async with self.pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
if event.event_type == "CustomerCreated":
|
|
# Insert into customers table
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO customers (customer_id, email, name, tier, created_at)
|
|
VALUES ($1, $2, $3, 'bronze', $4)
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['email'],
|
|
event.data['name'],
|
|
event.data['created_at']
|
|
)
|
|
# Initialize activity summary
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO customer_activity_summary
|
|
(customer_id, total_orders, total_spent, total_reviews)
|
|
VALUES ($1, 0, 0, 0)
|
|
""",
|
|
event.data['customer_id']
|
|
)
|
|
|
|
elif event.event_type == "OrderCompleted":
|
|
# Update activity summary
|
|
await conn.execute(
|
|
"""
|
|
UPDATE customer_activity_summary SET
|
|
total_orders = total_orders + 1,
|
|
total_spent = total_spent + $2,
|
|
last_order_at = $3
|
|
WHERE customer_id = $1
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['total_amount'],
|
|
event.data['completed_at']
|
|
)
|
|
# Insert into order history
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO customer_order_history
|
|
(customer_id, order_id, amount, completed_at)
|
|
VALUES ($1, $2, $3, $4)
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['order_id'],
|
|
event.data['total_amount'],
|
|
event.data['completed_at']
|
|
)
|
|
|
|
elif event.event_type == "ReviewSubmitted":
|
|
await conn.execute(
|
|
"""
|
|
UPDATE customer_activity_summary SET
|
|
total_reviews = total_reviews + 1,
|
|
last_review_at = $2
|
|
WHERE customer_id = $1
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['submitted_at']
|
|
)
|
|
|
|
elif event.event_type == "CustomerTierChanged":
|
|
await conn.execute(
|
|
"""
|
|
UPDATE customers SET tier = $2, updated_at = NOW()
|
|
WHERE customer_id = $1
|
|
""",
|
|
event.data['customer_id'],
|
|
event.data['new_tier']
|
|
)
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### Do's
|
|
|
|
- **Make projections idempotent** - Safe to replay
|
|
- **Use transactions** - For multi-table updates
|
|
- **Store checkpoints** - Resume after failures
|
|
- **Monitor lag** - Alert on projection delays
|
|
- **Plan for rebuilds** - Design for reconstruction
|
|
|
|
### Don'ts
|
|
|
|
- **Don't couple projections** - Each is independent
|
|
- **Don't skip error handling** - Log and alert on failures
|
|
- **Don't ignore ordering** - Events must be processed in order
|
|
- **Don't over-normalize** - Denormalize for query patterns
|