Compare commits

...

2 Commits

Author SHA1 Message Date
M. A.
cbb60494b1 Add Comprehensive Python Development Skills (#419)
* Add extra python skills covering code style, design patterns, resilience, resource management, testing patterns, and type safety ...etc

* fix: correct code examples in Python skills

- Clarify Python version requirements for type statement (3.10+ vs 3.12+)
- Add missing ValidationError import in configuration example
- Add missing httpx import and url parameter in async example

---------

Co-authored-by: Seth Hobson <wshobson@gmail.com>
2026-01-30 11:52:14 -05:00
Daniel
f9e9598241 Revise event sourcing architect metadata and description (#417)
Add header with the event sourcing architect's description and name format.
2026-01-30 11:34:59 -05:00
16 changed files with 4317 additions and 20 deletions

View File

@@ -6,7 +6,7 @@
"url": "https://github.com/wshobson"
},
"metadata": {
"description": "Production-ready workflow orchestration with 72 focused plugins, 108 specialized agents, and 129 skills - optimized for granular installation and minimal token usage",
"description": "Production-ready workflow orchestration with 72 focused plugins, 108 specialized agents, and 140 skills - optimized for granular installation and minimal token usage",
"version": "1.3.7"
},
"plugins": [
@@ -1611,7 +1611,18 @@
"./skills/python-testing-patterns",
"./skills/python-packaging",
"./skills/python-performance-optimization",
"./skills/uv-package-manager"
"./skills/uv-package-manager",
"./skills/python-type-safety",
"./skills/python-code-style",
"./skills/python-observability",
"./skills/python-project-structure",
"./skills/python-design-patterns",
"./skills/python-error-handling",
"./skills/python-configuration",
"./skills/python-resilience",
"./skills/python-resource-management",
"./skills/python-background-jobs",
"./skills/python-anti-patterns"
]
},
{

View File

@@ -37,7 +37,7 @@ Each plugin is completely isolated with its own agents, commands, and skills:
- **Clear boundaries** - Each plugin has a single, focused purpose
- **Progressive disclosure** - Skills load knowledge only when activated
**Example**: Installing `python-development` loads 3 Python agents, 1 scaffolding tool, and makes 5 skills available (~300 tokens), not the entire marketplace.
**Example**: Installing `python-development` loads 3 Python agents, 1 scaffolding tool, and makes 16 skills available (~1000 tokens), not the entire marketplace.
## Quick Start
@@ -63,7 +63,7 @@ Install the plugins you need:
```bash
# Essential development plugins
/plugin install python-development # Python with 5 specialized skills
/plugin install python-development # Python with 16 specialized skills
/plugin install javascript-typescript # JS/TS with 4 specialized skills
/plugin install backend-development # Backend APIs with 3 architecture skills
@@ -130,7 +130,7 @@ rm -rf ~/.claude/plugins/cache/claude-code-workflows && rm ~/.claude/plugins/ins
## What's New
### Agent Skills (129 skills across 20 plugins)
### Agent Skills (140 skills across 20 plugins)
Specialized knowledge packages following Anthropic's progressive disclosure architecture:

View File

@@ -1,6 +1,10 @@
# Event Sourcing Architect
---
name: event-sourcing-architect
description: Expert in event sourcing, CQRS, and event-driven architecture patterns. Masters event store design, projection building, saga orchestration, and eventual consistency patterns. Use PROACTIVELY for event-sourced systems, audit trail requirements, or complex domain modeling with temporal queries.
model: inherit
---
Expert in event sourcing, CQRS, and event-driven architecture patterns. Masters event store design, projection building, saga orchestration, and eventual consistency patterns. Use PROACTIVELY for event-sourced systems, audit trail requirements, or complex domain modeling with temporal queries.
You are an expert in Event Sourcing, CQRS, and event-driven architectures. Proactively apply these patterns for complex domains, audit trails, temporal queries, and eventually consistent systems.
## Capabilities

View File

@@ -18,6 +18,20 @@ Comprehensive guidance for implementing asynchronous Python applications using a
- Optimizing I/O-bound workloads
- Implementing async background tasks and queues
## Sync vs Async Decision Guide
Before adopting async, consider whether it's the right choice for your use case.
| Use Case | Recommended Approach |
|----------|---------------------|
| Many concurrent network/DB calls | `asyncio` |
| CPU-bound computation | `multiprocessing` or thread pool |
| Mixed I/O + CPU | Offload CPU work with `asyncio.to_thread()` |
| Simple scripts, few connections | Sync (simpler, easier to debug) |
| Web APIs with high concurrency | Async frameworks (FastAPI, aiohttp) |
**Key Rule:** Stay fully sync or fully async within a call path. Mixing creates hidden blocking and complexity.
## Core Concepts
### 1. Event Loop
@@ -583,6 +597,46 @@ async def process_item(item: str):
### 3. Avoid Blocking Operations
Never block the event loop with synchronous operations. A single blocking call stalls all concurrent tasks.
```python
# BAD - blocks the entire event loop
async def fetch_data_bad():
import time
import requests
time.sleep(1) # Blocks!
response = requests.get(url) # Also blocks!
# GOOD - use async-native libraries (e.g., httpx for async HTTP)
import httpx
async def fetch_data_good(url: str):
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
```
**Wrapping Blocking Code with `asyncio.to_thread()` (Python 3.9+):**
When you must use synchronous libraries, offload to a thread pool:
```python
import asyncio
from pathlib import Path
async def read_file_async(path: str) -> str:
"""Read file without blocking event loop."""
# asyncio.to_thread() runs sync code in a thread pool
return await asyncio.to_thread(Path(path).read_text)
async def call_sync_library(data: dict) -> dict:
"""Wrap a synchronous library call."""
# Useful for sync database drivers, file I/O, CPU work
return await asyncio.to_thread(sync_library.process, data)
```
**Lower-level approach with `run_in_executor()`:**
```python
import asyncio
import concurrent.futures
@@ -596,7 +650,7 @@ def blocking_operation(data: Any) -> Any:
async def run_in_executor(data: Any) -> Any:
"""Run blocking operation in thread pool."""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
@@ -692,11 +746,12 @@ async def test_with_timeout():
1. **Use asyncio.run()** for entry point (Python 3.7+)
2. **Always await coroutines** to execute them
3. **Use gather() for concurrent execution** of multiple tasks
3. **Limit concurrency with semaphores** - unbounded `gather()` can exhaust resources
4. **Implement proper error handling** with try/except
5. **Use timeouts** to prevent hanging operations
6. **Pool connections** for better performance
7. **Avoid blocking operations** in async code
8. **Use semaphores** for rate limiting
9. **Handle task cancellation** properly
7. **Never block the event loop** - use `asyncio.to_thread()` for sync code
8. **Use semaphores** for rate limiting external API calls
9. **Handle task cancellation** properly - always re-raise `CancelledError`
10. **Test async code** with pytest-asyncio
11. **Stay consistent** - fully sync or fully async, avoid mixing

View File

@@ -0,0 +1,349 @@
---
name: python-anti-patterns
description: Common Python anti-patterns to avoid. Use as a checklist when reviewing code, before finalizing implementations, or when debugging issues that might stem from known bad practices.
---
# Python Anti-Patterns Checklist
A reference checklist of common mistakes and anti-patterns in Python code. Review this before finalizing implementations to catch issues early.
## When to Use This Skill
- Reviewing code before merge
- Debugging mysterious issues
- Teaching or learning Python best practices
- Establishing team coding standards
- Refactoring legacy code
**Note:** This skill focuses on what to avoid. For guidance on positive patterns and architecture, see the `python-design-patterns` skill.
## Infrastructure Anti-Patterns
### Scattered Timeout/Retry Logic
```python
# BAD: Timeout logic duplicated everywhere
def fetch_user(user_id):
try:
return requests.get(url, timeout=30)
except Timeout:
logger.warning("Timeout fetching user")
return None
def fetch_orders(user_id):
try:
return requests.get(url, timeout=30)
except Timeout:
logger.warning("Timeout fetching orders")
return None
```
**Fix:** Centralize in decorators or client wrappers.
```python
# GOOD: Centralized retry logic
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
def http_get(url: str) -> Response:
return requests.get(url, timeout=30)
```
### Double Retry
```python
# BAD: Retrying at multiple layers
@retry(max_attempts=3) # Application retry
def call_service():
return client.request() # Client also has retry configured!
```
**Fix:** Retry at one layer only. Know your infrastructure's retry behavior.
### Hard-Coded Configuration
```python
# BAD: Secrets and config in code
DB_HOST = "prod-db.example.com"
API_KEY = "sk-12345"
def connect():
return psycopg.connect(f"host={DB_HOST}...")
```
**Fix:** Use environment variables with typed settings.
```python
# GOOD
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
db_host: str = Field(alias="DB_HOST")
api_key: str = Field(alias="API_KEY")
settings = Settings()
```
## Architecture Anti-Patterns
### Exposed Internal Types
```python
# BAD: Leaking ORM model to API
@app.get("/users/{id}")
def get_user(id: str) -> UserModel: # SQLAlchemy model
return db.query(UserModel).get(id)
```
**Fix:** Use DTOs/response models.
```python
# GOOD
@app.get("/users/{id}")
def get_user(id: str) -> UserResponse:
user = db.query(UserModel).get(id)
return UserResponse.from_orm(user)
```
### Mixed I/O and Business Logic
```python
# BAD: SQL embedded in business logic
def calculate_discount(user_id: str) -> float:
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
orders = db.query("SELECT * FROM orders WHERE user_id = ?", user_id)
# Business logic mixed with data access
if len(orders) > 10:
return 0.15
return 0.0
```
**Fix:** Repository pattern. Keep business logic pure.
```python
# GOOD
def calculate_discount(user: User, orders: list[Order]) -> float:
# Pure business logic, easily testable
if len(orders) > 10:
return 0.15
return 0.0
```
## Error Handling Anti-Patterns
### Bare Exception Handling
```python
# BAD: Swallowing all exceptions
try:
process()
except Exception:
pass # Silent failure - bugs hidden forever
```
**Fix:** Catch specific exceptions. Log or handle appropriately.
```python
# GOOD
try:
process()
except ConnectionError as e:
logger.warning("Connection failed, will retry", error=str(e))
raise
except ValueError as e:
logger.error("Invalid input", error=str(e))
raise BadRequestError(str(e))
```
### Ignored Partial Failures
```python
# BAD: Stops on first error
def process_batch(items):
results = []
for item in items:
result = process(item) # Raises on error - batch aborted
results.append(result)
return results
```
**Fix:** Capture both successes and failures.
```python
# GOOD
def process_batch(items) -> BatchResult:
succeeded = {}
failed = {}
for idx, item in enumerate(items):
try:
succeeded[idx] = process(item)
except Exception as e:
failed[idx] = e
return BatchResult(succeeded, failed)
```
### Missing Input Validation
```python
# BAD: No validation
def create_user(data: dict):
return User(**data) # Crashes deep in code on bad input
```
**Fix:** Validate early at API boundaries.
```python
# GOOD
def create_user(data: dict) -> User:
validated = CreateUserInput.model_validate(data)
return User.from_input(validated)
```
## Resource Anti-Patterns
### Unclosed Resources
```python
# BAD: File never closed
def read_file(path):
f = open(path)
return f.read() # What if this raises?
```
**Fix:** Use context managers.
```python
# GOOD
def read_file(path):
with open(path) as f:
return f.read()
```
### Blocking in Async
```python
# BAD: Blocks the entire event loop
async def fetch_data():
time.sleep(1) # Blocks everything!
response = requests.get(url) # Also blocks!
```
**Fix:** Use async-native libraries.
```python
# GOOD
async def fetch_data():
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
```
## Type Safety Anti-Patterns
### Missing Type Hints
```python
# BAD: No types
def process(data):
return data["value"] * 2
```
**Fix:** Annotate all public functions.
```python
# GOOD
def process(data: dict[str, int]) -> int:
return data["value"] * 2
```
### Untyped Collections
```python
# BAD: Generic list without type parameter
def get_users() -> list:
...
```
**Fix:** Use type parameters.
```python
# GOOD
def get_users() -> list[User]:
...
```
## Testing Anti-Patterns
### Only Testing Happy Paths
```python
# BAD: Only tests success case
def test_create_user():
user = service.create_user(valid_data)
assert user.id is not None
```
**Fix:** Test error conditions and edge cases.
```python
# GOOD
def test_create_user_success():
user = service.create_user(valid_data)
assert user.id is not None
def test_create_user_invalid_email():
with pytest.raises(ValueError, match="Invalid email"):
service.create_user(invalid_email_data)
def test_create_user_duplicate_email():
service.create_user(valid_data)
with pytest.raises(ConflictError):
service.create_user(valid_data)
```
### Over-Mocking
```python
# BAD: Mocking everything
def test_user_service():
mock_repo = Mock()
mock_cache = Mock()
mock_logger = Mock()
mock_metrics = Mock()
# Test doesn't verify real behavior
```
**Fix:** Use integration tests for critical paths. Mock only external services.
## Quick Review Checklist
Before finalizing code, verify:
- [ ] No scattered timeout/retry logic (centralized)
- [ ] No double retry (app + infrastructure)
- [ ] No hard-coded configuration or secrets
- [ ] No exposed internal types (ORM models, protobufs)
- [ ] No mixed I/O and business logic
- [ ] No bare `except Exception: pass`
- [ ] No ignored partial failures in batches
- [ ] No missing input validation
- [ ] No unclosed resources (using context managers)
- [ ] No blocking calls in async code
- [ ] All public functions have type hints
- [ ] Collections have type parameters
- [ ] Error paths are tested
- [ ] Edge cases are covered
## Common Fixes Summary
| Anti-Pattern | Fix |
|-------------|-----|
| Scattered retry logic | Centralized decorators |
| Hard-coded config | Environment variables + pydantic-settings |
| Exposed ORM models | DTO/response schemas |
| Mixed I/O + logic | Repository pattern |
| Bare except | Catch specific exceptions |
| Batch stops on error | Return BatchResult with successes/failures |
| No validation | Validate at boundaries with Pydantic |
| Unclosed resources | Context managers |
| Blocking in async | Async-native libraries |
| Missing types | Type annotations on all public APIs |
| Only happy path tests | Test errors and edge cases |

View File

@@ -0,0 +1,364 @@
---
name: python-background-jobs
description: Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.
---
# Python Background Jobs & Task Queues
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
## When to Use This Skill
- Processing tasks that take longer than a few seconds
- Sending emails, notifications, or webhooks
- Generating reports or exporting data
- Processing uploads or media transformations
- Integrating with unreliable external services
- Building event-driven architectures
## Core Concepts
### 1. Task Queue Pattern
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
### 2. Idempotency
Tasks may be retried on failure. Design for safe re-execution.
### 3. Job State Machine
Jobs transition through states: pending → running → succeeded/failed.
### 4. At-Least-Once Delivery
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
## Quick Start
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
```python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)
# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
```
## Fundamental Patterns
### Pattern 1: Return Job ID Immediately
For operations exceeding a few seconds, return a job ID and process asynchronously.
```python
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
```
### Pattern 2: Celery Task Configuration
Configure Celery tasks with proper retry and timeout settings.
```python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
# Global configuration
app.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
```
### Pattern 3: Make Tasks Idempotent
Workers may retry on crash or timeout. Design for safe re-execution.
```python
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
```
**Idempotency Strategies:**
1. **Check-before-write**: Verify state before action
2. **Idempotency keys**: Use unique tokens with external services
3. **Upsert patterns**: `INSERT ... ON CONFLICT UPDATE`
4. **Deduplication window**: Track processed IDs for N hours
### Pattern 4: Job State Management
Persist job state transitions for visibility and debugging.
```python
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)
```
## Advanced Patterns
### Pattern 5: Dead Letter Queue
Handle permanently failed tasks for manual inspection.
```python
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""Process webhook with DLQ for failures."""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# Move to dead letter queue for manual inspection
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"Webhook moved to DLQ after max retries",
webhook_id=webhook_id,
error=str(e),
)
return
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
```
### Pattern 6: Status Polling Endpoint
Provide an endpoint for clients to check job status.
```python
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""Get current status of a background job."""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"Job {job_id} not found")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# Helpful for clients
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)
```
### Pattern 7: Task Chaining and Workflows
Compose complex workflows from simple tasks.
```python
from celery import chain, group, chord
# Simple chain: A → B → C
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
# Parallel execution: A, B, C all at once
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
# Chord: Run tasks in parallel, then a callback
# Process all items, then send completion notification
workflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
```
### Pattern 8: Alternative Task Queues
Choose the right tool for your needs.
**RQ (Redis Queue)**: Simple, Redis-based
```python
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")
```
**Dramatiq**: Modern Celery alternative
```python
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)
```
**Cloud-native options:**
- AWS SQS + Lambda
- Google Cloud Tasks
- Azure Functions
## Best Practices Summary
1. **Return immediately** - Don't block requests for long operations
2. **Persist job state** - Enable status polling and debugging
3. **Make tasks idempotent** - Safe to retry on any failure
4. **Use idempotency keys** - For external service calls
5. **Set timeouts** - Both soft and hard limits
6. **Implement DLQ** - Capture permanently failed tasks
7. **Log transitions** - Track job state changes
8. **Retry appropriately** - Exponential backoff for transient errors
9. **Don't retry permanent failures** - Validation errors, invalid credentials
10. **Monitor queue depth** - Alert on backlog growth

View File

@@ -0,0 +1,360 @@
---
name: python-code-style
description: Python code style, linting, formatting, naming conventions, and documentation standards. Use when writing new code, reviewing style, configuring linters, writing docstrings, or establishing project standards.
---
# Python Code Style & Documentation
Consistent code style and clear documentation make codebases maintainable and collaborative. This skill covers modern Python tooling, naming conventions, and documentation standards.
## When to Use This Skill
- Setting up linting and formatting for a new project
- Writing or reviewing docstrings
- Establishing team coding standards
- Configuring ruff, mypy, or pyright
- Reviewing code for style consistency
- Creating project documentation
## Core Concepts
### 1. Automated Formatting
Let tools handle formatting debates. Configure once, enforce automatically.
### 2. Consistent Naming
Follow PEP 8 conventions with meaningful, descriptive names.
### 3. Documentation as Code
Docstrings should be maintained alongside the code they describe.
### 4. Type Annotations
Modern Python code should include type hints for all public APIs.
## Quick Start
```bash
# Install modern tooling
pip install ruff mypy
# Configure in pyproject.toml
[tool.ruff]
line-length = 120
target-version = "py312" # Adjust based on your project's minimum Python version
[tool.mypy]
strict = true
```
## Fundamental Patterns
### Pattern 1: Modern Python Tooling
Use `ruff` as an all-in-one linter and formatter. It replaces flake8, isort, and black with a single fast tool.
```toml
# pyproject.toml
[tool.ruff]
line-length = 120
target-version = "py312" # Adjust based on your project's minimum Python version
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"SIM", # flake8-simplify
]
ignore = ["E501"] # Line length handled by formatter
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
```
Run with:
```bash
ruff check --fix . # Lint and auto-fix
ruff format . # Format code
```
### Pattern 2: Type Checking Configuration
Configure strict type checking for production code.
```toml
# pyproject.toml
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_ignores = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false
```
Alternative: Use `pyright` for faster checking.
```toml
[tool.pyright]
pythonVersion = "3.12"
typeCheckingMode = "strict"
```
### Pattern 3: Naming Conventions
Follow PEP 8 with emphasis on clarity over brevity.
**Files and Modules:**
```python
# Good: Descriptive snake_case
user_repository.py
order_processing.py
http_client.py
# Avoid: Abbreviations
usr_repo.py
ord_proc.py
http_cli.py
```
**Classes and Functions:**
```python
# Classes: PascalCase
class UserRepository:
pass
class HTTPClientFactory: # Acronyms stay uppercase
pass
# Functions and variables: snake_case
def get_user_by_email(email: str) -> User | None:
retry_count = 3
max_connections = 100
```
**Constants:**
```python
# Module-level constants: SCREAMING_SNAKE_CASE
MAX_RETRY_ATTEMPTS = 3
DEFAULT_TIMEOUT_SECONDS = 30
API_BASE_URL = "https://api.example.com"
```
### Pattern 4: Import Organization
Group imports in a consistent order: standard library, third-party, local.
```python
# Standard library
import os
from collections.abc import Callable
from typing import Any
# Third-party packages
import httpx
from pydantic import BaseModel
from sqlalchemy import Column
# Local imports
from myproject.models import User
from myproject.services import UserService
```
Use absolute imports exclusively:
```python
# Preferred
from myproject.utils import retry_decorator
# Avoid relative imports
from ..utils import retry_decorator
```
## Advanced Patterns
### Pattern 5: Google-Style Docstrings
Write docstrings for all public classes, methods, and functions.
**Simple Function:**
```python
def get_user(user_id: str) -> User:
"""Retrieve a user by their unique identifier."""
...
```
**Complex Function:**
```python
def process_batch(
items: list[Item],
max_workers: int = 4,
on_progress: Callable[[int, int], None] | None = None,
) -> BatchResult:
"""Process items concurrently using a worker pool.
Processes each item in the batch using the configured number of
workers. Progress can be monitored via the optional callback.
Args:
items: The items to process. Must not be empty.
max_workers: Maximum concurrent workers. Defaults to 4.
on_progress: Optional callback receiving (completed, total) counts.
Returns:
BatchResult containing succeeded items and any failures with
their associated exceptions.
Raises:
ValueError: If items is empty.
ProcessingError: If the batch cannot be processed.
Example:
>>> result = process_batch(items, max_workers=8)
>>> print(f"Processed {len(result.succeeded)} items")
"""
...
```
**Class Docstring:**
```python
class UserService:
"""Service for managing user operations.
Provides methods for creating, retrieving, updating, and
deleting users with proper validation and error handling.
Attributes:
repository: The data access layer for user persistence.
logger: Logger instance for operation tracking.
Example:
>>> service = UserService(repository, logger)
>>> user = service.create_user(CreateUserInput(...))
"""
def __init__(self, repository: UserRepository, logger: Logger) -> None:
"""Initialize the user service.
Args:
repository: Data access layer for users.
logger: Logger for tracking operations.
"""
self.repository = repository
self.logger = logger
```
### Pattern 6: Line Length and Formatting
Set line length to 120 characters for modern displays while maintaining readability.
```python
# Good: Readable line breaks
def create_user(
email: str,
name: str,
role: UserRole = UserRole.MEMBER,
notify: bool = True,
) -> User:
...
# Good: Chain method calls clearly
result = (
db.query(User)
.filter(User.active == True)
.order_by(User.created_at.desc())
.limit(10)
.all()
)
# Good: Format long strings
error_message = (
f"Failed to process user {user_id}: "
f"received status {response.status_code} "
f"with body {response.text[:100]}"
)
```
### Pattern 7: Project Documentation
**README Structure:**
```markdown
# Project Name
Brief description of what the project does.
## Installation
\`\`\`bash
pip install myproject
\`\`\`
## Quick Start
\`\`\`python
from myproject import Client
client = Client(api_key="...")
result = client.process(data)
\`\`\`
## Configuration
Document environment variables and configuration options.
## Development
\`\`\`bash
pip install -e ".[dev]"
pytest
\`\`\`
```
**CHANGELOG Format (Keep a Changelog):**
```markdown
# Changelog
## [Unreleased]
### Added
- New feature X
### Changed
- Modified behavior of Y
### Fixed
- Bug in Z
```
## Best Practices Summary
1. **Use ruff** - Single tool for linting and formatting
2. **Enable strict mypy** - Catch type errors before runtime
3. **120 character lines** - Modern standard for readability
4. **Descriptive names** - Clarity over brevity
5. **Absolute imports** - More maintainable than relative
6. **Google-style docstrings** - Consistent, readable documentation
7. **Document public APIs** - Every public function needs a docstring
8. **Keep docs updated** - Treat documentation as code
9. **Automate in CI** - Run linters on every commit
10. **Target Python 3.10+** - For new projects, Python 3.12+ is recommended for modern language features

View File

@@ -0,0 +1,368 @@
---
name: python-configuration
description: Python configuration management via environment variables and typed settings. Use when externalizing config, setting up pydantic-settings, managing secrets, or implementing environment-specific behavior.
---
# Python Configuration Management
Externalize configuration from code using environment variables and typed settings. Well-managed configuration enables the same code to run in any environment without modification.
## When to Use This Skill
- Setting up a new project's configuration system
- Migrating from hardcoded values to environment variables
- Implementing pydantic-settings for typed configuration
- Managing secrets and sensitive values
- Creating environment-specific settings (dev/staging/prod)
- Validating configuration at application startup
## Core Concepts
### 1. Externalized Configuration
All environment-specific values (URLs, secrets, feature flags) come from environment variables, not code.
### 2. Typed Settings
Parse and validate configuration into typed objects at startup, not scattered throughout code.
### 3. Fail Fast
Validate all required configuration at application boot. Missing config should crash immediately with a clear message.
### 4. Sensible Defaults
Provide reasonable defaults for local development while requiring explicit values for sensitive settings.
## Quick Start
```python
from pydantic_settings import BaseSettings
from pydantic import Field
class Settings(BaseSettings):
database_url: str = Field(alias="DATABASE_URL")
api_key: str = Field(alias="API_KEY")
debug: bool = Field(default=False, alias="DEBUG")
settings = Settings() # Loads from environment
```
## Fundamental Patterns
### Pattern 1: Typed Settings with Pydantic
Create a central settings class that loads and validates all configuration.
```python
from pydantic_settings import BaseSettings
from pydantic import Field, PostgresDsn, ValidationError
import sys
class Settings(BaseSettings):
"""Application configuration loaded from environment variables."""
# Database
db_host: str = Field(alias="DB_HOST")
db_port: int = Field(default=5432, alias="DB_PORT")
db_name: str = Field(alias="DB_NAME")
db_user: str = Field(alias="DB_USER")
db_password: str = Field(alias="DB_PASSWORD")
# Redis
redis_url: str = Field(default="redis://localhost:6379", alias="REDIS_URL")
# API Keys
api_secret_key: str = Field(alias="API_SECRET_KEY")
# Feature flags
enable_new_feature: bool = Field(default=False, alias="ENABLE_NEW_FEATURE")
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
}
# Create singleton instance at module load
try:
settings = Settings()
except ValidationError as e:
print(f"Configuration error:\n{e}")
sys.exit(1)
```
Import `settings` throughout your application:
```python
from myapp.config import settings
def get_database_connection():
return connect(
host=settings.db_host,
port=settings.db_port,
database=settings.db_name,
)
```
### Pattern 2: Fail Fast on Missing Configuration
Required settings should crash the application immediately with a clear error.
```python
from pydantic_settings import BaseSettings
from pydantic import Field, ValidationError
import sys
class Settings(BaseSettings):
# Required - no default means it must be set
api_key: str = Field(alias="API_KEY")
database_url: str = Field(alias="DATABASE_URL")
# Optional with defaults
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
try:
settings = Settings()
except ValidationError as e:
print("=" * 60)
print("CONFIGURATION ERROR")
print("=" * 60)
for error in e.errors():
field = error["loc"][0]
print(f" - {field}: {error['msg']}")
print("\nPlease set the required environment variables.")
sys.exit(1)
```
A clear error at startup is better than a cryptic `None` failure mid-request.
### Pattern 3: Local Development Defaults
Provide sensible defaults for local development while requiring explicit values for secrets.
```python
class Settings(BaseSettings):
# Has local default, but prod will override
db_host: str = Field(default="localhost", alias="DB_HOST")
db_port: int = Field(default=5432, alias="DB_PORT")
# Always required - no default for secrets
db_password: str = Field(alias="DB_PASSWORD")
api_secret_key: str = Field(alias="API_SECRET_KEY")
# Development convenience
debug: bool = Field(default=False, alias="DEBUG")
model_config = {"env_file": ".env"}
```
Create a `.env` file for local development (never commit this):
```bash
# .env (add to .gitignore)
DB_PASSWORD=local_dev_password
API_SECRET_KEY=dev-secret-key
DEBUG=true
```
### Pattern 4: Namespaced Environment Variables
Prefix related variables for clarity and easy debugging.
```bash
# Database configuration
DB_HOST=localhost
DB_PORT=5432
DB_NAME=myapp
DB_USER=admin
DB_PASSWORD=secret
# Redis configuration
REDIS_URL=redis://localhost:6379
REDIS_MAX_CONNECTIONS=10
# Authentication
AUTH_SECRET_KEY=your-secret-key
AUTH_TOKEN_EXPIRY_SECONDS=3600
AUTH_ALGORITHM=HS256
# Feature flags
FEATURE_NEW_CHECKOUT=true
FEATURE_BETA_UI=false
```
Makes `env | grep DB_` useful for debugging.
## Advanced Patterns
### Pattern 5: Type Coercion
Pydantic handles common conversions automatically.
```python
from pydantic_settings import BaseSettings
from pydantic import Field, field_validator
class Settings(BaseSettings):
# Automatically converts "true", "1", "yes" to True
debug: bool = False
# Automatically converts string to int
max_connections: int = 100
# Parse comma-separated string to list
allowed_hosts: list[str] = Field(default_factory=list)
@field_validator("allowed_hosts", mode="before")
@classmethod
def parse_allowed_hosts(cls, v: str | list[str]) -> list[str]:
if isinstance(v, str):
return [host.strip() for host in v.split(",") if host.strip()]
return v
```
Usage:
```bash
ALLOWED_HOSTS=example.com,api.example.com,localhost
MAX_CONNECTIONS=50
DEBUG=true
```
### Pattern 6: Environment-Specific Configuration
Use an environment enum to switch behavior.
```python
from enum import Enum
from pydantic_settings import BaseSettings
from pydantic import Field, computed_field
class Environment(str, Enum):
LOCAL = "local"
STAGING = "staging"
PRODUCTION = "production"
class Settings(BaseSettings):
environment: Environment = Field(
default=Environment.LOCAL,
alias="ENVIRONMENT",
)
# Settings that vary by environment
log_level: str = Field(default="DEBUG", alias="LOG_LEVEL")
@computed_field
@property
def is_production(self) -> bool:
return self.environment == Environment.PRODUCTION
@computed_field
@property
def is_local(self) -> bool:
return self.environment == Environment.LOCAL
# Usage
if settings.is_production:
configure_production_logging()
else:
configure_debug_logging()
```
### Pattern 7: Nested Configuration Groups
Organize related settings into nested models.
```python
from pydantic import BaseModel
from pydantic_settings import BaseSettings
class DatabaseSettings(BaseModel):
host: str = "localhost"
port: int = 5432
name: str
user: str
password: str
class RedisSettings(BaseModel):
url: str = "redis://localhost:6379"
max_connections: int = 10
class Settings(BaseSettings):
database: DatabaseSettings
redis: RedisSettings
debug: bool = False
model_config = {
"env_nested_delimiter": "__",
"env_file": ".env",
}
```
Environment variables use double underscore for nesting:
```bash
DATABASE__HOST=db.example.com
DATABASE__PORT=5432
DATABASE__NAME=myapp
DATABASE__USER=admin
DATABASE__PASSWORD=secret
REDIS__URL=redis://redis.example.com:6379
```
### Pattern 8: Secrets from Files
For container environments, read secrets from mounted files.
```python
from pydantic_settings import BaseSettings
from pydantic import Field
from pathlib import Path
class Settings(BaseSettings):
# Read from environment variable or file
db_password: str = Field(alias="DB_PASSWORD")
model_config = {
"secrets_dir": "/run/secrets", # Docker secrets location
}
```
Pydantic will look for `/run/secrets/db_password` if the env var isn't set.
### Pattern 9: Configuration Validation
Add custom validation for complex requirements.
```python
from pydantic_settings import BaseSettings
from pydantic import Field, model_validator
class Settings(BaseSettings):
db_host: str = Field(alias="DB_HOST")
db_port: int = Field(alias="DB_PORT")
read_replica_host: str | None = Field(default=None, alias="READ_REPLICA_HOST")
read_replica_port: int = Field(default=5432, alias="READ_REPLICA_PORT")
@model_validator(mode="after")
def validate_replica_settings(self):
if self.read_replica_host and self.read_replica_port == self.db_port:
if self.read_replica_host == self.db_host:
raise ValueError(
"Read replica cannot be the same as primary database"
)
return self
```
## Best Practices Summary
1. **Never hardcode config** - All environment-specific values from env vars
2. **Use typed settings** - Pydantic-settings with validation
3. **Fail fast** - Crash on missing required config at startup
4. **Provide dev defaults** - Make local development easy
5. **Never commit secrets** - Use `.env` files (gitignored) or secret managers
6. **Namespace variables** - `DB_HOST`, `REDIS_URL` for clarity
7. **Import settings singleton** - Don't call `os.getenv()` throughout code
8. **Document all variables** - README should list required env vars
9. **Validate early** - Check config correctness at boot time
10. **Use secrets_dir** - Support mounted secrets in containers

View File

@@ -0,0 +1,411 @@
---
name: python-design-patterns
description: Python design patterns including KISS, Separation of Concerns, Single Responsibility, and composition over inheritance. Use when making architecture decisions, refactoring code structure, or evaluating when abstractions are appropriate.
---
# Python Design Patterns
Write maintainable Python code using fundamental design principles. These patterns help you build systems that are easy to understand, test, and modify.
## When to Use This Skill
- Designing new components or services
- Refactoring complex or tangled code
- Deciding whether to create an abstraction
- Choosing between inheritance and composition
- Evaluating code complexity and coupling
- Planning modular architectures
## Core Concepts
### 1. KISS (Keep It Simple)
Choose the simplest solution that works. Complexity must be justified by concrete requirements.
### 2. Single Responsibility (SRP)
Each unit should have one reason to change. Separate concerns into focused components.
### 3. Composition Over Inheritance
Build behavior by combining objects, not extending classes.
### 4. Rule of Three
Wait until you have three instances before abstracting. Duplication is often better than premature abstraction.
## Quick Start
```python
# Simple beats clever
# Instead of a factory/registry pattern:
FORMATTERS = {"json": JsonFormatter, "csv": CsvFormatter}
def get_formatter(name: str) -> Formatter:
return FORMATTERS[name]()
```
## Fundamental Patterns
### Pattern 1: KISS - Keep It Simple
Before adding complexity, ask: does a simpler solution work?
```python
# Over-engineered: Factory with registration
class OutputFormatterFactory:
_formatters: dict[str, type[Formatter]] = {}
@classmethod
def register(cls, name: str):
def decorator(formatter_cls):
cls._formatters[name] = formatter_cls
return formatter_cls
return decorator
@classmethod
def create(cls, name: str) -> Formatter:
return cls._formatters[name]()
@OutputFormatterFactory.register("json")
class JsonFormatter(Formatter):
...
# Simple: Just use a dictionary
FORMATTERS = {
"json": JsonFormatter,
"csv": CsvFormatter,
"xml": XmlFormatter,
}
def get_formatter(name: str) -> Formatter:
"""Get formatter by name."""
if name not in FORMATTERS:
raise ValueError(f"Unknown format: {name}")
return FORMATTERS[name]()
```
The factory pattern adds code without adding value here. Save patterns for when they solve real problems.
### Pattern 2: Single Responsibility Principle
Each class or function should have one reason to change.
```python
# BAD: Handler does everything
class UserHandler:
async def create_user(self, request: Request) -> Response:
# HTTP parsing
data = await request.json()
# Validation
if not data.get("email"):
return Response({"error": "email required"}, status=400)
# Database access
user = await db.execute(
"INSERT INTO users (email, name) VALUES ($1, $2) RETURNING *",
data["email"], data["name"]
)
# Response formatting
return Response({"id": user.id, "email": user.email}, status=201)
# GOOD: Separated concerns
class UserService:
"""Business logic only."""
def __init__(self, repo: UserRepository) -> None:
self._repo = repo
async def create_user(self, data: CreateUserInput) -> User:
# Only business rules here
user = User(email=data.email, name=data.name)
return await self._repo.save(user)
class UserHandler:
"""HTTP concerns only."""
def __init__(self, service: UserService) -> None:
self._service = service
async def create_user(self, request: Request) -> Response:
data = CreateUserInput(**(await request.json()))
user = await self._service.create_user(data)
return Response(user.to_dict(), status=201)
```
Now HTTP changes don't affect business logic, and vice versa.
### Pattern 3: Separation of Concerns
Organize code into distinct layers with clear responsibilities.
```
┌─────────────────────────────────────────────────────┐
│ API Layer (handlers) │
│ - Parse requests │
│ - Call services │
│ - Format responses │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Service Layer (business logic) │
│ - Domain rules and validation │
│ - Orchestrate operations │
│ - Pure functions where possible │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Repository Layer (data access) │
│ - SQL queries │
│ - External API calls │
│ - Cache operations │
└─────────────────────────────────────────────────────┘
```
Each layer depends only on layers below it:
```python
# Repository: Data access
class UserRepository:
async def get_by_id(self, user_id: str) -> User | None:
row = await self._db.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return User(**row) if row else None
# Service: Business logic
class UserService:
def __init__(self, repo: UserRepository) -> None:
self._repo = repo
async def get_user(self, user_id: str) -> User:
user = await self._repo.get_by_id(user_id)
if user is None:
raise UserNotFoundError(user_id)
return user
# Handler: HTTP concerns
@app.get("/users/{user_id}")
async def get_user(user_id: str) -> UserResponse:
user = await user_service.get_user(user_id)
return UserResponse.from_user(user)
```
### Pattern 4: Composition Over Inheritance
Build behavior by combining objects rather than inheriting.
```python
# Inheritance: Rigid and hard to test
class EmailNotificationService(NotificationService):
def __init__(self):
super().__init__()
self._smtp = SmtpClient() # Hard to mock
def notify(self, user: User, message: str) -> None:
self._smtp.send(user.email, message)
# Composition: Flexible and testable
class NotificationService:
"""Send notifications via multiple channels."""
def __init__(
self,
email_sender: EmailSender,
sms_sender: SmsSender | None = None,
push_sender: PushSender | None = None,
) -> None:
self._email = email_sender
self._sms = sms_sender
self._push = push_sender
async def notify(
self,
user: User,
message: str,
channels: set[str] | None = None,
) -> None:
channels = channels or {"email"}
if "email" in channels:
await self._email.send(user.email, message)
if "sms" in channels and self._sms and user.phone:
await self._sms.send(user.phone, message)
if "push" in channels and self._push and user.device_token:
await self._push.send(user.device_token, message)
# Easy to test with fakes
service = NotificationService(
email_sender=FakeEmailSender(),
sms_sender=FakeSmsSender(),
)
```
## Advanced Patterns
### Pattern 5: Rule of Three
Wait until you have three instances before abstracting.
```python
# Two similar functions? Don't abstract yet
def process_orders(orders: list[Order]) -> list[Result]:
results = []
for order in orders:
validated = validate_order(order)
result = process_validated_order(validated)
results.append(result)
return results
def process_returns(returns: list[Return]) -> list[Result]:
results = []
for ret in returns:
validated = validate_return(ret)
result = process_validated_return(validated)
results.append(result)
return results
# These look similar, but wait! Are they actually the same?
# Different validation, different processing, different errors...
# Duplication is often better than the wrong abstraction
# Only after a third case, consider if there's a real pattern
# But even then, sometimes explicit is better than abstract
```
### Pattern 6: Function Size Guidelines
Keep functions focused. Extract when a function:
- Exceeds 20-50 lines (varies by complexity)
- Serves multiple distinct purposes
- Has deeply nested logic (3+ levels)
```python
# Too long, multiple concerns mixed
def process_order(order: Order) -> Result:
# 50 lines of validation...
# 30 lines of inventory check...
# 40 lines of payment processing...
# 20 lines of notification...
pass
# Better: Composed from focused functions
def process_order(order: Order) -> Result:
"""Process a customer order through the complete workflow."""
validate_order(order)
reserve_inventory(order)
payment_result = charge_payment(order)
send_confirmation(order, payment_result)
return Result(success=True, order_id=order.id)
```
### Pattern 7: Dependency Injection
Pass dependencies through constructors for testability.
```python
from typing import Protocol
class Logger(Protocol):
def info(self, msg: str, **kwargs) -> None: ...
def error(self, msg: str, **kwargs) -> None: ...
class Cache(Protocol):
async def get(self, key: str) -> str | None: ...
async def set(self, key: str, value: str, ttl: int) -> None: ...
class UserService:
"""Service with injected dependencies."""
def __init__(
self,
repository: UserRepository,
cache: Cache,
logger: Logger,
) -> None:
self._repo = repository
self._cache = cache
self._logger = logger
async def get_user(self, user_id: str) -> User:
# Check cache first
cached = await self._cache.get(f"user:{user_id}")
if cached:
self._logger.info("Cache hit", user_id=user_id)
return User.from_json(cached)
# Fetch from database
user = await self._repo.get_by_id(user_id)
if user:
await self._cache.set(f"user:{user_id}", user.to_json(), ttl=300)
return user
# Production
service = UserService(
repository=PostgresUserRepository(db),
cache=RedisCache(redis),
logger=StructlogLogger(),
)
# Testing
service = UserService(
repository=InMemoryUserRepository(),
cache=FakeCache(),
logger=NullLogger(),
)
```
### Pattern 8: Avoiding Common Anti-Patterns
**Don't expose internal types:**
```python
# BAD: Leaking ORM model to API
@app.get("/users/{id}")
def get_user(id: str) -> UserModel: # SQLAlchemy model
return db.query(UserModel).get(id)
# GOOD: Use response schemas
@app.get("/users/{id}")
def get_user(id: str) -> UserResponse:
user = db.query(UserModel).get(id)
return UserResponse.from_orm(user)
```
**Don't mix I/O with business logic:**
```python
# BAD: SQL embedded in business logic
def calculate_discount(user_id: str) -> float:
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
orders = db.query("SELECT * FROM orders WHERE user_id = ?", user_id)
# Business logic mixed with data access
# GOOD: Repository pattern
def calculate_discount(user: User, order_history: list[Order]) -> float:
# Pure business logic, easily testable
if len(order_history) > 10:
return 0.15
return 0.0
```
## Best Practices Summary
1. **Keep it simple** - Choose the simplest solution that works
2. **Single responsibility** - Each unit has one reason to change
3. **Separate concerns** - Distinct layers with clear purposes
4. **Compose, don't inherit** - Combine objects for flexibility
5. **Rule of three** - Wait before abstracting
6. **Keep functions small** - 20-50 lines (varies by complexity), one purpose
7. **Inject dependencies** - Constructor injection for testability
8. **Delete before abstracting** - Remove dead code, then consider patterns
9. **Test each layer** - Isolated tests for each concern
10. **Explicit over clever** - Readable code beats elegant code

View File

@@ -0,0 +1,359 @@
---
name: python-error-handling
description: Python error handling patterns including input validation, exception hierarchies, and partial failure handling. Use when implementing validation logic, designing exception strategies, handling batch processing failures, or building robust APIs.
---
# Python Error Handling
Build robust Python applications with proper input validation, meaningful exceptions, and graceful failure handling. Good error handling makes debugging easier and systems more reliable.
## When to Use This Skill
- Validating user input and API parameters
- Designing exception hierarchies for applications
- Handling partial failures in batch operations
- Converting external data to domain types
- Building user-friendly error messages
- Implementing fail-fast validation patterns
## Core Concepts
### 1. Fail Fast
Validate inputs early, before expensive operations. Report all validation errors at once when possible.
### 2. Meaningful Exceptions
Use appropriate exception types with context. Messages should explain what failed, why, and how to fix it.
### 3. Partial Failures
In batch operations, don't let one failure abort everything. Track successes and failures separately.
### 4. Preserve Context
Chain exceptions to maintain the full error trail for debugging.
## Quick Start
```python
def fetch_page(url: str, page_size: int) -> Page:
if not url:
raise ValueError("'url' is required")
if not 1 <= page_size <= 100:
raise ValueError(f"'page_size' must be 1-100, got {page_size}")
# Now safe to proceed...
```
## Fundamental Patterns
### Pattern 1: Early Input Validation
Validate all inputs at API boundaries before any processing begins.
```python
def process_order(
order_id: str,
quantity: int,
discount_percent: float,
) -> OrderResult:
"""Process an order with validation."""
# Validate required fields
if not order_id:
raise ValueError("'order_id' is required")
# Validate ranges
if quantity <= 0:
raise ValueError(f"'quantity' must be positive, got {quantity}")
if not 0 <= discount_percent <= 100:
raise ValueError(
f"'discount_percent' must be 0-100, got {discount_percent}"
)
# Validation passed, proceed with processing
return _process_validated_order(order_id, quantity, discount_percent)
```
### Pattern 2: Convert to Domain Types Early
Parse strings and external data into typed domain objects at system boundaries.
```python
from enum import Enum
class OutputFormat(Enum):
JSON = "json"
CSV = "csv"
PARQUET = "parquet"
def parse_output_format(value: str) -> OutputFormat:
"""Parse string to OutputFormat enum.
Args:
value: Format string from user input.
Returns:
Validated OutputFormat enum member.
Raises:
ValueError: If format is not recognized.
"""
try:
return OutputFormat(value.lower())
except ValueError:
valid_formats = [f.value for f in OutputFormat]
raise ValueError(
f"Invalid format '{value}'. "
f"Valid options: {', '.join(valid_formats)}"
)
# Usage at API boundary
def export_data(data: list[dict], format_str: str) -> bytes:
output_format = parse_output_format(format_str) # Fail fast
# Rest of function uses typed OutputFormat
...
```
### Pattern 3: Pydantic for Complex Validation
Use Pydantic models for structured input validation with automatic error messages.
```python
from pydantic import BaseModel, Field, field_validator
class CreateUserInput(BaseModel):
"""Input model for user creation."""
email: str = Field(..., min_length=5, max_length=255)
name: str = Field(..., min_length=1, max_length=100)
age: int = Field(ge=0, le=150)
@field_validator("email")
@classmethod
def validate_email_format(cls, v: str) -> str:
if "@" not in v or "." not in v.split("@")[-1]:
raise ValueError("Invalid email format")
return v.lower()
@field_validator("name")
@classmethod
def normalize_name(cls, v: str) -> str:
return v.strip().title()
# Usage
try:
user_input = CreateUserInput(
email="user@example.com",
name="john doe",
age=25,
)
except ValidationError as e:
# Pydantic provides detailed error information
print(e.errors())
```
### Pattern 4: Map Errors to Standard Exceptions
Use Python's built-in exception types appropriately, adding context as needed.
| Failure Type | Exception | Example |
|--------------|-----------|---------|
| Invalid input | `ValueError` | Bad parameter values |
| Wrong type | `TypeError` | Expected string, got int |
| Missing item | `KeyError` | Dict key not found |
| Operational failure | `RuntimeError` | Service unavailable |
| Timeout | `TimeoutError` | Operation took too long |
| File not found | `FileNotFoundError` | Path doesn't exist |
| Permission denied | `PermissionError` | Access forbidden |
```python
# Good: Specific exception with context
raise ValueError(f"'page_size' must be 1-100, got {page_size}")
# Avoid: Generic exception, no context
raise Exception("Invalid parameter")
```
## Advanced Patterns
### Pattern 5: Custom Exceptions with Context
Create domain-specific exceptions that carry structured information.
```python
class ApiError(Exception):
"""Base exception for API errors."""
def __init__(
self,
message: str,
status_code: int,
response_body: str | None = None,
) -> None:
self.status_code = status_code
self.response_body = response_body
super().__init__(message)
class RateLimitError(ApiError):
"""Raised when rate limit is exceeded."""
def __init__(self, retry_after: int) -> None:
self.retry_after = retry_after
super().__init__(
f"Rate limit exceeded. Retry after {retry_after}s",
status_code=429,
)
# Usage
def handle_response(response: Response) -> dict:
match response.status_code:
case 200:
return response.json()
case 401:
raise ApiError("Invalid credentials", 401)
case 404:
raise ApiError(f"Resource not found: {response.url}", 404)
case 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError(retry_after)
case code if 400 <= code < 500:
raise ApiError(f"Client error: {response.text}", code)
case code if code >= 500:
raise ApiError(f"Server error: {response.text}", code)
```
### Pattern 6: Exception Chaining
Preserve the original exception when re-raising to maintain the debug trail.
```python
import httpx
class ServiceError(Exception):
"""High-level service operation failed."""
pass
def upload_file(path: str) -> str:
"""Upload file and return URL."""
try:
with open(path, "rb") as f:
response = httpx.post("https://upload.example.com", files={"file": f})
response.raise_for_status()
return response.json()["url"]
except FileNotFoundError as e:
raise ServiceError(f"Upload failed: file not found at '{path}'") from e
except httpx.HTTPStatusError as e:
raise ServiceError(
f"Upload failed: server returned {e.response.status_code}"
) from e
except httpx.RequestError as e:
raise ServiceError(f"Upload failed: network error") from e
```
### Pattern 7: Batch Processing with Partial Failures
Never let one bad item abort an entire batch. Track results per item.
```python
from dataclasses import dataclass
@dataclass
class BatchResult[T]:
"""Results from batch processing."""
succeeded: dict[int, T] # index -> result
failed: dict[int, Exception] # index -> error
@property
def success_count(self) -> int:
return len(self.succeeded)
@property
def failure_count(self) -> int:
return len(self.failed)
@property
def all_succeeded(self) -> bool:
return len(self.failed) == 0
def process_batch(items: list[Item]) -> BatchResult[ProcessedItem]:
"""Process items, capturing individual failures.
Args:
items: Items to process.
Returns:
BatchResult with succeeded and failed items by index.
"""
succeeded: dict[int, ProcessedItem] = {}
failed: dict[int, Exception] = {}
for idx, item in enumerate(items):
try:
result = process_single_item(item)
succeeded[idx] = result
except Exception as e:
failed[idx] = e
return BatchResult(succeeded=succeeded, failed=failed)
# Caller handles partial results
result = process_batch(items)
if not result.all_succeeded:
logger.warning(
f"Batch completed with {result.failure_count} failures",
failed_indices=list(result.failed.keys()),
)
```
### Pattern 8: Progress Reporting for Long Operations
Provide visibility into batch progress without coupling business logic to UI.
```python
from collections.abc import Callable
ProgressCallback = Callable[[int, int, str], None] # current, total, status
def process_large_batch(
items: list[Item],
on_progress: ProgressCallback | None = None,
) -> BatchResult:
"""Process batch with optional progress reporting.
Args:
items: Items to process.
on_progress: Optional callback receiving (current, total, status).
"""
total = len(items)
succeeded = {}
failed = {}
for idx, item in enumerate(items):
if on_progress:
on_progress(idx, total, f"Processing {item.id}")
try:
succeeded[idx] = process_single_item(item)
except Exception as e:
failed[idx] = e
if on_progress:
on_progress(total, total, "Complete")
return BatchResult(succeeded=succeeded, failed=failed)
```
## Best Practices Summary
1. **Validate early** - Check inputs before expensive operations
2. **Use specific exceptions** - `ValueError`, `TypeError`, not generic `Exception`
3. **Include context** - Messages should explain what, why, and how to fix
4. **Convert types at boundaries** - Parse strings to enums/domain types early
5. **Chain exceptions** - Use `raise ... from e` to preserve debug info
6. **Handle partial failures** - Don't abort batches on single item errors
7. **Use Pydantic** - For complex input validation with structured errors
8. **Document failure modes** - Docstrings should list possible exceptions
9. **Log with context** - Include IDs, counts, and other debugging info
10. **Test error paths** - Verify exceptions are raised correctly

View File

@@ -0,0 +1,400 @@
---
name: python-observability
description: Python observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems.
---
# Python Observability
Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.
## When to Use This Skill
- Adding structured logging to applications
- Implementing metrics collection with Prometheus
- Setting up distributed tracing across services
- Propagating correlation IDs through request chains
- Debugging production issues
- Building observability dashboards
## Core Concepts
### 1. Structured Logging
Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.
### 2. The Four Golden Signals
Track latency, traffic, errors, and saturation for every service boundary.
### 3. Correlation IDs
Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.
### 4. Bounded Cardinality
Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.
## Quick Start
```python
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)
```
## Fundamental Patterns
### Pattern 1: Structured Logging with Structlog
Configure structlog for JSON output with consistent fields.
```python
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()
```
### Pattern 2: Consistent Log Fields
Every log entry should include standard fields for filtering and correlation.
```python
import structlog
from contextvars import ContextVar
# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
```
### Pattern 3: Semantic Log Levels
Use log levels consistently across the application.
| Level | Purpose | Examples |
|-------|---------|----------|
| `DEBUG` | Development diagnostics | Variable values, internal state |
| `INFO` | Request lifecycle, operations | Request start/end, job completion |
| `WARNING` | Recoverable anomalies | Retry attempts, fallback used |
| `ERROR` | Failures needing attention | Exceptions, service unavailable |
```python
# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)
# WARNING: Abnormal but handled situations
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: Failures requiring investigation
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
```
Never log expected behavior at `ERROR`. A user entering a wrong password is `INFO`, not `ERROR`.
### Pattern 4: Correlation ID Propagation
Generate a unique ID at ingress and thread it through all operations.
```python
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI middleware example
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
```
Propagate to outbound requests:
```python
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
```
## Advanced Patterns
### Pattern 5: The Four Golden Signals with Prometheus
Track these metrics for every service boundary:
```python
from prometheus_client import Counter, Histogram, Gauge
# Latency: How long requests take
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# Traffic: Request rate
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
# Errors: Error rate
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
```
Instrument your endpoints:
```python
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
```
### Pattern 6: Bounded Cardinality
Avoid labels with unbounded values to prevent metric explosion.
```python
# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
```
### Pattern 7: Timed Operations with Context Manager
Create a reusable timing context manager for operations.
```python
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
```
### Pattern 8: OpenTelemetry Tracing
Set up distributed tracing with OpenTelemetry.
**Note:** OpenTelemetry is actively evolving. Check the [official Python documentation](https://opentelemetry.io/docs/languages/python/) for the latest API patterns and best practices.
```python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order
```
## Best Practices Summary
1. **Use structured logging** - JSON logs with consistent fields
2. **Propagate correlation IDs** - Thread through all requests and logs
3. **Track the four golden signals** - Latency, traffic, errors, saturation
4. **Bound label cardinality** - Never use unbounded values as metric labels
5. **Log at appropriate levels** - Don't cry wolf with ERROR
6. **Include context** - User ID, request ID, operation name in logs
7. **Use context managers** - Consistent timing and error handling
8. **Separate concerns** - Observability code shouldn't pollute business logic
9. **Test your observability** - Verify logs and metrics in integration tests
10. **Set up alerts** - Metrics are useless without alerting

View File

@@ -0,0 +1,252 @@
---
name: python-project-structure
description: Python project organization, module architecture, and public API design. Use when setting up new projects, organizing modules, defining public interfaces with __all__, or planning directory layouts.
---
# Python Project Structure & Module Architecture
Design well-organized Python projects with clear module boundaries, explicit public interfaces, and maintainable directory structures. Good organization makes code discoverable and changes predictable.
## When to Use This Skill
- Starting a new Python project from scratch
- Reorganizing an existing codebase for clarity
- Defining module public APIs with `__all__`
- Deciding between flat and nested directory structures
- Determining test file placement strategies
- Creating reusable library packages
## Core Concepts
### 1. Module Cohesion
Group related code that changes together. A module should have a single, clear purpose.
### 2. Explicit Interfaces
Define what's public with `__all__`. Everything not listed is an internal implementation detail.
### 3. Flat Hierarchies
Prefer shallow directory structures. Add depth only for genuine sub-domains.
### 4. Consistent Conventions
Apply naming and organization patterns uniformly across the project.
## Quick Start
```
myproject/
├── src/
│ └── myproject/
│ ├── __init__.py
│ ├── services/
│ ├── models/
│ └── api/
├── tests/
├── pyproject.toml
└── README.md
```
## Fundamental Patterns
### Pattern 1: One Concept Per File
Each file should focus on a single concept or closely related set of functions. Consider splitting when a file:
- Handles multiple unrelated responsibilities
- Grows beyond 300-500 lines (varies by complexity)
- Contains classes that change for different reasons
```python
# Good: Focused files
# user_service.py - User business logic
# user_repository.py - User data access
# user_models.py - User data structures
# Avoid: Kitchen sink files
# user.py - Contains service, repository, models, utilities...
```
### Pattern 2: Explicit Public APIs with `__all__`
Define the public interface for every module. Unlisted members are internal implementation details.
```python
# mypackage/services/__init__.py
from .user_service import UserService
from .order_service import OrderService
from .exceptions import ServiceError, ValidationError
__all__ = [
"UserService",
"OrderService",
"ServiceError",
"ValidationError",
]
# Internal helpers remain private by omission
# from .internal_helpers import _validate_input # Not exported
```
### Pattern 3: Flat Directory Structure
Prefer minimal nesting. Deep hierarchies make imports verbose and navigation difficult.
```
# Preferred: Flat structure
project/
├── api/
│ ├── routes.py
│ └── middleware.py
├── services/
│ ├── user_service.py
│ └── order_service.py
├── models/
│ ├── user.py
│ └── order.py
└── utils/
└── validation.py
# Avoid: Deep nesting
project/core/internal/services/impl/user/
```
Add sub-packages only when there's a genuine sub-domain requiring isolation.
### Pattern 4: Test File Organization
Choose one approach and apply it consistently throughout the project.
**Option A: Colocated Tests**
```
src/
├── user_service.py
├── test_user_service.py
├── order_service.py
└── test_order_service.py
```
Benefits: Tests live next to the code they verify. Easy to see coverage gaps.
**Option B: Parallel Test Directory**
```
src/
├── services/
│ ├── user_service.py
│ └── order_service.py
tests/
├── services/
│ ├── test_user_service.py
│ └── test_order_service.py
```
Benefits: Clean separation between production and test code. Standard for larger projects.
## Advanced Patterns
### Pattern 5: Package Initialization
Use `__init__.py` to provide a clean public interface for package consumers.
```python
# mypackage/__init__.py
"""MyPackage - A library for doing useful things."""
from .core import MainClass, HelperClass
from .exceptions import PackageError, ConfigError
from .config import Settings
__all__ = [
"MainClass",
"HelperClass",
"PackageError",
"ConfigError",
"Settings",
]
__version__ = "1.0.0"
```
Consumers can then import directly from the package:
```python
from mypackage import MainClass, Settings
```
### Pattern 6: Layered Architecture
Organize code by architectural layer for clear separation of concerns.
```
myapp/
├── api/ # HTTP handlers, request/response
│ ├── routes/
│ └── middleware/
├── services/ # Business logic
├── repositories/ # Data access
├── models/ # Domain entities
├── schemas/ # API schemas (Pydantic)
└── config/ # Configuration
```
Each layer should only depend on layers below it, never above.
### Pattern 7: Domain-Driven Structure
For complex applications, organize by business domain rather than technical layer.
```
ecommerce/
├── users/
│ ├── models.py
│ ├── services.py
│ ├── repository.py
│ └── api.py
├── orders/
│ ├── models.py
│ ├── services.py
│ ├── repository.py
│ └── api.py
└── shared/
├── database.py
└── exceptions.py
```
## File and Module Naming
### Conventions
- Use `snake_case` for all file and module names: `user_repository.py`
- Avoid abbreviations that obscure meaning: `user_repository.py` not `usr_repo.py`
- Match class names to file names: `UserService` in `user_service.py`
### Import Style
Use absolute imports for clarity and reliability:
```python
# Preferred: Absolute imports
from myproject.services import UserService
from myproject.models import User
# Avoid: Relative imports
from ..services import UserService
from . import models
```
Relative imports can break when modules are moved or reorganized.
## Best Practices Summary
1. **Keep files focused** - One concept per file, consider splitting at 300-500 lines (varies by complexity)
2. **Define `__all__` explicitly** - Make public interfaces clear
3. **Prefer flat structures** - Add depth only for genuine sub-domains
4. **Use absolute imports** - More reliable and clearer
5. **Be consistent** - Apply patterns uniformly across the project
6. **Match names to content** - File names should describe their purpose
7. **Separate concerns** - Keep layers distinct and dependencies flowing one direction
8. **Document your structure** - Include a README explaining the organization

View File

@@ -0,0 +1,376 @@
---
name: python-resilience
description: Python resilience patterns including automatic retries, exponential backoff, timeouts, and fault-tolerant decorators. Use when adding retry logic, implementing timeouts, building fault-tolerant services, or handling transient failures.
---
# Python Resilience Patterns
Build fault-tolerant Python applications that gracefully handle transient failures, network issues, and service outages. Resilience patterns keep systems running when dependencies are unreliable.
## When to Use This Skill
- Adding retry logic to external service calls
- Implementing timeouts for network operations
- Building fault-tolerant microservices
- Handling rate limiting and backpressure
- Creating infrastructure decorators
- Designing circuit breakers
## Core Concepts
### 1. Transient vs Permanent Failures
Retry transient errors (network timeouts, temporary service issues). Don't retry permanent errors (invalid credentials, bad requests).
### 2. Exponential Backoff
Increase wait time between retries to avoid overwhelming recovering services.
### 3. Jitter
Add randomness to backoff to prevent thundering herd when many clients retry simultaneously.
### 4. Bounded Retries
Cap both attempt count and total duration to prevent infinite retry loops.
## Quick Start
```python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
return httpx.post("https://api.example.com", json=request).json()
```
## Fundamental Patterns
### Pattern 1: Basic Retry with Tenacity
Use the `tenacity` library for production-grade retry logic. For simpler cases, consider built-in retry functionality or a lightweight custom implementation.
```python
from tenacity import (
retry,
stop_after_attempt,
stop_after_delay,
wait_exponential_jitter,
retry_if_exception_type,
)
TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)
@retry(
retry=retry_if_exception_type(TRANSIENT_ERRORS),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
"""Fetch data with automatic retry on transient failures."""
response = httpx.get(url, timeout=30)
response.raise_for_status()
return response.json()
```
### Pattern 2: Retry Only Appropriate Errors
Whitelist specific transient exceptions. Never retry:
- `ValueError`, `TypeError` - These are bugs, not transient issues
- `AuthenticationError` - Invalid credentials won't become valid
- HTTP 4xx errors (except 429) - Client errors are permanent
```python
from tenacity import retry, retry_if_exception_type
import httpx
# Define what's retryable
RETRYABLE_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
)
@retry(
retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def resilient_api_call(endpoint: str) -> dict:
"""Make API call with retry on network issues."""
return httpx.get(endpoint, timeout=10).json()
```
### Pattern 3: HTTP Status Code Retries
Retry specific HTTP status codes that indicate transient issues.
```python
from tenacity import retry, retry_if_result, stop_after_attempt
import httpx
RETRY_STATUS_CODES = {429, 502, 503, 504}
def should_retry_response(response: httpx.Response) -> bool:
"""Check if response indicates a retryable error."""
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=retry_if_result(should_retry_response),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
"""Make HTTP request with retry on transient status codes."""
return httpx.request(method, url, timeout=30, **kwargs)
```
### Pattern 4: Combined Exception and Status Retry
Handle both network exceptions and HTTP status codes.
```python
from tenacity import (
retry,
retry_if_exception_type,
retry_if_result,
stop_after_attempt,
wait_exponential_jitter,
before_sleep_log,
)
import logging
import httpx
logger = logging.getLogger(__name__)
TRANSIENT_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectError,
httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
def is_retryable_response(response: httpx.Response) -> bool:
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=(
retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
retry_if_result(is_retryable_response)
),
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
method: str,
url: str,
**kwargs,
) -> httpx.Response:
"""HTTP call with comprehensive retry handling."""
return httpx.request(method, url, timeout=30, **kwargs)
```
## Advanced Patterns
### Pattern 5: Logging Retry Attempts
Track retry behavior for debugging and alerting.
```python
from tenacity import retry, stop_after_attempt, wait_exponential
import structlog
logger = structlog.get_logger()
def log_retry_attempt(retry_state):
"""Log detailed retry information."""
exception = retry_state.outcome.exception()
logger.warning(
"Retrying operation",
attempt=retry_state.attempt_number,
exception_type=type(exception).__name__,
exception_message=str(exception),
next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, max=10),
before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
"""External call with retry logging."""
...
```
### Pattern 6: Timeout Decorator
Create reusable timeout decorators for consistent timeout handling.
```python
import asyncio
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar("T")
def with_timeout(seconds: float):
"""Decorator to add timeout to async functions."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds,
)
return wrapper
return decorator
@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
"""Fetch URL with 30 second timeout."""
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
```
### Pattern 7: Cross-Cutting Concerns via Decorators
Stack decorators to separate infrastructure from business logic.
```python
from functools import wraps
from typing import TypeVar, Callable
import structlog
logger = structlog.get_logger()
T = TypeVar("T")
def traced(name: str | None = None):
"""Add tracing to function calls."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
span_name = name or func.__name__
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
logger.info("Operation started", operation=span_name)
try:
result = await func(*args, **kwargs)
logger.info("Operation completed", operation=span_name)
return result
except Exception as e:
logger.error("Operation failed", operation=span_name, error=str(e))
raise
return wrapper
return decorator
# Stack multiple concerns
@traced("fetch_user_data")
@with_timeout(30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter())
async def fetch_user_data(user_id: str) -> dict:
"""Fetch user with tracing, timeout, and retry."""
...
```
### Pattern 8: Dependency Injection for Testability
Pass infrastructure components through constructors for easy testing.
```python
from dataclasses import dataclass
from typing import Protocol
class Logger(Protocol):
def info(self, msg: str, **kwargs) -> None: ...
def error(self, msg: str, **kwargs) -> None: ...
class MetricsClient(Protocol):
def increment(self, metric: str, tags: dict | None = None) -> None: ...
def timing(self, metric: str, value: float) -> None: ...
@dataclass
class UserService:
"""Service with injected infrastructure."""
repository: UserRepository
logger: Logger
metrics: MetricsClient
async def get_user(self, user_id: str) -> User:
self.logger.info("Fetching user", user_id=user_id)
start = time.perf_counter()
try:
user = await self.repository.get(user_id)
self.metrics.increment("user.fetch.success")
return user
except Exception as e:
self.metrics.increment("user.fetch.error")
self.logger.error("Failed to fetch user", user_id=user_id, error=str(e))
raise
finally:
elapsed = time.perf_counter() - start
self.metrics.timing("user.fetch.duration", elapsed)
# Easy to test with fakes
service = UserService(
repository=FakeRepository(),
logger=FakeLogger(),
metrics=FakeMetrics(),
)
```
### Pattern 9: Fail-Safe Defaults
Degrade gracefully when non-critical operations fail.
```python
from typing import TypeVar
from collections.abc import Callable
T = TypeVar("T")
def fail_safe(default: T, log_failure: bool = True):
"""Return default value on failure instead of raising."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
try:
return await func(*args, **kwargs)
except Exception as e:
if log_failure:
logger.warning(
"Operation failed, using default",
function=func.__name__,
error=str(e),
)
return default
return wrapper
return decorator
@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
"""Get recommendations, return empty list on failure."""
...
```
## Best Practices Summary
1. **Retry only transient errors** - Don't retry bugs or authentication failures
2. **Use exponential backoff** - Give services time to recover
3. **Add jitter** - Prevent thundering herd from synchronized retries
4. **Cap total duration** - `stop_after_attempt(5) | stop_after_delay(60)`
5. **Log every retry** - Silent retries hide systemic problems
6. **Use decorators** - Keep retry logic separate from business logic
7. **Inject dependencies** - Make infrastructure testable
8. **Set timeouts everywhere** - Every network call needs a timeout
9. **Fail gracefully** - Return cached/default values for non-critical paths
10. **Monitor retry rates** - High retry rates indicate underlying issues

View File

@@ -0,0 +1,421 @@
---
name: python-resource-management
description: Python resource management with context managers, cleanup patterns, and streaming. Use when managing connections, file handles, implementing cleanup logic, or building streaming responses with accumulated state.
---
# Python Resource Management
Manage resources deterministically using context managers. Resources like database connections, file handles, and network sockets should be released reliably, even when exceptions occur.
## When to Use This Skill
- Managing database connections and connection pools
- Working with file handles and I/O
- Implementing custom context managers
- Building streaming responses with state
- Handling nested resource cleanup
- Creating async context managers
## Core Concepts
### 1. Context Managers
The `with` statement ensures resources are released automatically, even on exceptions.
### 2. Protocol Methods
`__enter__`/`__exit__` for sync, `__aenter__`/`__aexit__` for async resource management.
### 3. Unconditional Cleanup
`__exit__` always runs, regardless of whether an exception occurred.
### 4. Exception Handling
Return `True` from `__exit__` to suppress exceptions, `False` to propagate them.
## Quick Start
```python
from contextlib import contextmanager
@contextmanager
def managed_resource():
resource = acquire_resource()
try:
yield resource
finally:
resource.cleanup()
with managed_resource() as r:
r.do_work()
```
## Fundamental Patterns
### Pattern 1: Class-Based Context Manager
Implement the context manager protocol for complex resources.
```python
class DatabaseConnection:
"""Database connection with automatic cleanup."""
def __init__(self, dsn: str) -> None:
self._dsn = dsn
self._conn: Connection | None = None
def connect(self) -> None:
"""Establish database connection."""
self._conn = psycopg.connect(self._dsn)
def close(self) -> None:
"""Close connection if open."""
if self._conn is not None:
self._conn.close()
self._conn = None
def __enter__(self) -> "DatabaseConnection":
"""Enter context: connect and return self."""
self.connect()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit context: always close connection."""
self.close()
# Usage with context manager (preferred)
with DatabaseConnection(dsn) as db:
result = db.execute(query)
# Manual management when needed
db = DatabaseConnection(dsn)
db.connect()
try:
result = db.execute(query)
finally:
db.close()
```
### Pattern 2: Async Context Manager
For async resources, implement the async protocol.
```python
class AsyncDatabasePool:
"""Async database connection pool."""
def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
self._dsn = dsn
self._min_size = min_size
self._max_size = max_size
self._pool: asyncpg.Pool | None = None
async def __aenter__(self) -> "AsyncDatabasePool":
"""Create connection pool."""
self._pool = await asyncpg.create_pool(
self._dsn,
min_size=self._min_size,
max_size=self._max_size,
)
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Close all connections in pool."""
if self._pool is not None:
await self._pool.close()
async def execute(self, query: str, *args) -> list[dict]:
"""Execute query using pooled connection."""
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
# Usage
async with AsyncDatabasePool(dsn) as pool:
users = await pool.execute("SELECT * FROM users WHERE active = $1", True)
```
### Pattern 3: Using @contextmanager Decorator
Simplify context managers with the decorator for straightforward cases.
```python
from contextlib import contextmanager, asynccontextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_block(name: str):
"""Time a block of code."""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))
# Usage
with timed_block("data_processing"):
process_large_dataset()
@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
"""Manage database transaction."""
await conn.execute("BEGIN")
try:
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
# Usage
async with database_transaction(conn) as tx:
await tx.execute("INSERT INTO users ...")
await tx.execute("INSERT INTO audit_log ...")
```
### Pattern 4: Unconditional Resource Release
Always clean up resources in `__exit__`, regardless of exceptions.
```python
class FileProcessor:
"""Process file with guaranteed cleanup."""
def __init__(self, path: str) -> None:
self._path = path
self._file: IO | None = None
self._temp_files: list[Path] = []
def __enter__(self) -> "FileProcessor":
self._file = open(self._path, "r")
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Clean up all resources unconditionally."""
# Close main file
if self._file is not None:
self._file.close()
# Clean up any temporary files
for temp_file in self._temp_files:
try:
temp_file.unlink()
except OSError:
pass # Best effort cleanup
# Return None/False to propagate any exception
```
## Advanced Patterns
### Pattern 5: Selective Exception Suppression
Only suppress specific, documented exceptions.
```python
class StreamWriter:
"""Writer that handles broken pipe gracefully."""
def __init__(self, stream) -> None:
self._stream = stream
def __enter__(self) -> "StreamWriter":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""Clean up, suppressing BrokenPipeError on shutdown."""
self._stream.close()
# Suppress BrokenPipeError (client disconnected)
# This is expected behavior, not an error
if exc_type is BrokenPipeError:
return True # Exception suppressed
return False # Propagate all other exceptions
```
### Pattern 6: Streaming with Accumulated State
Maintain both incremental chunks and accumulated state during streaming.
```python
from collections.abc import Generator
from dataclasses import dataclass, field
@dataclass
class StreamingResult:
"""Accumulated streaming result."""
chunks: list[str] = field(default_factory=list)
_finalized: bool = False
@property
def content(self) -> str:
"""Get accumulated content."""
return "".join(self.chunks)
def add_chunk(self, chunk: str) -> None:
"""Add chunk to accumulator."""
if self._finalized:
raise RuntimeError("Cannot add to finalized result")
self.chunks.append(chunk)
def finalize(self) -> str:
"""Mark stream complete and return content."""
self._finalized = True
return self.content
def stream_with_accumulation(
response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
"""Stream response while accumulating content.
Yields:
Tuple of (accumulated_content, new_chunk) for each chunk.
Returns:
Final accumulated content.
"""
result = StreamingResult()
for chunk in response.iter_content():
result.add_chunk(chunk)
yield result.content, chunk
return result.finalize()
```
### Pattern 7: Efficient String Accumulation
Avoid O(n²) string concatenation when accumulating.
```python
def accumulate_stream(stream) -> str:
"""Efficiently accumulate stream content."""
# BAD: O(n²) due to string immutability
# content = ""
# for chunk in stream:
# content += chunk # Creates new string each time
# GOOD: O(n) with list and join
chunks: list[str] = []
for chunk in stream:
chunks.append(chunk)
return "".join(chunks) # Single allocation
```
### Pattern 8: Tracking Stream Metrics
Measure time-to-first-byte and total streaming time.
```python
import time
from collections.abc import Generator
def stream_with_metrics(
response: StreamingResponse,
) -> Generator[str, None, dict]:
"""Stream response while collecting metrics.
Yields:
Content chunks.
Returns:
Metrics dictionary.
"""
start = time.perf_counter()
first_chunk_time: float | None = None
chunk_count = 0
total_bytes = 0
for chunk in response.iter_content():
if first_chunk_time is None:
first_chunk_time = time.perf_counter() - start
chunk_count += 1
total_bytes += len(chunk.encode())
yield chunk
total_time = time.perf_counter() - start
return {
"time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
"total_time_ms": round(total_time * 1000, 2),
"chunk_count": chunk_count,
"total_bytes": total_bytes,
}
```
### Pattern 9: Managing Multiple Resources with ExitStack
Handle a dynamic number of resources cleanly.
```python
from contextlib import ExitStack, AsyncExitStack
from pathlib import Path
def process_files(paths: list[Path]) -> list[str]:
"""Process multiple files with automatic cleanup."""
results = []
with ExitStack() as stack:
# Open all files - they'll all be closed when block exits
files = [stack.enter_context(open(p)) for p in paths]
for f in files:
results.append(f.read())
return results
async def process_connections(hosts: list[str]) -> list[dict]:
"""Process multiple async connections."""
results = []
async with AsyncExitStack() as stack:
connections = [
await stack.enter_async_context(connect_to_host(host))
for host in hosts
]
for conn in connections:
results.append(await conn.fetch_data())
return results
```
## Best Practices Summary
1. **Always use context managers** - For any resource that needs cleanup
2. **Clean up unconditionally** - `__exit__` runs even on exception
3. **Don't suppress unexpectedly** - Return `False` unless suppression is intentional
4. **Use @contextmanager** - For simple resource patterns
5. **Implement both protocols** - Support `with` and manual management
6. **Use ExitStack** - For dynamic numbers of resources
7. **Accumulate efficiently** - List + join, not string concatenation
8. **Track metrics** - Time-to-first-byte matters for streaming
9. **Document behavior** - Especially exception suppression
10. **Test cleanup paths** - Verify resources are released on errors

View File

@@ -618,6 +618,52 @@ def test_sorted_list_properties(lst):
assert sorted_lst[i] <= sorted_lst[i + 1]
```
## Test Design Principles
### One Behavior Per Test
Each test should verify exactly one behavior. This makes failures easy to diagnose and tests easy to maintain.
```python
# BAD - testing multiple behaviors
def test_user_service():
user = service.create_user(data)
assert user.id is not None
assert user.email == data["email"]
updated = service.update_user(user.id, {"name": "New"})
assert updated.name == "New"
# GOOD - focused tests
def test_create_user_assigns_id():
user = service.create_user(data)
assert user.id is not None
def test_create_user_stores_email():
user = service.create_user(data)
assert user.email == data["email"]
def test_update_user_changes_name():
user = service.create_user(data)
updated = service.update_user(user.id, {"name": "New"})
assert updated.name == "New"
```
### Test Error Paths
Always test failure cases, not just happy paths.
```python
def test_get_user_raises_not_found():
with pytest.raises(UserNotFoundError) as exc_info:
service.get_user("nonexistent-id")
assert "nonexistent-id" in str(exc_info.value)
def test_create_user_rejects_invalid_email():
with pytest.raises(ValueError, match="Invalid email format"):
service.create_user({"email": "not-an-email"})
```
## Testing Best Practices
### Test Organization
@@ -636,38 +682,131 @@ def test_sorted_list_properties(lst):
# test_workflows.py
```
### Test Naming
### Test Naming Convention
A common pattern: `test_<unit>_<scenario>_<expected_outcome>`. Adapt to your team's preferences.
```python
# Good test names
# Pattern: test_<unit>_<scenario>_<expected>
def test_create_user_with_valid_data_returns_user():
...
def test_create_user_with_duplicate_email_raises_conflict():
...
def test_get_user_with_unknown_id_returns_none():
...
# Good test names - clear and descriptive
def test_user_creation_with_valid_data():
"""Clear name describes what is being tested."""
pass
def test_login_fails_with_invalid_password():
"""Name describes expected behavior."""
pass
def test_api_returns_404_for_missing_resource():
"""Specific about inputs and expected outcomes."""
pass
# Bad test names
# Bad test names - avoid these
def test_1(): # Not descriptive
pass
def test_user(): # Too vague
pass
def test_function(): # Doesn't explain what's tested
pass
```
### Testing Retry Behavior
Verify that retry logic works correctly using mock side effects.
```python
from unittest.mock import Mock
def test_retries_on_transient_error():
"""Test that service retries on transient failures."""
client = Mock()
# Fail twice, then succeed
client.request.side_effect = [
ConnectionError("Failed"),
ConnectionError("Failed"),
{"status": "ok"},
]
service = ServiceWithRetry(client, max_retries=3)
result = service.fetch()
assert result == {"status": "ok"}
assert client.request.call_count == 3
def test_gives_up_after_max_retries():
"""Test that service stops retrying after max attempts."""
client = Mock()
client.request.side_effect = ConnectionError("Failed")
service = ServiceWithRetry(client, max_retries=3)
with pytest.raises(ConnectionError):
service.fetch()
assert client.request.call_count == 3
def test_does_not_retry_on_permanent_error():
"""Test that permanent errors are not retried."""
client = Mock()
client.request.side_effect = ValueError("Invalid input")
service = ServiceWithRetry(client, max_retries=3)
with pytest.raises(ValueError):
service.fetch()
# Only called once - no retry for ValueError
assert client.request.call_count == 1
```
### Mocking Time with Freezegun
Use freezegun to control time in tests for predictable time-dependent behavior.
```python
from freezegun import freeze_time
from datetime import datetime, timedelta
@freeze_time("2026-01-15 10:00:00")
def test_token_expiry():
"""Test token expires at correct time."""
token = create_token(expires_in_seconds=3600)
assert token.expires_at == datetime(2026, 1, 15, 11, 0, 0)
@freeze_time("2026-01-15 10:00:00")
def test_is_expired_returns_false_before_expiry():
"""Test token is not expired when within validity period."""
token = create_token(expires_in_seconds=3600)
assert not token.is_expired()
@freeze_time("2026-01-15 12:00:00")
def test_is_expired_returns_true_after_expiry():
"""Test token is expired after validity period."""
token = Token(expires_at=datetime(2026, 1, 15, 11, 30, 0))
assert token.is_expired()
def test_with_time_travel():
"""Test behavior across time using freeze_time context."""
with freeze_time("2026-01-01") as frozen_time:
item = create_item()
assert item.created_at == datetime(2026, 1, 1)
# Move forward in time
frozen_time.move_to("2026-01-15")
assert item.age_days == 14
```
### Test Markers
```python

View File

@@ -0,0 +1,428 @@
---
name: python-type-safety
description: Python type safety with type hints, generics, protocols, and strict type checking. Use when adding type annotations, implementing generic classes, defining structural interfaces, or configuring mypy/pyright.
---
# Python Type Safety
Leverage Python's type system to catch errors at static analysis time. Type annotations serve as enforced documentation that tooling validates automatically.
## When to Use This Skill
- Adding type hints to existing code
- Creating generic, reusable classes
- Defining structural interfaces with protocols
- Configuring mypy or pyright for strict checking
- Understanding type narrowing and guards
- Building type-safe APIs and libraries
## Core Concepts
### 1. Type Annotations
Declare expected types for function parameters, return values, and variables.
### 2. Generics
Write reusable code that preserves type information across different types.
### 3. Protocols
Define structural interfaces without inheritance (duck typing with type safety).
### 4. Type Narrowing
Use guards and conditionals to narrow types within code blocks.
## Quick Start
```python
def get_user(user_id: str) -> User | None:
"""Return type makes 'might not exist' explicit."""
...
# Type checker enforces handling None case
user = get_user("123")
if user is None:
raise UserNotFoundError("123")
print(user.name) # Type checker knows user is User here
```
## Fundamental Patterns
### Pattern 1: Annotate All Public Signatures
Every public function, method, and class should have type annotations.
```python
def get_user(user_id: str) -> User:
"""Retrieve user by ID."""
...
def process_batch(
items: list[Item],
max_workers: int = 4,
) -> BatchResult[ProcessedItem]:
"""Process items concurrently."""
...
class UserRepository:
def __init__(self, db: Database) -> None:
self._db = db
async def find_by_id(self, user_id: str) -> User | None:
"""Return User if found, None otherwise."""
...
async def find_by_email(self, email: str) -> User | None:
...
async def save(self, user: User) -> User:
"""Save and return user with generated ID."""
...
```
Use `mypy --strict` or `pyright` in CI to catch type errors early. For existing projects, enable strict mode incrementally using per-module overrides.
### Pattern 2: Use Modern Union Syntax
Python 3.10+ provides cleaner union syntax.
```python
# Preferred (3.10+)
def find_user(user_id: str) -> User | None:
...
def parse_value(v: str) -> int | float | str:
...
# Older style (still valid, needed for 3.9)
from typing import Optional, Union
def find_user(user_id: str) -> Optional[User]:
...
```
### Pattern 3: Type Narrowing with Guards
Use conditionals to narrow types for the type checker.
```python
def process_user(user_id: str) -> UserData:
user = find_user(user_id)
if user is None:
raise UserNotFoundError(f"User {user_id} not found")
# Type checker knows user is User here, not User | None
return UserData(
name=user.name,
email=user.email,
)
def process_items(items: list[Item | None]) -> list[ProcessedItem]:
# Filter and narrow types
valid_items = [item for item in items if item is not None]
# valid_items is now list[Item]
return [process(item) for item in valid_items]
```
### Pattern 4: Generic Classes
Create type-safe reusable containers.
```python
from typing import TypeVar, Generic
T = TypeVar("T")
E = TypeVar("E", bound=Exception)
class Result(Generic[T, E]):
"""Represents either a success value or an error."""
def __init__(
self,
value: T | None = None,
error: E | None = None,
) -> None:
if (value is None) == (error is None):
raise ValueError("Exactly one of value or error must be set")
self._value = value
self._error = error
@property
def is_success(self) -> bool:
return self._error is None
@property
def is_failure(self) -> bool:
return self._error is not None
def unwrap(self) -> T:
"""Get value or raise the error."""
if self._error is not None:
raise self._error
return self._value # type: ignore[return-value]
def unwrap_or(self, default: T) -> T:
"""Get value or return default."""
if self._error is not None:
return default
return self._value # type: ignore[return-value]
# Usage preserves types
def parse_config(path: str) -> Result[Config, ConfigError]:
try:
return Result(value=Config.from_file(path))
except ConfigError as e:
return Result(error=e)
result = parse_config("config.yaml")
if result.is_success:
config = result.unwrap() # Type: Config
```
## Advanced Patterns
### Pattern 5: Generic Repository
Create type-safe data access patterns.
```python
from typing import TypeVar, Generic
from abc import ABC, abstractmethod
T = TypeVar("T")
ID = TypeVar("ID")
class Repository(ABC, Generic[T, ID]):
"""Generic repository interface."""
@abstractmethod
async def get(self, id: ID) -> T | None:
"""Get entity by ID."""
...
@abstractmethod
async def save(self, entity: T) -> T:
"""Save and return entity."""
...
@abstractmethod
async def delete(self, id: ID) -> bool:
"""Delete entity, return True if existed."""
...
class UserRepository(Repository[User, str]):
"""Concrete repository for Users with string IDs."""
async def get(self, id: str) -> User | None:
row = await self._db.fetchrow(
"SELECT * FROM users WHERE id = $1", id
)
return User(**row) if row else None
async def save(self, entity: User) -> User:
...
async def delete(self, id: str) -> bool:
...
```
### Pattern 6: TypeVar with Bounds
Restrict generic parameters to specific types.
```python
from typing import TypeVar
from pydantic import BaseModel
ModelT = TypeVar("ModelT", bound=BaseModel)
def validate_and_create(model_cls: type[ModelT], data: dict) -> ModelT:
"""Create a validated Pydantic model from dict."""
return model_cls.model_validate(data)
# Works with any BaseModel subclass
class User(BaseModel):
name: str
email: str
user = validate_and_create(User, {"name": "Alice", "email": "a@b.com"})
# user is typed as User
# Type error: str is not a BaseModel subclass
result = validate_and_create(str, {"name": "Alice"}) # Error!
```
### Pattern 7: Protocols for Structural Typing
Define interfaces without requiring inheritance.
```python
from typing import Protocol, runtime_checkable
@runtime_checkable
class Serializable(Protocol):
"""Any class that can be serialized to/from dict."""
def to_dict(self) -> dict:
...
@classmethod
def from_dict(cls, data: dict) -> "Serializable":
...
# User satisfies Serializable without inheriting from it
class User:
def __init__(self, id: str, name: str) -> None:
self.id = id
self.name = name
def to_dict(self) -> dict:
return {"id": self.id, "name": self.name}
@classmethod
def from_dict(cls, data: dict) -> "User":
return cls(id=data["id"], name=data["name"])
def serialize(obj: Serializable) -> str:
"""Works with any Serializable object."""
return json.dumps(obj.to_dict())
# Works - User matches the protocol
serialize(User("1", "Alice"))
# Runtime checking with @runtime_checkable
isinstance(User("1", "Alice"), Serializable) # True
```
### Pattern 8: Common Protocol Patterns
Define reusable structural interfaces.
```python
from typing import Protocol
class Closeable(Protocol):
"""Resource that can be closed."""
def close(self) -> None: ...
class AsyncCloseable(Protocol):
"""Async resource that can be closed."""
async def close(self) -> None: ...
class Readable(Protocol):
"""Object that can be read from."""
def read(self, n: int = -1) -> bytes: ...
class HasId(Protocol):
"""Object with an ID property."""
@property
def id(self) -> str: ...
class Comparable(Protocol):
"""Object that supports comparison."""
def __lt__(self, other: "Comparable") -> bool: ...
def __le__(self, other: "Comparable") -> bool: ...
```
### Pattern 9: Type Aliases
Create meaningful type names.
**Note:** The `type` statement was introduced in Python 3.10 for simple aliases. Generic type statements require Python 3.12+.
```python
# Python 3.10+ type statement for simple aliases
type UserId = str
type UserDict = dict[str, Any]
# Python 3.12+ type statement with generics
type Handler[T] = Callable[[Request], T]
type AsyncHandler[T] = Callable[[Request], Awaitable[T]]
# Python 3.9-3.11 style (needed for broader compatibility)
from typing import TypeAlias
from collections.abc import Callable, Awaitable
UserId: TypeAlias = str
Handler: TypeAlias = Callable[[Request], Response]
# Usage
def register_handler(path: str, handler: Handler[Response]) -> None:
...
```
### Pattern 10: Callable Types
Type function parameters and callbacks.
```python
from collections.abc import Callable, Awaitable
# Sync callback
ProgressCallback = Callable[[int, int], None] # (current, total)
# Async callback
AsyncHandler = Callable[[Request], Awaitable[Response]]
# With named parameters (using Protocol)
class OnProgress(Protocol):
def __call__(
self,
current: int,
total: int,
*,
message: str = "",
) -> None: ...
def process_items(
items: list[Item],
on_progress: ProgressCallback | None = None,
) -> list[Result]:
for i, item in enumerate(items):
if on_progress:
on_progress(i, len(items))
...
```
## Configuration
### Strict Mode Checklist
For `mypy --strict` compliance:
```toml
# pyproject.toml
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_ignores = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
no_implicit_optional = true
```
Incremental adoption goals:
- All function parameters annotated
- All return types annotated
- Class attributes annotated
- Minimize `Any` usage (acceptable for truly dynamic data)
- Generic collections use type parameters (`list[str]` not `list`)
For existing codebases, enable strict mode per-module using `# mypy: strict` or configure per-module overrides in `pyproject.toml`.
## Best Practices Summary
1. **Annotate all public APIs** - Functions, methods, class attributes
2. **Use `T | None`** - Modern union syntax over `Optional[T]`
3. **Run strict type checking** - `mypy --strict` in CI
4. **Use generics** - Preserve type info in reusable code
5. **Define protocols** - Structural typing for interfaces
6. **Narrow types** - Use guards to help the type checker
7. **Bound type vars** - Restrict generics to meaningful types
8. **Create type aliases** - Meaningful names for complex types
9. **Minimize `Any`** - Use specific types or generics. `Any` is acceptable for truly dynamic data or when interfacing with untyped third-party code
10. **Document with types** - Types are enforceable documentation