mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add comprehensive Conductor plugin implementing Context-Driven Development methodology with tracks, specs, and phased implementation plans. Components: - 5 commands: setup, new-track, implement, status, revert - 1 agent: conductor-validator - 3 skills: context-driven-development, track-management, workflow-patterns - 18 templates for project artifacts Documentation updates: - README.md: Updated counts (68 plugins, 100 agents, 110 skills, 76 tools) - docs/plugins.md: Added Conductor to Workflows section - docs/agents.md: Added conductor-validator agent - docs/agent-skills.md: Added Conductor skills section Also includes Prettier formatting across all project files.
13 KiB
13 KiB
Integration Testing with Mocked Activities
Comprehensive patterns for testing workflows with mocked external dependencies, error injection, and complex scenarios.
Activity Mocking Strategy
Purpose: Test workflow orchestration logic without calling real external services
Basic Mock Pattern
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from unittest.mock import Mock
@pytest.mark.asyncio
async def test_workflow_with_mocked_activity(workflow_env):
"""Mock activity to test workflow logic"""
# Create mock activity
mock_activity = Mock(return_value="mocked-result")
@workflow.defn
class WorkflowWithActivity:
@workflow.run
async def run(self, input: str) -> str:
result = await workflow.execute_activity(
process_external_data,
input,
start_to_close_timeout=timedelta(seconds=10),
)
return f"processed: {result}"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[WorkflowWithActivity],
activities=[mock_activity], # Use mock instead of real activity
):
result = await workflow_env.client.execute_workflow(
WorkflowWithActivity.run,
"test-input",
id="wf-mock",
task_queue="test",
)
assert result == "processed: mocked-result"
mock_activity.assert_called_once()
Dynamic Mock Responses
Scenario-Based Mocking:
@pytest.mark.asyncio
async def test_workflow_multiple_mock_scenarios(workflow_env):
"""Test different workflow paths with dynamic mocks"""
# Mock returns different values based on input
def dynamic_activity(input: str) -> str:
if input == "error-case":
raise ApplicationError("Validation failed", non_retryable=True)
return f"processed-{input}"
@workflow.defn
class DynamicWorkflow:
@workflow.run
async def run(self, input: str) -> str:
try:
result = await workflow.execute_activity(
dynamic_activity,
input,
start_to_close_timeout=timedelta(seconds=10),
)
return f"success: {result}"
except ApplicationError as e:
return f"error: {e.message}"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[DynamicWorkflow],
activities=[dynamic_activity],
):
# Test success path
result_success = await workflow_env.client.execute_workflow(
DynamicWorkflow.run,
"valid-input",
id="wf-success",
task_queue="test",
)
assert result_success == "success: processed-valid-input"
# Test error path
result_error = await workflow_env.client.execute_workflow(
DynamicWorkflow.run,
"error-case",
id="wf-error",
task_queue="test",
)
assert "Validation failed" in result_error
Error Injection Patterns
Testing Transient Failures
Retry Behavior:
@pytest.mark.asyncio
async def test_workflow_transient_errors(workflow_env):
"""Test retry logic with controlled failures"""
attempt_count = 0
@activity.defn
async def transient_activity() -> str:
nonlocal attempt_count
attempt_count += 1
if attempt_count < 3:
raise Exception(f"Transient error {attempt_count}")
return "success-after-retries"
@workflow.defn
class RetryWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
transient_activity,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=10),
maximum_attempts=5,
backoff_coefficient=1.0,
),
)
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[RetryWorkflow],
activities=[transient_activity],
):
result = await workflow_env.client.execute_workflow(
RetryWorkflow.run,
id="retry-wf",
task_queue="test",
)
assert result == "success-after-retries"
assert attempt_count == 3
Testing Non-Retryable Errors
Business Validation Failures:
@pytest.mark.asyncio
async def test_workflow_non_retryable_error(workflow_env):
"""Test handling of permanent failures"""
@activity.defn
async def validation_activity(input: dict) -> str:
if not input.get("valid"):
raise ApplicationError(
"Invalid input",
non_retryable=True, # Don't retry validation errors
)
return "validated"
@workflow.defn
class ValidationWorkflow:
@workflow.run
async def run(self, input: dict) -> str:
try:
return await workflow.execute_activity(
validation_activity,
input,
start_to_close_timeout=timedelta(seconds=10),
)
except ApplicationError as e:
return f"validation-failed: {e.message}"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[ValidationWorkflow],
activities=[validation_activity],
):
result = await workflow_env.client.execute_workflow(
ValidationWorkflow.run,
{"valid": False},
id="validation-wf",
task_queue="test",
)
assert "validation-failed" in result
Multi-Activity Workflow Testing
Sequential Activity Pattern
@pytest.mark.asyncio
async def test_workflow_sequential_activities(workflow_env):
"""Test workflow orchestrating multiple activities"""
activity_calls = []
@activity.defn
async def step_1(input: str) -> str:
activity_calls.append("step_1")
return f"{input}-step1"
@activity.defn
async def step_2(input: str) -> str:
activity_calls.append("step_2")
return f"{input}-step2"
@activity.defn
async def step_3(input: str) -> str:
activity_calls.append("step_3")
return f"{input}-step3"
@workflow.defn
class SequentialWorkflow:
@workflow.run
async def run(self, input: str) -> str:
result_1 = await workflow.execute_activity(
step_1,
input,
start_to_close_timeout=timedelta(seconds=10),
)
result_2 = await workflow.execute_activity(
step_2,
result_1,
start_to_close_timeout=timedelta(seconds=10),
)
result_3 = await workflow.execute_activity(
step_3,
result_2,
start_to_close_timeout=timedelta(seconds=10),
)
return result_3
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[SequentialWorkflow],
activities=[step_1, step_2, step_3],
):
result = await workflow_env.client.execute_workflow(
SequentialWorkflow.run,
"start",
id="seq-wf",
task_queue="test",
)
assert result == "start-step1-step2-step3"
assert activity_calls == ["step_1", "step_2", "step_3"]
Parallel Activity Pattern
@pytest.mark.asyncio
async def test_workflow_parallel_activities(workflow_env):
"""Test concurrent activity execution"""
@activity.defn
async def parallel_task(task_id: int) -> str:
return f"task-{task_id}"
@workflow.defn
class ParallelWorkflow:
@workflow.run
async def run(self, task_count: int) -> list[str]:
# Execute activities in parallel
tasks = [
workflow.execute_activity(
parallel_task,
i,
start_to_close_timeout=timedelta(seconds=10),
)
for i in range(task_count)
]
return await asyncio.gather(*tasks)
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[ParallelWorkflow],
activities=[parallel_task],
):
result = await workflow_env.client.execute_workflow(
ParallelWorkflow.run,
3,
id="parallel-wf",
task_queue="test",
)
assert result == ["task-0", "task-1", "task-2"]
Signal and Query Testing
Signal Handlers
@pytest.mark.asyncio
async def test_workflow_signals(workflow_env):
"""Test workflow signal handling"""
@workflow.defn
class SignalWorkflow:
def __init__(self) -> None:
self._status = "initialized"
@workflow.run
async def run(self) -> str:
# Wait for completion signal
await workflow.wait_condition(lambda: self._status == "completed")
return self._status
@workflow.signal
async def update_status(self, new_status: str) -> None:
self._status = new_status
@workflow.query
def get_status(self) -> str:
return self._status
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[SignalWorkflow],
):
# Start workflow
handle = await workflow_env.client.start_workflow(
SignalWorkflow.run,
id="signal-wf",
task_queue="test",
)
# Verify initial state via query
initial_status = await handle.query(SignalWorkflow.get_status)
assert initial_status == "initialized"
# Send signal
await handle.signal(SignalWorkflow.update_status, "processing")
# Verify updated state
updated_status = await handle.query(SignalWorkflow.get_status)
assert updated_status == "processing"
# Complete workflow
await handle.signal(SignalWorkflow.update_status, "completed")
result = await handle.result()
assert result == "completed"
Coverage Strategies
Workflow Logic Coverage
Target: ≥80% coverage of workflow decision logic
# Test all branches
@pytest.mark.parametrize("condition,expected", [
(True, "branch-a"),
(False, "branch-b"),
])
async def test_workflow_branches(workflow_env, condition, expected):
"""Ensure all code paths are tested"""
# Test implementation
pass
Activity Coverage
Target: ≥80% coverage of activity logic
# Test activity edge cases
@pytest.mark.parametrize("input,expected", [
("valid", "success"),
("", "empty-input-error"),
(None, "null-input-error"),
])
async def test_activity_edge_cases(activity_env, input, expected):
"""Test activity error handling"""
# Test implementation
pass
Integration Test Organization
Test Structure
tests/
├── integration/
│ ├── conftest.py # Shared fixtures
│ ├── test_order_workflow.py # Order processing tests
│ ├── test_payment_workflow.py # Payment tests
│ └── test_fulfillment_workflow.py
├── unit/
│ ├── test_order_activities.py
│ └── test_payment_activities.py
└── fixtures/
└── test_data.py # Test data builders
Shared Fixtures
# conftest.py
import pytest
from temporalio.testing import WorkflowEnvironment
@pytest.fixture(scope="session")
async def workflow_env():
"""Session-scoped environment for integration tests"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.fixture
def mock_payment_service():
"""Mock external payment service"""
return Mock()
@pytest.fixture
def mock_inventory_service():
"""Mock external inventory service"""
return Mock()
Best Practices
- Mock External Dependencies: Never call real APIs in tests
- Test Error Scenarios: Verify compensation and retry logic
- Parallel Testing: Use pytest-xdist for faster test runs
- Isolated Tests: Each test should be independent
- Clear Assertions: Verify both results and side effects
- Coverage Target: ≥80% for critical workflows
- Fast Execution: Use time-skipping, avoid real delays
Additional Resources
- Mocking Strategies: docs.temporal.io/develop/python/testing-suite
- pytest Best Practices: docs.pytest.org/en/stable/goodpractices.html
- Python SDK Samples: github.com/temporalio/samples-python