diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index de0de15..7092ca7 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -101,8 +101,8 @@ { "name": "backend-development", "source": "./plugins/backend-development", - "description": "Backend API design, GraphQL architecture, and test-driven backend development", - "version": "1.2.2", + "description": "Backend API design, GraphQL architecture, workflow orchestration with Temporal, and test-driven backend development", + "version": "1.2.3", "author": { "name": "Seth Hobson", "url": "https://github.com/wshobson" @@ -115,7 +115,10 @@ "api-design", "graphql", "tdd", - "architecture" + "architecture", + "temporal", + "workflow-orchestration", + "distributed-systems" ], "category": "development", "strict": false, @@ -125,12 +128,15 @@ "agents": [ "./agents/backend-architect.md", "./agents/graphql-architect.md", - "./agents/tdd-orchestrator.md" + "./agents/tdd-orchestrator.md", + "./agents/temporal-python-pro.md" ], "skills": [ "./skills/api-design-principles", "./skills/architecture-patterns", - "./skills/microservices-patterns" + "./skills/microservices-patterns", + "./skills/workflow-orchestration-patterns", + "./skills/temporal-python-testing" ] }, { diff --git a/docs/agent-skills.md b/docs/agent-skills.md index d16e025..0ea146c 100644 --- a/docs/agent-skills.md +++ b/docs/agent-skills.md @@ -1,6 +1,6 @@ # Agent Skills -Agent Skills are modular packages that extend Claude's capabilities with specialized domain knowledge, following Anthropic's [Agent Skills Specification](https://github.com/anthropics/skills/blob/main/agent_skills_spec.md). This plugin ecosystem includes **55 specialized skills** across 15 plugins, enabling progressive disclosure and efficient token usage. +Agent Skills are modular packages that extend Claude's capabilities with specialized domain knowledge, following Anthropic's [Agent Skills Specification](https://github.com/anthropics/skills/blob/main/agent_skills_spec.md). This plugin ecosystem includes **57 specialized skills** across 15 plugins, enabling progressive disclosure and efficient token usage. ## Overview @@ -30,13 +30,15 @@ Skills provide Claude with deep expertise in specific domains without loading ev | **rag-implementation** | Build Retrieval-Augmented Generation systems with vector databases and semantic search | | **llm-evaluation** | Implement comprehensive evaluation strategies with automated metrics and benchmarking | -### Backend Development (3 skills) +### Backend Development (5 skills) | Skill | Description | |-------|-------------| | **api-design-principles** | Master REST and GraphQL API design for intuitive, scalable, and maintainable APIs | | **architecture-patterns** | Implement Clean Architecture, Hexagonal Architecture, and Domain-Driven Design | | **microservices-patterns** | Design microservices with service boundaries, event-driven communication, and resilience | +| **workflow-orchestration-patterns** | Design durable workflows with Temporal for distributed systems, saga patterns, and state management | +| **temporal-python-testing** | Test Temporal workflows with pytest, time-skipping, and mocking strategies for comprehensive coverage | ### Developer Essentials (8 skills) diff --git a/docs/agents.md b/docs/agents.md index de01ff7..5a1411d 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -1,6 +1,6 @@ # Agent Reference -Complete reference for all **85 specialized AI agents** organized by category with model assignments. +Complete reference for all **86 specialized AI agents** organized by category with model assignments. ## Agent Categories @@ -46,6 +46,7 @@ Complete reference for all **85 specialized AI agents** organized by category wi | [javascript-pro](../plugins/javascript-typescript/agents/javascript-pro.md) | sonnet | Modern JavaScript with ES6+, async patterns, Node.js | | [typescript-pro](../plugins/javascript-typescript/agents/typescript-pro.md) | sonnet | Advanced TypeScript with type systems and generics | | [python-pro](../plugins/python-development/agents/python-pro.md) | sonnet | Python development with advanced features and optimization | +| [temporal-python-pro](../plugins/backend-development/agents/temporal-python-pro.md) | sonnet | Temporal workflow orchestration with Python SDK, durable workflows, saga patterns | | [ruby-pro](../plugins/web-scripting/agents/ruby-pro.md) | sonnet | Ruby with metaprogramming, Rails patterns, gem development | | [php-pro](../plugins/web-scripting/agents/php-pro.md) | sonnet | Modern PHP with frameworks and performance optimization | diff --git a/plugins/backend-development/agents/temporal-python-pro.md b/plugins/backend-development/agents/temporal-python-pro.md new file mode 100644 index 0000000..7e37118 --- /dev/null +++ b/plugins/backend-development/agents/temporal-python-pro.md @@ -0,0 +1,311 @@ +--- +name: temporal-python-pro +description: Master Temporal workflow orchestration with Python SDK. Implements durable workflows, saga patterns, and distributed transactions. Covers async/await, testing strategies, and production deployment. Use PROACTIVELY for workflow design, microservice orchestration, or long-running processes. +model: sonnet +--- + +You are an expert Temporal workflow developer specializing in Python SDK implementation, durable workflow design, and production-ready distributed systems. + +## Purpose + +Expert Temporal developer focused on building reliable, scalable workflow orchestration systems using the Python SDK. Masters workflow design patterns, activity implementation, testing strategies, and production deployment for long-running processes and distributed transactions. + +## Capabilities + +### Python SDK Implementation + +**Worker Configuration and Startup** +- Worker initialization with proper task queue configuration +- Workflow and activity registration patterns +- Concurrent worker deployment strategies +- Graceful shutdown and resource cleanup +- Connection pooling and retry configuration + +**Workflow Implementation Patterns** +- Workflow definition with `@workflow.defn` decorator +- Async/await workflow entry points with `@workflow.run` +- Workflow-safe time operations with `workflow.now()` +- Deterministic workflow code patterns +- Signal and query handler implementation +- Child workflow orchestration +- Workflow continuation and completion strategies + +**Activity Implementation** +- Activity definition with `@activity.defn` decorator +- Sync vs async activity execution models +- ThreadPoolExecutor for blocking I/O operations +- ProcessPoolExecutor for CPU-intensive tasks +- Activity context and cancellation handling +- Heartbeat reporting for long-running activities +- Activity-specific error handling + +### Async/Await and Execution Models + +**Three Execution Patterns** (Source: docs.temporal.io): + +1. **Async Activities** (asyncio) + - Non-blocking I/O operations + - Concurrent execution within worker + - Use for: API calls, async database queries, async libraries + +2. **Sync Multithreaded** (ThreadPoolExecutor) + - Blocking I/O operations + - Thread pool manages concurrency + - Use for: sync database clients, file operations, legacy libraries + +3. **Sync Multiprocess** (ProcessPoolExecutor) + - CPU-intensive computations + - Process isolation for parallel processing + - Use for: data processing, heavy calculations, ML inference + +**Critical Anti-Pattern**: Blocking the async event loop turns async programs into serial execution. Always use sync activities for blocking operations. + +### Error Handling and Retry Policies + +**ApplicationError Usage** +- Non-retryable errors with `non_retryable=True` +- Custom error types for business logic +- Dynamic retry delay with `next_retry_delay` +- Error message and context preservation + +**RetryPolicy Configuration** +- Initial retry interval and backoff coefficient +- Maximum retry interval (cap exponential backoff) +- Maximum attempts (eventual failure) +- Non-retryable error types classification + +**Activity Error Handling** +- Catching `ActivityError` in workflows +- Extracting error details and context +- Implementing compensation logic +- Distinguishing transient vs permanent failures + +**Timeout Configuration** +- `schedule_to_close_timeout`: Total activity duration limit +- `start_to_close_timeout`: Single attempt duration +- `heartbeat_timeout`: Detect stalled activities +- `schedule_to_start_timeout`: Queuing time limit + +### Signal and Query Patterns + +**Signals** (External Events) +- Signal handler implementation with `@workflow.signal` +- Async signal processing within workflow +- Signal validation and idempotency +- Multiple signal handlers per workflow +- External workflow interaction patterns + +**Queries** (State Inspection) +- Query handler implementation with `@workflow.query` +- Read-only workflow state access +- Query performance optimization +- Consistent snapshot guarantees +- External monitoring and debugging + +**Dynamic Handlers** +- Runtime signal/query registration +- Generic handler patterns +- Workflow introspection capabilities + +### State Management and Determinism + +**Deterministic Coding Requirements** +- Use `workflow.now()` instead of `datetime.now()` +- Use `workflow.random()` instead of `random.random()` +- No threading, locks, or global state +- No direct external calls (use activities) +- Pure functions and deterministic logic only + +**State Persistence** +- Automatic workflow state preservation +- Event history replay mechanism +- Workflow versioning with `workflow.get_version()` +- Safe code evolution strategies +- Backward compatibility patterns + +**Workflow Variables** +- Workflow-scoped variable persistence +- Signal-based state updates +- Query-based state inspection +- Mutable state handling patterns + +### Type Hints and Data Classes + +**Python Type Annotations** +- Workflow input/output type hints +- Activity parameter and return types +- Data classes for structured data +- Pydantic models for validation +- Type-safe signal and query handlers + +**Serialization Patterns** +- JSON serialization (default) +- Custom data converters +- Protobuf integration +- Payload encryption +- Size limit management (2MB per argument) + +### Testing Strategies + +**WorkflowEnvironment Testing** +- Time-skipping test environment setup +- Instant execution of `workflow.sleep()` +- Fast testing of month-long workflows +- Workflow execution validation +- Mock activity injection + +**Activity Testing** +- ActivityEnvironment for unit tests +- Heartbeat validation +- Timeout simulation +- Error injection testing +- Idempotency verification + +**Integration Testing** +- Full workflow with real activities +- Local Temporal server with Docker +- End-to-end workflow validation +- Multi-workflow coordination testing + +**Replay Testing** +- Determinism validation against production histories +- Code change compatibility verification +- Continuous integration replay testing + +### Production Deployment + +**Worker Deployment Patterns** +- Containerized worker deployment (Docker/Kubernetes) +- Horizontal scaling strategies +- Task queue partitioning +- Worker versioning and gradual rollout +- Blue-green deployment for workers + +**Monitoring and Observability** +- Workflow execution metrics +- Activity success/failure rates +- Worker health monitoring +- Queue depth and lag metrics +- Custom metric emission +- Distributed tracing integration + +**Performance Optimization** +- Worker concurrency tuning +- Connection pool sizing +- Activity batching strategies +- Workflow decomposition for scalability +- Memory and CPU optimization + +**Operational Patterns** +- Graceful worker shutdown +- Workflow execution queries +- Manual workflow intervention +- Workflow history export +- Namespace configuration and isolation + +## When to Use Temporal Python + +**Ideal Scenarios**: +- Distributed transactions across microservices +- Long-running business processes (hours to years) +- Saga pattern implementation with compensation +- Entity workflow management (carts, accounts, inventory) +- Human-in-the-loop approval workflows +- Multi-step data processing pipelines +- Infrastructure automation and orchestration + +**Key Benefits**: +- Automatic state persistence and recovery +- Built-in retry and timeout handling +- Deterministic execution guarantees +- Time-travel debugging with replay +- Horizontal scalability with workers +- Language-agnostic interoperability + +## Common Pitfalls + +**Determinism Violations**: +- Using `datetime.now()` instead of `workflow.now()` +- Random number generation with `random.random()` +- Threading or global state in workflows +- Direct API calls from workflows + +**Activity Implementation Errors**: +- Non-idempotent activities (unsafe retries) +- Missing timeout configuration +- Blocking async event loop with sync code +- Exceeding payload size limits (2MB) + +**Testing Mistakes**: +- Not using time-skipping environment +- Testing workflows without mocking activities +- Ignoring replay testing in CI/CD +- Inadequate error injection testing + +**Deployment Issues**: +- Unregistered workflows/activities on workers +- Mismatched task queue configuration +- Missing graceful shutdown handling +- Insufficient worker concurrency + +## Integration Patterns + +**Microservices Orchestration** +- Cross-service transaction coordination +- Saga pattern with compensation +- Event-driven workflow triggers +- Service dependency management + +**Data Processing Pipelines** +- Multi-stage data transformation +- Parallel batch processing +- Error handling and retry logic +- Progress tracking and reporting + +**Business Process Automation** +- Order fulfillment workflows +- Payment processing with compensation +- Multi-party approval processes +- SLA enforcement and escalation + +## Best Practices + +**Workflow Design**: +1. Keep workflows focused and single-purpose +2. Use child workflows for scalability +3. Implement idempotent activities +4. Configure appropriate timeouts +5. Design for failure and recovery + +**Testing**: +1. Use time-skipping for fast feedback +2. Mock activities in workflow tests +3. Validate replay with production histories +4. Test error scenarios and compensation +5. Achieve high coverage (≥80% target) + +**Production**: +1. Deploy workers with graceful shutdown +2. Monitor workflow and activity metrics +3. Implement distributed tracing +4. Version workflows carefully +5. Use workflow queries for debugging + +## Resources + +**Official Documentation**: +- Python SDK: python.temporal.io +- Core Concepts: docs.temporal.io/workflows +- Testing Guide: docs.temporal.io/develop/python/testing-suite +- Best Practices: docs.temporal.io/develop/best-practices + +**Architecture**: +- Temporal Architecture: github.com/temporalio/temporal/blob/main/docs/architecture/README.md +- Testing Patterns: github.com/temporalio/temporal/blob/main/docs/development/testing.md + +**Key Takeaways**: +1. Workflows = orchestration, Activities = external calls +2. Determinism is mandatory for workflows +3. Idempotency is critical for activities +4. Test with time-skipping for fast feedback +5. Monitor and observe in production diff --git a/plugins/backend-development/skills/temporal-python-testing/SKILL.md b/plugins/backend-development/skills/temporal-python-testing/SKILL.md new file mode 100644 index 0000000..369cffe --- /dev/null +++ b/plugins/backend-development/skills/temporal-python-testing/SKILL.md @@ -0,0 +1,146 @@ +--- +name: temporal-python-testing +description: Test Temporal workflows with pytest, time-skipping, and mocking strategies. Covers unit testing, integration testing, replay testing, and local development setup. Use when implementing Temporal workflow tests or debugging test failures. +--- + +# Temporal Python Testing Strategies + +Comprehensive testing approaches for Temporal workflows using pytest, progressive disclosure resources for specific testing scenarios. + +## When to Use This Skill + +- **Unit testing workflows** - Fast tests with time-skipping +- **Integration testing** - Workflows with mocked activities +- **Replay testing** - Validate determinism against production histories +- **Local development** - Set up Temporal server and pytest +- **CI/CD integration** - Automated testing pipelines +- **Coverage strategies** - Achieve ≥80% test coverage + +## Testing Philosophy + +**Recommended Approach** (Source: docs.temporal.io/develop/python/testing-suite): +- Write majority as integration tests +- Use pytest with async fixtures +- Time-skipping enables fast feedback (month-long workflows → seconds) +- Mock activities to isolate workflow logic +- Validate determinism with replay testing + +**Three Test Types**: +1. **Unit**: Workflows with time-skipping, activities with ActivityEnvironment +2. **Integration**: Workers with mocked activities +3. **End-to-end**: Full Temporal server with real activities (use sparingly) + +## Available Resources + +This skill provides detailed guidance through progressive disclosure. Load specific resources based on your testing needs: + +### Unit Testing Resources +**File**: `resources/unit-testing.md` +**When to load**: Testing individual workflows or activities in isolation +**Contains**: +- WorkflowEnvironment with time-skipping +- ActivityEnvironment for activity testing +- Fast execution of long-running workflows +- Manual time advancement patterns +- pytest fixtures and patterns + +### Integration Testing Resources +**File**: `resources/integration-testing.md` +**When to load**: Testing workflows with mocked external dependencies +**Contains**: +- Activity mocking strategies +- Error injection patterns +- Multi-activity workflow testing +- Signal and query testing +- Coverage strategies + +### Replay Testing Resources +**File**: `resources/replay-testing.md` +**When to load**: Validating determinism or deploying workflow changes +**Contains**: +- Determinism validation +- Production history replay +- CI/CD integration patterns +- Version compatibility testing + +### Local Development Resources +**File**: `resources/local-setup.md` +**When to load**: Setting up development environment +**Contains**: +- Docker Compose configuration +- pytest setup and configuration +- Coverage tool integration +- Development workflow + +## Quick Start Guide + +### Basic Workflow Test + +```python +import pytest +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +@pytest.fixture +async def workflow_env(): + env = await WorkflowEnvironment.start_time_skipping() + yield env + await env.shutdown() + +@pytest.mark.asyncio +async def test_workflow(workflow_env): + async with Worker( + workflow_env.client, + task_queue="test-queue", + workflows=[YourWorkflow], + activities=[your_activity], + ): + result = await workflow_env.client.execute_workflow( + YourWorkflow.run, + args, + id="test-wf-id", + task_queue="test-queue", + ) + assert result == expected +``` + +### Basic Activity Test + +```python +from temporalio.testing import ActivityEnvironment + +async def test_activity(): + env = ActivityEnvironment() + result = await env.run(your_activity, "test-input") + assert result == expected_output +``` + +## Coverage Targets + +**Recommended Coverage** (Source: docs.temporal.io best practices): +- **Workflows**: ≥80% logic coverage +- **Activities**: ≥80% logic coverage +- **Integration**: Critical paths with mocked activities +- **Replay**: All workflow versions before deployment + +## Key Testing Principles + +1. **Time-Skipping** - Month-long workflows test in seconds +2. **Mock Activities** - Isolate workflow logic from external dependencies +3. **Replay Testing** - Validate determinism before deployment +4. **High Coverage** - ≥80% target for production workflows +5. **Fast Feedback** - Unit tests run in milliseconds + +## How to Use Resources + +**Load specific resource when needed**: +- "Show me unit testing patterns" → Load `resources/unit-testing.md` +- "How do I mock activities?" → Load `resources/integration-testing.md` +- "Setup local Temporal server" → Load `resources/local-setup.md` +- "Validate determinism" → Load `resources/replay-testing.md` + +## Additional References + +- Python SDK Testing: docs.temporal.io/develop/python/testing-suite +- Testing Patterns: github.com/temporalio/temporal/blob/main/docs/development/testing.md +- Python Samples: github.com/temporalio/samples-python diff --git a/plugins/backend-development/skills/temporal-python-testing/resources/integration-testing.md b/plugins/backend-development/skills/temporal-python-testing/resources/integration-testing.md new file mode 100644 index 0000000..63b1058 --- /dev/null +++ b/plugins/backend-development/skills/temporal-python-testing/resources/integration-testing.md @@ -0,0 +1,452 @@ +# Integration Testing with Mocked Activities + +Comprehensive patterns for testing workflows with mocked external dependencies, error injection, and complex scenarios. + +## Activity Mocking Strategy + +**Purpose**: Test workflow orchestration logic without calling real external services + +### Basic Mock Pattern + +```python +import pytest +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker +from unittest.mock import Mock + +@pytest.mark.asyncio +async def test_workflow_with_mocked_activity(workflow_env): + """Mock activity to test workflow logic""" + + # Create mock activity + mock_activity = Mock(return_value="mocked-result") + + @workflow.defn + class WorkflowWithActivity: + @workflow.run + async def run(self, input: str) -> str: + result = await workflow.execute_activity( + process_external_data, + input, + start_to_close_timeout=timedelta(seconds=10), + ) + return f"processed: {result}" + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[WorkflowWithActivity], + activities=[mock_activity], # Use mock instead of real activity + ): + result = await workflow_env.client.execute_workflow( + WorkflowWithActivity.run, + "test-input", + id="wf-mock", + task_queue="test", + ) + assert result == "processed: mocked-result" + mock_activity.assert_called_once() +``` + +### Dynamic Mock Responses + +**Scenario-Based Mocking**: +```python +@pytest.mark.asyncio +async def test_workflow_multiple_mock_scenarios(workflow_env): + """Test different workflow paths with dynamic mocks""" + + # Mock returns different values based on input + def dynamic_activity(input: str) -> str: + if input == "error-case": + raise ApplicationError("Validation failed", non_retryable=True) + return f"processed-{input}" + + @workflow.defn + class DynamicWorkflow: + @workflow.run + async def run(self, input: str) -> str: + try: + result = await workflow.execute_activity( + dynamic_activity, + input, + start_to_close_timeout=timedelta(seconds=10), + ) + return f"success: {result}" + except ApplicationError as e: + return f"error: {e.message}" + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[DynamicWorkflow], + activities=[dynamic_activity], + ): + # Test success path + result_success = await workflow_env.client.execute_workflow( + DynamicWorkflow.run, + "valid-input", + id="wf-success", + task_queue="test", + ) + assert result_success == "success: processed-valid-input" + + # Test error path + result_error = await workflow_env.client.execute_workflow( + DynamicWorkflow.run, + "error-case", + id="wf-error", + task_queue="test", + ) + assert "Validation failed" in result_error +``` + +## Error Injection Patterns + +### Testing Transient Failures + +**Retry Behavior**: +```python +@pytest.mark.asyncio +async def test_workflow_transient_errors(workflow_env): + """Test retry logic with controlled failures""" + + attempt_count = 0 + + @activity.defn + async def transient_activity() -> str: + nonlocal attempt_count + attempt_count += 1 + + if attempt_count < 3: + raise Exception(f"Transient error {attempt_count}") + return "success-after-retries" + + @workflow.defn + class RetryWorkflow: + @workflow.run + async def run(self) -> str: + return await workflow.execute_activity( + transient_activity, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy( + initial_interval=timedelta(milliseconds=10), + maximum_attempts=5, + backoff_coefficient=1.0, + ), + ) + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[RetryWorkflow], + activities=[transient_activity], + ): + result = await workflow_env.client.execute_workflow( + RetryWorkflow.run, + id="retry-wf", + task_queue="test", + ) + assert result == "success-after-retries" + assert attempt_count == 3 +``` + +### Testing Non-Retryable Errors + +**Business Validation Failures**: +```python +@pytest.mark.asyncio +async def test_workflow_non_retryable_error(workflow_env): + """Test handling of permanent failures""" + + @activity.defn + async def validation_activity(input: dict) -> str: + if not input.get("valid"): + raise ApplicationError( + "Invalid input", + non_retryable=True, # Don't retry validation errors + ) + return "validated" + + @workflow.defn + class ValidationWorkflow: + @workflow.run + async def run(self, input: dict) -> str: + try: + return await workflow.execute_activity( + validation_activity, + input, + start_to_close_timeout=timedelta(seconds=10), + ) + except ApplicationError as e: + return f"validation-failed: {e.message}" + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[ValidationWorkflow], + activities=[validation_activity], + ): + result = await workflow_env.client.execute_workflow( + ValidationWorkflow.run, + {"valid": False}, + id="validation-wf", + task_queue="test", + ) + assert "validation-failed" in result +``` + +## Multi-Activity Workflow Testing + +### Sequential Activity Pattern + +```python +@pytest.mark.asyncio +async def test_workflow_sequential_activities(workflow_env): + """Test workflow orchestrating multiple activities""" + + activity_calls = [] + + @activity.defn + async def step_1(input: str) -> str: + activity_calls.append("step_1") + return f"{input}-step1" + + @activity.defn + async def step_2(input: str) -> str: + activity_calls.append("step_2") + return f"{input}-step2" + + @activity.defn + async def step_3(input: str) -> str: + activity_calls.append("step_3") + return f"{input}-step3" + + @workflow.defn + class SequentialWorkflow: + @workflow.run + async def run(self, input: str) -> str: + result_1 = await workflow.execute_activity( + step_1, + input, + start_to_close_timeout=timedelta(seconds=10), + ) + result_2 = await workflow.execute_activity( + step_2, + result_1, + start_to_close_timeout=timedelta(seconds=10), + ) + result_3 = await workflow.execute_activity( + step_3, + result_2, + start_to_close_timeout=timedelta(seconds=10), + ) + return result_3 + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[SequentialWorkflow], + activities=[step_1, step_2, step_3], + ): + result = await workflow_env.client.execute_workflow( + SequentialWorkflow.run, + "start", + id="seq-wf", + task_queue="test", + ) + assert result == "start-step1-step2-step3" + assert activity_calls == ["step_1", "step_2", "step_3"] +``` + +### Parallel Activity Pattern + +```python +@pytest.mark.asyncio +async def test_workflow_parallel_activities(workflow_env): + """Test concurrent activity execution""" + + @activity.defn + async def parallel_task(task_id: int) -> str: + return f"task-{task_id}" + + @workflow.defn + class ParallelWorkflow: + @workflow.run + async def run(self, task_count: int) -> list[str]: + # Execute activities in parallel + tasks = [ + workflow.execute_activity( + parallel_task, + i, + start_to_close_timeout=timedelta(seconds=10), + ) + for i in range(task_count) + ] + return await asyncio.gather(*tasks) + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[ParallelWorkflow], + activities=[parallel_task], + ): + result = await workflow_env.client.execute_workflow( + ParallelWorkflow.run, + 3, + id="parallel-wf", + task_queue="test", + ) + assert result == ["task-0", "task-1", "task-2"] +``` + +## Signal and Query Testing + +### Signal Handlers + +```python +@pytest.mark.asyncio +async def test_workflow_signals(workflow_env): + """Test workflow signal handling""" + + @workflow.defn + class SignalWorkflow: + def __init__(self) -> None: + self._status = "initialized" + + @workflow.run + async def run(self) -> str: + # Wait for completion signal + await workflow.wait_condition(lambda: self._status == "completed") + return self._status + + @workflow.signal + async def update_status(self, new_status: str) -> None: + self._status = new_status + + @workflow.query + def get_status(self) -> str: + return self._status + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[SignalWorkflow], + ): + # Start workflow + handle = await workflow_env.client.start_workflow( + SignalWorkflow.run, + id="signal-wf", + task_queue="test", + ) + + # Verify initial state via query + initial_status = await handle.query(SignalWorkflow.get_status) + assert initial_status == "initialized" + + # Send signal + await handle.signal(SignalWorkflow.update_status, "processing") + + # Verify updated state + updated_status = await handle.query(SignalWorkflow.get_status) + assert updated_status == "processing" + + # Complete workflow + await handle.signal(SignalWorkflow.update_status, "completed") + result = await handle.result() + assert result == "completed" +``` + +## Coverage Strategies + +### Workflow Logic Coverage + +**Target**: ≥80% coverage of workflow decision logic + +```python +# Test all branches +@pytest.mark.parametrize("condition,expected", [ + (True, "branch-a"), + (False, "branch-b"), +]) +async def test_workflow_branches(workflow_env, condition, expected): + """Ensure all code paths are tested""" + # Test implementation + pass +``` + +### Activity Coverage + +**Target**: ≥80% coverage of activity logic + +```python +# Test activity edge cases +@pytest.mark.parametrize("input,expected", [ + ("valid", "success"), + ("", "empty-input-error"), + (None, "null-input-error"), +]) +async def test_activity_edge_cases(activity_env, input, expected): + """Test activity error handling""" + # Test implementation + pass +``` + +## Integration Test Organization + +### Test Structure + +``` +tests/ +├── integration/ +│ ├── conftest.py # Shared fixtures +│ ├── test_order_workflow.py # Order processing tests +│ ├── test_payment_workflow.py # Payment tests +│ └── test_fulfillment_workflow.py +├── unit/ +│ ├── test_order_activities.py +│ └── test_payment_activities.py +└── fixtures/ + └── test_data.py # Test data builders +``` + +### Shared Fixtures + +```python +# conftest.py +import pytest +from temporalio.testing import WorkflowEnvironment + +@pytest.fixture(scope="session") +async def workflow_env(): + """Session-scoped environment for integration tests""" + env = await WorkflowEnvironment.start_time_skipping() + yield env + await env.shutdown() + +@pytest.fixture +def mock_payment_service(): + """Mock external payment service""" + return Mock() + +@pytest.fixture +def mock_inventory_service(): + """Mock external inventory service""" + return Mock() +``` + +## Best Practices + +1. **Mock External Dependencies**: Never call real APIs in tests +2. **Test Error Scenarios**: Verify compensation and retry logic +3. **Parallel Testing**: Use pytest-xdist for faster test runs +4. **Isolated Tests**: Each test should be independent +5. **Clear Assertions**: Verify both results and side effects +6. **Coverage Target**: ≥80% for critical workflows +7. **Fast Execution**: Use time-skipping, avoid real delays + +## Additional Resources + +- Mocking Strategies: docs.temporal.io/develop/python/testing-suite +- pytest Best Practices: docs.pytest.org/en/stable/goodpractices.html +- Python SDK Samples: github.com/temporalio/samples-python diff --git a/plugins/backend-development/skills/temporal-python-testing/resources/local-setup.md b/plugins/backend-development/skills/temporal-python-testing/resources/local-setup.md new file mode 100644 index 0000000..5939376 --- /dev/null +++ b/plugins/backend-development/skills/temporal-python-testing/resources/local-setup.md @@ -0,0 +1,550 @@ +# Local Development Setup for Temporal Python Testing + +Comprehensive guide for setting up local Temporal development environment with pytest integration and coverage tracking. + +## Temporal Server Setup with Docker Compose + +### Basic Docker Compose Configuration + +```yaml +# docker-compose.yml +version: "3.8" + +services: + temporal: + image: temporalio/auto-setup:latest + container_name: temporal-dev + ports: + - "7233:7233" # Temporal server + - "8233:8233" # Web UI + environment: + - DB=postgresql + - POSTGRES_USER=temporal + - POSTGRES_PWD=temporal + - POSTGRES_SEEDS=postgresql + - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml + depends_on: + - postgresql + + postgresql: + image: postgres:14-alpine + container_name: temporal-postgres + environment: + - POSTGRES_USER=temporal + - POSTGRES_PASSWORD=temporal + - POSTGRES_DB=temporal + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + + temporal-ui: + image: temporalio/ui:latest + container_name: temporal-ui + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CORS_ORIGINS=http://localhost:3000 + ports: + - "8080:8080" + +volumes: + postgres_data: +``` + +### Starting Local Server + +```bash +# Start Temporal server +docker-compose up -d + +# Verify server is running +docker-compose ps + +# View logs +docker-compose logs -f temporal + +# Access Temporal Web UI +open http://localhost:8080 + +# Stop server +docker-compose down + +# Reset data (clean slate) +docker-compose down -v +``` + +### Health Check Script + +```python +# scripts/health_check.py +import asyncio +from temporalio.client import Client + +async def check_temporal_health(): + """Verify Temporal server is accessible""" + try: + client = await Client.connect("localhost:7233") + print("✓ Connected to Temporal server") + + # Test workflow execution + from temporalio.worker import Worker + + @workflow.defn + class HealthCheckWorkflow: + @workflow.run + async def run(self) -> str: + return "healthy" + + async with Worker( + client, + task_queue="health-check", + workflows=[HealthCheckWorkflow], + ): + result = await client.execute_workflow( + HealthCheckWorkflow.run, + id="health-check", + task_queue="health-check", + ) + print(f"✓ Workflow execution successful: {result}") + + return True + + except Exception as e: + print(f"✗ Health check failed: {e}") + return False + +if __name__ == "__main__": + asyncio.run(check_temporal_health()) +``` + +## pytest Configuration + +### Project Structure + +``` +temporal-project/ +├── docker-compose.yml +├── pyproject.toml +├── pytest.ini +├── requirements.txt +├── src/ +│ ├── workflows/ +│ │ ├── __init__.py +│ │ ├── order_workflow.py +│ │ └── payment_workflow.py +│ └── activities/ +│ ├── __init__.py +│ ├── payment_activities.py +│ └── inventory_activities.py +├── tests/ +│ ├── conftest.py +│ ├── unit/ +│ │ ├── test_workflows.py +│ │ └── test_activities.py +│ ├── integration/ +│ │ └── test_order_flow.py +│ └── replay/ +│ └── test_workflow_replay.py +└── scripts/ + ├── health_check.py + └── export_histories.py +``` + +### pytest Configuration + +```ini +# pytest.ini +[pytest] +asyncio_mode = auto +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# Markers for test categorization +markers = + unit: Unit tests (fast, isolated) + integration: Integration tests (require Temporal server) + replay: Replay tests (require production histories) + slow: Slow running tests + +# Coverage settings +addopts = + --verbose + --strict-markers + --cov=src + --cov-report=term-missing + --cov-report=html + --cov-fail-under=80 + +# Async test timeout +asyncio_default_fixture_loop_scope = function +``` + +### Shared Test Fixtures + +```python +# tests/conftest.py +import pytest +from temporalio.testing import WorkflowEnvironment +from temporalio.client import Client + +@pytest.fixture(scope="session") +def event_loop(): + """Provide event loop for async fixtures""" + import asyncio + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() + +@pytest.fixture(scope="session") +async def temporal_client(): + """Provide Temporal client connected to local server""" + client = await Client.connect("localhost:7233") + yield client + await client.close() + +@pytest.fixture(scope="module") +async def workflow_env(): + """Module-scoped time-skipping environment""" + env = await WorkflowEnvironment.start_time_skipping() + yield env + await env.shutdown() + +@pytest.fixture +def activity_env(): + """Function-scoped activity environment""" + from temporalio.testing import ActivityEnvironment + return ActivityEnvironment() + +@pytest.fixture +async def test_worker(temporal_client, workflow_env): + """Pre-configured test worker""" + from temporalio.worker import Worker + from src.workflows import OrderWorkflow, PaymentWorkflow + from src.activities import process_payment, update_inventory + + return Worker( + workflow_env.client, + task_queue="test-queue", + workflows=[OrderWorkflow, PaymentWorkflow], + activities=[process_payment, update_inventory], + ) +``` + +### Dependencies + +```txt +# requirements.txt +temporalio>=1.5.0 +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +pytest-cov>=4.1.0 +pytest-xdist>=3.3.0 # Parallel test execution +``` + +```toml +# pyproject.toml +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_backend" + +[project] +name = "temporal-project" +version = "0.1.0" +requires-python = ">=3.10" +dependencies = [ + "temporalio>=1.5.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4.0", + "pytest-asyncio>=0.21.0", + "pytest-cov>=4.1.0", + "pytest-xdist>=3.3.0", +] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +``` + +## Coverage Configuration + +### Coverage Settings + +```ini +# .coveragerc +[run] +source = src +omit = + */tests/* + */venv/* + */__pycache__/* + +[report] +exclude_lines = + # Exclude type checking blocks + if TYPE_CHECKING: + # Exclude debug code + def __repr__ + # Exclude abstract methods + @abstractmethod + # Exclude pass statements + pass + +[html] +directory = htmlcov +``` + +### Running Tests with Coverage + +```bash +# Run all tests with coverage +pytest --cov=src --cov-report=term-missing + +# Generate HTML coverage report +pytest --cov=src --cov-report=html +open htmlcov/index.html + +# Run specific test categories +pytest -m unit # Unit tests only +pytest -m integration # Integration tests only +pytest -m "not slow" # Skip slow tests + +# Parallel execution (faster) +pytest -n auto # Use all CPU cores + +# Fail if coverage below threshold +pytest --cov=src --cov-fail-under=80 +``` + +### Coverage Report Example + +``` +---------- coverage: platform darwin, python 3.11.5 ----------- +Name Stmts Miss Cover Missing +----------------------------------------------------------------- +src/__init__.py 0 0 100% +src/activities/__init__.py 2 0 100% +src/activities/inventory.py 45 3 93% 78-80 +src/activities/payment.py 38 0 100% +src/workflows/__init__.py 2 0 100% +src/workflows/order_workflow.py 67 5 93% 45-49 +src/workflows/payment_workflow.py 52 0 100% +----------------------------------------------------------------- +TOTAL 206 8 96% + +10 files skipped due to complete coverage. +``` + +## Development Workflow + +### Daily Development Flow + +```bash +# 1. Start Temporal server +docker-compose up -d + +# 2. Verify server health +python scripts/health_check.py + +# 3. Run tests during development +pytest tests/unit/ --verbose + +# 4. Run full test suite before commit +pytest --cov=src --cov-report=term-missing + +# 5. Check coverage +open htmlcov/index.html + +# 6. Stop server +docker-compose down +``` + +### Pre-Commit Hook + +```bash +# .git/hooks/pre-commit +#!/bin/bash + +echo "Running tests..." +pytest --cov=src --cov-fail-under=80 + +if [ $? -ne 0 ]; then + echo "Tests failed. Commit aborted." + exit 1 +fi + +echo "All tests passed!" +``` + +### Makefile for Common Tasks + +```makefile +# Makefile +.PHONY: setup test test-unit test-integration coverage clean + +setup: + docker-compose up -d + pip install -r requirements.txt + python scripts/health_check.py + +test: + pytest --cov=src --cov-report=term-missing + +test-unit: + pytest -m unit --verbose + +test-integration: + pytest -m integration --verbose + +test-replay: + pytest -m replay --verbose + +test-parallel: + pytest -n auto --cov=src + +coverage: + pytest --cov=src --cov-report=html + open htmlcov/index.html + +clean: + docker-compose down -v + rm -rf .pytest_cache htmlcov .coverage + +ci: + docker-compose up -d + sleep 10 # Wait for Temporal to start + pytest --cov=src --cov-fail-under=80 + docker-compose down +``` + +### CI/CD Example + +```yaml +# .github/workflows/test.yml +name: Tests + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Start Temporal server + run: docker-compose up -d + + - name: Wait for Temporal + run: sleep 10 + + - name: Install dependencies + run: | + pip install -r requirements.txt + + - name: Run tests with coverage + run: | + pytest --cov=src --cov-report=xml --cov-fail-under=80 + + - name: Upload coverage + uses: codecov/codecov-action@v3 + with: + file: ./coverage.xml + + - name: Cleanup + if: always() + run: docker-compose down +``` + +## Debugging Tips + +### Enable Temporal SDK Logging + +```python +import logging + +# Enable debug logging for Temporal SDK +logging.basicConfig(level=logging.DEBUG) +temporal_logger = logging.getLogger("temporalio") +temporal_logger.setLevel(logging.DEBUG) +``` + +### Interactive Debugging + +```python +# Add breakpoint in test +@pytest.mark.asyncio +async def test_workflow_with_breakpoint(workflow_env): + import pdb; pdb.set_trace() # Debug here + + async with Worker(...): + result = await workflow_env.client.execute_workflow(...) +``` + +### Temporal Web UI + +```bash +# Access Web UI at http://localhost:8080 +# - View workflow executions +# - Inspect event history +# - Replay workflows +# - Monitor workers +``` + +## Best Practices + +1. **Isolated Environment**: Use Docker Compose for reproducible local setup +2. **Health Checks**: Always verify Temporal server before running tests +3. **Fast Feedback**: Use pytest markers to run unit tests quickly +4. **Coverage Targets**: Maintain ≥80% code coverage +5. **Parallel Testing**: Use pytest-xdist for faster test runs +6. **CI/CD Integration**: Automated testing on every commit +7. **Cleanup**: Clear Docker volumes between test runs if needed + +## Troubleshooting + +**Issue: Temporal server not starting** +```bash +# Check logs +docker-compose logs temporal + +# Reset database +docker-compose down -v +docker-compose up -d +``` + +**Issue: Tests timing out** +```python +# Increase timeout in pytest.ini +asyncio_default_timeout = 30 +``` + +**Issue: Port already in use** +```bash +# Find process using port 7233 +lsof -i :7233 + +# Kill process or change port in docker-compose.yml +``` + +## Additional Resources + +- Temporal Local Development: docs.temporal.io/develop/python/local-dev +- pytest Documentation: docs.pytest.org +- Docker Compose: docs.docker.com/compose +- pytest-asyncio: github.com/pytest-dev/pytest-asyncio diff --git a/plugins/backend-development/skills/temporal-python-testing/resources/replay-testing.md b/plugins/backend-development/skills/temporal-python-testing/resources/replay-testing.md new file mode 100644 index 0000000..c65d3b0 --- /dev/null +++ b/plugins/backend-development/skills/temporal-python-testing/resources/replay-testing.md @@ -0,0 +1,455 @@ +# Replay Testing for Determinism and Compatibility + +Comprehensive guide for validating workflow determinism and ensuring safe code changes using replay testing. + +## What is Replay Testing? + +**Purpose**: Verify that workflow code changes are backward-compatible with existing workflow executions + +**How it works**: +1. Temporal records every workflow decision as Event History +2. Replay testing re-executes workflow code against recorded history +3. If new code makes same decisions → deterministic (safe to deploy) +4. If decisions differ → non-deterministic (breaking change) + +**Critical Use Cases**: +- Deploying workflow code changes to production +- Validating refactoring doesn't break running workflows +- CI/CD automated compatibility checks +- Version migration validation + +## Basic Replay Testing + +### Replayer Setup + +```python +from temporalio.worker import Replayer +from temporalio.client import Client + +async def test_workflow_replay(): + """Test workflow against production history""" + + # Connect to Temporal server + client = await Client.connect("localhost:7233") + + # Create replayer with current workflow code + replayer = Replayer( + workflows=[OrderWorkflow, PaymentWorkflow] + ) + + # Fetch workflow history from production + handle = client.get_workflow_handle("order-123") + history = await handle.fetch_history() + + # Replay history with current code + await replayer.replay_workflow(history) + # Success = deterministic, Exception = breaking change +``` + +### Testing Against Multiple Histories + +```python +import pytest +from temporalio.worker import Replayer + +@pytest.mark.asyncio +async def test_replay_multiple_workflows(): + """Replay against multiple production histories""" + + replayer = Replayer(workflows=[OrderWorkflow]) + + # Test against different workflow executions + workflow_ids = [ + "order-success-123", + "order-cancelled-456", + "order-retry-789", + ] + + for workflow_id in workflow_ids: + handle = client.get_workflow_handle(workflow_id) + history = await handle.fetch_history() + + # Replay should succeed for all variants + await replayer.replay_workflow(history) +``` + +## Determinism Validation + +### Common Non-Deterministic Patterns + +**Problem: Random Number Generation** +```python +# ❌ Non-deterministic (breaks replay) +@workflow.defn +class BadWorkflow: + @workflow.run + async def run(self) -> int: + return random.randint(1, 100) # Different on replay! + +# ✅ Deterministic (safe for replay) +@workflow.defn +class GoodWorkflow: + @workflow.run + async def run(self) -> int: + return workflow.random().randint(1, 100) # Deterministic random +``` + +**Problem: Current Time** +```python +# ❌ Non-deterministic +@workflow.defn +class BadWorkflow: + @workflow.run + async def run(self) -> str: + now = datetime.now() # Different on replay! + return now.isoformat() + +# ✅ Deterministic +@workflow.defn +class GoodWorkflow: + @workflow.run + async def run(self) -> str: + now = workflow.now() # Deterministic time + return now.isoformat() +``` + +**Problem: Direct External Calls** +```python +# ❌ Non-deterministic +@workflow.defn +class BadWorkflow: + @workflow.run + async def run(self) -> dict: + response = requests.get("https://api.example.com/data") # External call! + return response.json() + +# ✅ Deterministic +@workflow.defn +class GoodWorkflow: + @workflow.run + async def run(self) -> dict: + # Use activity for external calls + return await workflow.execute_activity( + fetch_external_data, + start_to_close_timeout=timedelta(seconds=30), + ) +``` + +### Testing Determinism + +```python +@pytest.mark.asyncio +async def test_workflow_determinism(): + """Verify workflow produces same output on multiple runs""" + + @workflow.defn + class DeterministicWorkflow: + @workflow.run + async def run(self, seed: int) -> list[int]: + # Use workflow.random() for determinism + rng = workflow.random() + rng.seed(seed) + return [rng.randint(1, 100) for _ in range(10)] + + env = await WorkflowEnvironment.start_time_skipping() + + # Run workflow twice with same input + results = [] + for i in range(2): + async with Worker( + env.client, + task_queue="test", + workflows=[DeterministicWorkflow], + ): + result = await env.client.execute_workflow( + DeterministicWorkflow.run, + 42, # Same seed + id=f"determinism-test-{i}", + task_queue="test", + ) + results.append(result) + + await env.shutdown() + + # Verify identical outputs + assert results[0] == results[1] +``` + +## Production History Replay + +### Exporting Workflow History + +```python +from temporalio.client import Client + +async def export_workflow_history(workflow_id: str, output_file: str): + """Export workflow history for replay testing""" + + client = await Client.connect("production.temporal.io:7233") + + # Fetch workflow history + handle = client.get_workflow_handle(workflow_id) + history = await handle.fetch_history() + + # Save to file for replay testing + with open(output_file, "wb") as f: + f.write(history.SerializeToString()) + + print(f"Exported history to {output_file}") +``` + +### Replaying from File + +```python +from temporalio.worker import Replayer +from temporalio.api.history.v1 import History + +async def test_replay_from_file(): + """Replay workflow from exported history file""" + + # Load history from file + with open("workflow_histories/order-123.pb", "rb") as f: + history = History.FromString(f.read()) + + # Replay with current workflow code + replayer = Replayer(workflows=[OrderWorkflow]) + await replayer.replay_workflow(history) + # Success = safe to deploy +``` + +## CI/CD Integration Patterns + +### GitHub Actions Example + +```yaml +# .github/workflows/replay-tests.yml +name: Replay Tests + +on: + pull_request: + branches: [main] + +jobs: + replay-tests: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Install dependencies + run: | + pip install -r requirements.txt + pip install pytest pytest-asyncio + + - name: Download production histories + run: | + # Fetch recent workflow histories from production + python scripts/export_histories.py + + - name: Run replay tests + run: | + pytest tests/replay/ --verbose + + - name: Upload results + if: failure() + uses: actions/upload-artifact@v3 + with: + name: replay-failures + path: replay-failures/ +``` + +### Automated History Export + +```python +# scripts/export_histories.py +import asyncio +from temporalio.client import Client +from datetime import datetime, timedelta + +async def export_recent_histories(): + """Export recent production workflow histories""" + + client = await Client.connect("production.temporal.io:7233") + + # Query recent completed workflows + workflows = client.list_workflows( + query="WorkflowType='OrderWorkflow' AND CloseTime > '7 days ago'" + ) + + count = 0 + async for workflow in workflows: + # Export history + history = await workflow.fetch_history() + + # Save to file + filename = f"workflow_histories/{workflow.id}.pb" + with open(filename, "wb") as f: + f.write(history.SerializeToString()) + + count += 1 + if count >= 100: # Limit to 100 most recent + break + + print(f"Exported {count} workflow histories") + +if __name__ == "__main__": + asyncio.run(export_recent_histories()) +``` + +### Replay Test Suite + +```python +# tests/replay/test_workflow_replay.py +import pytest +import glob +from temporalio.worker import Replayer +from temporalio.api.history.v1 import History +from workflows import OrderWorkflow, PaymentWorkflow + +@pytest.mark.asyncio +async def test_replay_all_histories(): + """Replay all production histories""" + + replayer = Replayer( + workflows=[OrderWorkflow, PaymentWorkflow] + ) + + # Load all history files + history_files = glob.glob("workflow_histories/*.pb") + + failures = [] + for history_file in history_files: + try: + with open(history_file, "rb") as f: + history = History.FromString(f.read()) + + await replayer.replay_workflow(history) + print(f"✓ {history_file}") + + except Exception as e: + failures.append((history_file, str(e))) + print(f"✗ {history_file}: {e}") + + # Report failures + if failures: + pytest.fail( + f"Replay failed for {len(failures)} workflows:\n" + + "\n".join(f" {file}: {error}" for file, error in failures) + ) +``` + +## Version Compatibility Testing + +### Testing Code Evolution + +```python +@pytest.mark.asyncio +async def test_workflow_version_compatibility(): + """Test workflow with version changes""" + + @workflow.defn + class EvolvingWorkflow: + @workflow.run + async def run(self) -> str: + # Use versioning for safe code evolution + version = workflow.get_version("feature-flag", 1, 2) + + if version == 1: + # Old behavior + return "version-1" + else: + # New behavior + return "version-2" + + env = await WorkflowEnvironment.start_time_skipping() + + # Test version 1 behavior + async with Worker( + env.client, + task_queue="test", + workflows=[EvolvingWorkflow], + ): + result_v1 = await env.client.execute_workflow( + EvolvingWorkflow.run, + id="evolving-v1", + task_queue="test", + ) + assert result_v1 == "version-1" + + # Simulate workflow executing again with version 2 + result_v2 = await env.client.execute_workflow( + EvolvingWorkflow.run, + id="evolving-v2", + task_queue="test", + ) + # New workflows use version 2 + assert result_v2 == "version-2" + + await env.shutdown() +``` + +### Migration Strategy + +```python +# Phase 1: Add version check +@workflow.defn +class MigratingWorkflow: + @workflow.run + async def run(self) -> dict: + version = workflow.get_version("new-logic", 1, 2) + + if version == 1: + # Old logic (existing workflows) + return await self._old_implementation() + else: + # New logic (new workflows) + return await self._new_implementation() + +# Phase 2: After all old workflows complete, remove old code +@workflow.defn +class MigratedWorkflow: + @workflow.run + async def run(self) -> dict: + # Only new logic remains + return await self._new_implementation() +``` + +## Best Practices + +1. **Replay Before Deploy**: Always run replay tests before deploying workflow changes +2. **Export Regularly**: Continuously export production histories for testing +3. **CI/CD Integration**: Automated replay testing in pull request checks +4. **Version Tracking**: Use workflow.get_version() for safe code evolution +5. **History Retention**: Keep representative workflow histories for regression testing +6. **Determinism**: Never use random(), datetime.now(), or direct external calls +7. **Comprehensive Testing**: Test against various workflow execution paths + +## Common Replay Errors + +**Non-Deterministic Error**: +``` +WorkflowNonDeterministicError: Workflow command mismatch at position 5 +Expected: ScheduleActivityTask(activity_id='activity-1') +Got: ScheduleActivityTask(activity_id='activity-2') +``` + +**Solution**: Code change altered workflow decision sequence + +**Version Mismatch Error**: +``` +WorkflowVersionError: Workflow version changed from 1 to 2 without using get_version() +``` + +**Solution**: Use workflow.get_version() for backward-compatible changes + +## Additional Resources + +- Replay Testing: docs.temporal.io/develop/python/testing-suite#replay-testing +- Workflow Versioning: docs.temporal.io/workflows#versioning +- Determinism Guide: docs.temporal.io/workflows#deterministic-constraints +- CI/CD Integration: github.com/temporalio/samples-python/tree/main/.github/workflows diff --git a/plugins/backend-development/skills/temporal-python-testing/resources/unit-testing.md b/plugins/backend-development/skills/temporal-python-testing/resources/unit-testing.md new file mode 100644 index 0000000..ad6c2f0 --- /dev/null +++ b/plugins/backend-development/skills/temporal-python-testing/resources/unit-testing.md @@ -0,0 +1,320 @@ +# Unit Testing Temporal Workflows and Activities + +Focused guide for testing individual workflows and activities in isolation using WorkflowEnvironment and ActivityEnvironment. + +## WorkflowEnvironment with Time-Skipping + +**Purpose**: Test workflows in isolation with instant time progression (month-long workflows → seconds) + +### Basic Setup Pattern + +```python +import pytest +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +@pytest.fixture +async def workflow_env(): + """Reusable time-skipping test environment""" + env = await WorkflowEnvironment.start_time_skipping() + yield env + await env.shutdown() + +@pytest.mark.asyncio +async def test_workflow_execution(workflow_env): + """Test workflow with time-skipping""" + async with Worker( + workflow_env.client, + task_queue="test-queue", + workflows=[YourWorkflow], + activities=[your_activity], + ): + result = await workflow_env.client.execute_workflow( + YourWorkflow.run, + "test-input", + id="test-wf-id", + task_queue="test-queue", + ) + assert result == "expected-output" +``` + +**Key Benefits**: +- `workflow.sleep(timedelta(days=30))` completes instantly +- Fast feedback loop (milliseconds vs hours) +- Deterministic test execution + +### Time-Skipping Examples + +**Sleep Advancement**: +```python +@pytest.mark.asyncio +async def test_workflow_with_delays(workflow_env): + """Workflow sleeps are instant in time-skipping mode""" + + @workflow.defn + class DelayedWorkflow: + @workflow.run + async def run(self) -> str: + await workflow.sleep(timedelta(hours=24)) # Instant in tests + return "completed" + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[DelayedWorkflow], + ): + result = await workflow_env.client.execute_workflow( + DelayedWorkflow.run, + id="delayed-wf", + task_queue="test", + ) + assert result == "completed" +``` + +**Manual Time Control**: +```python +@pytest.mark.asyncio +async def test_workflow_manual_time(workflow_env): + """Manually advance time for precise control""" + + handle = await workflow_env.client.start_workflow( + TimeBasedWorkflow.run, + id="time-wf", + task_queue="test", + ) + + # Advance time by specific amount + await workflow_env.sleep(timedelta(hours=1)) + + # Verify intermediate state via query + state = await handle.query(TimeBasedWorkflow.get_state) + assert state == "processing" + + # Advance to completion + await workflow_env.sleep(timedelta(hours=23)) + result = await handle.result() + assert result == "completed" +``` + +### Testing Workflow Logic + +**Decision Testing**: +```python +@pytest.mark.asyncio +async def test_workflow_branching(workflow_env): + """Test different execution paths""" + + @workflow.defn + class ConditionalWorkflow: + @workflow.run + async def run(self, condition: bool) -> str: + if condition: + return "path-a" + return "path-b" + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[ConditionalWorkflow], + ): + # Test true path + result_a = await workflow_env.client.execute_workflow( + ConditionalWorkflow.run, + True, + id="cond-wf-true", + task_queue="test", + ) + assert result_a == "path-a" + + # Test false path + result_b = await workflow_env.client.execute_workflow( + ConditionalWorkflow.run, + False, + id="cond-wf-false", + task_queue="test", + ) + assert result_b == "path-b" +``` + +## ActivityEnvironment Testing + +**Purpose**: Test activities in isolation without workflows or Temporal server + +### Basic Activity Test + +```python +from temporalio.testing import ActivityEnvironment + +async def test_activity_basic(): + """Test activity without workflow context""" + + @activity.defn + async def process_data(input: str) -> str: + return input.upper() + + env = ActivityEnvironment() + result = await env.run(process_data, "test") + assert result == "TEST" +``` + +### Testing Activity Context + +**Heartbeat Testing**: +```python +async def test_activity_heartbeat(): + """Verify heartbeat calls""" + + @activity.defn + async def long_running_activity(total_items: int) -> int: + for i in range(total_items): + activity.heartbeat(i) # Report progress + await asyncio.sleep(0.1) + return total_items + + env = ActivityEnvironment() + result = await env.run(long_running_activity, 10) + assert result == 10 +``` + +**Cancellation Testing**: +```python +async def test_activity_cancellation(): + """Test activity cancellation handling""" + + @activity.defn + async def cancellable_activity() -> str: + try: + while True: + if activity.is_cancelled(): + return "cancelled" + await asyncio.sleep(0.1) + except asyncio.CancelledError: + return "cancelled" + + env = ActivityEnvironment(cancellation_reason="test-cancel") + result = await env.run(cancellable_activity) + assert result == "cancelled" +``` + +### Testing Error Handling + +**Exception Propagation**: +```python +async def test_activity_error(): + """Test activity error handling""" + + @activity.defn + async def failing_activity(should_fail: bool) -> str: + if should_fail: + raise ApplicationError("Validation failed", non_retryable=True) + return "success" + + env = ActivityEnvironment() + + # Test success path + result = await env.run(failing_activity, False) + assert result == "success" + + # Test error path + with pytest.raises(ApplicationError) as exc_info: + await env.run(failing_activity, True) + assert "Validation failed" in str(exc_info.value) +``` + +## Pytest Integration Patterns + +### Shared Fixtures + +```python +# conftest.py +import pytest +from temporalio.testing import WorkflowEnvironment + +@pytest.fixture(scope="module") +async def workflow_env(): + """Module-scoped environment (reused across tests)""" + env = await WorkflowEnvironment.start_time_skipping() + yield env + await env.shutdown() + +@pytest.fixture +def activity_env(): + """Function-scoped environment (fresh per test)""" + return ActivityEnvironment() +``` + +### Parameterized Tests + +```python +@pytest.mark.parametrize("input,expected", [ + ("test", "TEST"), + ("hello", "HELLO"), + ("123", "123"), +]) +async def test_activity_parameterized(activity_env, input, expected): + """Test multiple input scenarios""" + result = await activity_env.run(process_data, input) + assert result == expected +``` + +## Best Practices + +1. **Fast Execution**: Use time-skipping for all workflow tests +2. **Isolation**: Test workflows and activities separately +3. **Shared Fixtures**: Reuse WorkflowEnvironment across related tests +4. **Coverage Target**: ≥80% for workflow logic +5. **Mock Activities**: Use ActivityEnvironment for activity-specific logic +6. **Determinism**: Ensure test results are consistent across runs +7. **Error Cases**: Test both success and failure scenarios + +## Common Patterns + +**Testing Retry Logic**: +```python +@pytest.mark.asyncio +async def test_workflow_with_retries(workflow_env): + """Test activity retry behavior""" + + call_count = 0 + + @activity.defn + async def flaky_activity() -> str: + nonlocal call_count + call_count += 1 + if call_count < 3: + raise Exception("Transient error") + return "success" + + @workflow.defn + class RetryWorkflow: + @workflow.run + async def run(self) -> str: + return await workflow.execute_activity( + flaky_activity, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy( + initial_interval=timedelta(milliseconds=1), + maximum_attempts=5, + ), + ) + + async with Worker( + workflow_env.client, + task_queue="test", + workflows=[RetryWorkflow], + activities=[flaky_activity], + ): + result = await workflow_env.client.execute_workflow( + RetryWorkflow.run, + id="retry-wf", + task_queue="test", + ) + assert result == "success" + assert call_count == 3 # Verify retry attempts +``` + +## Additional Resources + +- Python SDK Testing: docs.temporal.io/develop/python/testing-suite +- pytest Documentation: docs.pytest.org +- Temporal Samples: github.com/temporalio/samples-python diff --git a/plugins/backend-development/skills/workflow-orchestration-patterns/SKILL.md b/plugins/backend-development/skills/workflow-orchestration-patterns/SKILL.md new file mode 100644 index 0000000..5010bf5 --- /dev/null +++ b/plugins/backend-development/skills/workflow-orchestration-patterns/SKILL.md @@ -0,0 +1,286 @@ +--- +name: workflow-orchestration-patterns +description: Design durable workflows with Temporal for distributed systems. Covers workflow vs activity separation, saga patterns, state management, and determinism constraints. Use when building long-running processes, distributed transactions, or microservice orchestration. +--- + +# Workflow Orchestration Patterns + +Master workflow orchestration architecture with Temporal, covering fundamental design decisions, resilience patterns, and best practices for building reliable distributed systems. + +## When to Use Workflow Orchestration + +### Ideal Use Cases (Source: docs.temporal.io) + +- **Multi-step processes** spanning machines/services/databases +- **Distributed transactions** requiring all-or-nothing semantics +- **Long-running workflows** (hours to years) with automatic state persistence +- **Failure recovery** that must resume from last successful step +- **Business processes**: bookings, orders, campaigns, approvals +- **Entity lifecycle management**: inventory tracking, account management, cart workflows +- **Infrastructure automation**: CI/CD pipelines, provisioning, deployments +- **Human-in-the-loop** systems requiring timeouts and escalations + +### When NOT to Use + +- Simple CRUD operations (use direct API calls) +- Pure data processing pipelines (use Airflow, batch processing) +- Stateless request/response (use standard APIs) +- Real-time streaming (use Kafka, event processors) + +## Critical Design Decision: Workflows vs Activities + +**The Fundamental Rule** (Source: temporal.io/blog/workflow-engine-principles): +- **Workflows** = Orchestration logic and decision-making +- **Activities** = External interactions (APIs, databases, network calls) + +### Workflows (Orchestration) + +**Characteristics:** +- Contain business logic and coordination +- **MUST be deterministic** (same inputs → same outputs) +- **Cannot** perform direct external calls +- State automatically preserved across failures +- Can run for years despite infrastructure failures + +**Example workflow tasks:** +- Decide which steps to execute +- Handle compensation logic +- Manage timeouts and retries +- Coordinate child workflows + +### Activities (External Interactions) + +**Characteristics:** +- Handle all external system interactions +- Can be non-deterministic (API calls, DB writes) +- Include built-in timeouts and retry logic +- **Must be idempotent** (calling N times = calling once) +- Short-lived (seconds to minutes typically) + +**Example activity tasks:** +- Call payment gateway API +- Write to database +- Send emails or notifications +- Query external services + +### Design Decision Framework + +``` +Does it touch external systems? → Activity +Is it orchestration/decision logic? → Workflow +``` + +## Core Workflow Patterns + +### 1. Saga Pattern with Compensation + +**Purpose**: Implement distributed transactions with rollback capability + +**Pattern** (Source: temporal.io/blog/compensating-actions-part-of-a-complete-breakfast-with-sagas): + +``` +For each step: + 1. Register compensation BEFORE executing + 2. Execute the step (via activity) + 3. On failure, run all compensations in reverse order (LIFO) +``` + +**Example: Payment Workflow** +1. Reserve inventory (compensation: release inventory) +2. Charge payment (compensation: refund payment) +3. Fulfill order (compensation: cancel fulfillment) + +**Critical Requirements:** +- Compensations must be idempotent +- Register compensation BEFORE executing step +- Run compensations in reverse order +- Handle partial failures gracefully + +### 2. Entity Workflows (Actor Model) + +**Purpose**: Long-lived workflow representing single entity instance + +**Pattern** (Source: docs.temporal.io/evaluate/use-cases-design-patterns): +- One workflow execution = one entity (cart, account, inventory item) +- Workflow persists for entity lifetime +- Receives signals for state changes +- Supports queries for current state + +**Example Use Cases:** +- Shopping cart (add items, checkout, expiration) +- Bank account (deposits, withdrawals, balance checks) +- Product inventory (stock updates, reservations) + +**Benefits:** +- Encapsulates entity behavior +- Guarantees consistency per entity +- Natural event sourcing + +### 3. Fan-Out/Fan-In (Parallel Execution) + +**Purpose**: Execute multiple tasks in parallel, aggregate results + +**Pattern:** +- Spawn child workflows or parallel activities +- Wait for all to complete +- Aggregate results +- Handle partial failures + +**Scaling Rule** (Source: temporal.io/blog/workflow-engine-principles): +- Don't scale individual workflows +- For 1M tasks: spawn 1K child workflows × 1K tasks each +- Keep each workflow bounded + +### 4. Async Callback Pattern + +**Purpose**: Wait for external event or human approval + +**Pattern:** +- Workflow sends request and waits for signal +- External system processes asynchronously +- Sends signal to resume workflow +- Workflow continues with response + +**Use Cases:** +- Human approval workflows +- Webhook callbacks +- Long-running external processes + +## State Management and Determinism + +### Automatic State Preservation + +**How Temporal Works** (Source: docs.temporal.io/workflows): +- Complete program state preserved automatically +- Event History records every command and event +- Seamless recovery from crashes +- Applications restore pre-failure state + +### Determinism Constraints + +**Workflows Execute as State Machines**: +- Replay behavior must be consistent +- Same inputs → identical outputs every time + +**Prohibited in Workflows** (Source: docs.temporal.io/workflows): +- ❌ Threading, locks, synchronization primitives +- ❌ Random number generation (`random()`) +- ❌ Global state or static variables +- ❌ System time (`datetime.now()`) +- ❌ Direct file I/O or network calls +- ❌ Non-deterministic libraries + +**Allowed in Workflows**: +- ✅ `workflow.now()` (deterministic time) +- ✅ `workflow.random()` (deterministic random) +- ✅ Pure functions and calculations +- ✅ Calling activities (non-deterministic operations) + +### Versioning Strategies + +**Challenge**: Changing workflow code while old executions still running + +**Solutions**: +1. **Versioning API**: Use `workflow.get_version()` for safe changes +2. **New Workflow Type**: Create new workflow, route new executions to it +3. **Backward Compatibility**: Ensure old events replay correctly + +## Resilience and Error Handling + +### Retry Policies + +**Default Behavior**: Temporal retries activities forever + +**Configure Retry**: +- Initial retry interval +- Backoff coefficient (exponential backoff) +- Maximum interval (cap retry delay) +- Maximum attempts (eventually fail) + +**Non-Retryable Errors**: +- Invalid input (validation failures) +- Business rule violations +- Permanent failures (resource not found) + +### Idempotency Requirements + +**Why Critical** (Source: docs.temporal.io/activities): +- Activities may execute multiple times +- Network failures trigger retries +- Duplicate execution must be safe + +**Implementation Strategies**: +- Idempotency keys (deduplication) +- Check-then-act with unique constraints +- Upsert operations instead of insert +- Track processed request IDs + +### Activity Heartbeats + +**Purpose**: Detect stalled long-running activities + +**Pattern**: +- Activity sends periodic heartbeat +- Includes progress information +- Timeout if no heartbeat received +- Enables progress-based retry + +## Best Practices + +### Workflow Design + +1. **Keep workflows focused** - Single responsibility per workflow +2. **Small workflows** - Use child workflows for scalability +3. **Clear boundaries** - Workflow orchestrates, activities execute +4. **Test locally** - Use time-skipping test environment + +### Activity Design + +1. **Idempotent operations** - Safe to retry +2. **Short-lived** - Seconds to minutes, not hours +3. **Timeout configuration** - Always set timeouts +4. **Heartbeat for long tasks** - Report progress +5. **Error handling** - Distinguish retryable vs non-retryable + +### Common Pitfalls + +**Workflow Violations**: +- Using `datetime.now()` instead of `workflow.now()` +- Threading or async operations in workflow code +- Calling external APIs directly from workflow +- Non-deterministic logic in workflows + +**Activity Mistakes**: +- Non-idempotent operations (can't handle retries) +- Missing timeouts (activities run forever) +- No error classification (retry validation errors) +- Ignoring payload limits (2MB per argument) + +### Operational Considerations + +**Monitoring**: +- Workflow execution duration +- Activity failure rates +- Retry attempts and backoff +- Pending workflow counts + +**Scalability**: +- Horizontal scaling with workers +- Task queue partitioning +- Child workflow decomposition +- Activity batching when appropriate + +## Additional Resources + +**Official Documentation**: +- Temporal Core Concepts: docs.temporal.io/workflows +- Workflow Patterns: docs.temporal.io/evaluate/use-cases-design-patterns +- Best Practices: docs.temporal.io/develop/best-practices +- Saga Pattern: temporal.io/blog/saga-pattern-made-easy + +**Key Principles**: +1. Workflows = orchestration, Activities = external calls +2. Determinism is non-negotiable for workflows +3. Idempotency is critical for activities +4. State preservation is automatic +5. Design for failure and recovery