mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add domain expert agents with comprehensive skill sets: - service-mesh-expert (cloud-infrastructure): Istio/Linkerd patterns, mTLS, observability - event-sourcing-architect (backend-development): CQRS, event stores, projections, sagas - vector-database-engineer (llm-application-dev): embeddings, similarity search, hybrid search - monorepo-architect (developer-essentials): Nx, Turborepo, Bazel, pnpm workspaces - threat-modeling-expert (security-scanning): STRIDE, attack trees, security requirements Update all documentation to reflect correct counts: - 67 plugins, 99 agents, 107 skills, 71 commands
483 lines
15 KiB
Markdown
483 lines
15 KiB
Markdown
---
|
|
name: saga-orchestration
|
|
description: Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.
|
|
---
|
|
|
|
# Saga Orchestration
|
|
|
|
Patterns for managing distributed transactions and long-running business processes.
|
|
|
|
## When to Use This Skill
|
|
|
|
- Coordinating multi-service transactions
|
|
- Implementing compensating transactions
|
|
- Managing long-running business workflows
|
|
- Handling failures in distributed systems
|
|
- Building order fulfillment processes
|
|
- Implementing approval workflows
|
|
|
|
## Core Concepts
|
|
|
|
### 1. Saga Types
|
|
|
|
```
|
|
Choreography Orchestration
|
|
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
|
|
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
|
|
└─────┘ └─────┘ └─────┘ └──────┬──────┘
|
|
│ │ │ │
|
|
▼ ▼ ▼ ┌─────┼─────┐
|
|
Event Event Event ▼ ▼ ▼
|
|
┌────┐┌────┐┌────┐
|
|
│Svc1││Svc2││Svc3│
|
|
└────┘└────┘└────┘
|
|
```
|
|
|
|
### 2. Saga Execution States
|
|
|
|
| State | Description |
|
|
|-------|-------------|
|
|
| **Started** | Saga initiated |
|
|
| **Pending** | Waiting for step completion |
|
|
| **Compensating** | Rolling back due to failure |
|
|
| **Completed** | All steps succeeded |
|
|
| **Failed** | Saga failed after compensation |
|
|
|
|
## Templates
|
|
|
|
### Template 1: Saga Orchestrator Base
|
|
|
|
```python
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
class SagaState(Enum):
|
|
STARTED = "started"
|
|
PENDING = "pending"
|
|
COMPENSATING = "compensating"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class SagaStep:
|
|
name: str
|
|
action: str
|
|
compensation: str
|
|
status: str = "pending"
|
|
result: Optional[Dict] = None
|
|
error: Optional[str] = None
|
|
executed_at: Optional[datetime] = None
|
|
compensated_at: Optional[datetime] = None
|
|
|
|
|
|
@dataclass
|
|
class Saga:
|
|
saga_id: str
|
|
saga_type: str
|
|
state: SagaState
|
|
data: Dict[str, Any]
|
|
steps: List[SagaStep]
|
|
current_step: int = 0
|
|
created_at: datetime = field(default_factory=datetime.utcnow)
|
|
updated_at: datetime = field(default_factory=datetime.utcnow)
|
|
|
|
|
|
class SagaOrchestrator(ABC):
|
|
"""Base class for saga orchestrators."""
|
|
|
|
def __init__(self, saga_store, event_publisher):
|
|
self.saga_store = saga_store
|
|
self.event_publisher = event_publisher
|
|
|
|
@abstractmethod
|
|
def define_steps(self, data: Dict) -> List[SagaStep]:
|
|
"""Define the saga steps."""
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def saga_type(self) -> str:
|
|
"""Unique saga type identifier."""
|
|
pass
|
|
|
|
async def start(self, data: Dict) -> Saga:
|
|
"""Start a new saga."""
|
|
saga = Saga(
|
|
saga_id=str(uuid.uuid4()),
|
|
saga_type=self.saga_type,
|
|
state=SagaState.STARTED,
|
|
data=data,
|
|
steps=self.define_steps(data)
|
|
)
|
|
await self.saga_store.save(saga)
|
|
await self._execute_next_step(saga)
|
|
return saga
|
|
|
|
async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
|
|
"""Handle successful step completion."""
|
|
saga = await self.saga_store.get(saga_id)
|
|
|
|
# Update step
|
|
for step in saga.steps:
|
|
if step.name == step_name:
|
|
step.status = "completed"
|
|
step.result = result
|
|
step.executed_at = datetime.utcnow()
|
|
break
|
|
|
|
saga.current_step += 1
|
|
saga.updated_at = datetime.utcnow()
|
|
|
|
# Check if saga is complete
|
|
if saga.current_step >= len(saga.steps):
|
|
saga.state = SagaState.COMPLETED
|
|
await self.saga_store.save(saga)
|
|
await self._on_saga_completed(saga)
|
|
else:
|
|
saga.state = SagaState.PENDING
|
|
await self.saga_store.save(saga)
|
|
await self._execute_next_step(saga)
|
|
|
|
async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
|
|
"""Handle step failure - start compensation."""
|
|
saga = await self.saga_store.get(saga_id)
|
|
|
|
# Mark step as failed
|
|
for step in saga.steps:
|
|
if step.name == step_name:
|
|
step.status = "failed"
|
|
step.error = error
|
|
break
|
|
|
|
saga.state = SagaState.COMPENSATING
|
|
saga.updated_at = datetime.utcnow()
|
|
await self.saga_store.save(saga)
|
|
|
|
# Start compensation from current step backwards
|
|
await self._compensate(saga)
|
|
|
|
async def _execute_next_step(self, saga: Saga):
|
|
"""Execute the next step in the saga."""
|
|
if saga.current_step >= len(saga.steps):
|
|
return
|
|
|
|
step = saga.steps[saga.current_step]
|
|
step.status = "executing"
|
|
await self.saga_store.save(saga)
|
|
|
|
# Publish command to execute step
|
|
await self.event_publisher.publish(
|
|
step.action,
|
|
{
|
|
"saga_id": saga.saga_id,
|
|
"step_name": step.name,
|
|
**saga.data
|
|
}
|
|
)
|
|
|
|
async def _compensate(self, saga: Saga):
|
|
"""Execute compensation for completed steps."""
|
|
# Compensate in reverse order
|
|
for i in range(saga.current_step - 1, -1, -1):
|
|
step = saga.steps[i]
|
|
if step.status == "completed":
|
|
step.status = "compensating"
|
|
await self.saga_store.save(saga)
|
|
|
|
await self.event_publisher.publish(
|
|
step.compensation,
|
|
{
|
|
"saga_id": saga.saga_id,
|
|
"step_name": step.name,
|
|
"original_result": step.result,
|
|
**saga.data
|
|
}
|
|
)
|
|
|
|
async def handle_compensation_completed(self, saga_id: str, step_name: str):
|
|
"""Handle compensation completion."""
|
|
saga = await self.saga_store.get(saga_id)
|
|
|
|
for step in saga.steps:
|
|
if step.name == step_name:
|
|
step.status = "compensated"
|
|
step.compensated_at = datetime.utcnow()
|
|
break
|
|
|
|
# Check if all compensations complete
|
|
all_compensated = all(
|
|
s.status in ("compensated", "pending", "failed")
|
|
for s in saga.steps
|
|
)
|
|
|
|
if all_compensated:
|
|
saga.state = SagaState.FAILED
|
|
await self._on_saga_failed(saga)
|
|
|
|
await self.saga_store.save(saga)
|
|
|
|
async def _on_saga_completed(self, saga: Saga):
|
|
"""Called when saga completes successfully."""
|
|
await self.event_publisher.publish(
|
|
f"{self.saga_type}Completed",
|
|
{"saga_id": saga.saga_id, **saga.data}
|
|
)
|
|
|
|
async def _on_saga_failed(self, saga: Saga):
|
|
"""Called when saga fails after compensation."""
|
|
await self.event_publisher.publish(
|
|
f"{self.saga_type}Failed",
|
|
{"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
|
|
)
|
|
```
|
|
|
|
### Template 2: Order Fulfillment Saga
|
|
|
|
```python
|
|
class OrderFulfillmentSaga(SagaOrchestrator):
|
|
"""Orchestrates order fulfillment across services."""
|
|
|
|
@property
|
|
def saga_type(self) -> str:
|
|
return "OrderFulfillment"
|
|
|
|
def define_steps(self, data: Dict) -> List[SagaStep]:
|
|
return [
|
|
SagaStep(
|
|
name="reserve_inventory",
|
|
action="InventoryService.ReserveItems",
|
|
compensation="InventoryService.ReleaseReservation"
|
|
),
|
|
SagaStep(
|
|
name="process_payment",
|
|
action="PaymentService.ProcessPayment",
|
|
compensation="PaymentService.RefundPayment"
|
|
),
|
|
SagaStep(
|
|
name="create_shipment",
|
|
action="ShippingService.CreateShipment",
|
|
compensation="ShippingService.CancelShipment"
|
|
),
|
|
SagaStep(
|
|
name="send_confirmation",
|
|
action="NotificationService.SendOrderConfirmation",
|
|
compensation="NotificationService.SendCancellationNotice"
|
|
)
|
|
]
|
|
|
|
|
|
# Usage
|
|
async def create_order(order_data: Dict):
|
|
saga = OrderFulfillmentSaga(saga_store, event_publisher)
|
|
return await saga.start({
|
|
"order_id": order_data["order_id"],
|
|
"customer_id": order_data["customer_id"],
|
|
"items": order_data["items"],
|
|
"payment_method": order_data["payment_method"],
|
|
"shipping_address": order_data["shipping_address"]
|
|
})
|
|
|
|
|
|
# Event handlers in each service
|
|
class InventoryService:
|
|
async def handle_reserve_items(self, command: Dict):
|
|
try:
|
|
# Reserve inventory
|
|
reservation = await self.reserve(
|
|
command["items"],
|
|
command["order_id"]
|
|
)
|
|
# Report success
|
|
await self.event_publisher.publish(
|
|
"SagaStepCompleted",
|
|
{
|
|
"saga_id": command["saga_id"],
|
|
"step_name": "reserve_inventory",
|
|
"result": {"reservation_id": reservation.id}
|
|
}
|
|
)
|
|
except InsufficientInventoryError as e:
|
|
await self.event_publisher.publish(
|
|
"SagaStepFailed",
|
|
{
|
|
"saga_id": command["saga_id"],
|
|
"step_name": "reserve_inventory",
|
|
"error": str(e)
|
|
}
|
|
)
|
|
|
|
async def handle_release_reservation(self, command: Dict):
|
|
# Compensating action
|
|
await self.release_reservation(
|
|
command["original_result"]["reservation_id"]
|
|
)
|
|
await self.event_publisher.publish(
|
|
"SagaCompensationCompleted",
|
|
{
|
|
"saga_id": command["saga_id"],
|
|
"step_name": "reserve_inventory"
|
|
}
|
|
)
|
|
```
|
|
|
|
### Template 3: Choreography-Based Saga
|
|
|
|
```python
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any
|
|
import asyncio
|
|
|
|
@dataclass
|
|
class SagaContext:
|
|
"""Passed through choreographed saga events."""
|
|
saga_id: str
|
|
step: int
|
|
data: Dict[str, Any]
|
|
completed_steps: list
|
|
|
|
|
|
class OrderChoreographySaga:
|
|
"""Choreography-based saga using events."""
|
|
|
|
def __init__(self, event_bus):
|
|
self.event_bus = event_bus
|
|
self._register_handlers()
|
|
|
|
def _register_handlers(self):
|
|
self.event_bus.subscribe("OrderCreated", self._on_order_created)
|
|
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
|
|
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
|
|
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
|
|
|
|
# Compensation handlers
|
|
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
|
|
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
|
|
|
|
async def _on_order_created(self, event: Dict):
|
|
"""Step 1: Order created, reserve inventory."""
|
|
await self.event_bus.publish("ReserveInventory", {
|
|
"saga_id": event["order_id"],
|
|
"order_id": event["order_id"],
|
|
"items": event["items"]
|
|
})
|
|
|
|
async def _on_inventory_reserved(self, event: Dict):
|
|
"""Step 2: Inventory reserved, process payment."""
|
|
await self.event_bus.publish("ProcessPayment", {
|
|
"saga_id": event["saga_id"],
|
|
"order_id": event["order_id"],
|
|
"amount": event["total_amount"],
|
|
"reservation_id": event["reservation_id"]
|
|
})
|
|
|
|
async def _on_payment_processed(self, event: Dict):
|
|
"""Step 3: Payment done, create shipment."""
|
|
await self.event_bus.publish("CreateShipment", {
|
|
"saga_id": event["saga_id"],
|
|
"order_id": event["order_id"],
|
|
"payment_id": event["payment_id"]
|
|
})
|
|
|
|
async def _on_shipment_created(self, event: Dict):
|
|
"""Step 4: Complete - send confirmation."""
|
|
await self.event_bus.publish("OrderFulfilled", {
|
|
"saga_id": event["saga_id"],
|
|
"order_id": event["order_id"],
|
|
"tracking_number": event["tracking_number"]
|
|
})
|
|
|
|
# Compensation handlers
|
|
async def _on_payment_failed(self, event: Dict):
|
|
"""Payment failed - release inventory."""
|
|
await self.event_bus.publish("ReleaseInventory", {
|
|
"saga_id": event["saga_id"],
|
|
"reservation_id": event["reservation_id"]
|
|
})
|
|
await self.event_bus.publish("OrderFailed", {
|
|
"order_id": event["order_id"],
|
|
"reason": "Payment failed"
|
|
})
|
|
|
|
async def _on_shipment_failed(self, event: Dict):
|
|
"""Shipment failed - refund payment and release inventory."""
|
|
await self.event_bus.publish("RefundPayment", {
|
|
"saga_id": event["saga_id"],
|
|
"payment_id": event["payment_id"]
|
|
})
|
|
await self.event_bus.publish("ReleaseInventory", {
|
|
"saga_id": event["saga_id"],
|
|
"reservation_id": event["reservation_id"]
|
|
})
|
|
```
|
|
|
|
### Template 4: Saga with Timeouts
|
|
|
|
```python
|
|
class TimeoutSagaOrchestrator(SagaOrchestrator):
|
|
"""Saga orchestrator with step timeouts."""
|
|
|
|
def __init__(self, saga_store, event_publisher, scheduler):
|
|
super().__init__(saga_store, event_publisher)
|
|
self.scheduler = scheduler
|
|
|
|
async def _execute_next_step(self, saga: Saga):
|
|
if saga.current_step >= len(saga.steps):
|
|
return
|
|
|
|
step = saga.steps[saga.current_step]
|
|
step.status = "executing"
|
|
step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
|
|
await self.saga_store.save(saga)
|
|
|
|
# Schedule timeout check
|
|
await self.scheduler.schedule(
|
|
f"saga_timeout_{saga.saga_id}_{step.name}",
|
|
self._check_timeout,
|
|
{"saga_id": saga.saga_id, "step_name": step.name},
|
|
run_at=step.timeout_at
|
|
)
|
|
|
|
await self.event_publisher.publish(
|
|
step.action,
|
|
{"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
|
|
)
|
|
|
|
async def _check_timeout(self, data: Dict):
|
|
"""Check if step has timed out."""
|
|
saga = await self.saga_store.get(data["saga_id"])
|
|
step = next(s for s in saga.steps if s.name == data["step_name"])
|
|
|
|
if step.status == "executing":
|
|
# Step timed out - fail it
|
|
await self.handle_step_failed(
|
|
data["saga_id"],
|
|
data["step_name"],
|
|
"Step timed out"
|
|
)
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### Do's
|
|
- **Make steps idempotent** - Safe to retry
|
|
- **Design compensations carefully** - They must work
|
|
- **Use correlation IDs** - For tracing across services
|
|
- **Implement timeouts** - Don't wait forever
|
|
- **Log everything** - For debugging failures
|
|
|
|
### Don'ts
|
|
- **Don't assume instant completion** - Sagas take time
|
|
- **Don't skip compensation testing** - Most critical part
|
|
- **Don't couple services** - Use async messaging
|
|
- **Don't ignore partial failures** - Handle gracefully
|
|
|
|
## Resources
|
|
|
|
- [Saga Pattern](https://microservices.io/patterns/data/saga.html)
|
|
- [Designing Data-Intensive Applications](https://dataintensive.net/)
|