mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add Comprehensive Python Development Skills (#419)
* Add extra python skills covering code style, design patterns, resilience, resource management, testing patterns, and type safety ...etc * fix: correct code examples in Python skills - Clarify Python version requirements for type statement (3.10+ vs 3.12+) - Add missing ValidationError import in configuration example - Add missing httpx import and url parameter in async example --------- Co-authored-by: Seth Hobson <wshobson@gmail.com>
This commit is contained in:
@@ -18,6 +18,20 @@ Comprehensive guidance for implementing asynchronous Python applications using a
|
||||
- Optimizing I/O-bound workloads
|
||||
- Implementing async background tasks and queues
|
||||
|
||||
## Sync vs Async Decision Guide
|
||||
|
||||
Before adopting async, consider whether it's the right choice for your use case.
|
||||
|
||||
| Use Case | Recommended Approach |
|
||||
|----------|---------------------|
|
||||
| Many concurrent network/DB calls | `asyncio` |
|
||||
| CPU-bound computation | `multiprocessing` or thread pool |
|
||||
| Mixed I/O + CPU | Offload CPU work with `asyncio.to_thread()` |
|
||||
| Simple scripts, few connections | Sync (simpler, easier to debug) |
|
||||
| Web APIs with high concurrency | Async frameworks (FastAPI, aiohttp) |
|
||||
|
||||
**Key Rule:** Stay fully sync or fully async within a call path. Mixing creates hidden blocking and complexity.
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Event Loop
|
||||
@@ -583,6 +597,46 @@ async def process_item(item: str):
|
||||
|
||||
### 3. Avoid Blocking Operations
|
||||
|
||||
Never block the event loop with synchronous operations. A single blocking call stalls all concurrent tasks.
|
||||
|
||||
```python
|
||||
# BAD - blocks the entire event loop
|
||||
async def fetch_data_bad():
|
||||
import time
|
||||
import requests
|
||||
time.sleep(1) # Blocks!
|
||||
response = requests.get(url) # Also blocks!
|
||||
|
||||
# GOOD - use async-native libraries (e.g., httpx for async HTTP)
|
||||
import httpx
|
||||
|
||||
async def fetch_data_good(url: str):
|
||||
await asyncio.sleep(1)
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url)
|
||||
```
|
||||
|
||||
**Wrapping Blocking Code with `asyncio.to_thread()` (Python 3.9+):**
|
||||
|
||||
When you must use synchronous libraries, offload to a thread pool:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
async def read_file_async(path: str) -> str:
|
||||
"""Read file without blocking event loop."""
|
||||
# asyncio.to_thread() runs sync code in a thread pool
|
||||
return await asyncio.to_thread(Path(path).read_text)
|
||||
|
||||
async def call_sync_library(data: dict) -> dict:
|
||||
"""Wrap a synchronous library call."""
|
||||
# Useful for sync database drivers, file I/O, CPU work
|
||||
return await asyncio.to_thread(sync_library.process, data)
|
||||
```
|
||||
|
||||
**Lower-level approach with `run_in_executor()`:**
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
@@ -596,7 +650,7 @@ def blocking_operation(data: Any) -> Any:
|
||||
|
||||
async def run_in_executor(data: Any) -> Any:
|
||||
"""Run blocking operation in thread pool."""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
result = await loop.run_in_executor(pool, blocking_operation, data)
|
||||
return result
|
||||
@@ -692,11 +746,12 @@ async def test_with_timeout():
|
||||
|
||||
1. **Use asyncio.run()** for entry point (Python 3.7+)
|
||||
2. **Always await coroutines** to execute them
|
||||
3. **Use gather() for concurrent execution** of multiple tasks
|
||||
3. **Limit concurrency with semaphores** - unbounded `gather()` can exhaust resources
|
||||
4. **Implement proper error handling** with try/except
|
||||
5. **Use timeouts** to prevent hanging operations
|
||||
6. **Pool connections** for better performance
|
||||
7. **Avoid blocking operations** in async code
|
||||
8. **Use semaphores** for rate limiting
|
||||
9. **Handle task cancellation** properly
|
||||
7. **Never block the event loop** - use `asyncio.to_thread()` for sync code
|
||||
8. **Use semaphores** for rate limiting external API calls
|
||||
9. **Handle task cancellation** properly - always re-raise `CancelledError`
|
||||
10. **Test async code** with pytest-asyncio
|
||||
11. **Stay consistent** - fully sync or fully async, avoid mixing
|
||||
|
||||
349
plugins/python-development/skills/python-anti-patterns/SKILL.md
Normal file
349
plugins/python-development/skills/python-anti-patterns/SKILL.md
Normal file
@@ -0,0 +1,349 @@
|
||||
---
|
||||
name: python-anti-patterns
|
||||
description: Common Python anti-patterns to avoid. Use as a checklist when reviewing code, before finalizing implementations, or when debugging issues that might stem from known bad practices.
|
||||
---
|
||||
|
||||
# Python Anti-Patterns Checklist
|
||||
|
||||
A reference checklist of common mistakes and anti-patterns in Python code. Review this before finalizing implementations to catch issues early.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Reviewing code before merge
|
||||
- Debugging mysterious issues
|
||||
- Teaching or learning Python best practices
|
||||
- Establishing team coding standards
|
||||
- Refactoring legacy code
|
||||
|
||||
**Note:** This skill focuses on what to avoid. For guidance on positive patterns and architecture, see the `python-design-patterns` skill.
|
||||
|
||||
## Infrastructure Anti-Patterns
|
||||
|
||||
### Scattered Timeout/Retry Logic
|
||||
|
||||
```python
|
||||
# BAD: Timeout logic duplicated everywhere
|
||||
def fetch_user(user_id):
|
||||
try:
|
||||
return requests.get(url, timeout=30)
|
||||
except Timeout:
|
||||
logger.warning("Timeout fetching user")
|
||||
return None
|
||||
|
||||
def fetch_orders(user_id):
|
||||
try:
|
||||
return requests.get(url, timeout=30)
|
||||
except Timeout:
|
||||
logger.warning("Timeout fetching orders")
|
||||
return None
|
||||
```
|
||||
|
||||
**Fix:** Centralize in decorators or client wrappers.
|
||||
|
||||
```python
|
||||
# GOOD: Centralized retry logic
|
||||
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
|
||||
def http_get(url: str) -> Response:
|
||||
return requests.get(url, timeout=30)
|
||||
```
|
||||
|
||||
### Double Retry
|
||||
|
||||
```python
|
||||
# BAD: Retrying at multiple layers
|
||||
@retry(max_attempts=3) # Application retry
|
||||
def call_service():
|
||||
return client.request() # Client also has retry configured!
|
||||
```
|
||||
|
||||
**Fix:** Retry at one layer only. Know your infrastructure's retry behavior.
|
||||
|
||||
### Hard-Coded Configuration
|
||||
|
||||
```python
|
||||
# BAD: Secrets and config in code
|
||||
DB_HOST = "prod-db.example.com"
|
||||
API_KEY = "sk-12345"
|
||||
|
||||
def connect():
|
||||
return psycopg.connect(f"host={DB_HOST}...")
|
||||
```
|
||||
|
||||
**Fix:** Use environment variables with typed settings.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
class Settings(BaseSettings):
|
||||
db_host: str = Field(alias="DB_HOST")
|
||||
api_key: str = Field(alias="API_KEY")
|
||||
|
||||
settings = Settings()
|
||||
```
|
||||
|
||||
## Architecture Anti-Patterns
|
||||
|
||||
### Exposed Internal Types
|
||||
|
||||
```python
|
||||
# BAD: Leaking ORM model to API
|
||||
@app.get("/users/{id}")
|
||||
def get_user(id: str) -> UserModel: # SQLAlchemy model
|
||||
return db.query(UserModel).get(id)
|
||||
```
|
||||
|
||||
**Fix:** Use DTOs/response models.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
@app.get("/users/{id}")
|
||||
def get_user(id: str) -> UserResponse:
|
||||
user = db.query(UserModel).get(id)
|
||||
return UserResponse.from_orm(user)
|
||||
```
|
||||
|
||||
### Mixed I/O and Business Logic
|
||||
|
||||
```python
|
||||
# BAD: SQL embedded in business logic
|
||||
def calculate_discount(user_id: str) -> float:
|
||||
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
|
||||
orders = db.query("SELECT * FROM orders WHERE user_id = ?", user_id)
|
||||
# Business logic mixed with data access
|
||||
if len(orders) > 10:
|
||||
return 0.15
|
||||
return 0.0
|
||||
```
|
||||
|
||||
**Fix:** Repository pattern. Keep business logic pure.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def calculate_discount(user: User, orders: list[Order]) -> float:
|
||||
# Pure business logic, easily testable
|
||||
if len(orders) > 10:
|
||||
return 0.15
|
||||
return 0.0
|
||||
```
|
||||
|
||||
## Error Handling Anti-Patterns
|
||||
|
||||
### Bare Exception Handling
|
||||
|
||||
```python
|
||||
# BAD: Swallowing all exceptions
|
||||
try:
|
||||
process()
|
||||
except Exception:
|
||||
pass # Silent failure - bugs hidden forever
|
||||
```
|
||||
|
||||
**Fix:** Catch specific exceptions. Log or handle appropriately.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
try:
|
||||
process()
|
||||
except ConnectionError as e:
|
||||
logger.warning("Connection failed, will retry", error=str(e))
|
||||
raise
|
||||
except ValueError as e:
|
||||
logger.error("Invalid input", error=str(e))
|
||||
raise BadRequestError(str(e))
|
||||
```
|
||||
|
||||
### Ignored Partial Failures
|
||||
|
||||
```python
|
||||
# BAD: Stops on first error
|
||||
def process_batch(items):
|
||||
results = []
|
||||
for item in items:
|
||||
result = process(item) # Raises on error - batch aborted
|
||||
results.append(result)
|
||||
return results
|
||||
```
|
||||
|
||||
**Fix:** Capture both successes and failures.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def process_batch(items) -> BatchResult:
|
||||
succeeded = {}
|
||||
failed = {}
|
||||
for idx, item in enumerate(items):
|
||||
try:
|
||||
succeeded[idx] = process(item)
|
||||
except Exception as e:
|
||||
failed[idx] = e
|
||||
return BatchResult(succeeded, failed)
|
||||
```
|
||||
|
||||
### Missing Input Validation
|
||||
|
||||
```python
|
||||
# BAD: No validation
|
||||
def create_user(data: dict):
|
||||
return User(**data) # Crashes deep in code on bad input
|
||||
```
|
||||
|
||||
**Fix:** Validate early at API boundaries.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def create_user(data: dict) -> User:
|
||||
validated = CreateUserInput.model_validate(data)
|
||||
return User.from_input(validated)
|
||||
```
|
||||
|
||||
## Resource Anti-Patterns
|
||||
|
||||
### Unclosed Resources
|
||||
|
||||
```python
|
||||
# BAD: File never closed
|
||||
def read_file(path):
|
||||
f = open(path)
|
||||
return f.read() # What if this raises?
|
||||
```
|
||||
|
||||
**Fix:** Use context managers.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def read_file(path):
|
||||
with open(path) as f:
|
||||
return f.read()
|
||||
```
|
||||
|
||||
### Blocking in Async
|
||||
|
||||
```python
|
||||
# BAD: Blocks the entire event loop
|
||||
async def fetch_data():
|
||||
time.sleep(1) # Blocks everything!
|
||||
response = requests.get(url) # Also blocks!
|
||||
```
|
||||
|
||||
**Fix:** Use async-native libraries.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
async def fetch_data():
|
||||
await asyncio.sleep(1)
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url)
|
||||
```
|
||||
|
||||
## Type Safety Anti-Patterns
|
||||
|
||||
### Missing Type Hints
|
||||
|
||||
```python
|
||||
# BAD: No types
|
||||
def process(data):
|
||||
return data["value"] * 2
|
||||
```
|
||||
|
||||
**Fix:** Annotate all public functions.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def process(data: dict[str, int]) -> int:
|
||||
return data["value"] * 2
|
||||
```
|
||||
|
||||
### Untyped Collections
|
||||
|
||||
```python
|
||||
# BAD: Generic list without type parameter
|
||||
def get_users() -> list:
|
||||
...
|
||||
```
|
||||
|
||||
**Fix:** Use type parameters.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def get_users() -> list[User]:
|
||||
...
|
||||
```
|
||||
|
||||
## Testing Anti-Patterns
|
||||
|
||||
### Only Testing Happy Paths
|
||||
|
||||
```python
|
||||
# BAD: Only tests success case
|
||||
def test_create_user():
|
||||
user = service.create_user(valid_data)
|
||||
assert user.id is not None
|
||||
```
|
||||
|
||||
**Fix:** Test error conditions and edge cases.
|
||||
|
||||
```python
|
||||
# GOOD
|
||||
def test_create_user_success():
|
||||
user = service.create_user(valid_data)
|
||||
assert user.id is not None
|
||||
|
||||
def test_create_user_invalid_email():
|
||||
with pytest.raises(ValueError, match="Invalid email"):
|
||||
service.create_user(invalid_email_data)
|
||||
|
||||
def test_create_user_duplicate_email():
|
||||
service.create_user(valid_data)
|
||||
with pytest.raises(ConflictError):
|
||||
service.create_user(valid_data)
|
||||
```
|
||||
|
||||
### Over-Mocking
|
||||
|
||||
```python
|
||||
# BAD: Mocking everything
|
||||
def test_user_service():
|
||||
mock_repo = Mock()
|
||||
mock_cache = Mock()
|
||||
mock_logger = Mock()
|
||||
mock_metrics = Mock()
|
||||
# Test doesn't verify real behavior
|
||||
```
|
||||
|
||||
**Fix:** Use integration tests for critical paths. Mock only external services.
|
||||
|
||||
## Quick Review Checklist
|
||||
|
||||
Before finalizing code, verify:
|
||||
|
||||
- [ ] No scattered timeout/retry logic (centralized)
|
||||
- [ ] No double retry (app + infrastructure)
|
||||
- [ ] No hard-coded configuration or secrets
|
||||
- [ ] No exposed internal types (ORM models, protobufs)
|
||||
- [ ] No mixed I/O and business logic
|
||||
- [ ] No bare `except Exception: pass`
|
||||
- [ ] No ignored partial failures in batches
|
||||
- [ ] No missing input validation
|
||||
- [ ] No unclosed resources (using context managers)
|
||||
- [ ] No blocking calls in async code
|
||||
- [ ] All public functions have type hints
|
||||
- [ ] Collections have type parameters
|
||||
- [ ] Error paths are tested
|
||||
- [ ] Edge cases are covered
|
||||
|
||||
## Common Fixes Summary
|
||||
|
||||
| Anti-Pattern | Fix |
|
||||
|-------------|-----|
|
||||
| Scattered retry logic | Centralized decorators |
|
||||
| Hard-coded config | Environment variables + pydantic-settings |
|
||||
| Exposed ORM models | DTO/response schemas |
|
||||
| Mixed I/O + logic | Repository pattern |
|
||||
| Bare except | Catch specific exceptions |
|
||||
| Batch stops on error | Return BatchResult with successes/failures |
|
||||
| No validation | Validate at boundaries with Pydantic |
|
||||
| Unclosed resources | Context managers |
|
||||
| Blocking in async | Async-native libraries |
|
||||
| Missing types | Type annotations on all public APIs |
|
||||
| Only happy path tests | Test errors and edge cases |
|
||||
@@ -0,0 +1,364 @@
|
||||
---
|
||||
name: python-background-jobs
|
||||
description: Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.
|
||||
---
|
||||
|
||||
# Python Background Jobs & Task Queues
|
||||
|
||||
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Processing tasks that take longer than a few seconds
|
||||
- Sending emails, notifications, or webhooks
|
||||
- Generating reports or exporting data
|
||||
- Processing uploads or media transformations
|
||||
- Integrating with unreliable external services
|
||||
- Building event-driven architectures
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Task Queue Pattern
|
||||
|
||||
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
|
||||
|
||||
### 2. Idempotency
|
||||
|
||||
Tasks may be retried on failure. Design for safe re-execution.
|
||||
|
||||
### 3. Job State Machine
|
||||
|
||||
Jobs transition through states: pending → running → succeeded/failed.
|
||||
|
||||
### 4. At-Least-Once Delivery
|
||||
|
||||
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
|
||||
|
||||
## Quick Start
|
||||
|
||||
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
|
||||
|
||||
```python
|
||||
from celery import Celery
|
||||
|
||||
app = Celery("tasks", broker="redis://localhost:6379")
|
||||
|
||||
@app.task
|
||||
def send_email(to: str, subject: str, body: str) -> None:
|
||||
# This runs in a background worker
|
||||
email_client.send(to, subject, body)
|
||||
|
||||
# In your API handler
|
||||
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Return Job ID Immediately
|
||||
|
||||
For operations exceeding a few seconds, return a job ID and process asynchronously.
|
||||
|
||||
```python
|
||||
from uuid import uuid4
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from datetime import datetime
|
||||
|
||||
class JobStatus(Enum):
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
SUCCEEDED = "succeeded"
|
||||
FAILED = "failed"
|
||||
|
||||
@dataclass
|
||||
class Job:
|
||||
id: str
|
||||
status: JobStatus
|
||||
created_at: datetime
|
||||
started_at: datetime | None = None
|
||||
completed_at: datetime | None = None
|
||||
result: dict | None = None
|
||||
error: str | None = None
|
||||
|
||||
# API endpoint
|
||||
async def start_export(request: ExportRequest) -> JobResponse:
|
||||
"""Start export job and return job ID."""
|
||||
job_id = str(uuid4())
|
||||
|
||||
# Persist job record
|
||||
await jobs_repo.create(Job(
|
||||
id=job_id,
|
||||
status=JobStatus.PENDING,
|
||||
created_at=datetime.utcnow(),
|
||||
))
|
||||
|
||||
# Enqueue task for background processing
|
||||
await task_queue.enqueue(
|
||||
"export_data",
|
||||
job_id=job_id,
|
||||
params=request.model_dump(),
|
||||
)
|
||||
|
||||
# Return immediately with job ID
|
||||
return JobResponse(
|
||||
job_id=job_id,
|
||||
status="pending",
|
||||
poll_url=f"/jobs/{job_id}",
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 2: Celery Task Configuration
|
||||
|
||||
Configure Celery tasks with proper retry and timeout settings.
|
||||
|
||||
```python
|
||||
from celery import Celery
|
||||
|
||||
app = Celery("tasks", broker="redis://localhost:6379")
|
||||
|
||||
# Global configuration
|
||||
app.conf.update(
|
||||
task_time_limit=3600, # Hard limit: 1 hour
|
||||
task_soft_time_limit=3000, # Soft limit: 50 minutes
|
||||
task_acks_late=True, # Acknowledge after completion
|
||||
task_reject_on_worker_lost=True,
|
||||
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
|
||||
)
|
||||
|
||||
@app.task(
|
||||
bind=True,
|
||||
max_retries=3,
|
||||
default_retry_delay=60,
|
||||
autoretry_for=(ConnectionError, TimeoutError),
|
||||
)
|
||||
def process_payment(self, payment_id: str) -> dict:
|
||||
"""Process payment with automatic retry on transient errors."""
|
||||
try:
|
||||
result = payment_gateway.charge(payment_id)
|
||||
return {"status": "success", "transaction_id": result.id}
|
||||
except PaymentDeclinedError as e:
|
||||
# Don't retry permanent failures
|
||||
return {"status": "declined", "reason": str(e)}
|
||||
except TransientError as e:
|
||||
# Retry with exponential backoff
|
||||
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
|
||||
```
|
||||
|
||||
### Pattern 3: Make Tasks Idempotent
|
||||
|
||||
Workers may retry on crash or timeout. Design for safe re-execution.
|
||||
|
||||
```python
|
||||
@app.task(bind=True)
|
||||
def process_order(self, order_id: str) -> None:
|
||||
"""Process order idempotently."""
|
||||
order = orders_repo.get(order_id)
|
||||
|
||||
# Already processed? Return early
|
||||
if order.status == OrderStatus.COMPLETED:
|
||||
logger.info("Order already processed", order_id=order_id)
|
||||
return
|
||||
|
||||
# Already in progress? Check if we should continue
|
||||
if order.status == OrderStatus.PROCESSING:
|
||||
# Use idempotency key to avoid double-charging
|
||||
pass
|
||||
|
||||
# Process with idempotency key
|
||||
result = payment_provider.charge(
|
||||
amount=order.total,
|
||||
idempotency_key=f"order-{order_id}", # Critical!
|
||||
)
|
||||
|
||||
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
|
||||
```
|
||||
|
||||
**Idempotency Strategies:**
|
||||
|
||||
1. **Check-before-write**: Verify state before action
|
||||
2. **Idempotency keys**: Use unique tokens with external services
|
||||
3. **Upsert patterns**: `INSERT ... ON CONFLICT UPDATE`
|
||||
4. **Deduplication window**: Track processed IDs for N hours
|
||||
|
||||
### Pattern 4: Job State Management
|
||||
|
||||
Persist job state transitions for visibility and debugging.
|
||||
|
||||
```python
|
||||
class JobRepository:
|
||||
"""Repository for managing job state."""
|
||||
|
||||
async def create(self, job: Job) -> Job:
|
||||
"""Create new job record."""
|
||||
await self._db.execute(
|
||||
"""INSERT INTO jobs (id, status, created_at)
|
||||
VALUES ($1, $2, $3)""",
|
||||
job.id, job.status.value, job.created_at,
|
||||
)
|
||||
return job
|
||||
|
||||
async def update_status(
|
||||
self,
|
||||
job_id: str,
|
||||
status: JobStatus,
|
||||
**fields,
|
||||
) -> None:
|
||||
"""Update job status with timestamp."""
|
||||
updates = {"status": status.value, **fields}
|
||||
|
||||
if status == JobStatus.RUNNING:
|
||||
updates["started_at"] = datetime.utcnow()
|
||||
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
|
||||
updates["completed_at"] = datetime.utcnow()
|
||||
|
||||
await self._db.execute(
|
||||
"UPDATE jobs SET status = $1, ... WHERE id = $2",
|
||||
updates, job_id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Job status updated",
|
||||
job_id=job_id,
|
||||
status=status.value,
|
||||
)
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Dead Letter Queue
|
||||
|
||||
Handle permanently failed tasks for manual inspection.
|
||||
|
||||
```python
|
||||
@app.task(bind=True, max_retries=3)
|
||||
def process_webhook(self, webhook_id: str, payload: dict) -> None:
|
||||
"""Process webhook with DLQ for failures."""
|
||||
try:
|
||||
result = send_webhook(payload)
|
||||
if not result.success:
|
||||
raise WebhookFailedError(result.error)
|
||||
except Exception as e:
|
||||
if self.request.retries >= self.max_retries:
|
||||
# Move to dead letter queue for manual inspection
|
||||
dead_letter_queue.send({
|
||||
"task": "process_webhook",
|
||||
"webhook_id": webhook_id,
|
||||
"payload": payload,
|
||||
"error": str(e),
|
||||
"attempts": self.request.retries + 1,
|
||||
"failed_at": datetime.utcnow().isoformat(),
|
||||
})
|
||||
logger.error(
|
||||
"Webhook moved to DLQ after max retries",
|
||||
webhook_id=webhook_id,
|
||||
error=str(e),
|
||||
)
|
||||
return
|
||||
|
||||
# Exponential backoff retry
|
||||
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
|
||||
```
|
||||
|
||||
### Pattern 6: Status Polling Endpoint
|
||||
|
||||
Provide an endpoint for clients to check job status.
|
||||
|
||||
```python
|
||||
from fastapi import FastAPI, HTTPException
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/jobs/{job_id}")
|
||||
async def get_job_status(job_id: str) -> JobStatusResponse:
|
||||
"""Get current status of a background job."""
|
||||
job = await jobs_repo.get(job_id)
|
||||
|
||||
if job is None:
|
||||
raise HTTPException(404, f"Job {job_id} not found")
|
||||
|
||||
return JobStatusResponse(
|
||||
job_id=job.id,
|
||||
status=job.status.value,
|
||||
created_at=job.created_at,
|
||||
started_at=job.started_at,
|
||||
completed_at=job.completed_at,
|
||||
result=job.result if job.status == JobStatus.SUCCEEDED else None,
|
||||
error=job.error if job.status == JobStatus.FAILED else None,
|
||||
# Helpful for clients
|
||||
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 7: Task Chaining and Workflows
|
||||
|
||||
Compose complex workflows from simple tasks.
|
||||
|
||||
```python
|
||||
from celery import chain, group, chord
|
||||
|
||||
# Simple chain: A → B → C
|
||||
workflow = chain(
|
||||
extract_data.s(source_id),
|
||||
transform_data.s(),
|
||||
load_data.s(destination_id),
|
||||
)
|
||||
|
||||
# Parallel execution: A, B, C all at once
|
||||
parallel = group(
|
||||
send_email.s(user_email),
|
||||
send_sms.s(user_phone),
|
||||
update_analytics.s(event_data),
|
||||
)
|
||||
|
||||
# Chord: Run tasks in parallel, then a callback
|
||||
# Process all items, then send completion notification
|
||||
workflow = chord(
|
||||
[process_item.s(item_id) for item_id in item_ids],
|
||||
send_completion_notification.s(batch_id),
|
||||
)
|
||||
|
||||
workflow.apply_async()
|
||||
```
|
||||
|
||||
### Pattern 8: Alternative Task Queues
|
||||
|
||||
Choose the right tool for your needs.
|
||||
|
||||
**RQ (Redis Queue)**: Simple, Redis-based
|
||||
```python
|
||||
from rq import Queue
|
||||
from redis import Redis
|
||||
|
||||
queue = Queue(connection=Redis())
|
||||
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")
|
||||
```
|
||||
|
||||
**Dramatiq**: Modern Celery alternative
|
||||
```python
|
||||
import dramatiq
|
||||
from dramatiq.brokers.redis import RedisBroker
|
||||
|
||||
dramatiq.set_broker(RedisBroker())
|
||||
|
||||
@dramatiq.actor
|
||||
def send_email(to: str, subject: str, body: str) -> None:
|
||||
email_client.send(to, subject, body)
|
||||
```
|
||||
|
||||
**Cloud-native options:**
|
||||
- AWS SQS + Lambda
|
||||
- Google Cloud Tasks
|
||||
- Azure Functions
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Return immediately** - Don't block requests for long operations
|
||||
2. **Persist job state** - Enable status polling and debugging
|
||||
3. **Make tasks idempotent** - Safe to retry on any failure
|
||||
4. **Use idempotency keys** - For external service calls
|
||||
5. **Set timeouts** - Both soft and hard limits
|
||||
6. **Implement DLQ** - Capture permanently failed tasks
|
||||
7. **Log transitions** - Track job state changes
|
||||
8. **Retry appropriately** - Exponential backoff for transient errors
|
||||
9. **Don't retry permanent failures** - Validation errors, invalid credentials
|
||||
10. **Monitor queue depth** - Alert on backlog growth
|
||||
360
plugins/python-development/skills/python-code-style/SKILL.md
Normal file
360
plugins/python-development/skills/python-code-style/SKILL.md
Normal file
@@ -0,0 +1,360 @@
|
||||
---
|
||||
name: python-code-style
|
||||
description: Python code style, linting, formatting, naming conventions, and documentation standards. Use when writing new code, reviewing style, configuring linters, writing docstrings, or establishing project standards.
|
||||
---
|
||||
|
||||
# Python Code Style & Documentation
|
||||
|
||||
Consistent code style and clear documentation make codebases maintainable and collaborative. This skill covers modern Python tooling, naming conventions, and documentation standards.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Setting up linting and formatting for a new project
|
||||
- Writing or reviewing docstrings
|
||||
- Establishing team coding standards
|
||||
- Configuring ruff, mypy, or pyright
|
||||
- Reviewing code for style consistency
|
||||
- Creating project documentation
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Automated Formatting
|
||||
|
||||
Let tools handle formatting debates. Configure once, enforce automatically.
|
||||
|
||||
### 2. Consistent Naming
|
||||
|
||||
Follow PEP 8 conventions with meaningful, descriptive names.
|
||||
|
||||
### 3. Documentation as Code
|
||||
|
||||
Docstrings should be maintained alongside the code they describe.
|
||||
|
||||
### 4. Type Annotations
|
||||
|
||||
Modern Python code should include type hints for all public APIs.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Install modern tooling
|
||||
pip install ruff mypy
|
||||
|
||||
# Configure in pyproject.toml
|
||||
[tool.ruff]
|
||||
line-length = 120
|
||||
target-version = "py312" # Adjust based on your project's minimum Python version
|
||||
|
||||
[tool.mypy]
|
||||
strict = true
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Modern Python Tooling
|
||||
|
||||
Use `ruff` as an all-in-one linter and formatter. It replaces flake8, isort, and black with a single fast tool.
|
||||
|
||||
```toml
|
||||
# pyproject.toml
|
||||
[tool.ruff]
|
||||
line-length = 120
|
||||
target-version = "py312" # Adjust based on your project's minimum Python version
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = [
|
||||
"E", # pycodestyle errors
|
||||
"W", # pycodestyle warnings
|
||||
"F", # pyflakes
|
||||
"I", # isort
|
||||
"B", # flake8-bugbear
|
||||
"C4", # flake8-comprehensions
|
||||
"UP", # pyupgrade
|
||||
"SIM", # flake8-simplify
|
||||
]
|
||||
ignore = ["E501"] # Line length handled by formatter
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = "double"
|
||||
indent-style = "space"
|
||||
```
|
||||
|
||||
Run with:
|
||||
|
||||
```bash
|
||||
ruff check --fix . # Lint and auto-fix
|
||||
ruff format . # Format code
|
||||
```
|
||||
|
||||
### Pattern 2: Type Checking Configuration
|
||||
|
||||
Configure strict type checking for production code.
|
||||
|
||||
```toml
|
||||
# pyproject.toml
|
||||
[tool.mypy]
|
||||
python_version = "3.12"
|
||||
strict = true
|
||||
warn_return_any = true
|
||||
warn_unused_ignores = true
|
||||
disallow_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = "tests.*"
|
||||
disallow_untyped_defs = false
|
||||
```
|
||||
|
||||
Alternative: Use `pyright` for faster checking.
|
||||
|
||||
```toml
|
||||
[tool.pyright]
|
||||
pythonVersion = "3.12"
|
||||
typeCheckingMode = "strict"
|
||||
```
|
||||
|
||||
### Pattern 3: Naming Conventions
|
||||
|
||||
Follow PEP 8 with emphasis on clarity over brevity.
|
||||
|
||||
**Files and Modules:**
|
||||
|
||||
```python
|
||||
# Good: Descriptive snake_case
|
||||
user_repository.py
|
||||
order_processing.py
|
||||
http_client.py
|
||||
|
||||
# Avoid: Abbreviations
|
||||
usr_repo.py
|
||||
ord_proc.py
|
||||
http_cli.py
|
||||
```
|
||||
|
||||
**Classes and Functions:**
|
||||
|
||||
```python
|
||||
# Classes: PascalCase
|
||||
class UserRepository:
|
||||
pass
|
||||
|
||||
class HTTPClientFactory: # Acronyms stay uppercase
|
||||
pass
|
||||
|
||||
# Functions and variables: snake_case
|
||||
def get_user_by_email(email: str) -> User | None:
|
||||
retry_count = 3
|
||||
max_connections = 100
|
||||
```
|
||||
|
||||
**Constants:**
|
||||
|
||||
```python
|
||||
# Module-level constants: SCREAMING_SNAKE_CASE
|
||||
MAX_RETRY_ATTEMPTS = 3
|
||||
DEFAULT_TIMEOUT_SECONDS = 30
|
||||
API_BASE_URL = "https://api.example.com"
|
||||
```
|
||||
|
||||
### Pattern 4: Import Organization
|
||||
|
||||
Group imports in a consistent order: standard library, third-party, local.
|
||||
|
||||
```python
|
||||
# Standard library
|
||||
import os
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
# Third-party packages
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import Column
|
||||
|
||||
# Local imports
|
||||
from myproject.models import User
|
||||
from myproject.services import UserService
|
||||
```
|
||||
|
||||
Use absolute imports exclusively:
|
||||
|
||||
```python
|
||||
# Preferred
|
||||
from myproject.utils import retry_decorator
|
||||
|
||||
# Avoid relative imports
|
||||
from ..utils import retry_decorator
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Google-Style Docstrings
|
||||
|
||||
Write docstrings for all public classes, methods, and functions.
|
||||
|
||||
**Simple Function:**
|
||||
|
||||
```python
|
||||
def get_user(user_id: str) -> User:
|
||||
"""Retrieve a user by their unique identifier."""
|
||||
...
|
||||
```
|
||||
|
||||
**Complex Function:**
|
||||
|
||||
```python
|
||||
def process_batch(
|
||||
items: list[Item],
|
||||
max_workers: int = 4,
|
||||
on_progress: Callable[[int, int], None] | None = None,
|
||||
) -> BatchResult:
|
||||
"""Process items concurrently using a worker pool.
|
||||
|
||||
Processes each item in the batch using the configured number of
|
||||
workers. Progress can be monitored via the optional callback.
|
||||
|
||||
Args:
|
||||
items: The items to process. Must not be empty.
|
||||
max_workers: Maximum concurrent workers. Defaults to 4.
|
||||
on_progress: Optional callback receiving (completed, total) counts.
|
||||
|
||||
Returns:
|
||||
BatchResult containing succeeded items and any failures with
|
||||
their associated exceptions.
|
||||
|
||||
Raises:
|
||||
ValueError: If items is empty.
|
||||
ProcessingError: If the batch cannot be processed.
|
||||
|
||||
Example:
|
||||
>>> result = process_batch(items, max_workers=8)
|
||||
>>> print(f"Processed {len(result.succeeded)} items")
|
||||
"""
|
||||
...
|
||||
```
|
||||
|
||||
**Class Docstring:**
|
||||
|
||||
```python
|
||||
class UserService:
|
||||
"""Service for managing user operations.
|
||||
|
||||
Provides methods for creating, retrieving, updating, and
|
||||
deleting users with proper validation and error handling.
|
||||
|
||||
Attributes:
|
||||
repository: The data access layer for user persistence.
|
||||
logger: Logger instance for operation tracking.
|
||||
|
||||
Example:
|
||||
>>> service = UserService(repository, logger)
|
||||
>>> user = service.create_user(CreateUserInput(...))
|
||||
"""
|
||||
|
||||
def __init__(self, repository: UserRepository, logger: Logger) -> None:
|
||||
"""Initialize the user service.
|
||||
|
||||
Args:
|
||||
repository: Data access layer for users.
|
||||
logger: Logger for tracking operations.
|
||||
"""
|
||||
self.repository = repository
|
||||
self.logger = logger
|
||||
```
|
||||
|
||||
### Pattern 6: Line Length and Formatting
|
||||
|
||||
Set line length to 120 characters for modern displays while maintaining readability.
|
||||
|
||||
```python
|
||||
# Good: Readable line breaks
|
||||
def create_user(
|
||||
email: str,
|
||||
name: str,
|
||||
role: UserRole = UserRole.MEMBER,
|
||||
notify: bool = True,
|
||||
) -> User:
|
||||
...
|
||||
|
||||
# Good: Chain method calls clearly
|
||||
result = (
|
||||
db.query(User)
|
||||
.filter(User.active == True)
|
||||
.order_by(User.created_at.desc())
|
||||
.limit(10)
|
||||
.all()
|
||||
)
|
||||
|
||||
# Good: Format long strings
|
||||
error_message = (
|
||||
f"Failed to process user {user_id}: "
|
||||
f"received status {response.status_code} "
|
||||
f"with body {response.text[:100]}"
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 7: Project Documentation
|
||||
|
||||
**README Structure:**
|
||||
|
||||
```markdown
|
||||
# Project Name
|
||||
|
||||
Brief description of what the project does.
|
||||
|
||||
## Installation
|
||||
|
||||
\`\`\`bash
|
||||
pip install myproject
|
||||
\`\`\`
|
||||
|
||||
## Quick Start
|
||||
|
||||
\`\`\`python
|
||||
from myproject import Client
|
||||
|
||||
client = Client(api_key="...")
|
||||
result = client.process(data)
|
||||
\`\`\`
|
||||
|
||||
## Configuration
|
||||
|
||||
Document environment variables and configuration options.
|
||||
|
||||
## Development
|
||||
|
||||
\`\`\`bash
|
||||
pip install -e ".[dev]"
|
||||
pytest
|
||||
\`\`\`
|
||||
```
|
||||
|
||||
**CHANGELOG Format (Keep a Changelog):**
|
||||
|
||||
```markdown
|
||||
# Changelog
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- New feature X
|
||||
|
||||
### Changed
|
||||
- Modified behavior of Y
|
||||
|
||||
### Fixed
|
||||
- Bug in Z
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Use ruff** - Single tool for linting and formatting
|
||||
2. **Enable strict mypy** - Catch type errors before runtime
|
||||
3. **120 character lines** - Modern standard for readability
|
||||
4. **Descriptive names** - Clarity over brevity
|
||||
5. **Absolute imports** - More maintainable than relative
|
||||
6. **Google-style docstrings** - Consistent, readable documentation
|
||||
7. **Document public APIs** - Every public function needs a docstring
|
||||
8. **Keep docs updated** - Treat documentation as code
|
||||
9. **Automate in CI** - Run linters on every commit
|
||||
10. **Target Python 3.10+** - For new projects, Python 3.12+ is recommended for modern language features
|
||||
368
plugins/python-development/skills/python-configuration/SKILL.md
Normal file
368
plugins/python-development/skills/python-configuration/SKILL.md
Normal file
@@ -0,0 +1,368 @@
|
||||
---
|
||||
name: python-configuration
|
||||
description: Python configuration management via environment variables and typed settings. Use when externalizing config, setting up pydantic-settings, managing secrets, or implementing environment-specific behavior.
|
||||
---
|
||||
|
||||
# Python Configuration Management
|
||||
|
||||
Externalize configuration from code using environment variables and typed settings. Well-managed configuration enables the same code to run in any environment without modification.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Setting up a new project's configuration system
|
||||
- Migrating from hardcoded values to environment variables
|
||||
- Implementing pydantic-settings for typed configuration
|
||||
- Managing secrets and sensitive values
|
||||
- Creating environment-specific settings (dev/staging/prod)
|
||||
- Validating configuration at application startup
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Externalized Configuration
|
||||
|
||||
All environment-specific values (URLs, secrets, feature flags) come from environment variables, not code.
|
||||
|
||||
### 2. Typed Settings
|
||||
|
||||
Parse and validate configuration into typed objects at startup, not scattered throughout code.
|
||||
|
||||
### 3. Fail Fast
|
||||
|
||||
Validate all required configuration at application boot. Missing config should crash immediately with a clear message.
|
||||
|
||||
### 4. Sensible Defaults
|
||||
|
||||
Provide reasonable defaults for local development while requiring explicit values for sensitive settings.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field
|
||||
|
||||
class Settings(BaseSettings):
|
||||
database_url: str = Field(alias="DATABASE_URL")
|
||||
api_key: str = Field(alias="API_KEY")
|
||||
debug: bool = Field(default=False, alias="DEBUG")
|
||||
|
||||
settings = Settings() # Loads from environment
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Typed Settings with Pydantic
|
||||
|
||||
Create a central settings class that loads and validates all configuration.
|
||||
|
||||
```python
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field, PostgresDsn, ValidationError
|
||||
import sys
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Application configuration loaded from environment variables."""
|
||||
|
||||
# Database
|
||||
db_host: str = Field(alias="DB_HOST")
|
||||
db_port: int = Field(default=5432, alias="DB_PORT")
|
||||
db_name: str = Field(alias="DB_NAME")
|
||||
db_user: str = Field(alias="DB_USER")
|
||||
db_password: str = Field(alias="DB_PASSWORD")
|
||||
|
||||
# Redis
|
||||
redis_url: str = Field(default="redis://localhost:6379", alias="REDIS_URL")
|
||||
|
||||
# API Keys
|
||||
api_secret_key: str = Field(alias="API_SECRET_KEY")
|
||||
|
||||
# Feature flags
|
||||
enable_new_feature: bool = Field(default=False, alias="ENABLE_NEW_FEATURE")
|
||||
|
||||
model_config = {
|
||||
"env_file": ".env",
|
||||
"env_file_encoding": "utf-8",
|
||||
}
|
||||
|
||||
# Create singleton instance at module load
|
||||
try:
|
||||
settings = Settings()
|
||||
except ValidationError as e:
|
||||
print(f"Configuration error:\n{e}")
|
||||
sys.exit(1)
|
||||
```
|
||||
|
||||
Import `settings` throughout your application:
|
||||
|
||||
```python
|
||||
from myapp.config import settings
|
||||
|
||||
def get_database_connection():
|
||||
return connect(
|
||||
host=settings.db_host,
|
||||
port=settings.db_port,
|
||||
database=settings.db_name,
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 2: Fail Fast on Missing Configuration
|
||||
|
||||
Required settings should crash the application immediately with a clear error.
|
||||
|
||||
```python
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field, ValidationError
|
||||
import sys
|
||||
|
||||
class Settings(BaseSettings):
|
||||
# Required - no default means it must be set
|
||||
api_key: str = Field(alias="API_KEY")
|
||||
database_url: str = Field(alias="DATABASE_URL")
|
||||
|
||||
# Optional with defaults
|
||||
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
|
||||
|
||||
try:
|
||||
settings = Settings()
|
||||
except ValidationError as e:
|
||||
print("=" * 60)
|
||||
print("CONFIGURATION ERROR")
|
||||
print("=" * 60)
|
||||
for error in e.errors():
|
||||
field = error["loc"][0]
|
||||
print(f" - {field}: {error['msg']}")
|
||||
print("\nPlease set the required environment variables.")
|
||||
sys.exit(1)
|
||||
```
|
||||
|
||||
A clear error at startup is better than a cryptic `None` failure mid-request.
|
||||
|
||||
### Pattern 3: Local Development Defaults
|
||||
|
||||
Provide sensible defaults for local development while requiring explicit values for secrets.
|
||||
|
||||
```python
|
||||
class Settings(BaseSettings):
|
||||
# Has local default, but prod will override
|
||||
db_host: str = Field(default="localhost", alias="DB_HOST")
|
||||
db_port: int = Field(default=5432, alias="DB_PORT")
|
||||
|
||||
# Always required - no default for secrets
|
||||
db_password: str = Field(alias="DB_PASSWORD")
|
||||
api_secret_key: str = Field(alias="API_SECRET_KEY")
|
||||
|
||||
# Development convenience
|
||||
debug: bool = Field(default=False, alias="DEBUG")
|
||||
|
||||
model_config = {"env_file": ".env"}
|
||||
```
|
||||
|
||||
Create a `.env` file for local development (never commit this):
|
||||
|
||||
```bash
|
||||
# .env (add to .gitignore)
|
||||
DB_PASSWORD=local_dev_password
|
||||
API_SECRET_KEY=dev-secret-key
|
||||
DEBUG=true
|
||||
```
|
||||
|
||||
### Pattern 4: Namespaced Environment Variables
|
||||
|
||||
Prefix related variables for clarity and easy debugging.
|
||||
|
||||
```bash
|
||||
# Database configuration
|
||||
DB_HOST=localhost
|
||||
DB_PORT=5432
|
||||
DB_NAME=myapp
|
||||
DB_USER=admin
|
||||
DB_PASSWORD=secret
|
||||
|
||||
# Redis configuration
|
||||
REDIS_URL=redis://localhost:6379
|
||||
REDIS_MAX_CONNECTIONS=10
|
||||
|
||||
# Authentication
|
||||
AUTH_SECRET_KEY=your-secret-key
|
||||
AUTH_TOKEN_EXPIRY_SECONDS=3600
|
||||
AUTH_ALGORITHM=HS256
|
||||
|
||||
# Feature flags
|
||||
FEATURE_NEW_CHECKOUT=true
|
||||
FEATURE_BETA_UI=false
|
||||
```
|
||||
|
||||
Makes `env | grep DB_` useful for debugging.
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Type Coercion
|
||||
|
||||
Pydantic handles common conversions automatically.
|
||||
|
||||
```python
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field, field_validator
|
||||
|
||||
class Settings(BaseSettings):
|
||||
# Automatically converts "true", "1", "yes" to True
|
||||
debug: bool = False
|
||||
|
||||
# Automatically converts string to int
|
||||
max_connections: int = 100
|
||||
|
||||
# Parse comma-separated string to list
|
||||
allowed_hosts: list[str] = Field(default_factory=list)
|
||||
|
||||
@field_validator("allowed_hosts", mode="before")
|
||||
@classmethod
|
||||
def parse_allowed_hosts(cls, v: str | list[str]) -> list[str]:
|
||||
if isinstance(v, str):
|
||||
return [host.strip() for host in v.split(",") if host.strip()]
|
||||
return v
|
||||
```
|
||||
|
||||
Usage:
|
||||
|
||||
```bash
|
||||
ALLOWED_HOSTS=example.com,api.example.com,localhost
|
||||
MAX_CONNECTIONS=50
|
||||
DEBUG=true
|
||||
```
|
||||
|
||||
### Pattern 6: Environment-Specific Configuration
|
||||
|
||||
Use an environment enum to switch behavior.
|
||||
|
||||
```python
|
||||
from enum import Enum
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field, computed_field
|
||||
|
||||
class Environment(str, Enum):
|
||||
LOCAL = "local"
|
||||
STAGING = "staging"
|
||||
PRODUCTION = "production"
|
||||
|
||||
class Settings(BaseSettings):
|
||||
environment: Environment = Field(
|
||||
default=Environment.LOCAL,
|
||||
alias="ENVIRONMENT",
|
||||
)
|
||||
|
||||
# Settings that vary by environment
|
||||
log_level: str = Field(default="DEBUG", alias="LOG_LEVEL")
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def is_production(self) -> bool:
|
||||
return self.environment == Environment.PRODUCTION
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def is_local(self) -> bool:
|
||||
return self.environment == Environment.LOCAL
|
||||
|
||||
# Usage
|
||||
if settings.is_production:
|
||||
configure_production_logging()
|
||||
else:
|
||||
configure_debug_logging()
|
||||
```
|
||||
|
||||
### Pattern 7: Nested Configuration Groups
|
||||
|
||||
Organize related settings into nested models.
|
||||
|
||||
```python
|
||||
from pydantic import BaseModel
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
class DatabaseSettings(BaseModel):
|
||||
host: str = "localhost"
|
||||
port: int = 5432
|
||||
name: str
|
||||
user: str
|
||||
password: str
|
||||
|
||||
class RedisSettings(BaseModel):
|
||||
url: str = "redis://localhost:6379"
|
||||
max_connections: int = 10
|
||||
|
||||
class Settings(BaseSettings):
|
||||
database: DatabaseSettings
|
||||
redis: RedisSettings
|
||||
debug: bool = False
|
||||
|
||||
model_config = {
|
||||
"env_nested_delimiter": "__",
|
||||
"env_file": ".env",
|
||||
}
|
||||
```
|
||||
|
||||
Environment variables use double underscore for nesting:
|
||||
|
||||
```bash
|
||||
DATABASE__HOST=db.example.com
|
||||
DATABASE__PORT=5432
|
||||
DATABASE__NAME=myapp
|
||||
DATABASE__USER=admin
|
||||
DATABASE__PASSWORD=secret
|
||||
REDIS__URL=redis://redis.example.com:6379
|
||||
```
|
||||
|
||||
### Pattern 8: Secrets from Files
|
||||
|
||||
For container environments, read secrets from mounted files.
|
||||
|
||||
```python
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field
|
||||
from pathlib import Path
|
||||
|
||||
class Settings(BaseSettings):
|
||||
# Read from environment variable or file
|
||||
db_password: str = Field(alias="DB_PASSWORD")
|
||||
|
||||
model_config = {
|
||||
"secrets_dir": "/run/secrets", # Docker secrets location
|
||||
}
|
||||
```
|
||||
|
||||
Pydantic will look for `/run/secrets/db_password` if the env var isn't set.
|
||||
|
||||
### Pattern 9: Configuration Validation
|
||||
|
||||
Add custom validation for complex requirements.
|
||||
|
||||
```python
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field, model_validator
|
||||
|
||||
class Settings(BaseSettings):
|
||||
db_host: str = Field(alias="DB_HOST")
|
||||
db_port: int = Field(alias="DB_PORT")
|
||||
read_replica_host: str | None = Field(default=None, alias="READ_REPLICA_HOST")
|
||||
read_replica_port: int = Field(default=5432, alias="READ_REPLICA_PORT")
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_replica_settings(self):
|
||||
if self.read_replica_host and self.read_replica_port == self.db_port:
|
||||
if self.read_replica_host == self.db_host:
|
||||
raise ValueError(
|
||||
"Read replica cannot be the same as primary database"
|
||||
)
|
||||
return self
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Never hardcode config** - All environment-specific values from env vars
|
||||
2. **Use typed settings** - Pydantic-settings with validation
|
||||
3. **Fail fast** - Crash on missing required config at startup
|
||||
4. **Provide dev defaults** - Make local development easy
|
||||
5. **Never commit secrets** - Use `.env` files (gitignored) or secret managers
|
||||
6. **Namespace variables** - `DB_HOST`, `REDIS_URL` for clarity
|
||||
7. **Import settings singleton** - Don't call `os.getenv()` throughout code
|
||||
8. **Document all variables** - README should list required env vars
|
||||
9. **Validate early** - Check config correctness at boot time
|
||||
10. **Use secrets_dir** - Support mounted secrets in containers
|
||||
@@ -0,0 +1,411 @@
|
||||
---
|
||||
name: python-design-patterns
|
||||
description: Python design patterns including KISS, Separation of Concerns, Single Responsibility, and composition over inheritance. Use when making architecture decisions, refactoring code structure, or evaluating when abstractions are appropriate.
|
||||
---
|
||||
|
||||
# Python Design Patterns
|
||||
|
||||
Write maintainable Python code using fundamental design principles. These patterns help you build systems that are easy to understand, test, and modify.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Designing new components or services
|
||||
- Refactoring complex or tangled code
|
||||
- Deciding whether to create an abstraction
|
||||
- Choosing between inheritance and composition
|
||||
- Evaluating code complexity and coupling
|
||||
- Planning modular architectures
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. KISS (Keep It Simple)
|
||||
|
||||
Choose the simplest solution that works. Complexity must be justified by concrete requirements.
|
||||
|
||||
### 2. Single Responsibility (SRP)
|
||||
|
||||
Each unit should have one reason to change. Separate concerns into focused components.
|
||||
|
||||
### 3. Composition Over Inheritance
|
||||
|
||||
Build behavior by combining objects, not extending classes.
|
||||
|
||||
### 4. Rule of Three
|
||||
|
||||
Wait until you have three instances before abstracting. Duplication is often better than premature abstraction.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
# Simple beats clever
|
||||
# Instead of a factory/registry pattern:
|
||||
FORMATTERS = {"json": JsonFormatter, "csv": CsvFormatter}
|
||||
|
||||
def get_formatter(name: str) -> Formatter:
|
||||
return FORMATTERS[name]()
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: KISS - Keep It Simple
|
||||
|
||||
Before adding complexity, ask: does a simpler solution work?
|
||||
|
||||
```python
|
||||
# Over-engineered: Factory with registration
|
||||
class OutputFormatterFactory:
|
||||
_formatters: dict[str, type[Formatter]] = {}
|
||||
|
||||
@classmethod
|
||||
def register(cls, name: str):
|
||||
def decorator(formatter_cls):
|
||||
cls._formatters[name] = formatter_cls
|
||||
return formatter_cls
|
||||
return decorator
|
||||
|
||||
@classmethod
|
||||
def create(cls, name: str) -> Formatter:
|
||||
return cls._formatters[name]()
|
||||
|
||||
@OutputFormatterFactory.register("json")
|
||||
class JsonFormatter(Formatter):
|
||||
...
|
||||
|
||||
# Simple: Just use a dictionary
|
||||
FORMATTERS = {
|
||||
"json": JsonFormatter,
|
||||
"csv": CsvFormatter,
|
||||
"xml": XmlFormatter,
|
||||
}
|
||||
|
||||
def get_formatter(name: str) -> Formatter:
|
||||
"""Get formatter by name."""
|
||||
if name not in FORMATTERS:
|
||||
raise ValueError(f"Unknown format: {name}")
|
||||
return FORMATTERS[name]()
|
||||
```
|
||||
|
||||
The factory pattern adds code without adding value here. Save patterns for when they solve real problems.
|
||||
|
||||
### Pattern 2: Single Responsibility Principle
|
||||
|
||||
Each class or function should have one reason to change.
|
||||
|
||||
```python
|
||||
# BAD: Handler does everything
|
||||
class UserHandler:
|
||||
async def create_user(self, request: Request) -> Response:
|
||||
# HTTP parsing
|
||||
data = await request.json()
|
||||
|
||||
# Validation
|
||||
if not data.get("email"):
|
||||
return Response({"error": "email required"}, status=400)
|
||||
|
||||
# Database access
|
||||
user = await db.execute(
|
||||
"INSERT INTO users (email, name) VALUES ($1, $2) RETURNING *",
|
||||
data["email"], data["name"]
|
||||
)
|
||||
|
||||
# Response formatting
|
||||
return Response({"id": user.id, "email": user.email}, status=201)
|
||||
|
||||
# GOOD: Separated concerns
|
||||
class UserService:
|
||||
"""Business logic only."""
|
||||
|
||||
def __init__(self, repo: UserRepository) -> None:
|
||||
self._repo = repo
|
||||
|
||||
async def create_user(self, data: CreateUserInput) -> User:
|
||||
# Only business rules here
|
||||
user = User(email=data.email, name=data.name)
|
||||
return await self._repo.save(user)
|
||||
|
||||
class UserHandler:
|
||||
"""HTTP concerns only."""
|
||||
|
||||
def __init__(self, service: UserService) -> None:
|
||||
self._service = service
|
||||
|
||||
async def create_user(self, request: Request) -> Response:
|
||||
data = CreateUserInput(**(await request.json()))
|
||||
user = await self._service.create_user(data)
|
||||
return Response(user.to_dict(), status=201)
|
||||
```
|
||||
|
||||
Now HTTP changes don't affect business logic, and vice versa.
|
||||
|
||||
### Pattern 3: Separation of Concerns
|
||||
|
||||
Organize code into distinct layers with clear responsibilities.
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ API Layer (handlers) │
|
||||
│ - Parse requests │
|
||||
│ - Call services │
|
||||
│ - Format responses │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ Service Layer (business logic) │
|
||||
│ - Domain rules and validation │
|
||||
│ - Orchestrate operations │
|
||||
│ - Pure functions where possible │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ Repository Layer (data access) │
|
||||
│ - SQL queries │
|
||||
│ - External API calls │
|
||||
│ - Cache operations │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
Each layer depends only on layers below it:
|
||||
|
||||
```python
|
||||
# Repository: Data access
|
||||
class UserRepository:
|
||||
async def get_by_id(self, user_id: str) -> User | None:
|
||||
row = await self._db.fetchrow(
|
||||
"SELECT * FROM users WHERE id = $1", user_id
|
||||
)
|
||||
return User(**row) if row else None
|
||||
|
||||
# Service: Business logic
|
||||
class UserService:
|
||||
def __init__(self, repo: UserRepository) -> None:
|
||||
self._repo = repo
|
||||
|
||||
async def get_user(self, user_id: str) -> User:
|
||||
user = await self._repo.get_by_id(user_id)
|
||||
if user is None:
|
||||
raise UserNotFoundError(user_id)
|
||||
return user
|
||||
|
||||
# Handler: HTTP concerns
|
||||
@app.get("/users/{user_id}")
|
||||
async def get_user(user_id: str) -> UserResponse:
|
||||
user = await user_service.get_user(user_id)
|
||||
return UserResponse.from_user(user)
|
||||
```
|
||||
|
||||
### Pattern 4: Composition Over Inheritance
|
||||
|
||||
Build behavior by combining objects rather than inheriting.
|
||||
|
||||
```python
|
||||
# Inheritance: Rigid and hard to test
|
||||
class EmailNotificationService(NotificationService):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._smtp = SmtpClient() # Hard to mock
|
||||
|
||||
def notify(self, user: User, message: str) -> None:
|
||||
self._smtp.send(user.email, message)
|
||||
|
||||
# Composition: Flexible and testable
|
||||
class NotificationService:
|
||||
"""Send notifications via multiple channels."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
email_sender: EmailSender,
|
||||
sms_sender: SmsSender | None = None,
|
||||
push_sender: PushSender | None = None,
|
||||
) -> None:
|
||||
self._email = email_sender
|
||||
self._sms = sms_sender
|
||||
self._push = push_sender
|
||||
|
||||
async def notify(
|
||||
self,
|
||||
user: User,
|
||||
message: str,
|
||||
channels: set[str] | None = None,
|
||||
) -> None:
|
||||
channels = channels or {"email"}
|
||||
|
||||
if "email" in channels:
|
||||
await self._email.send(user.email, message)
|
||||
|
||||
if "sms" in channels and self._sms and user.phone:
|
||||
await self._sms.send(user.phone, message)
|
||||
|
||||
if "push" in channels and self._push and user.device_token:
|
||||
await self._push.send(user.device_token, message)
|
||||
|
||||
# Easy to test with fakes
|
||||
service = NotificationService(
|
||||
email_sender=FakeEmailSender(),
|
||||
sms_sender=FakeSmsSender(),
|
||||
)
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Rule of Three
|
||||
|
||||
Wait until you have three instances before abstracting.
|
||||
|
||||
```python
|
||||
# Two similar functions? Don't abstract yet
|
||||
def process_orders(orders: list[Order]) -> list[Result]:
|
||||
results = []
|
||||
for order in orders:
|
||||
validated = validate_order(order)
|
||||
result = process_validated_order(validated)
|
||||
results.append(result)
|
||||
return results
|
||||
|
||||
def process_returns(returns: list[Return]) -> list[Result]:
|
||||
results = []
|
||||
for ret in returns:
|
||||
validated = validate_return(ret)
|
||||
result = process_validated_return(validated)
|
||||
results.append(result)
|
||||
return results
|
||||
|
||||
# These look similar, but wait! Are they actually the same?
|
||||
# Different validation, different processing, different errors...
|
||||
# Duplication is often better than the wrong abstraction
|
||||
|
||||
# Only after a third case, consider if there's a real pattern
|
||||
# But even then, sometimes explicit is better than abstract
|
||||
```
|
||||
|
||||
### Pattern 6: Function Size Guidelines
|
||||
|
||||
Keep functions focused. Extract when a function:
|
||||
|
||||
- Exceeds 20-50 lines (varies by complexity)
|
||||
- Serves multiple distinct purposes
|
||||
- Has deeply nested logic (3+ levels)
|
||||
|
||||
```python
|
||||
# Too long, multiple concerns mixed
|
||||
def process_order(order: Order) -> Result:
|
||||
# 50 lines of validation...
|
||||
# 30 lines of inventory check...
|
||||
# 40 lines of payment processing...
|
||||
# 20 lines of notification...
|
||||
pass
|
||||
|
||||
# Better: Composed from focused functions
|
||||
def process_order(order: Order) -> Result:
|
||||
"""Process a customer order through the complete workflow."""
|
||||
validate_order(order)
|
||||
reserve_inventory(order)
|
||||
payment_result = charge_payment(order)
|
||||
send_confirmation(order, payment_result)
|
||||
return Result(success=True, order_id=order.id)
|
||||
```
|
||||
|
||||
### Pattern 7: Dependency Injection
|
||||
|
||||
Pass dependencies through constructors for testability.
|
||||
|
||||
```python
|
||||
from typing import Protocol
|
||||
|
||||
class Logger(Protocol):
|
||||
def info(self, msg: str, **kwargs) -> None: ...
|
||||
def error(self, msg: str, **kwargs) -> None: ...
|
||||
|
||||
class Cache(Protocol):
|
||||
async def get(self, key: str) -> str | None: ...
|
||||
async def set(self, key: str, value: str, ttl: int) -> None: ...
|
||||
|
||||
class UserService:
|
||||
"""Service with injected dependencies."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repository: UserRepository,
|
||||
cache: Cache,
|
||||
logger: Logger,
|
||||
) -> None:
|
||||
self._repo = repository
|
||||
self._cache = cache
|
||||
self._logger = logger
|
||||
|
||||
async def get_user(self, user_id: str) -> User:
|
||||
# Check cache first
|
||||
cached = await self._cache.get(f"user:{user_id}")
|
||||
if cached:
|
||||
self._logger.info("Cache hit", user_id=user_id)
|
||||
return User.from_json(cached)
|
||||
|
||||
# Fetch from database
|
||||
user = await self._repo.get_by_id(user_id)
|
||||
if user:
|
||||
await self._cache.set(f"user:{user_id}", user.to_json(), ttl=300)
|
||||
|
||||
return user
|
||||
|
||||
# Production
|
||||
service = UserService(
|
||||
repository=PostgresUserRepository(db),
|
||||
cache=RedisCache(redis),
|
||||
logger=StructlogLogger(),
|
||||
)
|
||||
|
||||
# Testing
|
||||
service = UserService(
|
||||
repository=InMemoryUserRepository(),
|
||||
cache=FakeCache(),
|
||||
logger=NullLogger(),
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 8: Avoiding Common Anti-Patterns
|
||||
|
||||
**Don't expose internal types:**
|
||||
|
||||
```python
|
||||
# BAD: Leaking ORM model to API
|
||||
@app.get("/users/{id}")
|
||||
def get_user(id: str) -> UserModel: # SQLAlchemy model
|
||||
return db.query(UserModel).get(id)
|
||||
|
||||
# GOOD: Use response schemas
|
||||
@app.get("/users/{id}")
|
||||
def get_user(id: str) -> UserResponse:
|
||||
user = db.query(UserModel).get(id)
|
||||
return UserResponse.from_orm(user)
|
||||
```
|
||||
|
||||
**Don't mix I/O with business logic:**
|
||||
|
||||
```python
|
||||
# BAD: SQL embedded in business logic
|
||||
def calculate_discount(user_id: str) -> float:
|
||||
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
|
||||
orders = db.query("SELECT * FROM orders WHERE user_id = ?", user_id)
|
||||
# Business logic mixed with data access
|
||||
|
||||
# GOOD: Repository pattern
|
||||
def calculate_discount(user: User, order_history: list[Order]) -> float:
|
||||
# Pure business logic, easily testable
|
||||
if len(order_history) > 10:
|
||||
return 0.15
|
||||
return 0.0
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Keep it simple** - Choose the simplest solution that works
|
||||
2. **Single responsibility** - Each unit has one reason to change
|
||||
3. **Separate concerns** - Distinct layers with clear purposes
|
||||
4. **Compose, don't inherit** - Combine objects for flexibility
|
||||
5. **Rule of three** - Wait before abstracting
|
||||
6. **Keep functions small** - 20-50 lines (varies by complexity), one purpose
|
||||
7. **Inject dependencies** - Constructor injection for testability
|
||||
8. **Delete before abstracting** - Remove dead code, then consider patterns
|
||||
9. **Test each layer** - Isolated tests for each concern
|
||||
10. **Explicit over clever** - Readable code beats elegant code
|
||||
359
plugins/python-development/skills/python-error-handling/SKILL.md
Normal file
359
plugins/python-development/skills/python-error-handling/SKILL.md
Normal file
@@ -0,0 +1,359 @@
|
||||
---
|
||||
name: python-error-handling
|
||||
description: Python error handling patterns including input validation, exception hierarchies, and partial failure handling. Use when implementing validation logic, designing exception strategies, handling batch processing failures, or building robust APIs.
|
||||
---
|
||||
|
||||
# Python Error Handling
|
||||
|
||||
Build robust Python applications with proper input validation, meaningful exceptions, and graceful failure handling. Good error handling makes debugging easier and systems more reliable.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Validating user input and API parameters
|
||||
- Designing exception hierarchies for applications
|
||||
- Handling partial failures in batch operations
|
||||
- Converting external data to domain types
|
||||
- Building user-friendly error messages
|
||||
- Implementing fail-fast validation patterns
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Fail Fast
|
||||
|
||||
Validate inputs early, before expensive operations. Report all validation errors at once when possible.
|
||||
|
||||
### 2. Meaningful Exceptions
|
||||
|
||||
Use appropriate exception types with context. Messages should explain what failed, why, and how to fix it.
|
||||
|
||||
### 3. Partial Failures
|
||||
|
||||
In batch operations, don't let one failure abort everything. Track successes and failures separately.
|
||||
|
||||
### 4. Preserve Context
|
||||
|
||||
Chain exceptions to maintain the full error trail for debugging.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
def fetch_page(url: str, page_size: int) -> Page:
|
||||
if not url:
|
||||
raise ValueError("'url' is required")
|
||||
if not 1 <= page_size <= 100:
|
||||
raise ValueError(f"'page_size' must be 1-100, got {page_size}")
|
||||
# Now safe to proceed...
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Early Input Validation
|
||||
|
||||
Validate all inputs at API boundaries before any processing begins.
|
||||
|
||||
```python
|
||||
def process_order(
|
||||
order_id: str,
|
||||
quantity: int,
|
||||
discount_percent: float,
|
||||
) -> OrderResult:
|
||||
"""Process an order with validation."""
|
||||
# Validate required fields
|
||||
if not order_id:
|
||||
raise ValueError("'order_id' is required")
|
||||
|
||||
# Validate ranges
|
||||
if quantity <= 0:
|
||||
raise ValueError(f"'quantity' must be positive, got {quantity}")
|
||||
|
||||
if not 0 <= discount_percent <= 100:
|
||||
raise ValueError(
|
||||
f"'discount_percent' must be 0-100, got {discount_percent}"
|
||||
)
|
||||
|
||||
# Validation passed, proceed with processing
|
||||
return _process_validated_order(order_id, quantity, discount_percent)
|
||||
```
|
||||
|
||||
### Pattern 2: Convert to Domain Types Early
|
||||
|
||||
Parse strings and external data into typed domain objects at system boundaries.
|
||||
|
||||
```python
|
||||
from enum import Enum
|
||||
|
||||
class OutputFormat(Enum):
|
||||
JSON = "json"
|
||||
CSV = "csv"
|
||||
PARQUET = "parquet"
|
||||
|
||||
def parse_output_format(value: str) -> OutputFormat:
|
||||
"""Parse string to OutputFormat enum.
|
||||
|
||||
Args:
|
||||
value: Format string from user input.
|
||||
|
||||
Returns:
|
||||
Validated OutputFormat enum member.
|
||||
|
||||
Raises:
|
||||
ValueError: If format is not recognized.
|
||||
"""
|
||||
try:
|
||||
return OutputFormat(value.lower())
|
||||
except ValueError:
|
||||
valid_formats = [f.value for f in OutputFormat]
|
||||
raise ValueError(
|
||||
f"Invalid format '{value}'. "
|
||||
f"Valid options: {', '.join(valid_formats)}"
|
||||
)
|
||||
|
||||
# Usage at API boundary
|
||||
def export_data(data: list[dict], format_str: str) -> bytes:
|
||||
output_format = parse_output_format(format_str) # Fail fast
|
||||
# Rest of function uses typed OutputFormat
|
||||
...
|
||||
```
|
||||
|
||||
### Pattern 3: Pydantic for Complex Validation
|
||||
|
||||
Use Pydantic models for structured input validation with automatic error messages.
|
||||
|
||||
```python
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
class CreateUserInput(BaseModel):
|
||||
"""Input model for user creation."""
|
||||
|
||||
email: str = Field(..., min_length=5, max_length=255)
|
||||
name: str = Field(..., min_length=1, max_length=100)
|
||||
age: int = Field(ge=0, le=150)
|
||||
|
||||
@field_validator("email")
|
||||
@classmethod
|
||||
def validate_email_format(cls, v: str) -> str:
|
||||
if "@" not in v or "." not in v.split("@")[-1]:
|
||||
raise ValueError("Invalid email format")
|
||||
return v.lower()
|
||||
|
||||
@field_validator("name")
|
||||
@classmethod
|
||||
def normalize_name(cls, v: str) -> str:
|
||||
return v.strip().title()
|
||||
|
||||
# Usage
|
||||
try:
|
||||
user_input = CreateUserInput(
|
||||
email="user@example.com",
|
||||
name="john doe",
|
||||
age=25,
|
||||
)
|
||||
except ValidationError as e:
|
||||
# Pydantic provides detailed error information
|
||||
print(e.errors())
|
||||
```
|
||||
|
||||
### Pattern 4: Map Errors to Standard Exceptions
|
||||
|
||||
Use Python's built-in exception types appropriately, adding context as needed.
|
||||
|
||||
| Failure Type | Exception | Example |
|
||||
|--------------|-----------|---------|
|
||||
| Invalid input | `ValueError` | Bad parameter values |
|
||||
| Wrong type | `TypeError` | Expected string, got int |
|
||||
| Missing item | `KeyError` | Dict key not found |
|
||||
| Operational failure | `RuntimeError` | Service unavailable |
|
||||
| Timeout | `TimeoutError` | Operation took too long |
|
||||
| File not found | `FileNotFoundError` | Path doesn't exist |
|
||||
| Permission denied | `PermissionError` | Access forbidden |
|
||||
|
||||
```python
|
||||
# Good: Specific exception with context
|
||||
raise ValueError(f"'page_size' must be 1-100, got {page_size}")
|
||||
|
||||
# Avoid: Generic exception, no context
|
||||
raise Exception("Invalid parameter")
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Custom Exceptions with Context
|
||||
|
||||
Create domain-specific exceptions that carry structured information.
|
||||
|
||||
```python
|
||||
class ApiError(Exception):
|
||||
"""Base exception for API errors."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
status_code: int,
|
||||
response_body: str | None = None,
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self.response_body = response_body
|
||||
super().__init__(message)
|
||||
|
||||
class RateLimitError(ApiError):
|
||||
"""Raised when rate limit is exceeded."""
|
||||
|
||||
def __init__(self, retry_after: int) -> None:
|
||||
self.retry_after = retry_after
|
||||
super().__init__(
|
||||
f"Rate limit exceeded. Retry after {retry_after}s",
|
||||
status_code=429,
|
||||
)
|
||||
|
||||
# Usage
|
||||
def handle_response(response: Response) -> dict:
|
||||
match response.status_code:
|
||||
case 200:
|
||||
return response.json()
|
||||
case 401:
|
||||
raise ApiError("Invalid credentials", 401)
|
||||
case 404:
|
||||
raise ApiError(f"Resource not found: {response.url}", 404)
|
||||
case 429:
|
||||
retry_after = int(response.headers.get("Retry-After", 60))
|
||||
raise RateLimitError(retry_after)
|
||||
case code if 400 <= code < 500:
|
||||
raise ApiError(f"Client error: {response.text}", code)
|
||||
case code if code >= 500:
|
||||
raise ApiError(f"Server error: {response.text}", code)
|
||||
```
|
||||
|
||||
### Pattern 6: Exception Chaining
|
||||
|
||||
Preserve the original exception when re-raising to maintain the debug trail.
|
||||
|
||||
```python
|
||||
import httpx
|
||||
|
||||
class ServiceError(Exception):
|
||||
"""High-level service operation failed."""
|
||||
pass
|
||||
|
||||
def upload_file(path: str) -> str:
|
||||
"""Upload file and return URL."""
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
response = httpx.post("https://upload.example.com", files={"file": f})
|
||||
response.raise_for_status()
|
||||
return response.json()["url"]
|
||||
except FileNotFoundError as e:
|
||||
raise ServiceError(f"Upload failed: file not found at '{path}'") from e
|
||||
except httpx.HTTPStatusError as e:
|
||||
raise ServiceError(
|
||||
f"Upload failed: server returned {e.response.status_code}"
|
||||
) from e
|
||||
except httpx.RequestError as e:
|
||||
raise ServiceError(f"Upload failed: network error") from e
|
||||
```
|
||||
|
||||
### Pattern 7: Batch Processing with Partial Failures
|
||||
|
||||
Never let one bad item abort an entire batch. Track results per item.
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class BatchResult[T]:
|
||||
"""Results from batch processing."""
|
||||
|
||||
succeeded: dict[int, T] # index -> result
|
||||
failed: dict[int, Exception] # index -> error
|
||||
|
||||
@property
|
||||
def success_count(self) -> int:
|
||||
return len(self.succeeded)
|
||||
|
||||
@property
|
||||
def failure_count(self) -> int:
|
||||
return len(self.failed)
|
||||
|
||||
@property
|
||||
def all_succeeded(self) -> bool:
|
||||
return len(self.failed) == 0
|
||||
|
||||
def process_batch(items: list[Item]) -> BatchResult[ProcessedItem]:
|
||||
"""Process items, capturing individual failures.
|
||||
|
||||
Args:
|
||||
items: Items to process.
|
||||
|
||||
Returns:
|
||||
BatchResult with succeeded and failed items by index.
|
||||
"""
|
||||
succeeded: dict[int, ProcessedItem] = {}
|
||||
failed: dict[int, Exception] = {}
|
||||
|
||||
for idx, item in enumerate(items):
|
||||
try:
|
||||
result = process_single_item(item)
|
||||
succeeded[idx] = result
|
||||
except Exception as e:
|
||||
failed[idx] = e
|
||||
|
||||
return BatchResult(succeeded=succeeded, failed=failed)
|
||||
|
||||
# Caller handles partial results
|
||||
result = process_batch(items)
|
||||
if not result.all_succeeded:
|
||||
logger.warning(
|
||||
f"Batch completed with {result.failure_count} failures",
|
||||
failed_indices=list(result.failed.keys()),
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 8: Progress Reporting for Long Operations
|
||||
|
||||
Provide visibility into batch progress without coupling business logic to UI.
|
||||
|
||||
```python
|
||||
from collections.abc import Callable
|
||||
|
||||
ProgressCallback = Callable[[int, int, str], None] # current, total, status
|
||||
|
||||
def process_large_batch(
|
||||
items: list[Item],
|
||||
on_progress: ProgressCallback | None = None,
|
||||
) -> BatchResult:
|
||||
"""Process batch with optional progress reporting.
|
||||
|
||||
Args:
|
||||
items: Items to process.
|
||||
on_progress: Optional callback receiving (current, total, status).
|
||||
"""
|
||||
total = len(items)
|
||||
succeeded = {}
|
||||
failed = {}
|
||||
|
||||
for idx, item in enumerate(items):
|
||||
if on_progress:
|
||||
on_progress(idx, total, f"Processing {item.id}")
|
||||
|
||||
try:
|
||||
succeeded[idx] = process_single_item(item)
|
||||
except Exception as e:
|
||||
failed[idx] = e
|
||||
|
||||
if on_progress:
|
||||
on_progress(total, total, "Complete")
|
||||
|
||||
return BatchResult(succeeded=succeeded, failed=failed)
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Validate early** - Check inputs before expensive operations
|
||||
2. **Use specific exceptions** - `ValueError`, `TypeError`, not generic `Exception`
|
||||
3. **Include context** - Messages should explain what, why, and how to fix
|
||||
4. **Convert types at boundaries** - Parse strings to enums/domain types early
|
||||
5. **Chain exceptions** - Use `raise ... from e` to preserve debug info
|
||||
6. **Handle partial failures** - Don't abort batches on single item errors
|
||||
7. **Use Pydantic** - For complex input validation with structured errors
|
||||
8. **Document failure modes** - Docstrings should list possible exceptions
|
||||
9. **Log with context** - Include IDs, counts, and other debugging info
|
||||
10. **Test error paths** - Verify exceptions are raised correctly
|
||||
400
plugins/python-development/skills/python-observability/SKILL.md
Normal file
400
plugins/python-development/skills/python-observability/SKILL.md
Normal file
@@ -0,0 +1,400 @@
|
||||
---
|
||||
name: python-observability
|
||||
description: Python observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems.
|
||||
---
|
||||
|
||||
# Python Observability
|
||||
|
||||
Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Adding structured logging to applications
|
||||
- Implementing metrics collection with Prometheus
|
||||
- Setting up distributed tracing across services
|
||||
- Propagating correlation IDs through request chains
|
||||
- Debugging production issues
|
||||
- Building observability dashboards
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Structured Logging
|
||||
|
||||
Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.
|
||||
|
||||
### 2. The Four Golden Signals
|
||||
|
||||
Track latency, traffic, errors, and saturation for every service boundary.
|
||||
|
||||
### 3. Correlation IDs
|
||||
|
||||
Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.
|
||||
|
||||
### 4. Bounded Cardinality
|
||||
|
||||
Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
import structlog
|
||||
|
||||
structlog.configure(
|
||||
processors=[
|
||||
structlog.processors.TimeStamper(fmt="iso"),
|
||||
structlog.processors.JSONRenderer(),
|
||||
],
|
||||
)
|
||||
|
||||
logger = structlog.get_logger()
|
||||
logger.info("Request processed", user_id="123", duration_ms=45)
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Structured Logging with Structlog
|
||||
|
||||
Configure structlog for JSON output with consistent fields.
|
||||
|
||||
```python
|
||||
import logging
|
||||
import structlog
|
||||
|
||||
def configure_logging(log_level: str = "INFO") -> None:
|
||||
"""Configure structured logging for the application."""
|
||||
structlog.configure(
|
||||
processors=[
|
||||
structlog.contextvars.merge_contextvars,
|
||||
structlog.processors.add_log_level,
|
||||
structlog.processors.TimeStamper(fmt="iso"),
|
||||
structlog.processors.StackInfoRenderer(),
|
||||
structlog.processors.format_exc_info,
|
||||
structlog.processors.JSONRenderer(),
|
||||
],
|
||||
wrapper_class=structlog.make_filtering_bound_logger(
|
||||
getattr(logging, log_level.upper())
|
||||
),
|
||||
context_class=dict,
|
||||
logger_factory=structlog.PrintLoggerFactory(),
|
||||
cache_logger_on_first_use=True,
|
||||
)
|
||||
|
||||
# Initialize at application startup
|
||||
configure_logging("INFO")
|
||||
logger = structlog.get_logger()
|
||||
```
|
||||
|
||||
### Pattern 2: Consistent Log Fields
|
||||
|
||||
Every log entry should include standard fields for filtering and correlation.
|
||||
|
||||
```python
|
||||
import structlog
|
||||
from contextvars import ContextVar
|
||||
|
||||
# Store correlation ID in context
|
||||
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
def process_request(request: Request) -> Response:
|
||||
"""Process request with structured logging."""
|
||||
logger.info(
|
||||
"Request received",
|
||||
correlation_id=correlation_id.get(),
|
||||
method=request.method,
|
||||
path=request.path,
|
||||
user_id=request.user_id,
|
||||
)
|
||||
|
||||
try:
|
||||
result = handle_request(request)
|
||||
logger.info(
|
||||
"Request completed",
|
||||
correlation_id=correlation_id.get(),
|
||||
status_code=200,
|
||||
duration_ms=elapsed,
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Request failed",
|
||||
correlation_id=correlation_id.get(),
|
||||
error_type=type(e).__name__,
|
||||
error_message=str(e),
|
||||
)
|
||||
raise
|
||||
```
|
||||
|
||||
### Pattern 3: Semantic Log Levels
|
||||
|
||||
Use log levels consistently across the application.
|
||||
|
||||
| Level | Purpose | Examples |
|
||||
|-------|---------|----------|
|
||||
| `DEBUG` | Development diagnostics | Variable values, internal state |
|
||||
| `INFO` | Request lifecycle, operations | Request start/end, job completion |
|
||||
| `WARNING` | Recoverable anomalies | Retry attempts, fallback used |
|
||||
| `ERROR` | Failures needing attention | Exceptions, service unavailable |
|
||||
|
||||
```python
|
||||
# DEBUG: Detailed internal information
|
||||
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
|
||||
|
||||
# INFO: Normal operational events
|
||||
logger.info("Order created", order_id=order.id, total=order.total)
|
||||
|
||||
# WARNING: Abnormal but handled situations
|
||||
logger.warning(
|
||||
"Rate limit approaching",
|
||||
current_rate=950,
|
||||
limit=1000,
|
||||
reset_seconds=30,
|
||||
)
|
||||
|
||||
# ERROR: Failures requiring investigation
|
||||
logger.error(
|
||||
"Payment processing failed",
|
||||
order_id=order.id,
|
||||
error=str(e),
|
||||
payment_provider="stripe",
|
||||
)
|
||||
```
|
||||
|
||||
Never log expected behavior at `ERROR`. A user entering a wrong password is `INFO`, not `ERROR`.
|
||||
|
||||
### Pattern 4: Correlation ID Propagation
|
||||
|
||||
Generate a unique ID at ingress and thread it through all operations.
|
||||
|
||||
```python
|
||||
from contextvars import ContextVar
|
||||
import uuid
|
||||
import structlog
|
||||
|
||||
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
|
||||
|
||||
def set_correlation_id(cid: str | None = None) -> str:
|
||||
"""Set correlation ID for current context."""
|
||||
cid = cid or str(uuid.uuid4())
|
||||
correlation_id.set(cid)
|
||||
structlog.contextvars.bind_contextvars(correlation_id=cid)
|
||||
return cid
|
||||
|
||||
# FastAPI middleware example
|
||||
from fastapi import Request
|
||||
|
||||
async def correlation_middleware(request: Request, call_next):
|
||||
"""Middleware to set and propagate correlation ID."""
|
||||
# Use incoming header or generate new
|
||||
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
|
||||
set_correlation_id(cid)
|
||||
|
||||
response = await call_next(request)
|
||||
response.headers["X-Correlation-ID"] = cid
|
||||
return response
|
||||
```
|
||||
|
||||
Propagate to outbound requests:
|
||||
|
||||
```python
|
||||
import httpx
|
||||
|
||||
async def call_downstream_service(endpoint: str, data: dict) -> dict:
|
||||
"""Call downstream service with correlation ID."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
endpoint,
|
||||
json=data,
|
||||
headers={"X-Correlation-ID": correlation_id.get()},
|
||||
)
|
||||
return response.json()
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: The Four Golden Signals with Prometheus
|
||||
|
||||
Track these metrics for every service boundary:
|
||||
|
||||
```python
|
||||
from prometheus_client import Counter, Histogram, Gauge
|
||||
|
||||
# Latency: How long requests take
|
||||
REQUEST_LATENCY = Histogram(
|
||||
"http_request_duration_seconds",
|
||||
"Request latency in seconds",
|
||||
["method", "endpoint", "status"],
|
||||
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
|
||||
)
|
||||
|
||||
# Traffic: Request rate
|
||||
REQUEST_COUNT = Counter(
|
||||
"http_requests_total",
|
||||
"Total HTTP requests",
|
||||
["method", "endpoint", "status"],
|
||||
)
|
||||
|
||||
# Errors: Error rate
|
||||
ERROR_COUNT = Counter(
|
||||
"http_errors_total",
|
||||
"Total HTTP errors",
|
||||
["method", "endpoint", "error_type"],
|
||||
)
|
||||
|
||||
# Saturation: Resource utilization
|
||||
DB_POOL_USAGE = Gauge(
|
||||
"db_connection_pool_used",
|
||||
"Number of database connections in use",
|
||||
)
|
||||
```
|
||||
|
||||
Instrument your endpoints:
|
||||
|
||||
```python
|
||||
import time
|
||||
from functools import wraps
|
||||
|
||||
def track_request(func):
|
||||
"""Decorator to track request metrics."""
|
||||
@wraps(func)
|
||||
async def wrapper(request: Request, *args, **kwargs):
|
||||
method = request.method
|
||||
endpoint = request.url.path
|
||||
start = time.perf_counter()
|
||||
|
||||
try:
|
||||
response = await func(request, *args, **kwargs)
|
||||
status = str(response.status_code)
|
||||
return response
|
||||
except Exception as e:
|
||||
status = "500"
|
||||
ERROR_COUNT.labels(
|
||||
method=method,
|
||||
endpoint=endpoint,
|
||||
error_type=type(e).__name__,
|
||||
).inc()
|
||||
raise
|
||||
finally:
|
||||
duration = time.perf_counter() - start
|
||||
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
|
||||
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
|
||||
|
||||
return wrapper
|
||||
```
|
||||
|
||||
### Pattern 6: Bounded Cardinality
|
||||
|
||||
Avoid labels with unbounded values to prevent metric explosion.
|
||||
|
||||
```python
|
||||
# BAD: User ID has potentially millions of values
|
||||
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
|
||||
|
||||
# GOOD: Bounded values only
|
||||
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
|
||||
|
||||
# If you need per-user metrics, use a different approach:
|
||||
# - Log the user_id and query logs
|
||||
# - Use a separate analytics system
|
||||
# - Bucket users by type/tier
|
||||
REQUEST_COUNT.labels(
|
||||
method="GET",
|
||||
endpoint="/users",
|
||||
user_tier="premium", # Bounded set of values
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 7: Timed Operations with Context Manager
|
||||
|
||||
Create a reusable timing context manager for operations.
|
||||
|
||||
```python
|
||||
from contextlib import contextmanager
|
||||
import time
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
@contextmanager
|
||||
def timed_operation(name: str, **extra_fields):
|
||||
"""Context manager for timing and logging operations."""
|
||||
start = time.perf_counter()
|
||||
logger.debug("Operation started", operation=name, **extra_fields)
|
||||
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||||
logger.error(
|
||||
"Operation failed",
|
||||
operation=name,
|
||||
duration_ms=round(elapsed_ms, 2),
|
||||
error=str(e),
|
||||
**extra_fields,
|
||||
)
|
||||
raise
|
||||
else:
|
||||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||||
logger.info(
|
||||
"Operation completed",
|
||||
operation=name,
|
||||
duration_ms=round(elapsed_ms, 2),
|
||||
**extra_fields,
|
||||
)
|
||||
|
||||
# Usage
|
||||
with timed_operation("fetch_user_orders", user_id=user.id):
|
||||
orders = await order_repository.get_by_user(user.id)
|
||||
```
|
||||
|
||||
### Pattern 8: OpenTelemetry Tracing
|
||||
|
||||
Set up distributed tracing with OpenTelemetry.
|
||||
|
||||
**Note:** OpenTelemetry is actively evolving. Check the [official Python documentation](https://opentelemetry.io/docs/languages/python/) for the latest API patterns and best practices.
|
||||
|
||||
```python
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
|
||||
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
|
||||
"""Configure OpenTelemetry tracing."""
|
||||
provider = TracerProvider()
|
||||
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
|
||||
provider.add_span_processor(processor)
|
||||
trace.set_tracer_provider(provider)
|
||||
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
async def process_order(order_id: str) -> Order:
|
||||
"""Process order with tracing."""
|
||||
with tracer.start_as_current_span("process_order") as span:
|
||||
span.set_attribute("order.id", order_id)
|
||||
|
||||
with tracer.start_as_current_span("validate_order"):
|
||||
validate_order(order_id)
|
||||
|
||||
with tracer.start_as_current_span("charge_payment"):
|
||||
charge_payment(order_id)
|
||||
|
||||
with tracer.start_as_current_span("send_confirmation"):
|
||||
send_confirmation(order_id)
|
||||
|
||||
return order
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Use structured logging** - JSON logs with consistent fields
|
||||
2. **Propagate correlation IDs** - Thread through all requests and logs
|
||||
3. **Track the four golden signals** - Latency, traffic, errors, saturation
|
||||
4. **Bound label cardinality** - Never use unbounded values as metric labels
|
||||
5. **Log at appropriate levels** - Don't cry wolf with ERROR
|
||||
6. **Include context** - User ID, request ID, operation name in logs
|
||||
7. **Use context managers** - Consistent timing and error handling
|
||||
8. **Separate concerns** - Observability code shouldn't pollute business logic
|
||||
9. **Test your observability** - Verify logs and metrics in integration tests
|
||||
10. **Set up alerts** - Metrics are useless without alerting
|
||||
@@ -0,0 +1,252 @@
|
||||
---
|
||||
name: python-project-structure
|
||||
description: Python project organization, module architecture, and public API design. Use when setting up new projects, organizing modules, defining public interfaces with __all__, or planning directory layouts.
|
||||
---
|
||||
|
||||
# Python Project Structure & Module Architecture
|
||||
|
||||
Design well-organized Python projects with clear module boundaries, explicit public interfaces, and maintainable directory structures. Good organization makes code discoverable and changes predictable.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Starting a new Python project from scratch
|
||||
- Reorganizing an existing codebase for clarity
|
||||
- Defining module public APIs with `__all__`
|
||||
- Deciding between flat and nested directory structures
|
||||
- Determining test file placement strategies
|
||||
- Creating reusable library packages
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Module Cohesion
|
||||
|
||||
Group related code that changes together. A module should have a single, clear purpose.
|
||||
|
||||
### 2. Explicit Interfaces
|
||||
|
||||
Define what's public with `__all__`. Everything not listed is an internal implementation detail.
|
||||
|
||||
### 3. Flat Hierarchies
|
||||
|
||||
Prefer shallow directory structures. Add depth only for genuine sub-domains.
|
||||
|
||||
### 4. Consistent Conventions
|
||||
|
||||
Apply naming and organization patterns uniformly across the project.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```
|
||||
myproject/
|
||||
├── src/
|
||||
│ └── myproject/
|
||||
│ ├── __init__.py
|
||||
│ ├── services/
|
||||
│ ├── models/
|
||||
│ └── api/
|
||||
├── tests/
|
||||
├── pyproject.toml
|
||||
└── README.md
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: One Concept Per File
|
||||
|
||||
Each file should focus on a single concept or closely related set of functions. Consider splitting when a file:
|
||||
|
||||
- Handles multiple unrelated responsibilities
|
||||
- Grows beyond 300-500 lines (varies by complexity)
|
||||
- Contains classes that change for different reasons
|
||||
|
||||
```python
|
||||
# Good: Focused files
|
||||
# user_service.py - User business logic
|
||||
# user_repository.py - User data access
|
||||
# user_models.py - User data structures
|
||||
|
||||
# Avoid: Kitchen sink files
|
||||
# user.py - Contains service, repository, models, utilities...
|
||||
```
|
||||
|
||||
### Pattern 2: Explicit Public APIs with `__all__`
|
||||
|
||||
Define the public interface for every module. Unlisted members are internal implementation details.
|
||||
|
||||
```python
|
||||
# mypackage/services/__init__.py
|
||||
from .user_service import UserService
|
||||
from .order_service import OrderService
|
||||
from .exceptions import ServiceError, ValidationError
|
||||
|
||||
__all__ = [
|
||||
"UserService",
|
||||
"OrderService",
|
||||
"ServiceError",
|
||||
"ValidationError",
|
||||
]
|
||||
|
||||
# Internal helpers remain private by omission
|
||||
# from .internal_helpers import _validate_input # Not exported
|
||||
```
|
||||
|
||||
### Pattern 3: Flat Directory Structure
|
||||
|
||||
Prefer minimal nesting. Deep hierarchies make imports verbose and navigation difficult.
|
||||
|
||||
```
|
||||
# Preferred: Flat structure
|
||||
project/
|
||||
├── api/
|
||||
│ ├── routes.py
|
||||
│ └── middleware.py
|
||||
├── services/
|
||||
│ ├── user_service.py
|
||||
│ └── order_service.py
|
||||
├── models/
|
||||
│ ├── user.py
|
||||
│ └── order.py
|
||||
└── utils/
|
||||
└── validation.py
|
||||
|
||||
# Avoid: Deep nesting
|
||||
project/core/internal/services/impl/user/
|
||||
```
|
||||
|
||||
Add sub-packages only when there's a genuine sub-domain requiring isolation.
|
||||
|
||||
### Pattern 4: Test File Organization
|
||||
|
||||
Choose one approach and apply it consistently throughout the project.
|
||||
|
||||
**Option A: Colocated Tests**
|
||||
|
||||
```
|
||||
src/
|
||||
├── user_service.py
|
||||
├── test_user_service.py
|
||||
├── order_service.py
|
||||
└── test_order_service.py
|
||||
```
|
||||
|
||||
Benefits: Tests live next to the code they verify. Easy to see coverage gaps.
|
||||
|
||||
**Option B: Parallel Test Directory**
|
||||
|
||||
```
|
||||
src/
|
||||
├── services/
|
||||
│ ├── user_service.py
|
||||
│ └── order_service.py
|
||||
tests/
|
||||
├── services/
|
||||
│ ├── test_user_service.py
|
||||
│ └── test_order_service.py
|
||||
```
|
||||
|
||||
Benefits: Clean separation between production and test code. Standard for larger projects.
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Package Initialization
|
||||
|
||||
Use `__init__.py` to provide a clean public interface for package consumers.
|
||||
|
||||
```python
|
||||
# mypackage/__init__.py
|
||||
"""MyPackage - A library for doing useful things."""
|
||||
|
||||
from .core import MainClass, HelperClass
|
||||
from .exceptions import PackageError, ConfigError
|
||||
from .config import Settings
|
||||
|
||||
__all__ = [
|
||||
"MainClass",
|
||||
"HelperClass",
|
||||
"PackageError",
|
||||
"ConfigError",
|
||||
"Settings",
|
||||
]
|
||||
|
||||
__version__ = "1.0.0"
|
||||
```
|
||||
|
||||
Consumers can then import directly from the package:
|
||||
|
||||
```python
|
||||
from mypackage import MainClass, Settings
|
||||
```
|
||||
|
||||
### Pattern 6: Layered Architecture
|
||||
|
||||
Organize code by architectural layer for clear separation of concerns.
|
||||
|
||||
```
|
||||
myapp/
|
||||
├── api/ # HTTP handlers, request/response
|
||||
│ ├── routes/
|
||||
│ └── middleware/
|
||||
├── services/ # Business logic
|
||||
├── repositories/ # Data access
|
||||
├── models/ # Domain entities
|
||||
├── schemas/ # API schemas (Pydantic)
|
||||
└── config/ # Configuration
|
||||
```
|
||||
|
||||
Each layer should only depend on layers below it, never above.
|
||||
|
||||
### Pattern 7: Domain-Driven Structure
|
||||
|
||||
For complex applications, organize by business domain rather than technical layer.
|
||||
|
||||
```
|
||||
ecommerce/
|
||||
├── users/
|
||||
│ ├── models.py
|
||||
│ ├── services.py
|
||||
│ ├── repository.py
|
||||
│ └── api.py
|
||||
├── orders/
|
||||
│ ├── models.py
|
||||
│ ├── services.py
|
||||
│ ├── repository.py
|
||||
│ └── api.py
|
||||
└── shared/
|
||||
├── database.py
|
||||
└── exceptions.py
|
||||
```
|
||||
|
||||
## File and Module Naming
|
||||
|
||||
### Conventions
|
||||
|
||||
- Use `snake_case` for all file and module names: `user_repository.py`
|
||||
- Avoid abbreviations that obscure meaning: `user_repository.py` not `usr_repo.py`
|
||||
- Match class names to file names: `UserService` in `user_service.py`
|
||||
|
||||
### Import Style
|
||||
|
||||
Use absolute imports for clarity and reliability:
|
||||
|
||||
```python
|
||||
# Preferred: Absolute imports
|
||||
from myproject.services import UserService
|
||||
from myproject.models import User
|
||||
|
||||
# Avoid: Relative imports
|
||||
from ..services import UserService
|
||||
from . import models
|
||||
```
|
||||
|
||||
Relative imports can break when modules are moved or reorganized.
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Keep files focused** - One concept per file, consider splitting at 300-500 lines (varies by complexity)
|
||||
2. **Define `__all__` explicitly** - Make public interfaces clear
|
||||
3. **Prefer flat structures** - Add depth only for genuine sub-domains
|
||||
4. **Use absolute imports** - More reliable and clearer
|
||||
5. **Be consistent** - Apply patterns uniformly across the project
|
||||
6. **Match names to content** - File names should describe their purpose
|
||||
7. **Separate concerns** - Keep layers distinct and dependencies flowing one direction
|
||||
8. **Document your structure** - Include a README explaining the organization
|
||||
376
plugins/python-development/skills/python-resilience/SKILL.md
Normal file
376
plugins/python-development/skills/python-resilience/SKILL.md
Normal file
@@ -0,0 +1,376 @@
|
||||
---
|
||||
name: python-resilience
|
||||
description: Python resilience patterns including automatic retries, exponential backoff, timeouts, and fault-tolerant decorators. Use when adding retry logic, implementing timeouts, building fault-tolerant services, or handling transient failures.
|
||||
---
|
||||
|
||||
# Python Resilience Patterns
|
||||
|
||||
Build fault-tolerant Python applications that gracefully handle transient failures, network issues, and service outages. Resilience patterns keep systems running when dependencies are unreliable.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Adding retry logic to external service calls
|
||||
- Implementing timeouts for network operations
|
||||
- Building fault-tolerant microservices
|
||||
- Handling rate limiting and backpressure
|
||||
- Creating infrastructure decorators
|
||||
- Designing circuit breakers
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Transient vs Permanent Failures
|
||||
|
||||
Retry transient errors (network timeouts, temporary service issues). Don't retry permanent errors (invalid credentials, bad requests).
|
||||
|
||||
### 2. Exponential Backoff
|
||||
|
||||
Increase wait time between retries to avoid overwhelming recovering services.
|
||||
|
||||
### 3. Jitter
|
||||
|
||||
Add randomness to backoff to prevent thundering herd when many clients retry simultaneously.
|
||||
|
||||
### 4. Bounded Retries
|
||||
|
||||
Cap both attempt count and total duration to prevent infinite retry loops.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential_jitter(initial=1, max=10),
|
||||
)
|
||||
def call_external_service(request: dict) -> dict:
|
||||
return httpx.post("https://api.example.com", json=request).json()
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Basic Retry with Tenacity
|
||||
|
||||
Use the `tenacity` library for production-grade retry logic. For simpler cases, consider built-in retry functionality or a lightweight custom implementation.
|
||||
|
||||
```python
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_attempt,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_exception_type,
|
||||
)
|
||||
|
||||
TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)
|
||||
|
||||
@retry(
|
||||
retry=retry_if_exception_type(TRANSIENT_ERRORS),
|
||||
stop=stop_after_attempt(5) | stop_after_delay(60),
|
||||
wait=wait_exponential_jitter(initial=1, max=30),
|
||||
)
|
||||
def fetch_data(url: str) -> dict:
|
||||
"""Fetch data with automatic retry on transient failures."""
|
||||
response = httpx.get(url, timeout=30)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
```
|
||||
|
||||
### Pattern 2: Retry Only Appropriate Errors
|
||||
|
||||
Whitelist specific transient exceptions. Never retry:
|
||||
|
||||
- `ValueError`, `TypeError` - These are bugs, not transient issues
|
||||
- `AuthenticationError` - Invalid credentials won't become valid
|
||||
- HTTP 4xx errors (except 429) - Client errors are permanent
|
||||
|
||||
```python
|
||||
from tenacity import retry, retry_if_exception_type
|
||||
import httpx
|
||||
|
||||
# Define what's retryable
|
||||
RETRYABLE_EXCEPTIONS = (
|
||||
ConnectionError,
|
||||
TimeoutError,
|
||||
httpx.ConnectTimeout,
|
||||
httpx.ReadTimeout,
|
||||
)
|
||||
|
||||
@retry(
|
||||
retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential_jitter(initial=1, max=10),
|
||||
)
|
||||
def resilient_api_call(endpoint: str) -> dict:
|
||||
"""Make API call with retry on network issues."""
|
||||
return httpx.get(endpoint, timeout=10).json()
|
||||
```
|
||||
|
||||
### Pattern 3: HTTP Status Code Retries
|
||||
|
||||
Retry specific HTTP status codes that indicate transient issues.
|
||||
|
||||
```python
|
||||
from tenacity import retry, retry_if_result, stop_after_attempt
|
||||
import httpx
|
||||
|
||||
RETRY_STATUS_CODES = {429, 502, 503, 504}
|
||||
|
||||
def should_retry_response(response: httpx.Response) -> bool:
|
||||
"""Check if response indicates a retryable error."""
|
||||
return response.status_code in RETRY_STATUS_CODES
|
||||
|
||||
@retry(
|
||||
retry=retry_if_result(should_retry_response),
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential_jitter(initial=1, max=10),
|
||||
)
|
||||
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
|
||||
"""Make HTTP request with retry on transient status codes."""
|
||||
return httpx.request(method, url, timeout=30, **kwargs)
|
||||
```
|
||||
|
||||
### Pattern 4: Combined Exception and Status Retry
|
||||
|
||||
Handle both network exceptions and HTTP status codes.
|
||||
|
||||
```python
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
retry_if_result,
|
||||
stop_after_attempt,
|
||||
wait_exponential_jitter,
|
||||
before_sleep_log,
|
||||
)
|
||||
import logging
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TRANSIENT_EXCEPTIONS = (
|
||||
ConnectionError,
|
||||
TimeoutError,
|
||||
httpx.ConnectError,
|
||||
httpx.ReadTimeout,
|
||||
)
|
||||
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
|
||||
|
||||
def is_retryable_response(response: httpx.Response) -> bool:
|
||||
return response.status_code in RETRY_STATUS_CODES
|
||||
|
||||
@retry(
|
||||
retry=(
|
||||
retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
|
||||
retry_if_result(is_retryable_response)
|
||||
),
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential_jitter(initial=1, max=30),
|
||||
before_sleep=before_sleep_log(logger, logging.WARNING),
|
||||
)
|
||||
def robust_http_call(
|
||||
method: str,
|
||||
url: str,
|
||||
**kwargs,
|
||||
) -> httpx.Response:
|
||||
"""HTTP call with comprehensive retry handling."""
|
||||
return httpx.request(method, url, timeout=30, **kwargs)
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Logging Retry Attempts
|
||||
|
||||
Track retry behavior for debugging and alerting.
|
||||
|
||||
```python
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
def log_retry_attempt(retry_state):
|
||||
"""Log detailed retry information."""
|
||||
exception = retry_state.outcome.exception()
|
||||
logger.warning(
|
||||
"Retrying operation",
|
||||
attempt=retry_state.attempt_number,
|
||||
exception_type=type(exception).__name__,
|
||||
exception_message=str(exception),
|
||||
next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
|
||||
)
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, max=10),
|
||||
before_sleep=log_retry_attempt,
|
||||
)
|
||||
def call_with_logging(request: dict) -> dict:
|
||||
"""External call with retry logging."""
|
||||
...
|
||||
```
|
||||
|
||||
### Pattern 6: Timeout Decorator
|
||||
|
||||
Create reusable timeout decorators for consistent timeout handling.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from functools import wraps
|
||||
from typing import TypeVar, Callable
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
def with_timeout(seconds: float):
|
||||
"""Decorator to add timeout to async functions."""
|
||||
def decorator(func: Callable[..., T]) -> Callable[..., T]:
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs) -> T:
|
||||
return await asyncio.wait_for(
|
||||
func(*args, **kwargs),
|
||||
timeout=seconds,
|
||||
)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
@with_timeout(30)
|
||||
async def fetch_with_timeout(url: str) -> dict:
|
||||
"""Fetch URL with 30 second timeout."""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url)
|
||||
return response.json()
|
||||
```
|
||||
|
||||
### Pattern 7: Cross-Cutting Concerns via Decorators
|
||||
|
||||
Stack decorators to separate infrastructure from business logic.
|
||||
|
||||
```python
|
||||
from functools import wraps
|
||||
from typing import TypeVar, Callable
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
T = TypeVar("T")
|
||||
|
||||
def traced(name: str | None = None):
|
||||
"""Add tracing to function calls."""
|
||||
def decorator(func: Callable[..., T]) -> Callable[..., T]:
|
||||
span_name = name or func.__name__
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs) -> T:
|
||||
logger.info("Operation started", operation=span_name)
|
||||
try:
|
||||
result = await func(*args, **kwargs)
|
||||
logger.info("Operation completed", operation=span_name)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error("Operation failed", operation=span_name, error=str(e))
|
||||
raise
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
# Stack multiple concerns
|
||||
@traced("fetch_user_data")
|
||||
@with_timeout(30)
|
||||
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter())
|
||||
async def fetch_user_data(user_id: str) -> dict:
|
||||
"""Fetch user with tracing, timeout, and retry."""
|
||||
...
|
||||
```
|
||||
|
||||
### Pattern 8: Dependency Injection for Testability
|
||||
|
||||
Pass infrastructure components through constructors for easy testing.
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass
|
||||
from typing import Protocol
|
||||
|
||||
class Logger(Protocol):
|
||||
def info(self, msg: str, **kwargs) -> None: ...
|
||||
def error(self, msg: str, **kwargs) -> None: ...
|
||||
|
||||
class MetricsClient(Protocol):
|
||||
def increment(self, metric: str, tags: dict | None = None) -> None: ...
|
||||
def timing(self, metric: str, value: float) -> None: ...
|
||||
|
||||
@dataclass
|
||||
class UserService:
|
||||
"""Service with injected infrastructure."""
|
||||
|
||||
repository: UserRepository
|
||||
logger: Logger
|
||||
metrics: MetricsClient
|
||||
|
||||
async def get_user(self, user_id: str) -> User:
|
||||
self.logger.info("Fetching user", user_id=user_id)
|
||||
start = time.perf_counter()
|
||||
|
||||
try:
|
||||
user = await self.repository.get(user_id)
|
||||
self.metrics.increment("user.fetch.success")
|
||||
return user
|
||||
except Exception as e:
|
||||
self.metrics.increment("user.fetch.error")
|
||||
self.logger.error("Failed to fetch user", user_id=user_id, error=str(e))
|
||||
raise
|
||||
finally:
|
||||
elapsed = time.perf_counter() - start
|
||||
self.metrics.timing("user.fetch.duration", elapsed)
|
||||
|
||||
# Easy to test with fakes
|
||||
service = UserService(
|
||||
repository=FakeRepository(),
|
||||
logger=FakeLogger(),
|
||||
metrics=FakeMetrics(),
|
||||
)
|
||||
```
|
||||
|
||||
### Pattern 9: Fail-Safe Defaults
|
||||
|
||||
Degrade gracefully when non-critical operations fail.
|
||||
|
||||
```python
|
||||
from typing import TypeVar
|
||||
from collections.abc import Callable
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
def fail_safe(default: T, log_failure: bool = True):
|
||||
"""Return default value on failure instead of raising."""
|
||||
def decorator(func: Callable[..., T]) -> Callable[..., T]:
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs) -> T:
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
if log_failure:
|
||||
logger.warning(
|
||||
"Operation failed, using default",
|
||||
function=func.__name__,
|
||||
error=str(e),
|
||||
)
|
||||
return default
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
@fail_safe(default=[])
|
||||
async def get_recommendations(user_id: str) -> list[str]:
|
||||
"""Get recommendations, return empty list on failure."""
|
||||
...
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Retry only transient errors** - Don't retry bugs or authentication failures
|
||||
2. **Use exponential backoff** - Give services time to recover
|
||||
3. **Add jitter** - Prevent thundering herd from synchronized retries
|
||||
4. **Cap total duration** - `stop_after_attempt(5) | stop_after_delay(60)`
|
||||
5. **Log every retry** - Silent retries hide systemic problems
|
||||
6. **Use decorators** - Keep retry logic separate from business logic
|
||||
7. **Inject dependencies** - Make infrastructure testable
|
||||
8. **Set timeouts everywhere** - Every network call needs a timeout
|
||||
9. **Fail gracefully** - Return cached/default values for non-critical paths
|
||||
10. **Monitor retry rates** - High retry rates indicate underlying issues
|
||||
@@ -0,0 +1,421 @@
|
||||
---
|
||||
name: python-resource-management
|
||||
description: Python resource management with context managers, cleanup patterns, and streaming. Use when managing connections, file handles, implementing cleanup logic, or building streaming responses with accumulated state.
|
||||
---
|
||||
|
||||
# Python Resource Management
|
||||
|
||||
Manage resources deterministically using context managers. Resources like database connections, file handles, and network sockets should be released reliably, even when exceptions occur.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Managing database connections and connection pools
|
||||
- Working with file handles and I/O
|
||||
- Implementing custom context managers
|
||||
- Building streaming responses with state
|
||||
- Handling nested resource cleanup
|
||||
- Creating async context managers
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Context Managers
|
||||
|
||||
The `with` statement ensures resources are released automatically, even on exceptions.
|
||||
|
||||
### 2. Protocol Methods
|
||||
|
||||
`__enter__`/`__exit__` for sync, `__aenter__`/`__aexit__` for async resource management.
|
||||
|
||||
### 3. Unconditional Cleanup
|
||||
|
||||
`__exit__` always runs, regardless of whether an exception occurred.
|
||||
|
||||
### 4. Exception Handling
|
||||
|
||||
Return `True` from `__exit__` to suppress exceptions, `False` to propagate them.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
from contextlib import contextmanager
|
||||
|
||||
@contextmanager
|
||||
def managed_resource():
|
||||
resource = acquire_resource()
|
||||
try:
|
||||
yield resource
|
||||
finally:
|
||||
resource.cleanup()
|
||||
|
||||
with managed_resource() as r:
|
||||
r.do_work()
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Class-Based Context Manager
|
||||
|
||||
Implement the context manager protocol for complex resources.
|
||||
|
||||
```python
|
||||
class DatabaseConnection:
|
||||
"""Database connection with automatic cleanup."""
|
||||
|
||||
def __init__(self, dsn: str) -> None:
|
||||
self._dsn = dsn
|
||||
self._conn: Connection | None = None
|
||||
|
||||
def connect(self) -> None:
|
||||
"""Establish database connection."""
|
||||
self._conn = psycopg.connect(self._dsn)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close connection if open."""
|
||||
if self._conn is not None:
|
||||
self._conn.close()
|
||||
self._conn = None
|
||||
|
||||
def __enter__(self) -> "DatabaseConnection":
|
||||
"""Enter context: connect and return self."""
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc_val: BaseException | None,
|
||||
exc_tb: TracebackType | None,
|
||||
) -> None:
|
||||
"""Exit context: always close connection."""
|
||||
self.close()
|
||||
|
||||
# Usage with context manager (preferred)
|
||||
with DatabaseConnection(dsn) as db:
|
||||
result = db.execute(query)
|
||||
|
||||
# Manual management when needed
|
||||
db = DatabaseConnection(dsn)
|
||||
db.connect()
|
||||
try:
|
||||
result = db.execute(query)
|
||||
finally:
|
||||
db.close()
|
||||
```
|
||||
|
||||
### Pattern 2: Async Context Manager
|
||||
|
||||
For async resources, implement the async protocol.
|
||||
|
||||
```python
|
||||
class AsyncDatabasePool:
|
||||
"""Async database connection pool."""
|
||||
|
||||
def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
|
||||
self._dsn = dsn
|
||||
self._min_size = min_size
|
||||
self._max_size = max_size
|
||||
self._pool: asyncpg.Pool | None = None
|
||||
|
||||
async def __aenter__(self) -> "AsyncDatabasePool":
|
||||
"""Create connection pool."""
|
||||
self._pool = await asyncpg.create_pool(
|
||||
self._dsn,
|
||||
min_size=self._min_size,
|
||||
max_size=self._max_size,
|
||||
)
|
||||
return self
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc_val: BaseException | None,
|
||||
exc_tb: TracebackType | None,
|
||||
) -> None:
|
||||
"""Close all connections in pool."""
|
||||
if self._pool is not None:
|
||||
await self._pool.close()
|
||||
|
||||
async def execute(self, query: str, *args) -> list[dict]:
|
||||
"""Execute query using pooled connection."""
|
||||
async with self._pool.acquire() as conn:
|
||||
return await conn.fetch(query, *args)
|
||||
|
||||
# Usage
|
||||
async with AsyncDatabasePool(dsn) as pool:
|
||||
users = await pool.execute("SELECT * FROM users WHERE active = $1", True)
|
||||
```
|
||||
|
||||
### Pattern 3: Using @contextmanager Decorator
|
||||
|
||||
Simplify context managers with the decorator for straightforward cases.
|
||||
|
||||
```python
|
||||
from contextlib import contextmanager, asynccontextmanager
|
||||
import time
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
@contextmanager
|
||||
def timed_block(name: str):
|
||||
"""Time a block of code."""
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
elapsed = time.perf_counter() - start
|
||||
logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))
|
||||
|
||||
# Usage
|
||||
with timed_block("data_processing"):
|
||||
process_large_dataset()
|
||||
|
||||
@asynccontextmanager
|
||||
async def database_transaction(conn: AsyncConnection):
|
||||
"""Manage database transaction."""
|
||||
await conn.execute("BEGIN")
|
||||
try:
|
||||
yield conn
|
||||
await conn.execute("COMMIT")
|
||||
except Exception:
|
||||
await conn.execute("ROLLBACK")
|
||||
raise
|
||||
|
||||
# Usage
|
||||
async with database_transaction(conn) as tx:
|
||||
await tx.execute("INSERT INTO users ...")
|
||||
await tx.execute("INSERT INTO audit_log ...")
|
||||
```
|
||||
|
||||
### Pattern 4: Unconditional Resource Release
|
||||
|
||||
Always clean up resources in `__exit__`, regardless of exceptions.
|
||||
|
||||
```python
|
||||
class FileProcessor:
|
||||
"""Process file with guaranteed cleanup."""
|
||||
|
||||
def __init__(self, path: str) -> None:
|
||||
self._path = path
|
||||
self._file: IO | None = None
|
||||
self._temp_files: list[Path] = []
|
||||
|
||||
def __enter__(self) -> "FileProcessor":
|
||||
self._file = open(self._path, "r")
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc_val: BaseException | None,
|
||||
exc_tb: TracebackType | None,
|
||||
) -> None:
|
||||
"""Clean up all resources unconditionally."""
|
||||
# Close main file
|
||||
if self._file is not None:
|
||||
self._file.close()
|
||||
|
||||
# Clean up any temporary files
|
||||
for temp_file in self._temp_files:
|
||||
try:
|
||||
temp_file.unlink()
|
||||
except OSError:
|
||||
pass # Best effort cleanup
|
||||
|
||||
# Return None/False to propagate any exception
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Selective Exception Suppression
|
||||
|
||||
Only suppress specific, documented exceptions.
|
||||
|
||||
```python
|
||||
class StreamWriter:
|
||||
"""Writer that handles broken pipe gracefully."""
|
||||
|
||||
def __init__(self, stream) -> None:
|
||||
self._stream = stream
|
||||
|
||||
def __enter__(self) -> "StreamWriter":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc_val: BaseException | None,
|
||||
exc_tb: TracebackType | None,
|
||||
) -> bool:
|
||||
"""Clean up, suppressing BrokenPipeError on shutdown."""
|
||||
self._stream.close()
|
||||
|
||||
# Suppress BrokenPipeError (client disconnected)
|
||||
# This is expected behavior, not an error
|
||||
if exc_type is BrokenPipeError:
|
||||
return True # Exception suppressed
|
||||
|
||||
return False # Propagate all other exceptions
|
||||
```
|
||||
|
||||
### Pattern 6: Streaming with Accumulated State
|
||||
|
||||
Maintain both incremental chunks and accumulated state during streaming.
|
||||
|
||||
```python
|
||||
from collections.abc import Generator
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
@dataclass
|
||||
class StreamingResult:
|
||||
"""Accumulated streaming result."""
|
||||
|
||||
chunks: list[str] = field(default_factory=list)
|
||||
_finalized: bool = False
|
||||
|
||||
@property
|
||||
def content(self) -> str:
|
||||
"""Get accumulated content."""
|
||||
return "".join(self.chunks)
|
||||
|
||||
def add_chunk(self, chunk: str) -> None:
|
||||
"""Add chunk to accumulator."""
|
||||
if self._finalized:
|
||||
raise RuntimeError("Cannot add to finalized result")
|
||||
self.chunks.append(chunk)
|
||||
|
||||
def finalize(self) -> str:
|
||||
"""Mark stream complete and return content."""
|
||||
self._finalized = True
|
||||
return self.content
|
||||
|
||||
def stream_with_accumulation(
|
||||
response: StreamingResponse,
|
||||
) -> Generator[tuple[str, str], None, str]:
|
||||
"""Stream response while accumulating content.
|
||||
|
||||
Yields:
|
||||
Tuple of (accumulated_content, new_chunk) for each chunk.
|
||||
|
||||
Returns:
|
||||
Final accumulated content.
|
||||
"""
|
||||
result = StreamingResult()
|
||||
|
||||
for chunk in response.iter_content():
|
||||
result.add_chunk(chunk)
|
||||
yield result.content, chunk
|
||||
|
||||
return result.finalize()
|
||||
```
|
||||
|
||||
### Pattern 7: Efficient String Accumulation
|
||||
|
||||
Avoid O(n²) string concatenation when accumulating.
|
||||
|
||||
```python
|
||||
def accumulate_stream(stream) -> str:
|
||||
"""Efficiently accumulate stream content."""
|
||||
# BAD: O(n²) due to string immutability
|
||||
# content = ""
|
||||
# for chunk in stream:
|
||||
# content += chunk # Creates new string each time
|
||||
|
||||
# GOOD: O(n) with list and join
|
||||
chunks: list[str] = []
|
||||
for chunk in stream:
|
||||
chunks.append(chunk)
|
||||
return "".join(chunks) # Single allocation
|
||||
```
|
||||
|
||||
### Pattern 8: Tracking Stream Metrics
|
||||
|
||||
Measure time-to-first-byte and total streaming time.
|
||||
|
||||
```python
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
|
||||
def stream_with_metrics(
|
||||
response: StreamingResponse,
|
||||
) -> Generator[str, None, dict]:
|
||||
"""Stream response while collecting metrics.
|
||||
|
||||
Yields:
|
||||
Content chunks.
|
||||
|
||||
Returns:
|
||||
Metrics dictionary.
|
||||
"""
|
||||
start = time.perf_counter()
|
||||
first_chunk_time: float | None = None
|
||||
chunk_count = 0
|
||||
total_bytes = 0
|
||||
|
||||
for chunk in response.iter_content():
|
||||
if first_chunk_time is None:
|
||||
first_chunk_time = time.perf_counter() - start
|
||||
|
||||
chunk_count += 1
|
||||
total_bytes += len(chunk.encode())
|
||||
yield chunk
|
||||
|
||||
total_time = time.perf_counter() - start
|
||||
|
||||
return {
|
||||
"time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
|
||||
"total_time_ms": round(total_time * 1000, 2),
|
||||
"chunk_count": chunk_count,
|
||||
"total_bytes": total_bytes,
|
||||
}
|
||||
```
|
||||
|
||||
### Pattern 9: Managing Multiple Resources with ExitStack
|
||||
|
||||
Handle a dynamic number of resources cleanly.
|
||||
|
||||
```python
|
||||
from contextlib import ExitStack, AsyncExitStack
|
||||
from pathlib import Path
|
||||
|
||||
def process_files(paths: list[Path]) -> list[str]:
|
||||
"""Process multiple files with automatic cleanup."""
|
||||
results = []
|
||||
|
||||
with ExitStack() as stack:
|
||||
# Open all files - they'll all be closed when block exits
|
||||
files = [stack.enter_context(open(p)) for p in paths]
|
||||
|
||||
for f in files:
|
||||
results.append(f.read())
|
||||
|
||||
return results
|
||||
|
||||
async def process_connections(hosts: list[str]) -> list[dict]:
|
||||
"""Process multiple async connections."""
|
||||
results = []
|
||||
|
||||
async with AsyncExitStack() as stack:
|
||||
connections = [
|
||||
await stack.enter_async_context(connect_to_host(host))
|
||||
for host in hosts
|
||||
]
|
||||
|
||||
for conn in connections:
|
||||
results.append(await conn.fetch_data())
|
||||
|
||||
return results
|
||||
```
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Always use context managers** - For any resource that needs cleanup
|
||||
2. **Clean up unconditionally** - `__exit__` runs even on exception
|
||||
3. **Don't suppress unexpectedly** - Return `False` unless suppression is intentional
|
||||
4. **Use @contextmanager** - For simple resource patterns
|
||||
5. **Implement both protocols** - Support `with` and manual management
|
||||
6. **Use ExitStack** - For dynamic numbers of resources
|
||||
7. **Accumulate efficiently** - List + join, not string concatenation
|
||||
8. **Track metrics** - Time-to-first-byte matters for streaming
|
||||
9. **Document behavior** - Especially exception suppression
|
||||
10. **Test cleanup paths** - Verify resources are released on errors
|
||||
@@ -618,6 +618,52 @@ def test_sorted_list_properties(lst):
|
||||
assert sorted_lst[i] <= sorted_lst[i + 1]
|
||||
```
|
||||
|
||||
## Test Design Principles
|
||||
|
||||
### One Behavior Per Test
|
||||
|
||||
Each test should verify exactly one behavior. This makes failures easy to diagnose and tests easy to maintain.
|
||||
|
||||
```python
|
||||
# BAD - testing multiple behaviors
|
||||
def test_user_service():
|
||||
user = service.create_user(data)
|
||||
assert user.id is not None
|
||||
assert user.email == data["email"]
|
||||
updated = service.update_user(user.id, {"name": "New"})
|
||||
assert updated.name == "New"
|
||||
|
||||
# GOOD - focused tests
|
||||
def test_create_user_assigns_id():
|
||||
user = service.create_user(data)
|
||||
assert user.id is not None
|
||||
|
||||
def test_create_user_stores_email():
|
||||
user = service.create_user(data)
|
||||
assert user.email == data["email"]
|
||||
|
||||
def test_update_user_changes_name():
|
||||
user = service.create_user(data)
|
||||
updated = service.update_user(user.id, {"name": "New"})
|
||||
assert updated.name == "New"
|
||||
```
|
||||
|
||||
### Test Error Paths
|
||||
|
||||
Always test failure cases, not just happy paths.
|
||||
|
||||
```python
|
||||
def test_get_user_raises_not_found():
|
||||
with pytest.raises(UserNotFoundError) as exc_info:
|
||||
service.get_user("nonexistent-id")
|
||||
|
||||
assert "nonexistent-id" in str(exc_info.value)
|
||||
|
||||
def test_create_user_rejects_invalid_email():
|
||||
with pytest.raises(ValueError, match="Invalid email format"):
|
||||
service.create_user({"email": "not-an-email"})
|
||||
```
|
||||
|
||||
## Testing Best Practices
|
||||
|
||||
### Test Organization
|
||||
@@ -636,38 +682,131 @@ def test_sorted_list_properties(lst):
|
||||
# test_workflows.py
|
||||
```
|
||||
|
||||
### Test Naming
|
||||
### Test Naming Convention
|
||||
|
||||
A common pattern: `test_<unit>_<scenario>_<expected_outcome>`. Adapt to your team's preferences.
|
||||
|
||||
```python
|
||||
# Good test names
|
||||
# Pattern: test_<unit>_<scenario>_<expected>
|
||||
def test_create_user_with_valid_data_returns_user():
|
||||
...
|
||||
|
||||
def test_create_user_with_duplicate_email_raises_conflict():
|
||||
...
|
||||
|
||||
def test_get_user_with_unknown_id_returns_none():
|
||||
...
|
||||
|
||||
# Good test names - clear and descriptive
|
||||
def test_user_creation_with_valid_data():
|
||||
"""Clear name describes what is being tested."""
|
||||
pass
|
||||
|
||||
|
||||
def test_login_fails_with_invalid_password():
|
||||
"""Name describes expected behavior."""
|
||||
pass
|
||||
|
||||
|
||||
def test_api_returns_404_for_missing_resource():
|
||||
"""Specific about inputs and expected outcomes."""
|
||||
pass
|
||||
|
||||
|
||||
# Bad test names
|
||||
# Bad test names - avoid these
|
||||
def test_1(): # Not descriptive
|
||||
pass
|
||||
|
||||
|
||||
def test_user(): # Too vague
|
||||
pass
|
||||
|
||||
|
||||
def test_function(): # Doesn't explain what's tested
|
||||
pass
|
||||
```
|
||||
|
||||
### Testing Retry Behavior
|
||||
|
||||
Verify that retry logic works correctly using mock side effects.
|
||||
|
||||
```python
|
||||
from unittest.mock import Mock
|
||||
|
||||
def test_retries_on_transient_error():
|
||||
"""Test that service retries on transient failures."""
|
||||
client = Mock()
|
||||
# Fail twice, then succeed
|
||||
client.request.side_effect = [
|
||||
ConnectionError("Failed"),
|
||||
ConnectionError("Failed"),
|
||||
{"status": "ok"},
|
||||
]
|
||||
|
||||
service = ServiceWithRetry(client, max_retries=3)
|
||||
result = service.fetch()
|
||||
|
||||
assert result == {"status": "ok"}
|
||||
assert client.request.call_count == 3
|
||||
|
||||
def test_gives_up_after_max_retries():
|
||||
"""Test that service stops retrying after max attempts."""
|
||||
client = Mock()
|
||||
client.request.side_effect = ConnectionError("Failed")
|
||||
|
||||
service = ServiceWithRetry(client, max_retries=3)
|
||||
|
||||
with pytest.raises(ConnectionError):
|
||||
service.fetch()
|
||||
|
||||
assert client.request.call_count == 3
|
||||
|
||||
def test_does_not_retry_on_permanent_error():
|
||||
"""Test that permanent errors are not retried."""
|
||||
client = Mock()
|
||||
client.request.side_effect = ValueError("Invalid input")
|
||||
|
||||
service = ServiceWithRetry(client, max_retries=3)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
service.fetch()
|
||||
|
||||
# Only called once - no retry for ValueError
|
||||
assert client.request.call_count == 1
|
||||
```
|
||||
|
||||
### Mocking Time with Freezegun
|
||||
|
||||
Use freezegun to control time in tests for predictable time-dependent behavior.
|
||||
|
||||
```python
|
||||
from freezegun import freeze_time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
@freeze_time("2026-01-15 10:00:00")
|
||||
def test_token_expiry():
|
||||
"""Test token expires at correct time."""
|
||||
token = create_token(expires_in_seconds=3600)
|
||||
assert token.expires_at == datetime(2026, 1, 15, 11, 0, 0)
|
||||
|
||||
@freeze_time("2026-01-15 10:00:00")
|
||||
def test_is_expired_returns_false_before_expiry():
|
||||
"""Test token is not expired when within validity period."""
|
||||
token = create_token(expires_in_seconds=3600)
|
||||
assert not token.is_expired()
|
||||
|
||||
@freeze_time("2026-01-15 12:00:00")
|
||||
def test_is_expired_returns_true_after_expiry():
|
||||
"""Test token is expired after validity period."""
|
||||
token = Token(expires_at=datetime(2026, 1, 15, 11, 30, 0))
|
||||
assert token.is_expired()
|
||||
|
||||
def test_with_time_travel():
|
||||
"""Test behavior across time using freeze_time context."""
|
||||
with freeze_time("2026-01-01") as frozen_time:
|
||||
item = create_item()
|
||||
assert item.created_at == datetime(2026, 1, 1)
|
||||
|
||||
# Move forward in time
|
||||
frozen_time.move_to("2026-01-15")
|
||||
assert item.age_days == 14
|
||||
```
|
||||
|
||||
### Test Markers
|
||||
|
||||
```python
|
||||
|
||||
428
plugins/python-development/skills/python-type-safety/SKILL.md
Normal file
428
plugins/python-development/skills/python-type-safety/SKILL.md
Normal file
@@ -0,0 +1,428 @@
|
||||
---
|
||||
name: python-type-safety
|
||||
description: Python type safety with type hints, generics, protocols, and strict type checking. Use when adding type annotations, implementing generic classes, defining structural interfaces, or configuring mypy/pyright.
|
||||
---
|
||||
|
||||
# Python Type Safety
|
||||
|
||||
Leverage Python's type system to catch errors at static analysis time. Type annotations serve as enforced documentation that tooling validates automatically.
|
||||
|
||||
## When to Use This Skill
|
||||
|
||||
- Adding type hints to existing code
|
||||
- Creating generic, reusable classes
|
||||
- Defining structural interfaces with protocols
|
||||
- Configuring mypy or pyright for strict checking
|
||||
- Understanding type narrowing and guards
|
||||
- Building type-safe APIs and libraries
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### 1. Type Annotations
|
||||
|
||||
Declare expected types for function parameters, return values, and variables.
|
||||
|
||||
### 2. Generics
|
||||
|
||||
Write reusable code that preserves type information across different types.
|
||||
|
||||
### 3. Protocols
|
||||
|
||||
Define structural interfaces without inheritance (duck typing with type safety).
|
||||
|
||||
### 4. Type Narrowing
|
||||
|
||||
Use guards and conditionals to narrow types within code blocks.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
def get_user(user_id: str) -> User | None:
|
||||
"""Return type makes 'might not exist' explicit."""
|
||||
...
|
||||
|
||||
# Type checker enforces handling None case
|
||||
user = get_user("123")
|
||||
if user is None:
|
||||
raise UserNotFoundError("123")
|
||||
print(user.name) # Type checker knows user is User here
|
||||
```
|
||||
|
||||
## Fundamental Patterns
|
||||
|
||||
### Pattern 1: Annotate All Public Signatures
|
||||
|
||||
Every public function, method, and class should have type annotations.
|
||||
|
||||
```python
|
||||
def get_user(user_id: str) -> User:
|
||||
"""Retrieve user by ID."""
|
||||
...
|
||||
|
||||
def process_batch(
|
||||
items: list[Item],
|
||||
max_workers: int = 4,
|
||||
) -> BatchResult[ProcessedItem]:
|
||||
"""Process items concurrently."""
|
||||
...
|
||||
|
||||
class UserRepository:
|
||||
def __init__(self, db: Database) -> None:
|
||||
self._db = db
|
||||
|
||||
async def find_by_id(self, user_id: str) -> User | None:
|
||||
"""Return User if found, None otherwise."""
|
||||
...
|
||||
|
||||
async def find_by_email(self, email: str) -> User | None:
|
||||
...
|
||||
|
||||
async def save(self, user: User) -> User:
|
||||
"""Save and return user with generated ID."""
|
||||
...
|
||||
```
|
||||
|
||||
Use `mypy --strict` or `pyright` in CI to catch type errors early. For existing projects, enable strict mode incrementally using per-module overrides.
|
||||
|
||||
### Pattern 2: Use Modern Union Syntax
|
||||
|
||||
Python 3.10+ provides cleaner union syntax.
|
||||
|
||||
```python
|
||||
# Preferred (3.10+)
|
||||
def find_user(user_id: str) -> User | None:
|
||||
...
|
||||
|
||||
def parse_value(v: str) -> int | float | str:
|
||||
...
|
||||
|
||||
# Older style (still valid, needed for 3.9)
|
||||
from typing import Optional, Union
|
||||
|
||||
def find_user(user_id: str) -> Optional[User]:
|
||||
...
|
||||
```
|
||||
|
||||
### Pattern 3: Type Narrowing with Guards
|
||||
|
||||
Use conditionals to narrow types for the type checker.
|
||||
|
||||
```python
|
||||
def process_user(user_id: str) -> UserData:
|
||||
user = find_user(user_id)
|
||||
|
||||
if user is None:
|
||||
raise UserNotFoundError(f"User {user_id} not found")
|
||||
|
||||
# Type checker knows user is User here, not User | None
|
||||
return UserData(
|
||||
name=user.name,
|
||||
email=user.email,
|
||||
)
|
||||
|
||||
def process_items(items: list[Item | None]) -> list[ProcessedItem]:
|
||||
# Filter and narrow types
|
||||
valid_items = [item for item in items if item is not None]
|
||||
# valid_items is now list[Item]
|
||||
return [process(item) for item in valid_items]
|
||||
```
|
||||
|
||||
### Pattern 4: Generic Classes
|
||||
|
||||
Create type-safe reusable containers.
|
||||
|
||||
```python
|
||||
from typing import TypeVar, Generic
|
||||
|
||||
T = TypeVar("T")
|
||||
E = TypeVar("E", bound=Exception)
|
||||
|
||||
class Result(Generic[T, E]):
|
||||
"""Represents either a success value or an error."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: T | None = None,
|
||||
error: E | None = None,
|
||||
) -> None:
|
||||
if (value is None) == (error is None):
|
||||
raise ValueError("Exactly one of value or error must be set")
|
||||
self._value = value
|
||||
self._error = error
|
||||
|
||||
@property
|
||||
def is_success(self) -> bool:
|
||||
return self._error is None
|
||||
|
||||
@property
|
||||
def is_failure(self) -> bool:
|
||||
return self._error is not None
|
||||
|
||||
def unwrap(self) -> T:
|
||||
"""Get value or raise the error."""
|
||||
if self._error is not None:
|
||||
raise self._error
|
||||
return self._value # type: ignore[return-value]
|
||||
|
||||
def unwrap_or(self, default: T) -> T:
|
||||
"""Get value or return default."""
|
||||
if self._error is not None:
|
||||
return default
|
||||
return self._value # type: ignore[return-value]
|
||||
|
||||
# Usage preserves types
|
||||
def parse_config(path: str) -> Result[Config, ConfigError]:
|
||||
try:
|
||||
return Result(value=Config.from_file(path))
|
||||
except ConfigError as e:
|
||||
return Result(error=e)
|
||||
|
||||
result = parse_config("config.yaml")
|
||||
if result.is_success:
|
||||
config = result.unwrap() # Type: Config
|
||||
```
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### Pattern 5: Generic Repository
|
||||
|
||||
Create type-safe data access patterns.
|
||||
|
||||
```python
|
||||
from typing import TypeVar, Generic
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
T = TypeVar("T")
|
||||
ID = TypeVar("ID")
|
||||
|
||||
class Repository(ABC, Generic[T, ID]):
|
||||
"""Generic repository interface."""
|
||||
|
||||
@abstractmethod
|
||||
async def get(self, id: ID) -> T | None:
|
||||
"""Get entity by ID."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def save(self, entity: T) -> T:
|
||||
"""Save and return entity."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def delete(self, id: ID) -> bool:
|
||||
"""Delete entity, return True if existed."""
|
||||
...
|
||||
|
||||
class UserRepository(Repository[User, str]):
|
||||
"""Concrete repository for Users with string IDs."""
|
||||
|
||||
async def get(self, id: str) -> User | None:
|
||||
row = await self._db.fetchrow(
|
||||
"SELECT * FROM users WHERE id = $1", id
|
||||
)
|
||||
return User(**row) if row else None
|
||||
|
||||
async def save(self, entity: User) -> User:
|
||||
...
|
||||
|
||||
async def delete(self, id: str) -> bool:
|
||||
...
|
||||
```
|
||||
|
||||
### Pattern 6: TypeVar with Bounds
|
||||
|
||||
Restrict generic parameters to specific types.
|
||||
|
||||
```python
|
||||
from typing import TypeVar
|
||||
from pydantic import BaseModel
|
||||
|
||||
ModelT = TypeVar("ModelT", bound=BaseModel)
|
||||
|
||||
def validate_and_create(model_cls: type[ModelT], data: dict) -> ModelT:
|
||||
"""Create a validated Pydantic model from dict."""
|
||||
return model_cls.model_validate(data)
|
||||
|
||||
# Works with any BaseModel subclass
|
||||
class User(BaseModel):
|
||||
name: str
|
||||
email: str
|
||||
|
||||
user = validate_and_create(User, {"name": "Alice", "email": "a@b.com"})
|
||||
# user is typed as User
|
||||
|
||||
# Type error: str is not a BaseModel subclass
|
||||
result = validate_and_create(str, {"name": "Alice"}) # Error!
|
||||
```
|
||||
|
||||
### Pattern 7: Protocols for Structural Typing
|
||||
|
||||
Define interfaces without requiring inheritance.
|
||||
|
||||
```python
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
@runtime_checkable
|
||||
class Serializable(Protocol):
|
||||
"""Any class that can be serialized to/from dict."""
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "Serializable":
|
||||
...
|
||||
|
||||
# User satisfies Serializable without inheriting from it
|
||||
class User:
|
||||
def __init__(self, id: str, name: str) -> None:
|
||||
self.id = id
|
||||
self.name = name
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {"id": self.id, "name": self.name}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "User":
|
||||
return cls(id=data["id"], name=data["name"])
|
||||
|
||||
def serialize(obj: Serializable) -> str:
|
||||
"""Works with any Serializable object."""
|
||||
return json.dumps(obj.to_dict())
|
||||
|
||||
# Works - User matches the protocol
|
||||
serialize(User("1", "Alice"))
|
||||
|
||||
# Runtime checking with @runtime_checkable
|
||||
isinstance(User("1", "Alice"), Serializable) # True
|
||||
```
|
||||
|
||||
### Pattern 8: Common Protocol Patterns
|
||||
|
||||
Define reusable structural interfaces.
|
||||
|
||||
```python
|
||||
from typing import Protocol
|
||||
|
||||
class Closeable(Protocol):
|
||||
"""Resource that can be closed."""
|
||||
def close(self) -> None: ...
|
||||
|
||||
class AsyncCloseable(Protocol):
|
||||
"""Async resource that can be closed."""
|
||||
async def close(self) -> None: ...
|
||||
|
||||
class Readable(Protocol):
|
||||
"""Object that can be read from."""
|
||||
def read(self, n: int = -1) -> bytes: ...
|
||||
|
||||
class HasId(Protocol):
|
||||
"""Object with an ID property."""
|
||||
@property
|
||||
def id(self) -> str: ...
|
||||
|
||||
class Comparable(Protocol):
|
||||
"""Object that supports comparison."""
|
||||
def __lt__(self, other: "Comparable") -> bool: ...
|
||||
def __le__(self, other: "Comparable") -> bool: ...
|
||||
```
|
||||
|
||||
### Pattern 9: Type Aliases
|
||||
|
||||
Create meaningful type names.
|
||||
|
||||
**Note:** The `type` statement was introduced in Python 3.10 for simple aliases. Generic type statements require Python 3.12+.
|
||||
|
||||
```python
|
||||
# Python 3.10+ type statement for simple aliases
|
||||
type UserId = str
|
||||
type UserDict = dict[str, Any]
|
||||
|
||||
# Python 3.12+ type statement with generics
|
||||
type Handler[T] = Callable[[Request], T]
|
||||
type AsyncHandler[T] = Callable[[Request], Awaitable[T]]
|
||||
|
||||
# Python 3.9-3.11 style (needed for broader compatibility)
|
||||
from typing import TypeAlias
|
||||
from collections.abc import Callable, Awaitable
|
||||
|
||||
UserId: TypeAlias = str
|
||||
Handler: TypeAlias = Callable[[Request], Response]
|
||||
|
||||
# Usage
|
||||
def register_handler(path: str, handler: Handler[Response]) -> None:
|
||||
...
|
||||
```
|
||||
|
||||
### Pattern 10: Callable Types
|
||||
|
||||
Type function parameters and callbacks.
|
||||
|
||||
```python
|
||||
from collections.abc import Callable, Awaitable
|
||||
|
||||
# Sync callback
|
||||
ProgressCallback = Callable[[int, int], None] # (current, total)
|
||||
|
||||
# Async callback
|
||||
AsyncHandler = Callable[[Request], Awaitable[Response]]
|
||||
|
||||
# With named parameters (using Protocol)
|
||||
class OnProgress(Protocol):
|
||||
def __call__(
|
||||
self,
|
||||
current: int,
|
||||
total: int,
|
||||
*,
|
||||
message: str = "",
|
||||
) -> None: ...
|
||||
|
||||
def process_items(
|
||||
items: list[Item],
|
||||
on_progress: ProgressCallback | None = None,
|
||||
) -> list[Result]:
|
||||
for i, item in enumerate(items):
|
||||
if on_progress:
|
||||
on_progress(i, len(items))
|
||||
...
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
### Strict Mode Checklist
|
||||
|
||||
For `mypy --strict` compliance:
|
||||
|
||||
```toml
|
||||
# pyproject.toml
|
||||
[tool.mypy]
|
||||
python_version = "3.12"
|
||||
strict = true
|
||||
warn_return_any = true
|
||||
warn_unused_ignores = true
|
||||
disallow_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
no_implicit_optional = true
|
||||
```
|
||||
|
||||
Incremental adoption goals:
|
||||
- All function parameters annotated
|
||||
- All return types annotated
|
||||
- Class attributes annotated
|
||||
- Minimize `Any` usage (acceptable for truly dynamic data)
|
||||
- Generic collections use type parameters (`list[str]` not `list`)
|
||||
|
||||
For existing codebases, enable strict mode per-module using `# mypy: strict` or configure per-module overrides in `pyproject.toml`.
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
1. **Annotate all public APIs** - Functions, methods, class attributes
|
||||
2. **Use `T | None`** - Modern union syntax over `Optional[T]`
|
||||
3. **Run strict type checking** - `mypy --strict` in CI
|
||||
4. **Use generics** - Preserve type info in reusable code
|
||||
5. **Define protocols** - Structural typing for interfaces
|
||||
6. **Narrow types** - Use guards to help the type checker
|
||||
7. **Bound type vars** - Restrict generics to meaningful types
|
||||
8. **Create type aliases** - Meaningful names for complex types
|
||||
9. **Minimize `Any`** - Use specific types or generics. `Any` is acceptable for truly dynamic data or when interfacing with untyped third-party code
|
||||
10. **Document with types** - Types are enforceable documentation
|
||||
Reference in New Issue
Block a user