feat: add Temporal workflow orchestration to backend-development plugin (#125)

* docs: enhance payment-integration agent with critical security guidance

Add evidence-based security requirements from Stripe, PayPal, OWASP:
- Webhook security (signature verification, idempotency, quick response, server validation)
- PCI compliance essentials (tokenization, server-side validation, environment separation)
- Real-world failure examples (processor collapse, Lambda failures, malicious price manipulation)

Minimal expansion: 32 to 57 lines (25 lines added)

* feat: add Temporal workflow orchestration to backend-development plugin

Add comprehensive Temporal workflow orchestration support with 1 agent and 2 skills:

**Agent:**
- temporal-python-pro: Python SDK expert for durable workflows, saga patterns,
  async/await patterns, error handling, and production deployment

**Skills:**
- workflow-orchestration-patterns: Language-agnostic patterns for workflows vs
  activities, saga compensation, entity workflows, and determinism constraints
- temporal-python-testing: Progressive disclosure testing guide with unit testing,
  integration testing, replay testing, and local development setup

**Changes:**
- Add agent: plugins/backend-development/agents/temporal-python-pro.md (311 lines)
- Add skill: plugins/backend-development/skills/workflow-orchestration-patterns/ (286 lines)
- Add skill: plugins/backend-development/skills/temporal-python-testing/ (SKILL.md + 4 resource files)
- Update marketplace.json: backend-development plugin v1.2.2 → v1.2.3
- Update docs/agents.md: 85 → 86 agents
- Update docs/agent-skills.md: 55 → 57 skills

**Content Sources:**
- Official Temporal documentation (docs.temporal.io)
- Temporal Python SDK guide (python.temporal.io)
- Temporal architecture docs (github.com/temporalio/temporal)
- OWASP best practices for distributed systems

Addresses #124

---------

Co-authored-by: Kiran Eshwarappa <kiran.eshwarapa@gmail.com>
This commit is contained in:
Kiri
2025-11-16 19:45:36 -06:00
committed by GitHub
parent 493e2ea399
commit ddbd034ca3
10 changed files with 2537 additions and 8 deletions

View File

@@ -101,8 +101,8 @@
{
"name": "backend-development",
"source": "./plugins/backend-development",
"description": "Backend API design, GraphQL architecture, and test-driven backend development",
"version": "1.2.2",
"description": "Backend API design, GraphQL architecture, workflow orchestration with Temporal, and test-driven backend development",
"version": "1.2.3",
"author": {
"name": "Seth Hobson",
"url": "https://github.com/wshobson"
@@ -115,7 +115,10 @@
"api-design",
"graphql",
"tdd",
"architecture"
"architecture",
"temporal",
"workflow-orchestration",
"distributed-systems"
],
"category": "development",
"strict": false,
@@ -125,12 +128,15 @@
"agents": [
"./agents/backend-architect.md",
"./agents/graphql-architect.md",
"./agents/tdd-orchestrator.md"
"./agents/tdd-orchestrator.md",
"./agents/temporal-python-pro.md"
],
"skills": [
"./skills/api-design-principles",
"./skills/architecture-patterns",
"./skills/microservices-patterns"
"./skills/microservices-patterns",
"./skills/workflow-orchestration-patterns",
"./skills/temporal-python-testing"
]
},
{

View File

@@ -1,6 +1,6 @@
# Agent Skills
Agent Skills are modular packages that extend Claude's capabilities with specialized domain knowledge, following Anthropic's [Agent Skills Specification](https://github.com/anthropics/skills/blob/main/agent_skills_spec.md). This plugin ecosystem includes **55 specialized skills** across 15 plugins, enabling progressive disclosure and efficient token usage.
Agent Skills are modular packages that extend Claude's capabilities with specialized domain knowledge, following Anthropic's [Agent Skills Specification](https://github.com/anthropics/skills/blob/main/agent_skills_spec.md). This plugin ecosystem includes **57 specialized skills** across 15 plugins, enabling progressive disclosure and efficient token usage.
## Overview
@@ -30,13 +30,15 @@ Skills provide Claude with deep expertise in specific domains without loading ev
| **rag-implementation** | Build Retrieval-Augmented Generation systems with vector databases and semantic search |
| **llm-evaluation** | Implement comprehensive evaluation strategies with automated metrics and benchmarking |
### Backend Development (3 skills)
### Backend Development (5 skills)
| Skill | Description |
|-------|-------------|
| **api-design-principles** | Master REST and GraphQL API design for intuitive, scalable, and maintainable APIs |
| **architecture-patterns** | Implement Clean Architecture, Hexagonal Architecture, and Domain-Driven Design |
| **microservices-patterns** | Design microservices with service boundaries, event-driven communication, and resilience |
| **workflow-orchestration-patterns** | Design durable workflows with Temporal for distributed systems, saga patterns, and state management |
| **temporal-python-testing** | Test Temporal workflows with pytest, time-skipping, and mocking strategies for comprehensive coverage |
### Developer Essentials (8 skills)

View File

@@ -1,6 +1,6 @@
# Agent Reference
Complete reference for all **85 specialized AI agents** organized by category with model assignments.
Complete reference for all **86 specialized AI agents** organized by category with model assignments.
## Agent Categories
@@ -46,6 +46,7 @@ Complete reference for all **85 specialized AI agents** organized by category wi
| [javascript-pro](../plugins/javascript-typescript/agents/javascript-pro.md) | sonnet | Modern JavaScript with ES6+, async patterns, Node.js |
| [typescript-pro](../plugins/javascript-typescript/agents/typescript-pro.md) | sonnet | Advanced TypeScript with type systems and generics |
| [python-pro](../plugins/python-development/agents/python-pro.md) | sonnet | Python development with advanced features and optimization |
| [temporal-python-pro](../plugins/backend-development/agents/temporal-python-pro.md) | sonnet | Temporal workflow orchestration with Python SDK, durable workflows, saga patterns |
| [ruby-pro](../plugins/web-scripting/agents/ruby-pro.md) | sonnet | Ruby with metaprogramming, Rails patterns, gem development |
| [php-pro](../plugins/web-scripting/agents/php-pro.md) | sonnet | Modern PHP with frameworks and performance optimization |

View File

@@ -0,0 +1,311 @@
---
name: temporal-python-pro
description: Master Temporal workflow orchestration with Python SDK. Implements durable workflows, saga patterns, and distributed transactions. Covers async/await, testing strategies, and production deployment. Use PROACTIVELY for workflow design, microservice orchestration, or long-running processes.
model: sonnet
---
You are an expert Temporal workflow developer specializing in Python SDK implementation, durable workflow design, and production-ready distributed systems.
## Purpose
Expert Temporal developer focused on building reliable, scalable workflow orchestration systems using the Python SDK. Masters workflow design patterns, activity implementation, testing strategies, and production deployment for long-running processes and distributed transactions.
## Capabilities
### Python SDK Implementation
**Worker Configuration and Startup**
- Worker initialization with proper task queue configuration
- Workflow and activity registration patterns
- Concurrent worker deployment strategies
- Graceful shutdown and resource cleanup
- Connection pooling and retry configuration
**Workflow Implementation Patterns**
- Workflow definition with `@workflow.defn` decorator
- Async/await workflow entry points with `@workflow.run`
- Workflow-safe time operations with `workflow.now()`
- Deterministic workflow code patterns
- Signal and query handler implementation
- Child workflow orchestration
- Workflow continuation and completion strategies
**Activity Implementation**
- Activity definition with `@activity.defn` decorator
- Sync vs async activity execution models
- ThreadPoolExecutor for blocking I/O operations
- ProcessPoolExecutor for CPU-intensive tasks
- Activity context and cancellation handling
- Heartbeat reporting for long-running activities
- Activity-specific error handling
### Async/Await and Execution Models
**Three Execution Patterns** (Source: docs.temporal.io):
1. **Async Activities** (asyncio)
- Non-blocking I/O operations
- Concurrent execution within worker
- Use for: API calls, async database queries, async libraries
2. **Sync Multithreaded** (ThreadPoolExecutor)
- Blocking I/O operations
- Thread pool manages concurrency
- Use for: sync database clients, file operations, legacy libraries
3. **Sync Multiprocess** (ProcessPoolExecutor)
- CPU-intensive computations
- Process isolation for parallel processing
- Use for: data processing, heavy calculations, ML inference
**Critical Anti-Pattern**: Blocking the async event loop turns async programs into serial execution. Always use sync activities for blocking operations.
### Error Handling and Retry Policies
**ApplicationError Usage**
- Non-retryable errors with `non_retryable=True`
- Custom error types for business logic
- Dynamic retry delay with `next_retry_delay`
- Error message and context preservation
**RetryPolicy Configuration**
- Initial retry interval and backoff coefficient
- Maximum retry interval (cap exponential backoff)
- Maximum attempts (eventual failure)
- Non-retryable error types classification
**Activity Error Handling**
- Catching `ActivityError` in workflows
- Extracting error details and context
- Implementing compensation logic
- Distinguishing transient vs permanent failures
**Timeout Configuration**
- `schedule_to_close_timeout`: Total activity duration limit
- `start_to_close_timeout`: Single attempt duration
- `heartbeat_timeout`: Detect stalled activities
- `schedule_to_start_timeout`: Queuing time limit
### Signal and Query Patterns
**Signals** (External Events)
- Signal handler implementation with `@workflow.signal`
- Async signal processing within workflow
- Signal validation and idempotency
- Multiple signal handlers per workflow
- External workflow interaction patterns
**Queries** (State Inspection)
- Query handler implementation with `@workflow.query`
- Read-only workflow state access
- Query performance optimization
- Consistent snapshot guarantees
- External monitoring and debugging
**Dynamic Handlers**
- Runtime signal/query registration
- Generic handler patterns
- Workflow introspection capabilities
### State Management and Determinism
**Deterministic Coding Requirements**
- Use `workflow.now()` instead of `datetime.now()`
- Use `workflow.random()` instead of `random.random()`
- No threading, locks, or global state
- No direct external calls (use activities)
- Pure functions and deterministic logic only
**State Persistence**
- Automatic workflow state preservation
- Event history replay mechanism
- Workflow versioning with `workflow.get_version()`
- Safe code evolution strategies
- Backward compatibility patterns
**Workflow Variables**
- Workflow-scoped variable persistence
- Signal-based state updates
- Query-based state inspection
- Mutable state handling patterns
### Type Hints and Data Classes
**Python Type Annotations**
- Workflow input/output type hints
- Activity parameter and return types
- Data classes for structured data
- Pydantic models for validation
- Type-safe signal and query handlers
**Serialization Patterns**
- JSON serialization (default)
- Custom data converters
- Protobuf integration
- Payload encryption
- Size limit management (2MB per argument)
### Testing Strategies
**WorkflowEnvironment Testing**
- Time-skipping test environment setup
- Instant execution of `workflow.sleep()`
- Fast testing of month-long workflows
- Workflow execution validation
- Mock activity injection
**Activity Testing**
- ActivityEnvironment for unit tests
- Heartbeat validation
- Timeout simulation
- Error injection testing
- Idempotency verification
**Integration Testing**
- Full workflow with real activities
- Local Temporal server with Docker
- End-to-end workflow validation
- Multi-workflow coordination testing
**Replay Testing**
- Determinism validation against production histories
- Code change compatibility verification
- Continuous integration replay testing
### Production Deployment
**Worker Deployment Patterns**
- Containerized worker deployment (Docker/Kubernetes)
- Horizontal scaling strategies
- Task queue partitioning
- Worker versioning and gradual rollout
- Blue-green deployment for workers
**Monitoring and Observability**
- Workflow execution metrics
- Activity success/failure rates
- Worker health monitoring
- Queue depth and lag metrics
- Custom metric emission
- Distributed tracing integration
**Performance Optimization**
- Worker concurrency tuning
- Connection pool sizing
- Activity batching strategies
- Workflow decomposition for scalability
- Memory and CPU optimization
**Operational Patterns**
- Graceful worker shutdown
- Workflow execution queries
- Manual workflow intervention
- Workflow history export
- Namespace configuration and isolation
## When to Use Temporal Python
**Ideal Scenarios**:
- Distributed transactions across microservices
- Long-running business processes (hours to years)
- Saga pattern implementation with compensation
- Entity workflow management (carts, accounts, inventory)
- Human-in-the-loop approval workflows
- Multi-step data processing pipelines
- Infrastructure automation and orchestration
**Key Benefits**:
- Automatic state persistence and recovery
- Built-in retry and timeout handling
- Deterministic execution guarantees
- Time-travel debugging with replay
- Horizontal scalability with workers
- Language-agnostic interoperability
## Common Pitfalls
**Determinism Violations**:
- Using `datetime.now()` instead of `workflow.now()`
- Random number generation with `random.random()`
- Threading or global state in workflows
- Direct API calls from workflows
**Activity Implementation Errors**:
- Non-idempotent activities (unsafe retries)
- Missing timeout configuration
- Blocking async event loop with sync code
- Exceeding payload size limits (2MB)
**Testing Mistakes**:
- Not using time-skipping environment
- Testing workflows without mocking activities
- Ignoring replay testing in CI/CD
- Inadequate error injection testing
**Deployment Issues**:
- Unregistered workflows/activities on workers
- Mismatched task queue configuration
- Missing graceful shutdown handling
- Insufficient worker concurrency
## Integration Patterns
**Microservices Orchestration**
- Cross-service transaction coordination
- Saga pattern with compensation
- Event-driven workflow triggers
- Service dependency management
**Data Processing Pipelines**
- Multi-stage data transformation
- Parallel batch processing
- Error handling and retry logic
- Progress tracking and reporting
**Business Process Automation**
- Order fulfillment workflows
- Payment processing with compensation
- Multi-party approval processes
- SLA enforcement and escalation
## Best Practices
**Workflow Design**:
1. Keep workflows focused and single-purpose
2. Use child workflows for scalability
3. Implement idempotent activities
4. Configure appropriate timeouts
5. Design for failure and recovery
**Testing**:
1. Use time-skipping for fast feedback
2. Mock activities in workflow tests
3. Validate replay with production histories
4. Test error scenarios and compensation
5. Achieve high coverage (≥80% target)
**Production**:
1. Deploy workers with graceful shutdown
2. Monitor workflow and activity metrics
3. Implement distributed tracing
4. Version workflows carefully
5. Use workflow queries for debugging
## Resources
**Official Documentation**:
- Python SDK: python.temporal.io
- Core Concepts: docs.temporal.io/workflows
- Testing Guide: docs.temporal.io/develop/python/testing-suite
- Best Practices: docs.temporal.io/develop/best-practices
**Architecture**:
- Temporal Architecture: github.com/temporalio/temporal/blob/main/docs/architecture/README.md
- Testing Patterns: github.com/temporalio/temporal/blob/main/docs/development/testing.md
**Key Takeaways**:
1. Workflows = orchestration, Activities = external calls
2. Determinism is mandatory for workflows
3. Idempotency is critical for activities
4. Test with time-skipping for fast feedback
5. Monitor and observe in production

View File

@@ -0,0 +1,146 @@
---
name: temporal-python-testing
description: Test Temporal workflows with pytest, time-skipping, and mocking strategies. Covers unit testing, integration testing, replay testing, and local development setup. Use when implementing Temporal workflow tests or debugging test failures.
---
# Temporal Python Testing Strategies
Comprehensive testing approaches for Temporal workflows using pytest, progressive disclosure resources for specific testing scenarios.
## When to Use This Skill
- **Unit testing workflows** - Fast tests with time-skipping
- **Integration testing** - Workflows with mocked activities
- **Replay testing** - Validate determinism against production histories
- **Local development** - Set up Temporal server and pytest
- **CI/CD integration** - Automated testing pipelines
- **Coverage strategies** - Achieve ≥80% test coverage
## Testing Philosophy
**Recommended Approach** (Source: docs.temporal.io/develop/python/testing-suite):
- Write majority as integration tests
- Use pytest with async fixtures
- Time-skipping enables fast feedback (month-long workflows → seconds)
- Mock activities to isolate workflow logic
- Validate determinism with replay testing
**Three Test Types**:
1. **Unit**: Workflows with time-skipping, activities with ActivityEnvironment
2. **Integration**: Workers with mocked activities
3. **End-to-end**: Full Temporal server with real activities (use sparingly)
## Available Resources
This skill provides detailed guidance through progressive disclosure. Load specific resources based on your testing needs:
### Unit Testing Resources
**File**: `resources/unit-testing.md`
**When to load**: Testing individual workflows or activities in isolation
**Contains**:
- WorkflowEnvironment with time-skipping
- ActivityEnvironment for activity testing
- Fast execution of long-running workflows
- Manual time advancement patterns
- pytest fixtures and patterns
### Integration Testing Resources
**File**: `resources/integration-testing.md`
**When to load**: Testing workflows with mocked external dependencies
**Contains**:
- Activity mocking strategies
- Error injection patterns
- Multi-activity workflow testing
- Signal and query testing
- Coverage strategies
### Replay Testing Resources
**File**: `resources/replay-testing.md`
**When to load**: Validating determinism or deploying workflow changes
**Contains**:
- Determinism validation
- Production history replay
- CI/CD integration patterns
- Version compatibility testing
### Local Development Resources
**File**: `resources/local-setup.md`
**When to load**: Setting up development environment
**Contains**:
- Docker Compose configuration
- pytest setup and configuration
- Coverage tool integration
- Development workflow
## Quick Start Guide
### Basic Workflow Test
```python
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@pytest.fixture
async def workflow_env():
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.mark.asyncio
async def test_workflow(workflow_env):
async with Worker(
workflow_env.client,
task_queue="test-queue",
workflows=[YourWorkflow],
activities=[your_activity],
):
result = await workflow_env.client.execute_workflow(
YourWorkflow.run,
args,
id="test-wf-id",
task_queue="test-queue",
)
assert result == expected
```
### Basic Activity Test
```python
from temporalio.testing import ActivityEnvironment
async def test_activity():
env = ActivityEnvironment()
result = await env.run(your_activity, "test-input")
assert result == expected_output
```
## Coverage Targets
**Recommended Coverage** (Source: docs.temporal.io best practices):
- **Workflows**: ≥80% logic coverage
- **Activities**: ≥80% logic coverage
- **Integration**: Critical paths with mocked activities
- **Replay**: All workflow versions before deployment
## Key Testing Principles
1. **Time-Skipping** - Month-long workflows test in seconds
2. **Mock Activities** - Isolate workflow logic from external dependencies
3. **Replay Testing** - Validate determinism before deployment
4. **High Coverage** - ≥80% target for production workflows
5. **Fast Feedback** - Unit tests run in milliseconds
## How to Use Resources
**Load specific resource when needed**:
- "Show me unit testing patterns" → Load `resources/unit-testing.md`
- "How do I mock activities?" → Load `resources/integration-testing.md`
- "Setup local Temporal server" → Load `resources/local-setup.md`
- "Validate determinism" → Load `resources/replay-testing.md`
## Additional References
- Python SDK Testing: docs.temporal.io/develop/python/testing-suite
- Testing Patterns: github.com/temporalio/temporal/blob/main/docs/development/testing.md
- Python Samples: github.com/temporalio/samples-python

View File

@@ -0,0 +1,452 @@
# Integration Testing with Mocked Activities
Comprehensive patterns for testing workflows with mocked external dependencies, error injection, and complex scenarios.
## Activity Mocking Strategy
**Purpose**: Test workflow orchestration logic without calling real external services
### Basic Mock Pattern
```python
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from unittest.mock import Mock
@pytest.mark.asyncio
async def test_workflow_with_mocked_activity(workflow_env):
"""Mock activity to test workflow logic"""
# Create mock activity
mock_activity = Mock(return_value="mocked-result")
@workflow.defn
class WorkflowWithActivity:
@workflow.run
async def run(self, input: str) -> str:
result = await workflow.execute_activity(
process_external_data,
input,
start_to_close_timeout=timedelta(seconds=10),
)
return f"processed: {result}"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[WorkflowWithActivity],
activities=[mock_activity], # Use mock instead of real activity
):
result = await workflow_env.client.execute_workflow(
WorkflowWithActivity.run,
"test-input",
id="wf-mock",
task_queue="test",
)
assert result == "processed: mocked-result"
mock_activity.assert_called_once()
```
### Dynamic Mock Responses
**Scenario-Based Mocking**:
```python
@pytest.mark.asyncio
async def test_workflow_multiple_mock_scenarios(workflow_env):
"""Test different workflow paths with dynamic mocks"""
# Mock returns different values based on input
def dynamic_activity(input: str) -> str:
if input == "error-case":
raise ApplicationError("Validation failed", non_retryable=True)
return f"processed-{input}"
@workflow.defn
class DynamicWorkflow:
@workflow.run
async def run(self, input: str) -> str:
try:
result = await workflow.execute_activity(
dynamic_activity,
input,
start_to_close_timeout=timedelta(seconds=10),
)
return f"success: {result}"
except ApplicationError as e:
return f"error: {e.message}"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[DynamicWorkflow],
activities=[dynamic_activity],
):
# Test success path
result_success = await workflow_env.client.execute_workflow(
DynamicWorkflow.run,
"valid-input",
id="wf-success",
task_queue="test",
)
assert result_success == "success: processed-valid-input"
# Test error path
result_error = await workflow_env.client.execute_workflow(
DynamicWorkflow.run,
"error-case",
id="wf-error",
task_queue="test",
)
assert "Validation failed" in result_error
```
## Error Injection Patterns
### Testing Transient Failures
**Retry Behavior**:
```python
@pytest.mark.asyncio
async def test_workflow_transient_errors(workflow_env):
"""Test retry logic with controlled failures"""
attempt_count = 0
@activity.defn
async def transient_activity() -> str:
nonlocal attempt_count
attempt_count += 1
if attempt_count < 3:
raise Exception(f"Transient error {attempt_count}")
return "success-after-retries"
@workflow.defn
class RetryWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
transient_activity,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=10),
maximum_attempts=5,
backoff_coefficient=1.0,
),
)
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[RetryWorkflow],
activities=[transient_activity],
):
result = await workflow_env.client.execute_workflow(
RetryWorkflow.run,
id="retry-wf",
task_queue="test",
)
assert result == "success-after-retries"
assert attempt_count == 3
```
### Testing Non-Retryable Errors
**Business Validation Failures**:
```python
@pytest.mark.asyncio
async def test_workflow_non_retryable_error(workflow_env):
"""Test handling of permanent failures"""
@activity.defn
async def validation_activity(input: dict) -> str:
if not input.get("valid"):
raise ApplicationError(
"Invalid input",
non_retryable=True, # Don't retry validation errors
)
return "validated"
@workflow.defn
class ValidationWorkflow:
@workflow.run
async def run(self, input: dict) -> str:
try:
return await workflow.execute_activity(
validation_activity,
input,
start_to_close_timeout=timedelta(seconds=10),
)
except ApplicationError as e:
return f"validation-failed: {e.message}"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[ValidationWorkflow],
activities=[validation_activity],
):
result = await workflow_env.client.execute_workflow(
ValidationWorkflow.run,
{"valid": False},
id="validation-wf",
task_queue="test",
)
assert "validation-failed" in result
```
## Multi-Activity Workflow Testing
### Sequential Activity Pattern
```python
@pytest.mark.asyncio
async def test_workflow_sequential_activities(workflow_env):
"""Test workflow orchestrating multiple activities"""
activity_calls = []
@activity.defn
async def step_1(input: str) -> str:
activity_calls.append("step_1")
return f"{input}-step1"
@activity.defn
async def step_2(input: str) -> str:
activity_calls.append("step_2")
return f"{input}-step2"
@activity.defn
async def step_3(input: str) -> str:
activity_calls.append("step_3")
return f"{input}-step3"
@workflow.defn
class SequentialWorkflow:
@workflow.run
async def run(self, input: str) -> str:
result_1 = await workflow.execute_activity(
step_1,
input,
start_to_close_timeout=timedelta(seconds=10),
)
result_2 = await workflow.execute_activity(
step_2,
result_1,
start_to_close_timeout=timedelta(seconds=10),
)
result_3 = await workflow.execute_activity(
step_3,
result_2,
start_to_close_timeout=timedelta(seconds=10),
)
return result_3
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[SequentialWorkflow],
activities=[step_1, step_2, step_3],
):
result = await workflow_env.client.execute_workflow(
SequentialWorkflow.run,
"start",
id="seq-wf",
task_queue="test",
)
assert result == "start-step1-step2-step3"
assert activity_calls == ["step_1", "step_2", "step_3"]
```
### Parallel Activity Pattern
```python
@pytest.mark.asyncio
async def test_workflow_parallel_activities(workflow_env):
"""Test concurrent activity execution"""
@activity.defn
async def parallel_task(task_id: int) -> str:
return f"task-{task_id}"
@workflow.defn
class ParallelWorkflow:
@workflow.run
async def run(self, task_count: int) -> list[str]:
# Execute activities in parallel
tasks = [
workflow.execute_activity(
parallel_task,
i,
start_to_close_timeout=timedelta(seconds=10),
)
for i in range(task_count)
]
return await asyncio.gather(*tasks)
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[ParallelWorkflow],
activities=[parallel_task],
):
result = await workflow_env.client.execute_workflow(
ParallelWorkflow.run,
3,
id="parallel-wf",
task_queue="test",
)
assert result == ["task-0", "task-1", "task-2"]
```
## Signal and Query Testing
### Signal Handlers
```python
@pytest.mark.asyncio
async def test_workflow_signals(workflow_env):
"""Test workflow signal handling"""
@workflow.defn
class SignalWorkflow:
def __init__(self) -> None:
self._status = "initialized"
@workflow.run
async def run(self) -> str:
# Wait for completion signal
await workflow.wait_condition(lambda: self._status == "completed")
return self._status
@workflow.signal
async def update_status(self, new_status: str) -> None:
self._status = new_status
@workflow.query
def get_status(self) -> str:
return self._status
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[SignalWorkflow],
):
# Start workflow
handle = await workflow_env.client.start_workflow(
SignalWorkflow.run,
id="signal-wf",
task_queue="test",
)
# Verify initial state via query
initial_status = await handle.query(SignalWorkflow.get_status)
assert initial_status == "initialized"
# Send signal
await handle.signal(SignalWorkflow.update_status, "processing")
# Verify updated state
updated_status = await handle.query(SignalWorkflow.get_status)
assert updated_status == "processing"
# Complete workflow
await handle.signal(SignalWorkflow.update_status, "completed")
result = await handle.result()
assert result == "completed"
```
## Coverage Strategies
### Workflow Logic Coverage
**Target**: ≥80% coverage of workflow decision logic
```python
# Test all branches
@pytest.mark.parametrize("condition,expected", [
(True, "branch-a"),
(False, "branch-b"),
])
async def test_workflow_branches(workflow_env, condition, expected):
"""Ensure all code paths are tested"""
# Test implementation
pass
```
### Activity Coverage
**Target**: ≥80% coverage of activity logic
```python
# Test activity edge cases
@pytest.mark.parametrize("input,expected", [
("valid", "success"),
("", "empty-input-error"),
(None, "null-input-error"),
])
async def test_activity_edge_cases(activity_env, input, expected):
"""Test activity error handling"""
# Test implementation
pass
```
## Integration Test Organization
### Test Structure
```
tests/
├── integration/
│ ├── conftest.py # Shared fixtures
│ ├── test_order_workflow.py # Order processing tests
│ ├── test_payment_workflow.py # Payment tests
│ └── test_fulfillment_workflow.py
├── unit/
│ ├── test_order_activities.py
│ └── test_payment_activities.py
└── fixtures/
└── test_data.py # Test data builders
```
### Shared Fixtures
```python
# conftest.py
import pytest
from temporalio.testing import WorkflowEnvironment
@pytest.fixture(scope="session")
async def workflow_env():
"""Session-scoped environment for integration tests"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.fixture
def mock_payment_service():
"""Mock external payment service"""
return Mock()
@pytest.fixture
def mock_inventory_service():
"""Mock external inventory service"""
return Mock()
```
## Best Practices
1. **Mock External Dependencies**: Never call real APIs in tests
2. **Test Error Scenarios**: Verify compensation and retry logic
3. **Parallel Testing**: Use pytest-xdist for faster test runs
4. **Isolated Tests**: Each test should be independent
5. **Clear Assertions**: Verify both results and side effects
6. **Coverage Target**: ≥80% for critical workflows
7. **Fast Execution**: Use time-skipping, avoid real delays
## Additional Resources
- Mocking Strategies: docs.temporal.io/develop/python/testing-suite
- pytest Best Practices: docs.pytest.org/en/stable/goodpractices.html
- Python SDK Samples: github.com/temporalio/samples-python

View File

@@ -0,0 +1,550 @@
# Local Development Setup for Temporal Python Testing
Comprehensive guide for setting up local Temporal development environment with pytest integration and coverage tracking.
## Temporal Server Setup with Docker Compose
### Basic Docker Compose Configuration
```yaml
# docker-compose.yml
version: "3.8"
services:
temporal:
image: temporalio/auto-setup:latest
container_name: temporal-dev
ports:
- "7233:7233" # Temporal server
- "8233:8233" # Web UI
environment:
- DB=postgresql
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
depends_on:
- postgresql
postgresql:
image: postgres:14-alpine
container_name: temporal-postgres
environment:
- POSTGRES_USER=temporal
- POSTGRES_PASSWORD=temporal
- POSTGRES_DB=temporal
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
temporal-ui:
image: temporalio/ui:latest
container_name: temporal-ui
depends_on:
- temporal
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
ports:
- "8080:8080"
volumes:
postgres_data:
```
### Starting Local Server
```bash
# Start Temporal server
docker-compose up -d
# Verify server is running
docker-compose ps
# View logs
docker-compose logs -f temporal
# Access Temporal Web UI
open http://localhost:8080
# Stop server
docker-compose down
# Reset data (clean slate)
docker-compose down -v
```
### Health Check Script
```python
# scripts/health_check.py
import asyncio
from temporalio.client import Client
async def check_temporal_health():
"""Verify Temporal server is accessible"""
try:
client = await Client.connect("localhost:7233")
print("✓ Connected to Temporal server")
# Test workflow execution
from temporalio.worker import Worker
@workflow.defn
class HealthCheckWorkflow:
@workflow.run
async def run(self) -> str:
return "healthy"
async with Worker(
client,
task_queue="health-check",
workflows=[HealthCheckWorkflow],
):
result = await client.execute_workflow(
HealthCheckWorkflow.run,
id="health-check",
task_queue="health-check",
)
print(f"✓ Workflow execution successful: {result}")
return True
except Exception as e:
print(f"✗ Health check failed: {e}")
return False
if __name__ == "__main__":
asyncio.run(check_temporal_health())
```
## pytest Configuration
### Project Structure
```
temporal-project/
├── docker-compose.yml
├── pyproject.toml
├── pytest.ini
├── requirements.txt
├── src/
│ ├── workflows/
│ │ ├── __init__.py
│ │ ├── order_workflow.py
│ │ └── payment_workflow.py
│ └── activities/
│ ├── __init__.py
│ ├── payment_activities.py
│ └── inventory_activities.py
├── tests/
│ ├── conftest.py
│ ├── unit/
│ │ ├── test_workflows.py
│ │ └── test_activities.py
│ ├── integration/
│ │ └── test_order_flow.py
│ └── replay/
│ └── test_workflow_replay.py
└── scripts/
├── health_check.py
└── export_histories.py
```
### pytest Configuration
```ini
# pytest.ini
[pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# Markers for test categorization
markers =
unit: Unit tests (fast, isolated)
integration: Integration tests (require Temporal server)
replay: Replay tests (require production histories)
slow: Slow running tests
# Coverage settings
addopts =
--verbose
--strict-markers
--cov=src
--cov-report=term-missing
--cov-report=html
--cov-fail-under=80
# Async test timeout
asyncio_default_fixture_loop_scope = function
```
### Shared Test Fixtures
```python
# tests/conftest.py
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.client import Client
@pytest.fixture(scope="session")
def event_loop():
"""Provide event loop for async fixtures"""
import asyncio
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def temporal_client():
"""Provide Temporal client connected to local server"""
client = await Client.connect("localhost:7233")
yield client
await client.close()
@pytest.fixture(scope="module")
async def workflow_env():
"""Module-scoped time-skipping environment"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.fixture
def activity_env():
"""Function-scoped activity environment"""
from temporalio.testing import ActivityEnvironment
return ActivityEnvironment()
@pytest.fixture
async def test_worker(temporal_client, workflow_env):
"""Pre-configured test worker"""
from temporalio.worker import Worker
from src.workflows import OrderWorkflow, PaymentWorkflow
from src.activities import process_payment, update_inventory
return Worker(
workflow_env.client,
task_queue="test-queue",
workflows=[OrderWorkflow, PaymentWorkflow],
activities=[process_payment, update_inventory],
)
```
### Dependencies
```txt
# requirements.txt
temporalio>=1.5.0
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
pytest-xdist>=3.3.0 # Parallel test execution
```
```toml
# pyproject.toml
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_backend"
[project]
name = "temporal-project"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"temporalio>=1.5.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
"pytest-xdist>=3.3.0",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
```
## Coverage Configuration
### Coverage Settings
```ini
# .coveragerc
[run]
source = src
omit =
*/tests/*
*/venv/*
*/__pycache__/*
[report]
exclude_lines =
# Exclude type checking blocks
if TYPE_CHECKING:
# Exclude debug code
def __repr__
# Exclude abstract methods
@abstractmethod
# Exclude pass statements
pass
[html]
directory = htmlcov
```
### Running Tests with Coverage
```bash
# Run all tests with coverage
pytest --cov=src --cov-report=term-missing
# Generate HTML coverage report
pytest --cov=src --cov-report=html
open htmlcov/index.html
# Run specific test categories
pytest -m unit # Unit tests only
pytest -m integration # Integration tests only
pytest -m "not slow" # Skip slow tests
# Parallel execution (faster)
pytest -n auto # Use all CPU cores
# Fail if coverage below threshold
pytest --cov=src --cov-fail-under=80
```
### Coverage Report Example
```
---------- coverage: platform darwin, python 3.11.5 -----------
Name Stmts Miss Cover Missing
-----------------------------------------------------------------
src/__init__.py 0 0 100%
src/activities/__init__.py 2 0 100%
src/activities/inventory.py 45 3 93% 78-80
src/activities/payment.py 38 0 100%
src/workflows/__init__.py 2 0 100%
src/workflows/order_workflow.py 67 5 93% 45-49
src/workflows/payment_workflow.py 52 0 100%
-----------------------------------------------------------------
TOTAL 206 8 96%
10 files skipped due to complete coverage.
```
## Development Workflow
### Daily Development Flow
```bash
# 1. Start Temporal server
docker-compose up -d
# 2. Verify server health
python scripts/health_check.py
# 3. Run tests during development
pytest tests/unit/ --verbose
# 4. Run full test suite before commit
pytest --cov=src --cov-report=term-missing
# 5. Check coverage
open htmlcov/index.html
# 6. Stop server
docker-compose down
```
### Pre-Commit Hook
```bash
# .git/hooks/pre-commit
#!/bin/bash
echo "Running tests..."
pytest --cov=src --cov-fail-under=80
if [ $? -ne 0 ]; then
echo "Tests failed. Commit aborted."
exit 1
fi
echo "All tests passed!"
```
### Makefile for Common Tasks
```makefile
# Makefile
.PHONY: setup test test-unit test-integration coverage clean
setup:
docker-compose up -d
pip install -r requirements.txt
python scripts/health_check.py
test:
pytest --cov=src --cov-report=term-missing
test-unit:
pytest -m unit --verbose
test-integration:
pytest -m integration --verbose
test-replay:
pytest -m replay --verbose
test-parallel:
pytest -n auto --cov=src
coverage:
pytest --cov=src --cov-report=html
open htmlcov/index.html
clean:
docker-compose down -v
rm -rf .pytest_cache htmlcov .coverage
ci:
docker-compose up -d
sleep 10 # Wait for Temporal to start
pytest --cov=src --cov-fail-under=80
docker-compose down
```
### CI/CD Example
```yaml
# .github/workflows/test.yml
name: Tests
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Start Temporal server
run: docker-compose up -d
- name: Wait for Temporal
run: sleep 10
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run tests with coverage
run: |
pytest --cov=src --cov-report=xml --cov-fail-under=80
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
- name: Cleanup
if: always()
run: docker-compose down
```
## Debugging Tips
### Enable Temporal SDK Logging
```python
import logging
# Enable debug logging for Temporal SDK
logging.basicConfig(level=logging.DEBUG)
temporal_logger = logging.getLogger("temporalio")
temporal_logger.setLevel(logging.DEBUG)
```
### Interactive Debugging
```python
# Add breakpoint in test
@pytest.mark.asyncio
async def test_workflow_with_breakpoint(workflow_env):
import pdb; pdb.set_trace() # Debug here
async with Worker(...):
result = await workflow_env.client.execute_workflow(...)
```
### Temporal Web UI
```bash
# Access Web UI at http://localhost:8080
# - View workflow executions
# - Inspect event history
# - Replay workflows
# - Monitor workers
```
## Best Practices
1. **Isolated Environment**: Use Docker Compose for reproducible local setup
2. **Health Checks**: Always verify Temporal server before running tests
3. **Fast Feedback**: Use pytest markers to run unit tests quickly
4. **Coverage Targets**: Maintain ≥80% code coverage
5. **Parallel Testing**: Use pytest-xdist for faster test runs
6. **CI/CD Integration**: Automated testing on every commit
7. **Cleanup**: Clear Docker volumes between test runs if needed
## Troubleshooting
**Issue: Temporal server not starting**
```bash
# Check logs
docker-compose logs temporal
# Reset database
docker-compose down -v
docker-compose up -d
```
**Issue: Tests timing out**
```python
# Increase timeout in pytest.ini
asyncio_default_timeout = 30
```
**Issue: Port already in use**
```bash
# Find process using port 7233
lsof -i :7233
# Kill process or change port in docker-compose.yml
```
## Additional Resources
- Temporal Local Development: docs.temporal.io/develop/python/local-dev
- pytest Documentation: docs.pytest.org
- Docker Compose: docs.docker.com/compose
- pytest-asyncio: github.com/pytest-dev/pytest-asyncio

View File

@@ -0,0 +1,455 @@
# Replay Testing for Determinism and Compatibility
Comprehensive guide for validating workflow determinism and ensuring safe code changes using replay testing.
## What is Replay Testing?
**Purpose**: Verify that workflow code changes are backward-compatible with existing workflow executions
**How it works**:
1. Temporal records every workflow decision as Event History
2. Replay testing re-executes workflow code against recorded history
3. If new code makes same decisions → deterministic (safe to deploy)
4. If decisions differ → non-deterministic (breaking change)
**Critical Use Cases**:
- Deploying workflow code changes to production
- Validating refactoring doesn't break running workflows
- CI/CD automated compatibility checks
- Version migration validation
## Basic Replay Testing
### Replayer Setup
```python
from temporalio.worker import Replayer
from temporalio.client import Client
async def test_workflow_replay():
"""Test workflow against production history"""
# Connect to Temporal server
client = await Client.connect("localhost:7233")
# Create replayer with current workflow code
replayer = Replayer(
workflows=[OrderWorkflow, PaymentWorkflow]
)
# Fetch workflow history from production
handle = client.get_workflow_handle("order-123")
history = await handle.fetch_history()
# Replay history with current code
await replayer.replay_workflow(history)
# Success = deterministic, Exception = breaking change
```
### Testing Against Multiple Histories
```python
import pytest
from temporalio.worker import Replayer
@pytest.mark.asyncio
async def test_replay_multiple_workflows():
"""Replay against multiple production histories"""
replayer = Replayer(workflows=[OrderWorkflow])
# Test against different workflow executions
workflow_ids = [
"order-success-123",
"order-cancelled-456",
"order-retry-789",
]
for workflow_id in workflow_ids:
handle = client.get_workflow_handle(workflow_id)
history = await handle.fetch_history()
# Replay should succeed for all variants
await replayer.replay_workflow(history)
```
## Determinism Validation
### Common Non-Deterministic Patterns
**Problem: Random Number Generation**
```python
# ❌ Non-deterministic (breaks replay)
@workflow.defn
class BadWorkflow:
@workflow.run
async def run(self) -> int:
return random.randint(1, 100) # Different on replay!
# ✅ Deterministic (safe for replay)
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self) -> int:
return workflow.random().randint(1, 100) # Deterministic random
```
**Problem: Current Time**
```python
# ❌ Non-deterministic
@workflow.defn
class BadWorkflow:
@workflow.run
async def run(self) -> str:
now = datetime.now() # Different on replay!
return now.isoformat()
# ✅ Deterministic
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self) -> str:
now = workflow.now() # Deterministic time
return now.isoformat()
```
**Problem: Direct External Calls**
```python
# ❌ Non-deterministic
@workflow.defn
class BadWorkflow:
@workflow.run
async def run(self) -> dict:
response = requests.get("https://api.example.com/data") # External call!
return response.json()
# ✅ Deterministic
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self) -> dict:
# Use activity for external calls
return await workflow.execute_activity(
fetch_external_data,
start_to_close_timeout=timedelta(seconds=30),
)
```
### Testing Determinism
```python
@pytest.mark.asyncio
async def test_workflow_determinism():
"""Verify workflow produces same output on multiple runs"""
@workflow.defn
class DeterministicWorkflow:
@workflow.run
async def run(self, seed: int) -> list[int]:
# Use workflow.random() for determinism
rng = workflow.random()
rng.seed(seed)
return [rng.randint(1, 100) for _ in range(10)]
env = await WorkflowEnvironment.start_time_skipping()
# Run workflow twice with same input
results = []
for i in range(2):
async with Worker(
env.client,
task_queue="test",
workflows=[DeterministicWorkflow],
):
result = await env.client.execute_workflow(
DeterministicWorkflow.run,
42, # Same seed
id=f"determinism-test-{i}",
task_queue="test",
)
results.append(result)
await env.shutdown()
# Verify identical outputs
assert results[0] == results[1]
```
## Production History Replay
### Exporting Workflow History
```python
from temporalio.client import Client
async def export_workflow_history(workflow_id: str, output_file: str):
"""Export workflow history for replay testing"""
client = await Client.connect("production.temporal.io:7233")
# Fetch workflow history
handle = client.get_workflow_handle(workflow_id)
history = await handle.fetch_history()
# Save to file for replay testing
with open(output_file, "wb") as f:
f.write(history.SerializeToString())
print(f"Exported history to {output_file}")
```
### Replaying from File
```python
from temporalio.worker import Replayer
from temporalio.api.history.v1 import History
async def test_replay_from_file():
"""Replay workflow from exported history file"""
# Load history from file
with open("workflow_histories/order-123.pb", "rb") as f:
history = History.FromString(f.read())
# Replay with current workflow code
replayer = Replayer(workflows=[OrderWorkflow])
await replayer.replay_workflow(history)
# Success = safe to deploy
```
## CI/CD Integration Patterns
### GitHub Actions Example
```yaml
# .github/workflows/replay-tests.yml
name: Replay Tests
on:
pull_request:
branches: [main]
jobs:
replay-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-asyncio
- name: Download production histories
run: |
# Fetch recent workflow histories from production
python scripts/export_histories.py
- name: Run replay tests
run: |
pytest tests/replay/ --verbose
- name: Upload results
if: failure()
uses: actions/upload-artifact@v3
with:
name: replay-failures
path: replay-failures/
```
### Automated History Export
```python
# scripts/export_histories.py
import asyncio
from temporalio.client import Client
from datetime import datetime, timedelta
async def export_recent_histories():
"""Export recent production workflow histories"""
client = await Client.connect("production.temporal.io:7233")
# Query recent completed workflows
workflows = client.list_workflows(
query="WorkflowType='OrderWorkflow' AND CloseTime > '7 days ago'"
)
count = 0
async for workflow in workflows:
# Export history
history = await workflow.fetch_history()
# Save to file
filename = f"workflow_histories/{workflow.id}.pb"
with open(filename, "wb") as f:
f.write(history.SerializeToString())
count += 1
if count >= 100: # Limit to 100 most recent
break
print(f"Exported {count} workflow histories")
if __name__ == "__main__":
asyncio.run(export_recent_histories())
```
### Replay Test Suite
```python
# tests/replay/test_workflow_replay.py
import pytest
import glob
from temporalio.worker import Replayer
from temporalio.api.history.v1 import History
from workflows import OrderWorkflow, PaymentWorkflow
@pytest.mark.asyncio
async def test_replay_all_histories():
"""Replay all production histories"""
replayer = Replayer(
workflows=[OrderWorkflow, PaymentWorkflow]
)
# Load all history files
history_files = glob.glob("workflow_histories/*.pb")
failures = []
for history_file in history_files:
try:
with open(history_file, "rb") as f:
history = History.FromString(f.read())
await replayer.replay_workflow(history)
print(f"{history_file}")
except Exception as e:
failures.append((history_file, str(e)))
print(f"{history_file}: {e}")
# Report failures
if failures:
pytest.fail(
f"Replay failed for {len(failures)} workflows:\n"
+ "\n".join(f" {file}: {error}" for file, error in failures)
)
```
## Version Compatibility Testing
### Testing Code Evolution
```python
@pytest.mark.asyncio
async def test_workflow_version_compatibility():
"""Test workflow with version changes"""
@workflow.defn
class EvolvingWorkflow:
@workflow.run
async def run(self) -> str:
# Use versioning for safe code evolution
version = workflow.get_version("feature-flag", 1, 2)
if version == 1:
# Old behavior
return "version-1"
else:
# New behavior
return "version-2"
env = await WorkflowEnvironment.start_time_skipping()
# Test version 1 behavior
async with Worker(
env.client,
task_queue="test",
workflows=[EvolvingWorkflow],
):
result_v1 = await env.client.execute_workflow(
EvolvingWorkflow.run,
id="evolving-v1",
task_queue="test",
)
assert result_v1 == "version-1"
# Simulate workflow executing again with version 2
result_v2 = await env.client.execute_workflow(
EvolvingWorkflow.run,
id="evolving-v2",
task_queue="test",
)
# New workflows use version 2
assert result_v2 == "version-2"
await env.shutdown()
```
### Migration Strategy
```python
# Phase 1: Add version check
@workflow.defn
class MigratingWorkflow:
@workflow.run
async def run(self) -> dict:
version = workflow.get_version("new-logic", 1, 2)
if version == 1:
# Old logic (existing workflows)
return await self._old_implementation()
else:
# New logic (new workflows)
return await self._new_implementation()
# Phase 2: After all old workflows complete, remove old code
@workflow.defn
class MigratedWorkflow:
@workflow.run
async def run(self) -> dict:
# Only new logic remains
return await self._new_implementation()
```
## Best Practices
1. **Replay Before Deploy**: Always run replay tests before deploying workflow changes
2. **Export Regularly**: Continuously export production histories for testing
3. **CI/CD Integration**: Automated replay testing in pull request checks
4. **Version Tracking**: Use workflow.get_version() for safe code evolution
5. **History Retention**: Keep representative workflow histories for regression testing
6. **Determinism**: Never use random(), datetime.now(), or direct external calls
7. **Comprehensive Testing**: Test against various workflow execution paths
## Common Replay Errors
**Non-Deterministic Error**:
```
WorkflowNonDeterministicError: Workflow command mismatch at position 5
Expected: ScheduleActivityTask(activity_id='activity-1')
Got: ScheduleActivityTask(activity_id='activity-2')
```
**Solution**: Code change altered workflow decision sequence
**Version Mismatch Error**:
```
WorkflowVersionError: Workflow version changed from 1 to 2 without using get_version()
```
**Solution**: Use workflow.get_version() for backward-compatible changes
## Additional Resources
- Replay Testing: docs.temporal.io/develop/python/testing-suite#replay-testing
- Workflow Versioning: docs.temporal.io/workflows#versioning
- Determinism Guide: docs.temporal.io/workflows#deterministic-constraints
- CI/CD Integration: github.com/temporalio/samples-python/tree/main/.github/workflows

View File

@@ -0,0 +1,320 @@
# Unit Testing Temporal Workflows and Activities
Focused guide for testing individual workflows and activities in isolation using WorkflowEnvironment and ActivityEnvironment.
## WorkflowEnvironment with Time-Skipping
**Purpose**: Test workflows in isolation with instant time progression (month-long workflows → seconds)
### Basic Setup Pattern
```python
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@pytest.fixture
async def workflow_env():
"""Reusable time-skipping test environment"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.mark.asyncio
async def test_workflow_execution(workflow_env):
"""Test workflow with time-skipping"""
async with Worker(
workflow_env.client,
task_queue="test-queue",
workflows=[YourWorkflow],
activities=[your_activity],
):
result = await workflow_env.client.execute_workflow(
YourWorkflow.run,
"test-input",
id="test-wf-id",
task_queue="test-queue",
)
assert result == "expected-output"
```
**Key Benefits**:
- `workflow.sleep(timedelta(days=30))` completes instantly
- Fast feedback loop (milliseconds vs hours)
- Deterministic test execution
### Time-Skipping Examples
**Sleep Advancement**:
```python
@pytest.mark.asyncio
async def test_workflow_with_delays(workflow_env):
"""Workflow sleeps are instant in time-skipping mode"""
@workflow.defn
class DelayedWorkflow:
@workflow.run
async def run(self) -> str:
await workflow.sleep(timedelta(hours=24)) # Instant in tests
return "completed"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[DelayedWorkflow],
):
result = await workflow_env.client.execute_workflow(
DelayedWorkflow.run,
id="delayed-wf",
task_queue="test",
)
assert result == "completed"
```
**Manual Time Control**:
```python
@pytest.mark.asyncio
async def test_workflow_manual_time(workflow_env):
"""Manually advance time for precise control"""
handle = await workflow_env.client.start_workflow(
TimeBasedWorkflow.run,
id="time-wf",
task_queue="test",
)
# Advance time by specific amount
await workflow_env.sleep(timedelta(hours=1))
# Verify intermediate state via query
state = await handle.query(TimeBasedWorkflow.get_state)
assert state == "processing"
# Advance to completion
await workflow_env.sleep(timedelta(hours=23))
result = await handle.result()
assert result == "completed"
```
### Testing Workflow Logic
**Decision Testing**:
```python
@pytest.mark.asyncio
async def test_workflow_branching(workflow_env):
"""Test different execution paths"""
@workflow.defn
class ConditionalWorkflow:
@workflow.run
async def run(self, condition: bool) -> str:
if condition:
return "path-a"
return "path-b"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[ConditionalWorkflow],
):
# Test true path
result_a = await workflow_env.client.execute_workflow(
ConditionalWorkflow.run,
True,
id="cond-wf-true",
task_queue="test",
)
assert result_a == "path-a"
# Test false path
result_b = await workflow_env.client.execute_workflow(
ConditionalWorkflow.run,
False,
id="cond-wf-false",
task_queue="test",
)
assert result_b == "path-b"
```
## ActivityEnvironment Testing
**Purpose**: Test activities in isolation without workflows or Temporal server
### Basic Activity Test
```python
from temporalio.testing import ActivityEnvironment
async def test_activity_basic():
"""Test activity without workflow context"""
@activity.defn
async def process_data(input: str) -> str:
return input.upper()
env = ActivityEnvironment()
result = await env.run(process_data, "test")
assert result == "TEST"
```
### Testing Activity Context
**Heartbeat Testing**:
```python
async def test_activity_heartbeat():
"""Verify heartbeat calls"""
@activity.defn
async def long_running_activity(total_items: int) -> int:
for i in range(total_items):
activity.heartbeat(i) # Report progress
await asyncio.sleep(0.1)
return total_items
env = ActivityEnvironment()
result = await env.run(long_running_activity, 10)
assert result == 10
```
**Cancellation Testing**:
```python
async def test_activity_cancellation():
"""Test activity cancellation handling"""
@activity.defn
async def cancellable_activity() -> str:
try:
while True:
if activity.is_cancelled():
return "cancelled"
await asyncio.sleep(0.1)
except asyncio.CancelledError:
return "cancelled"
env = ActivityEnvironment(cancellation_reason="test-cancel")
result = await env.run(cancellable_activity)
assert result == "cancelled"
```
### Testing Error Handling
**Exception Propagation**:
```python
async def test_activity_error():
"""Test activity error handling"""
@activity.defn
async def failing_activity(should_fail: bool) -> str:
if should_fail:
raise ApplicationError("Validation failed", non_retryable=True)
return "success"
env = ActivityEnvironment()
# Test success path
result = await env.run(failing_activity, False)
assert result == "success"
# Test error path
with pytest.raises(ApplicationError) as exc_info:
await env.run(failing_activity, True)
assert "Validation failed" in str(exc_info.value)
```
## Pytest Integration Patterns
### Shared Fixtures
```python
# conftest.py
import pytest
from temporalio.testing import WorkflowEnvironment
@pytest.fixture(scope="module")
async def workflow_env():
"""Module-scoped environment (reused across tests)"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.fixture
def activity_env():
"""Function-scoped environment (fresh per test)"""
return ActivityEnvironment()
```
### Parameterized Tests
```python
@pytest.mark.parametrize("input,expected", [
("test", "TEST"),
("hello", "HELLO"),
("123", "123"),
])
async def test_activity_parameterized(activity_env, input, expected):
"""Test multiple input scenarios"""
result = await activity_env.run(process_data, input)
assert result == expected
```
## Best Practices
1. **Fast Execution**: Use time-skipping for all workflow tests
2. **Isolation**: Test workflows and activities separately
3. **Shared Fixtures**: Reuse WorkflowEnvironment across related tests
4. **Coverage Target**: ≥80% for workflow logic
5. **Mock Activities**: Use ActivityEnvironment for activity-specific logic
6. **Determinism**: Ensure test results are consistent across runs
7. **Error Cases**: Test both success and failure scenarios
## Common Patterns
**Testing Retry Logic**:
```python
@pytest.mark.asyncio
async def test_workflow_with_retries(workflow_env):
"""Test activity retry behavior"""
call_count = 0
@activity.defn
async def flaky_activity() -> str:
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("Transient error")
return "success"
@workflow.defn
class RetryWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
flaky_activity,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=1),
maximum_attempts=5,
),
)
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[RetryWorkflow],
activities=[flaky_activity],
):
result = await workflow_env.client.execute_workflow(
RetryWorkflow.run,
id="retry-wf",
task_queue="test",
)
assert result == "success"
assert call_count == 3 # Verify retry attempts
```
## Additional Resources
- Python SDK Testing: docs.temporal.io/develop/python/testing-suite
- pytest Documentation: docs.pytest.org
- Temporal Samples: github.com/temporalio/samples-python

View File

@@ -0,0 +1,286 @@
---
name: workflow-orchestration-patterns
description: Design durable workflows with Temporal for distributed systems. Covers workflow vs activity separation, saga patterns, state management, and determinism constraints. Use when building long-running processes, distributed transactions, or microservice orchestration.
---
# Workflow Orchestration Patterns
Master workflow orchestration architecture with Temporal, covering fundamental design decisions, resilience patterns, and best practices for building reliable distributed systems.
## When to Use Workflow Orchestration
### Ideal Use Cases (Source: docs.temporal.io)
- **Multi-step processes** spanning machines/services/databases
- **Distributed transactions** requiring all-or-nothing semantics
- **Long-running workflows** (hours to years) with automatic state persistence
- **Failure recovery** that must resume from last successful step
- **Business processes**: bookings, orders, campaigns, approvals
- **Entity lifecycle management**: inventory tracking, account management, cart workflows
- **Infrastructure automation**: CI/CD pipelines, provisioning, deployments
- **Human-in-the-loop** systems requiring timeouts and escalations
### When NOT to Use
- Simple CRUD operations (use direct API calls)
- Pure data processing pipelines (use Airflow, batch processing)
- Stateless request/response (use standard APIs)
- Real-time streaming (use Kafka, event processors)
## Critical Design Decision: Workflows vs Activities
**The Fundamental Rule** (Source: temporal.io/blog/workflow-engine-principles):
- **Workflows** = Orchestration logic and decision-making
- **Activities** = External interactions (APIs, databases, network calls)
### Workflows (Orchestration)
**Characteristics:**
- Contain business logic and coordination
- **MUST be deterministic** (same inputs → same outputs)
- **Cannot** perform direct external calls
- State automatically preserved across failures
- Can run for years despite infrastructure failures
**Example workflow tasks:**
- Decide which steps to execute
- Handle compensation logic
- Manage timeouts and retries
- Coordinate child workflows
### Activities (External Interactions)
**Characteristics:**
- Handle all external system interactions
- Can be non-deterministic (API calls, DB writes)
- Include built-in timeouts and retry logic
- **Must be idempotent** (calling N times = calling once)
- Short-lived (seconds to minutes typically)
**Example activity tasks:**
- Call payment gateway API
- Write to database
- Send emails or notifications
- Query external services
### Design Decision Framework
```
Does it touch external systems? → Activity
Is it orchestration/decision logic? → Workflow
```
## Core Workflow Patterns
### 1. Saga Pattern with Compensation
**Purpose**: Implement distributed transactions with rollback capability
**Pattern** (Source: temporal.io/blog/compensating-actions-part-of-a-complete-breakfast-with-sagas):
```
For each step:
1. Register compensation BEFORE executing
2. Execute the step (via activity)
3. On failure, run all compensations in reverse order (LIFO)
```
**Example: Payment Workflow**
1. Reserve inventory (compensation: release inventory)
2. Charge payment (compensation: refund payment)
3. Fulfill order (compensation: cancel fulfillment)
**Critical Requirements:**
- Compensations must be idempotent
- Register compensation BEFORE executing step
- Run compensations in reverse order
- Handle partial failures gracefully
### 2. Entity Workflows (Actor Model)
**Purpose**: Long-lived workflow representing single entity instance
**Pattern** (Source: docs.temporal.io/evaluate/use-cases-design-patterns):
- One workflow execution = one entity (cart, account, inventory item)
- Workflow persists for entity lifetime
- Receives signals for state changes
- Supports queries for current state
**Example Use Cases:**
- Shopping cart (add items, checkout, expiration)
- Bank account (deposits, withdrawals, balance checks)
- Product inventory (stock updates, reservations)
**Benefits:**
- Encapsulates entity behavior
- Guarantees consistency per entity
- Natural event sourcing
### 3. Fan-Out/Fan-In (Parallel Execution)
**Purpose**: Execute multiple tasks in parallel, aggregate results
**Pattern:**
- Spawn child workflows or parallel activities
- Wait for all to complete
- Aggregate results
- Handle partial failures
**Scaling Rule** (Source: temporal.io/blog/workflow-engine-principles):
- Don't scale individual workflows
- For 1M tasks: spawn 1K child workflows × 1K tasks each
- Keep each workflow bounded
### 4. Async Callback Pattern
**Purpose**: Wait for external event or human approval
**Pattern:**
- Workflow sends request and waits for signal
- External system processes asynchronously
- Sends signal to resume workflow
- Workflow continues with response
**Use Cases:**
- Human approval workflows
- Webhook callbacks
- Long-running external processes
## State Management and Determinism
### Automatic State Preservation
**How Temporal Works** (Source: docs.temporal.io/workflows):
- Complete program state preserved automatically
- Event History records every command and event
- Seamless recovery from crashes
- Applications restore pre-failure state
### Determinism Constraints
**Workflows Execute as State Machines**:
- Replay behavior must be consistent
- Same inputs → identical outputs every time
**Prohibited in Workflows** (Source: docs.temporal.io/workflows):
- ❌ Threading, locks, synchronization primitives
- ❌ Random number generation (`random()`)
- ❌ Global state or static variables
- ❌ System time (`datetime.now()`)
- ❌ Direct file I/O or network calls
- ❌ Non-deterministic libraries
**Allowed in Workflows**:
-`workflow.now()` (deterministic time)
-`workflow.random()` (deterministic random)
- ✅ Pure functions and calculations
- ✅ Calling activities (non-deterministic operations)
### Versioning Strategies
**Challenge**: Changing workflow code while old executions still running
**Solutions**:
1. **Versioning API**: Use `workflow.get_version()` for safe changes
2. **New Workflow Type**: Create new workflow, route new executions to it
3. **Backward Compatibility**: Ensure old events replay correctly
## Resilience and Error Handling
### Retry Policies
**Default Behavior**: Temporal retries activities forever
**Configure Retry**:
- Initial retry interval
- Backoff coefficient (exponential backoff)
- Maximum interval (cap retry delay)
- Maximum attempts (eventually fail)
**Non-Retryable Errors**:
- Invalid input (validation failures)
- Business rule violations
- Permanent failures (resource not found)
### Idempotency Requirements
**Why Critical** (Source: docs.temporal.io/activities):
- Activities may execute multiple times
- Network failures trigger retries
- Duplicate execution must be safe
**Implementation Strategies**:
- Idempotency keys (deduplication)
- Check-then-act with unique constraints
- Upsert operations instead of insert
- Track processed request IDs
### Activity Heartbeats
**Purpose**: Detect stalled long-running activities
**Pattern**:
- Activity sends periodic heartbeat
- Includes progress information
- Timeout if no heartbeat received
- Enables progress-based retry
## Best Practices
### Workflow Design
1. **Keep workflows focused** - Single responsibility per workflow
2. **Small workflows** - Use child workflows for scalability
3. **Clear boundaries** - Workflow orchestrates, activities execute
4. **Test locally** - Use time-skipping test environment
### Activity Design
1. **Idempotent operations** - Safe to retry
2. **Short-lived** - Seconds to minutes, not hours
3. **Timeout configuration** - Always set timeouts
4. **Heartbeat for long tasks** - Report progress
5. **Error handling** - Distinguish retryable vs non-retryable
### Common Pitfalls
**Workflow Violations**:
- Using `datetime.now()` instead of `workflow.now()`
- Threading or async operations in workflow code
- Calling external APIs directly from workflow
- Non-deterministic logic in workflows
**Activity Mistakes**:
- Non-idempotent operations (can't handle retries)
- Missing timeouts (activities run forever)
- No error classification (retry validation errors)
- Ignoring payload limits (2MB per argument)
### Operational Considerations
**Monitoring**:
- Workflow execution duration
- Activity failure rates
- Retry attempts and backoff
- Pending workflow counts
**Scalability**:
- Horizontal scaling with workers
- Task queue partitioning
- Child workflow decomposition
- Activity batching when appropriate
## Additional Resources
**Official Documentation**:
- Temporal Core Concepts: docs.temporal.io/workflows
- Workflow Patterns: docs.temporal.io/evaluate/use-cases-design-patterns
- Best Practices: docs.temporal.io/develop/best-practices
- Saga Pattern: temporal.io/blog/saga-pattern-made-easy
**Key Principles**:
1. Workflows = orchestration, Activities = external calls
2. Determinism is non-negotiable for workflows
3. Idempotency is critical for activities
4. State preservation is automatic
5. Design for failure and recovery