mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add comprehensive Conductor plugin implementing Context-Driven Development methodology with tracks, specs, and phased implementation plans. Components: - 5 commands: setup, new-track, implement, status, revert - 1 agent: conductor-validator - 3 skills: context-driven-development, track-management, workflow-patterns - 18 templates for project artifacts Documentation updates: - README.md: Updated counts (68 plugins, 100 agents, 110 skills, 76 tools) - docs/plugins.md: Added Conductor to Workflows section - docs/agents.md: Added conductor-validator agent - docs/agent-skills.md: Added Conductor skills section Also includes Prettier formatting across all project files.
8.5 KiB
8.5 KiB
Unit Testing Temporal Workflows and Activities
Focused guide for testing individual workflows and activities in isolation using WorkflowEnvironment and ActivityEnvironment.
WorkflowEnvironment with Time-Skipping
Purpose: Test workflows in isolation with instant time progression (month-long workflows → seconds)
Basic Setup Pattern
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@pytest.fixture
async def workflow_env():
"""Reusable time-skipping test environment"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.mark.asyncio
async def test_workflow_execution(workflow_env):
"""Test workflow with time-skipping"""
async with Worker(
workflow_env.client,
task_queue="test-queue",
workflows=[YourWorkflow],
activities=[your_activity],
):
result = await workflow_env.client.execute_workflow(
YourWorkflow.run,
"test-input",
id="test-wf-id",
task_queue="test-queue",
)
assert result == "expected-output"
Key Benefits:
workflow.sleep(timedelta(days=30))completes instantly- Fast feedback loop (milliseconds vs hours)
- Deterministic test execution
Time-Skipping Examples
Sleep Advancement:
@pytest.mark.asyncio
async def test_workflow_with_delays(workflow_env):
"""Workflow sleeps are instant in time-skipping mode"""
@workflow.defn
class DelayedWorkflow:
@workflow.run
async def run(self) -> str:
await workflow.sleep(timedelta(hours=24)) # Instant in tests
return "completed"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[DelayedWorkflow],
):
result = await workflow_env.client.execute_workflow(
DelayedWorkflow.run,
id="delayed-wf",
task_queue="test",
)
assert result == "completed"
Manual Time Control:
@pytest.mark.asyncio
async def test_workflow_manual_time(workflow_env):
"""Manually advance time for precise control"""
handle = await workflow_env.client.start_workflow(
TimeBasedWorkflow.run,
id="time-wf",
task_queue="test",
)
# Advance time by specific amount
await workflow_env.sleep(timedelta(hours=1))
# Verify intermediate state via query
state = await handle.query(TimeBasedWorkflow.get_state)
assert state == "processing"
# Advance to completion
await workflow_env.sleep(timedelta(hours=23))
result = await handle.result()
assert result == "completed"
Testing Workflow Logic
Decision Testing:
@pytest.mark.asyncio
async def test_workflow_branching(workflow_env):
"""Test different execution paths"""
@workflow.defn
class ConditionalWorkflow:
@workflow.run
async def run(self, condition: bool) -> str:
if condition:
return "path-a"
return "path-b"
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[ConditionalWorkflow],
):
# Test true path
result_a = await workflow_env.client.execute_workflow(
ConditionalWorkflow.run,
True,
id="cond-wf-true",
task_queue="test",
)
assert result_a == "path-a"
# Test false path
result_b = await workflow_env.client.execute_workflow(
ConditionalWorkflow.run,
False,
id="cond-wf-false",
task_queue="test",
)
assert result_b == "path-b"
ActivityEnvironment Testing
Purpose: Test activities in isolation without workflows or Temporal server
Basic Activity Test
from temporalio.testing import ActivityEnvironment
async def test_activity_basic():
"""Test activity without workflow context"""
@activity.defn
async def process_data(input: str) -> str:
return input.upper()
env = ActivityEnvironment()
result = await env.run(process_data, "test")
assert result == "TEST"
Testing Activity Context
Heartbeat Testing:
async def test_activity_heartbeat():
"""Verify heartbeat calls"""
@activity.defn
async def long_running_activity(total_items: int) -> int:
for i in range(total_items):
activity.heartbeat(i) # Report progress
await asyncio.sleep(0.1)
return total_items
env = ActivityEnvironment()
result = await env.run(long_running_activity, 10)
assert result == 10
Cancellation Testing:
async def test_activity_cancellation():
"""Test activity cancellation handling"""
@activity.defn
async def cancellable_activity() -> str:
try:
while True:
if activity.is_cancelled():
return "cancelled"
await asyncio.sleep(0.1)
except asyncio.CancelledError:
return "cancelled"
env = ActivityEnvironment(cancellation_reason="test-cancel")
result = await env.run(cancellable_activity)
assert result == "cancelled"
Testing Error Handling
Exception Propagation:
async def test_activity_error():
"""Test activity error handling"""
@activity.defn
async def failing_activity(should_fail: bool) -> str:
if should_fail:
raise ApplicationError("Validation failed", non_retryable=True)
return "success"
env = ActivityEnvironment()
# Test success path
result = await env.run(failing_activity, False)
assert result == "success"
# Test error path
with pytest.raises(ApplicationError) as exc_info:
await env.run(failing_activity, True)
assert "Validation failed" in str(exc_info.value)
Pytest Integration Patterns
Shared Fixtures
# conftest.py
import pytest
from temporalio.testing import WorkflowEnvironment
@pytest.fixture(scope="module")
async def workflow_env():
"""Module-scoped environment (reused across tests)"""
env = await WorkflowEnvironment.start_time_skipping()
yield env
await env.shutdown()
@pytest.fixture
def activity_env():
"""Function-scoped environment (fresh per test)"""
return ActivityEnvironment()
Parameterized Tests
@pytest.mark.parametrize("input,expected", [
("test", "TEST"),
("hello", "HELLO"),
("123", "123"),
])
async def test_activity_parameterized(activity_env, input, expected):
"""Test multiple input scenarios"""
result = await activity_env.run(process_data, input)
assert result == expected
Best Practices
- Fast Execution: Use time-skipping for all workflow tests
- Isolation: Test workflows and activities separately
- Shared Fixtures: Reuse WorkflowEnvironment across related tests
- Coverage Target: ≥80% for workflow logic
- Mock Activities: Use ActivityEnvironment for activity-specific logic
- Determinism: Ensure test results are consistent across runs
- Error Cases: Test both success and failure scenarios
Common Patterns
Testing Retry Logic:
@pytest.mark.asyncio
async def test_workflow_with_retries(workflow_env):
"""Test activity retry behavior"""
call_count = 0
@activity.defn
async def flaky_activity() -> str:
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("Transient error")
return "success"
@workflow.defn
class RetryWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
flaky_activity,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=1),
maximum_attempts=5,
),
)
async with Worker(
workflow_env.client,
task_queue="test",
workflows=[RetryWorkflow],
activities=[flaky_activity],
):
result = await workflow_env.client.execute_workflow(
RetryWorkflow.run,
id="retry-wf",
task_queue="test",
)
assert result == "success"
assert call_count == 3 # Verify retry attempts
Additional Resources
- Python SDK Testing: docs.temporal.io/develop/python/testing-suite
- pytest Documentation: docs.pytest.org
- Temporal Samples: github.com/temporalio/samples-python