mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add comprehensive Conductor plugin implementing Context-Driven Development methodology with tracks, specs, and phased implementation plans. Components: - 5 commands: setup, new-track, implement, status, revert - 1 agent: conductor-validator - 3 skills: context-driven-development, track-management, workflow-patterns - 18 templates for project artifacts Documentation updates: - README.md: Updated counts (68 plugins, 100 agents, 110 skills, 76 tools) - docs/plugins.md: Added Conductor to Workflows section - docs/agents.md: Added conductor-validator agent - docs/agent-skills.md: Added Conductor skills section Also includes Prettier formatting across all project files.
438 lines
15 KiB
Markdown
438 lines
15 KiB
Markdown
---
|
|
name: event-store-design
|
|
description: Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.
|
|
---
|
|
|
|
# Event Store Design
|
|
|
|
Comprehensive guide to designing event stores for event-sourced applications.
|
|
|
|
## When to Use This Skill
|
|
|
|
- Designing event sourcing infrastructure
|
|
- Choosing between event store technologies
|
|
- Implementing custom event stores
|
|
- Optimizing event storage and retrieval
|
|
- Setting up event store schemas
|
|
- Planning for event store scaling
|
|
|
|
## Core Concepts
|
|
|
|
### 1. Event Store Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────┐
|
|
│ Event Store │
|
|
├─────────────────────────────────────────────────────┤
|
|
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
|
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
|
|
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
|
|
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
|
|
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
|
|
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
|
|
│ │ Event 3 │ │ ... │ │ Event 3 │ │
|
|
│ │ ... │ │ │ │ Event 4 │ │
|
|
│ └─────────────┘ └─────────────┘ └─────────────┘ │
|
|
├─────────────────────────────────────────────────────┤
|
|
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
|
|
└─────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### 2. Event Store Requirements
|
|
|
|
| Requirement | Description |
|
|
| ----------------- | ---------------------------------- |
|
|
| **Append-only** | Events are immutable, only appends |
|
|
| **Ordered** | Per-stream and global ordering |
|
|
| **Versioned** | Optimistic concurrency control |
|
|
| **Subscriptions** | Real-time event notifications |
|
|
| **Idempotent** | Handle duplicate writes safely |
|
|
|
|
## Technology Comparison
|
|
|
|
| Technology | Best For | Limitations |
|
|
| ---------------- | ------------------------- | -------------------------------- |
|
|
| **EventStoreDB** | Pure event sourcing | Single-purpose |
|
|
| **PostgreSQL** | Existing Postgres stack | Manual implementation |
|
|
| **Kafka** | High-throughput streaming | Not ideal for per-stream queries |
|
|
| **DynamoDB** | Serverless, AWS-native | Query limitations |
|
|
| **Marten** | .NET ecosystems | .NET specific |
|
|
|
|
## Templates
|
|
|
|
### Template 1: PostgreSQL Event Store Schema
|
|
|
|
```sql
|
|
-- Events table
|
|
CREATE TABLE events (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
stream_id VARCHAR(255) NOT NULL,
|
|
stream_type VARCHAR(255) NOT NULL,
|
|
event_type VARCHAR(255) NOT NULL,
|
|
event_data JSONB NOT NULL,
|
|
metadata JSONB DEFAULT '{}',
|
|
version BIGINT NOT NULL,
|
|
global_position BIGSERIAL,
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
|
|
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
|
|
);
|
|
|
|
-- Index for stream queries
|
|
CREATE INDEX idx_events_stream_id ON events(stream_id, version);
|
|
|
|
-- Index for global subscription
|
|
CREATE INDEX idx_events_global_position ON events(global_position);
|
|
|
|
-- Index for event type queries
|
|
CREATE INDEX idx_events_event_type ON events(event_type);
|
|
|
|
-- Index for time-based queries
|
|
CREATE INDEX idx_events_created_at ON events(created_at);
|
|
|
|
-- Snapshots table
|
|
CREATE TABLE snapshots (
|
|
stream_id VARCHAR(255) PRIMARY KEY,
|
|
stream_type VARCHAR(255) NOT NULL,
|
|
snapshot_data JSONB NOT NULL,
|
|
version BIGINT NOT NULL,
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Subscriptions checkpoint table
|
|
CREATE TABLE subscription_checkpoints (
|
|
subscription_id VARCHAR(255) PRIMARY KEY,
|
|
last_position BIGINT NOT NULL DEFAULT 0,
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
```
|
|
|
|
### Template 2: Python Event Store Implementation
|
|
|
|
```python
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any, Optional, List
|
|
from uuid import UUID, uuid4
|
|
import json
|
|
import asyncpg
|
|
|
|
@dataclass
|
|
class Event:
|
|
stream_id: str
|
|
event_type: str
|
|
data: dict
|
|
metadata: dict = field(default_factory=dict)
|
|
event_id: UUID = field(default_factory=uuid4)
|
|
version: Optional[int] = None
|
|
global_position: Optional[int] = None
|
|
created_at: datetime = field(default_factory=datetime.utcnow)
|
|
|
|
|
|
class EventStore:
|
|
def __init__(self, pool: asyncpg.Pool):
|
|
self.pool = pool
|
|
|
|
async def append_events(
|
|
self,
|
|
stream_id: str,
|
|
stream_type: str,
|
|
events: List[Event],
|
|
expected_version: Optional[int] = None
|
|
) -> List[Event]:
|
|
"""Append events to a stream with optimistic concurrency."""
|
|
async with self.pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
# Check expected version
|
|
if expected_version is not None:
|
|
current = await conn.fetchval(
|
|
"SELECT MAX(version) FROM events WHERE stream_id = $1",
|
|
stream_id
|
|
)
|
|
current = current or 0
|
|
if current != expected_version:
|
|
raise ConcurrencyError(
|
|
f"Expected version {expected_version}, got {current}"
|
|
)
|
|
|
|
# Get starting version
|
|
start_version = await conn.fetchval(
|
|
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
|
|
stream_id
|
|
)
|
|
|
|
# Insert events
|
|
saved_events = []
|
|
for i, event in enumerate(events):
|
|
event.version = start_version + i
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO events (id, stream_id, stream_type, event_type,
|
|
event_data, metadata, version, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
RETURNING global_position
|
|
""",
|
|
event.event_id,
|
|
stream_id,
|
|
stream_type,
|
|
event.event_type,
|
|
json.dumps(event.data),
|
|
json.dumps(event.metadata),
|
|
event.version,
|
|
event.created_at
|
|
)
|
|
event.global_position = row['global_position']
|
|
saved_events.append(event)
|
|
|
|
return saved_events
|
|
|
|
async def read_stream(
|
|
self,
|
|
stream_id: str,
|
|
from_version: int = 0,
|
|
limit: int = 1000
|
|
) -> List[Event]:
|
|
"""Read events from a stream."""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, stream_id, event_type, event_data, metadata,
|
|
version, global_position, created_at
|
|
FROM events
|
|
WHERE stream_id = $1 AND version >= $2
|
|
ORDER BY version
|
|
LIMIT $3
|
|
""",
|
|
stream_id, from_version, limit
|
|
)
|
|
return [self._row_to_event(row) for row in rows]
|
|
|
|
async def read_all(
|
|
self,
|
|
from_position: int = 0,
|
|
limit: int = 1000
|
|
) -> List[Event]:
|
|
"""Read all events globally."""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, stream_id, event_type, event_data, metadata,
|
|
version, global_position, created_at
|
|
FROM events
|
|
WHERE global_position > $1
|
|
ORDER BY global_position
|
|
LIMIT $2
|
|
""",
|
|
from_position, limit
|
|
)
|
|
return [self._row_to_event(row) for row in rows]
|
|
|
|
async def subscribe(
|
|
self,
|
|
subscription_id: str,
|
|
handler,
|
|
from_position: int = 0,
|
|
batch_size: int = 100
|
|
):
|
|
"""Subscribe to all events from a position."""
|
|
# Get checkpoint
|
|
async with self.pool.acquire() as conn:
|
|
checkpoint = await conn.fetchval(
|
|
"""
|
|
SELECT last_position FROM subscription_checkpoints
|
|
WHERE subscription_id = $1
|
|
""",
|
|
subscription_id
|
|
)
|
|
position = checkpoint or from_position
|
|
|
|
while True:
|
|
events = await self.read_all(position, batch_size)
|
|
if not events:
|
|
await asyncio.sleep(1) # Poll interval
|
|
continue
|
|
|
|
for event in events:
|
|
await handler(event)
|
|
position = event.global_position
|
|
|
|
# Save checkpoint
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO subscription_checkpoints (subscription_id, last_position)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (subscription_id)
|
|
DO UPDATE SET last_position = $2, updated_at = NOW()
|
|
""",
|
|
subscription_id, position
|
|
)
|
|
|
|
def _row_to_event(self, row) -> Event:
|
|
return Event(
|
|
event_id=row['id'],
|
|
stream_id=row['stream_id'],
|
|
event_type=row['event_type'],
|
|
data=json.loads(row['event_data']),
|
|
metadata=json.loads(row['metadata']),
|
|
version=row['version'],
|
|
global_position=row['global_position'],
|
|
created_at=row['created_at']
|
|
)
|
|
|
|
|
|
class ConcurrencyError(Exception):
|
|
"""Raised when optimistic concurrency check fails."""
|
|
pass
|
|
```
|
|
|
|
### Template 3: EventStoreDB Usage
|
|
|
|
```python
|
|
from esdbclient import EventStoreDBClient, NewEvent, StreamState
|
|
import json
|
|
|
|
# Connect
|
|
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
|
|
|
|
# Append events
|
|
def append_events(stream_name: str, events: list, expected_revision=None):
|
|
new_events = [
|
|
NewEvent(
|
|
type=event['type'],
|
|
data=json.dumps(event['data']).encode(),
|
|
metadata=json.dumps(event.get('metadata', {})).encode()
|
|
)
|
|
for event in events
|
|
]
|
|
|
|
if expected_revision is None:
|
|
state = StreamState.ANY
|
|
elif expected_revision == -1:
|
|
state = StreamState.NO_STREAM
|
|
else:
|
|
state = expected_revision
|
|
|
|
return client.append_to_stream(
|
|
stream_name=stream_name,
|
|
events=new_events,
|
|
current_version=state
|
|
)
|
|
|
|
# Read stream
|
|
def read_stream(stream_name: str, from_revision: int = 0):
|
|
events = client.get_stream(
|
|
stream_name=stream_name,
|
|
stream_position=from_revision
|
|
)
|
|
return [
|
|
{
|
|
'type': event.type,
|
|
'data': json.loads(event.data),
|
|
'metadata': json.loads(event.metadata) if event.metadata else {},
|
|
'stream_position': event.stream_position,
|
|
'commit_position': event.commit_position
|
|
}
|
|
for event in events
|
|
]
|
|
|
|
# Subscribe to all
|
|
async def subscribe_to_all(handler, from_position: int = 0):
|
|
subscription = client.subscribe_to_all(commit_position=from_position)
|
|
async for event in subscription:
|
|
await handler({
|
|
'type': event.type,
|
|
'data': json.loads(event.data),
|
|
'stream_id': event.stream_name,
|
|
'position': event.commit_position
|
|
})
|
|
|
|
# Category projection ($ce-Category)
|
|
def read_category(category: str):
|
|
"""Read all events for a category using system projection."""
|
|
return read_stream(f"$ce-{category}")
|
|
```
|
|
|
|
### Template 4: DynamoDB Event Store
|
|
|
|
```python
|
|
import boto3
|
|
from boto3.dynamodb.conditions import Key
|
|
from datetime import datetime
|
|
import json
|
|
import uuid
|
|
|
|
class DynamoEventStore:
|
|
def __init__(self, table_name: str):
|
|
self.dynamodb = boto3.resource('dynamodb')
|
|
self.table = self.dynamodb.Table(table_name)
|
|
|
|
def append_events(self, stream_id: str, events: list, expected_version: int = None):
|
|
"""Append events with conditional write for concurrency."""
|
|
with self.table.batch_writer() as batch:
|
|
for i, event in enumerate(events):
|
|
version = (expected_version or 0) + i + 1
|
|
item = {
|
|
'PK': f"STREAM#{stream_id}",
|
|
'SK': f"VERSION#{version:020d}",
|
|
'GSI1PK': 'EVENTS',
|
|
'GSI1SK': datetime.utcnow().isoformat(),
|
|
'event_id': str(uuid.uuid4()),
|
|
'stream_id': stream_id,
|
|
'event_type': event['type'],
|
|
'event_data': json.dumps(event['data']),
|
|
'version': version,
|
|
'created_at': datetime.utcnow().isoformat()
|
|
}
|
|
batch.put_item(Item=item)
|
|
return events
|
|
|
|
def read_stream(self, stream_id: str, from_version: int = 0):
|
|
"""Read events from a stream."""
|
|
response = self.table.query(
|
|
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
|
|
Key('SK').gte(f"VERSION#{from_version:020d}")
|
|
)
|
|
return [
|
|
{
|
|
'event_type': item['event_type'],
|
|
'data': json.loads(item['event_data']),
|
|
'version': item['version']
|
|
}
|
|
for item in response['Items']
|
|
]
|
|
|
|
# Table definition (CloudFormation/Terraform)
|
|
"""
|
|
DynamoDB Table:
|
|
- PK (Partition Key): String
|
|
- SK (Sort Key): String
|
|
- GSI1PK, GSI1SK for global ordering
|
|
|
|
Capacity: On-demand or provisioned based on throughput needs
|
|
"""
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### Do's
|
|
|
|
- **Use stream IDs that include aggregate type** - `Order-{uuid}`
|
|
- **Include correlation/causation IDs** - For tracing
|
|
- **Version events from day one** - Plan for schema evolution
|
|
- **Implement idempotency** - Use event IDs for deduplication
|
|
- **Index appropriately** - For your query patterns
|
|
|
|
### Don'ts
|
|
|
|
- **Don't update or delete events** - They're immutable facts
|
|
- **Don't store large payloads** - Keep events small
|
|
- **Don't skip optimistic concurrency** - Prevents data corruption
|
|
- **Don't ignore backpressure** - Handle slow consumers
|
|
|
|
## Resources
|
|
|
|
- [EventStoreDB](https://www.eventstore.com/)
|
|
- [Marten Events](https://martendb.io/events/)
|
|
- [Event Sourcing Pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)
|