From cbb60494b1df88ff43bff46821df5e71af6883c7 Mon Sep 17 00:00:00 2001 From: "M. A." <49915873+mohammadaffaneh@users.noreply.github.com> Date: Fri, 30 Jan 2026 17:52:14 +0100 Subject: [PATCH] Add Comprehensive Python Development Skills (#419) * Add extra python skills covering code style, design patterns, resilience, resource management, testing patterns, and type safety ...etc * fix: correct code examples in Python skills - Clarify Python version requirements for type statement (3.10+ vs 3.12+) - Add missing ValidationError import in configuration example - Add missing httpx import and url parameter in async example --------- Co-authored-by: Seth Hobson --- .claude-plugin/marketplace.json | 15 +- README.md | 6 +- .../skills/async-python-patterns/SKILL.md | 65 ++- .../skills/python-anti-patterns/SKILL.md | 349 ++++++++++++++ .../skills/python-background-jobs/SKILL.md | 364 +++++++++++++++ .../skills/python-code-style/SKILL.md | 360 +++++++++++++++ .../skills/python-configuration/SKILL.md | 368 +++++++++++++++ .../skills/python-design-patterns/SKILL.md | 411 +++++++++++++++++ .../skills/python-error-handling/SKILL.md | 359 +++++++++++++++ .../skills/python-observability/SKILL.md | 400 ++++++++++++++++ .../skills/python-project-structure/SKILL.md | 252 +++++++++++ .../skills/python-resilience/SKILL.md | 376 +++++++++++++++ .../python-resource-management/SKILL.md | 421 +++++++++++++++++ .../skills/python-testing-patterns/SKILL.md | 155 ++++++- .../skills/python-type-safety/SKILL.md | 428 ++++++++++++++++++ 15 files changed, 4311 insertions(+), 18 deletions(-) create mode 100644 plugins/python-development/skills/python-anti-patterns/SKILL.md create mode 100644 plugins/python-development/skills/python-background-jobs/SKILL.md create mode 100644 plugins/python-development/skills/python-code-style/SKILL.md create mode 100644 plugins/python-development/skills/python-configuration/SKILL.md create mode 100644 plugins/python-development/skills/python-design-patterns/SKILL.md create mode 100644 plugins/python-development/skills/python-error-handling/SKILL.md create mode 100644 plugins/python-development/skills/python-observability/SKILL.md create mode 100644 plugins/python-development/skills/python-project-structure/SKILL.md create mode 100644 plugins/python-development/skills/python-resilience/SKILL.md create mode 100644 plugins/python-development/skills/python-resource-management/SKILL.md create mode 100644 plugins/python-development/skills/python-type-safety/SKILL.md diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index c114ed9..dc553f4 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -6,7 +6,7 @@ "url": "https://github.com/wshobson" }, "metadata": { - "description": "Production-ready workflow orchestration with 72 focused plugins, 108 specialized agents, and 129 skills - optimized for granular installation and minimal token usage", + "description": "Production-ready workflow orchestration with 72 focused plugins, 108 specialized agents, and 140 skills - optimized for granular installation and minimal token usage", "version": "1.3.7" }, "plugins": [ @@ -1611,7 +1611,18 @@ "./skills/python-testing-patterns", "./skills/python-packaging", "./skills/python-performance-optimization", - "./skills/uv-package-manager" + "./skills/uv-package-manager", + "./skills/python-type-safety", + "./skills/python-code-style", + "./skills/python-observability", + "./skills/python-project-structure", + "./skills/python-design-patterns", + "./skills/python-error-handling", + "./skills/python-configuration", + "./skills/python-resilience", + "./skills/python-resource-management", + "./skills/python-background-jobs", + "./skills/python-anti-patterns" ] }, { diff --git a/README.md b/README.md index f41ab9a..e51ecab 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Each plugin is completely isolated with its own agents, commands, and skills: - **Clear boundaries** - Each plugin has a single, focused purpose - **Progressive disclosure** - Skills load knowledge only when activated -**Example**: Installing `python-development` loads 3 Python agents, 1 scaffolding tool, and makes 5 skills available (~300 tokens), not the entire marketplace. +**Example**: Installing `python-development` loads 3 Python agents, 1 scaffolding tool, and makes 16 skills available (~1000 tokens), not the entire marketplace. ## Quick Start @@ -63,7 +63,7 @@ Install the plugins you need: ```bash # Essential development plugins -/plugin install python-development # Python with 5 specialized skills +/plugin install python-development # Python with 16 specialized skills /plugin install javascript-typescript # JS/TS with 4 specialized skills /plugin install backend-development # Backend APIs with 3 architecture skills @@ -130,7 +130,7 @@ rm -rf ~/.claude/plugins/cache/claude-code-workflows && rm ~/.claude/plugins/ins ## What's New -### Agent Skills (129 skills across 20 plugins) +### Agent Skills (140 skills across 20 plugins) Specialized knowledge packages following Anthropic's progressive disclosure architecture: diff --git a/plugins/python-development/skills/async-python-patterns/SKILL.md b/plugins/python-development/skills/async-python-patterns/SKILL.md index 9504ce4..78c8f1e 100644 --- a/plugins/python-development/skills/async-python-patterns/SKILL.md +++ b/plugins/python-development/skills/async-python-patterns/SKILL.md @@ -18,6 +18,20 @@ Comprehensive guidance for implementing asynchronous Python applications using a - Optimizing I/O-bound workloads - Implementing async background tasks and queues +## Sync vs Async Decision Guide + +Before adopting async, consider whether it's the right choice for your use case. + +| Use Case | Recommended Approach | +|----------|---------------------| +| Many concurrent network/DB calls | `asyncio` | +| CPU-bound computation | `multiprocessing` or thread pool | +| Mixed I/O + CPU | Offload CPU work with `asyncio.to_thread()` | +| Simple scripts, few connections | Sync (simpler, easier to debug) | +| Web APIs with high concurrency | Async frameworks (FastAPI, aiohttp) | + +**Key Rule:** Stay fully sync or fully async within a call path. Mixing creates hidden blocking and complexity. + ## Core Concepts ### 1. Event Loop @@ -583,6 +597,46 @@ async def process_item(item: str): ### 3. Avoid Blocking Operations +Never block the event loop with synchronous operations. A single blocking call stalls all concurrent tasks. + +```python +# BAD - blocks the entire event loop +async def fetch_data_bad(): + import time + import requests + time.sleep(1) # Blocks! + response = requests.get(url) # Also blocks! + +# GOOD - use async-native libraries (e.g., httpx for async HTTP) +import httpx + +async def fetch_data_good(url: str): + await asyncio.sleep(1) + async with httpx.AsyncClient() as client: + response = await client.get(url) +``` + +**Wrapping Blocking Code with `asyncio.to_thread()` (Python 3.9+):** + +When you must use synchronous libraries, offload to a thread pool: + +```python +import asyncio +from pathlib import Path + +async def read_file_async(path: str) -> str: + """Read file without blocking event loop.""" + # asyncio.to_thread() runs sync code in a thread pool + return await asyncio.to_thread(Path(path).read_text) + +async def call_sync_library(data: dict) -> dict: + """Wrap a synchronous library call.""" + # Useful for sync database drivers, file I/O, CPU work + return await asyncio.to_thread(sync_library.process, data) +``` + +**Lower-level approach with `run_in_executor()`:** + ```python import asyncio import concurrent.futures @@ -596,7 +650,7 @@ def blocking_operation(data: Any) -> Any: async def run_in_executor(data: Any) -> Any: """Run blocking operation in thread pool.""" - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, blocking_operation, data) return result @@ -692,11 +746,12 @@ async def test_with_timeout(): 1. **Use asyncio.run()** for entry point (Python 3.7+) 2. **Always await coroutines** to execute them -3. **Use gather() for concurrent execution** of multiple tasks +3. **Limit concurrency with semaphores** - unbounded `gather()` can exhaust resources 4. **Implement proper error handling** with try/except 5. **Use timeouts** to prevent hanging operations 6. **Pool connections** for better performance -7. **Avoid blocking operations** in async code -8. **Use semaphores** for rate limiting -9. **Handle task cancellation** properly +7. **Never block the event loop** - use `asyncio.to_thread()` for sync code +8. **Use semaphores** for rate limiting external API calls +9. **Handle task cancellation** properly - always re-raise `CancelledError` 10. **Test async code** with pytest-asyncio +11. **Stay consistent** - fully sync or fully async, avoid mixing diff --git a/plugins/python-development/skills/python-anti-patterns/SKILL.md b/plugins/python-development/skills/python-anti-patterns/SKILL.md new file mode 100644 index 0000000..963a67d --- /dev/null +++ b/plugins/python-development/skills/python-anti-patterns/SKILL.md @@ -0,0 +1,349 @@ +--- +name: python-anti-patterns +description: Common Python anti-patterns to avoid. Use as a checklist when reviewing code, before finalizing implementations, or when debugging issues that might stem from known bad practices. +--- + +# Python Anti-Patterns Checklist + +A reference checklist of common mistakes and anti-patterns in Python code. Review this before finalizing implementations to catch issues early. + +## When to Use This Skill + +- Reviewing code before merge +- Debugging mysterious issues +- Teaching or learning Python best practices +- Establishing team coding standards +- Refactoring legacy code + +**Note:** This skill focuses on what to avoid. For guidance on positive patterns and architecture, see the `python-design-patterns` skill. + +## Infrastructure Anti-Patterns + +### Scattered Timeout/Retry Logic + +```python +# BAD: Timeout logic duplicated everywhere +def fetch_user(user_id): + try: + return requests.get(url, timeout=30) + except Timeout: + logger.warning("Timeout fetching user") + return None + +def fetch_orders(user_id): + try: + return requests.get(url, timeout=30) + except Timeout: + logger.warning("Timeout fetching orders") + return None +``` + +**Fix:** Centralize in decorators or client wrappers. + +```python +# GOOD: Centralized retry logic +@retry(stop=stop_after_attempt(3), wait=wait_exponential()) +def http_get(url: str) -> Response: + return requests.get(url, timeout=30) +``` + +### Double Retry + +```python +# BAD: Retrying at multiple layers +@retry(max_attempts=3) # Application retry +def call_service(): + return client.request() # Client also has retry configured! +``` + +**Fix:** Retry at one layer only. Know your infrastructure's retry behavior. + +### Hard-Coded Configuration + +```python +# BAD: Secrets and config in code +DB_HOST = "prod-db.example.com" +API_KEY = "sk-12345" + +def connect(): + return psycopg.connect(f"host={DB_HOST}...") +``` + +**Fix:** Use environment variables with typed settings. + +```python +# GOOD +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + db_host: str = Field(alias="DB_HOST") + api_key: str = Field(alias="API_KEY") + +settings = Settings() +``` + +## Architecture Anti-Patterns + +### Exposed Internal Types + +```python +# BAD: Leaking ORM model to API +@app.get("/users/{id}") +def get_user(id: str) -> UserModel: # SQLAlchemy model + return db.query(UserModel).get(id) +``` + +**Fix:** Use DTOs/response models. + +```python +# GOOD +@app.get("/users/{id}") +def get_user(id: str) -> UserResponse: + user = db.query(UserModel).get(id) + return UserResponse.from_orm(user) +``` + +### Mixed I/O and Business Logic + +```python +# BAD: SQL embedded in business logic +def calculate_discount(user_id: str) -> float: + user = db.query("SELECT * FROM users WHERE id = ?", user_id) + orders = db.query("SELECT * FROM orders WHERE user_id = ?", user_id) + # Business logic mixed with data access + if len(orders) > 10: + return 0.15 + return 0.0 +``` + +**Fix:** Repository pattern. Keep business logic pure. + +```python +# GOOD +def calculate_discount(user: User, orders: list[Order]) -> float: + # Pure business logic, easily testable + if len(orders) > 10: + return 0.15 + return 0.0 +``` + +## Error Handling Anti-Patterns + +### Bare Exception Handling + +```python +# BAD: Swallowing all exceptions +try: + process() +except Exception: + pass # Silent failure - bugs hidden forever +``` + +**Fix:** Catch specific exceptions. Log or handle appropriately. + +```python +# GOOD +try: + process() +except ConnectionError as e: + logger.warning("Connection failed, will retry", error=str(e)) + raise +except ValueError as e: + logger.error("Invalid input", error=str(e)) + raise BadRequestError(str(e)) +``` + +### Ignored Partial Failures + +```python +# BAD: Stops on first error +def process_batch(items): + results = [] + for item in items: + result = process(item) # Raises on error - batch aborted + results.append(result) + return results +``` + +**Fix:** Capture both successes and failures. + +```python +# GOOD +def process_batch(items) -> BatchResult: + succeeded = {} + failed = {} + for idx, item in enumerate(items): + try: + succeeded[idx] = process(item) + except Exception as e: + failed[idx] = e + return BatchResult(succeeded, failed) +``` + +### Missing Input Validation + +```python +# BAD: No validation +def create_user(data: dict): + return User(**data) # Crashes deep in code on bad input +``` + +**Fix:** Validate early at API boundaries. + +```python +# GOOD +def create_user(data: dict) -> User: + validated = CreateUserInput.model_validate(data) + return User.from_input(validated) +``` + +## Resource Anti-Patterns + +### Unclosed Resources + +```python +# BAD: File never closed +def read_file(path): + f = open(path) + return f.read() # What if this raises? +``` + +**Fix:** Use context managers. + +```python +# GOOD +def read_file(path): + with open(path) as f: + return f.read() +``` + +### Blocking in Async + +```python +# BAD: Blocks the entire event loop +async def fetch_data(): + time.sleep(1) # Blocks everything! + response = requests.get(url) # Also blocks! +``` + +**Fix:** Use async-native libraries. + +```python +# GOOD +async def fetch_data(): + await asyncio.sleep(1) + async with httpx.AsyncClient() as client: + response = await client.get(url) +``` + +## Type Safety Anti-Patterns + +### Missing Type Hints + +```python +# BAD: No types +def process(data): + return data["value"] * 2 +``` + +**Fix:** Annotate all public functions. + +```python +# GOOD +def process(data: dict[str, int]) -> int: + return data["value"] * 2 +``` + +### Untyped Collections + +```python +# BAD: Generic list without type parameter +def get_users() -> list: + ... +``` + +**Fix:** Use type parameters. + +```python +# GOOD +def get_users() -> list[User]: + ... +``` + +## Testing Anti-Patterns + +### Only Testing Happy Paths + +```python +# BAD: Only tests success case +def test_create_user(): + user = service.create_user(valid_data) + assert user.id is not None +``` + +**Fix:** Test error conditions and edge cases. + +```python +# GOOD +def test_create_user_success(): + user = service.create_user(valid_data) + assert user.id is not None + +def test_create_user_invalid_email(): + with pytest.raises(ValueError, match="Invalid email"): + service.create_user(invalid_email_data) + +def test_create_user_duplicate_email(): + service.create_user(valid_data) + with pytest.raises(ConflictError): + service.create_user(valid_data) +``` + +### Over-Mocking + +```python +# BAD: Mocking everything +def test_user_service(): + mock_repo = Mock() + mock_cache = Mock() + mock_logger = Mock() + mock_metrics = Mock() + # Test doesn't verify real behavior +``` + +**Fix:** Use integration tests for critical paths. Mock only external services. + +## Quick Review Checklist + +Before finalizing code, verify: + +- [ ] No scattered timeout/retry logic (centralized) +- [ ] No double retry (app + infrastructure) +- [ ] No hard-coded configuration or secrets +- [ ] No exposed internal types (ORM models, protobufs) +- [ ] No mixed I/O and business logic +- [ ] No bare `except Exception: pass` +- [ ] No ignored partial failures in batches +- [ ] No missing input validation +- [ ] No unclosed resources (using context managers) +- [ ] No blocking calls in async code +- [ ] All public functions have type hints +- [ ] Collections have type parameters +- [ ] Error paths are tested +- [ ] Edge cases are covered + +## Common Fixes Summary + +| Anti-Pattern | Fix | +|-------------|-----| +| Scattered retry logic | Centralized decorators | +| Hard-coded config | Environment variables + pydantic-settings | +| Exposed ORM models | DTO/response schemas | +| Mixed I/O + logic | Repository pattern | +| Bare except | Catch specific exceptions | +| Batch stops on error | Return BatchResult with successes/failures | +| No validation | Validate at boundaries with Pydantic | +| Unclosed resources | Context managers | +| Blocking in async | Async-native libraries | +| Missing types | Type annotations on all public APIs | +| Only happy path tests | Test errors and edge cases | diff --git a/plugins/python-development/skills/python-background-jobs/SKILL.md b/plugins/python-development/skills/python-background-jobs/SKILL.md new file mode 100644 index 0000000..87218ee --- /dev/null +++ b/plugins/python-development/skills/python-background-jobs/SKILL.md @@ -0,0 +1,364 @@ +--- +name: python-background-jobs +description: Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles. +--- + +# Python Background Jobs & Task Queues + +Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously. + +## When to Use This Skill + +- Processing tasks that take longer than a few seconds +- Sending emails, notifications, or webhooks +- Generating reports or exporting data +- Processing uploads or media transformations +- Integrating with unreliable external services +- Building event-driven architectures + +## Core Concepts + +### 1. Task Queue Pattern + +API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously. + +### 2. Idempotency + +Tasks may be retried on failure. Design for safe re-execution. + +### 3. Job State Machine + +Jobs transition through states: pending → running → succeeded/failed. + +### 4. At-Least-Once Delivery + +Most queues guarantee at-least-once delivery. Your code must handle duplicates. + +## Quick Start + +This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices. + +```python +from celery import Celery + +app = Celery("tasks", broker="redis://localhost:6379") + +@app.task +def send_email(to: str, subject: str, body: str) -> None: + # This runs in a background worker + email_client.send(to, subject, body) + +# In your API handler +send_email.delay("user@example.com", "Welcome!", "Thanks for signing up") +``` + +## Fundamental Patterns + +### Pattern 1: Return Job ID Immediately + +For operations exceeding a few seconds, return a job ID and process asynchronously. + +```python +from uuid import uuid4 +from dataclasses import dataclass +from enum import Enum +from datetime import datetime + +class JobStatus(Enum): + PENDING = "pending" + RUNNING = "running" + SUCCEEDED = "succeeded" + FAILED = "failed" + +@dataclass +class Job: + id: str + status: JobStatus + created_at: datetime + started_at: datetime | None = None + completed_at: datetime | None = None + result: dict | None = None + error: str | None = None + +# API endpoint +async def start_export(request: ExportRequest) -> JobResponse: + """Start export job and return job ID.""" + job_id = str(uuid4()) + + # Persist job record + await jobs_repo.create(Job( + id=job_id, + status=JobStatus.PENDING, + created_at=datetime.utcnow(), + )) + + # Enqueue task for background processing + await task_queue.enqueue( + "export_data", + job_id=job_id, + params=request.model_dump(), + ) + + # Return immediately with job ID + return JobResponse( + job_id=job_id, + status="pending", + poll_url=f"/jobs/{job_id}", + ) +``` + +### Pattern 2: Celery Task Configuration + +Configure Celery tasks with proper retry and timeout settings. + +```python +from celery import Celery + +app = Celery("tasks", broker="redis://localhost:6379") + +# Global configuration +app.conf.update( + task_time_limit=3600, # Hard limit: 1 hour + task_soft_time_limit=3000, # Soft limit: 50 minutes + task_acks_late=True, # Acknowledge after completion + task_reject_on_worker_lost=True, + worker_prefetch_multiplier=1, # Don't prefetch too many tasks +) + +@app.task( + bind=True, + max_retries=3, + default_retry_delay=60, + autoretry_for=(ConnectionError, TimeoutError), +) +def process_payment(self, payment_id: str) -> dict: + """Process payment with automatic retry on transient errors.""" + try: + result = payment_gateway.charge(payment_id) + return {"status": "success", "transaction_id": result.id} + except PaymentDeclinedError as e: + # Don't retry permanent failures + return {"status": "declined", "reason": str(e)} + except TransientError as e: + # Retry with exponential backoff + raise self.retry(exc=e, countdown=2 ** self.request.retries * 60) +``` + +### Pattern 3: Make Tasks Idempotent + +Workers may retry on crash or timeout. Design for safe re-execution. + +```python +@app.task(bind=True) +def process_order(self, order_id: str) -> None: + """Process order idempotently.""" + order = orders_repo.get(order_id) + + # Already processed? Return early + if order.status == OrderStatus.COMPLETED: + logger.info("Order already processed", order_id=order_id) + return + + # Already in progress? Check if we should continue + if order.status == OrderStatus.PROCESSING: + # Use idempotency key to avoid double-charging + pass + + # Process with idempotency key + result = payment_provider.charge( + amount=order.total, + idempotency_key=f"order-{order_id}", # Critical! + ) + + orders_repo.update(order_id, status=OrderStatus.COMPLETED) +``` + +**Idempotency Strategies:** + +1. **Check-before-write**: Verify state before action +2. **Idempotency keys**: Use unique tokens with external services +3. **Upsert patterns**: `INSERT ... ON CONFLICT UPDATE` +4. **Deduplication window**: Track processed IDs for N hours + +### Pattern 4: Job State Management + +Persist job state transitions for visibility and debugging. + +```python +class JobRepository: + """Repository for managing job state.""" + + async def create(self, job: Job) -> Job: + """Create new job record.""" + await self._db.execute( + """INSERT INTO jobs (id, status, created_at) + VALUES ($1, $2, $3)""", + job.id, job.status.value, job.created_at, + ) + return job + + async def update_status( + self, + job_id: str, + status: JobStatus, + **fields, + ) -> None: + """Update job status with timestamp.""" + updates = {"status": status.value, **fields} + + if status == JobStatus.RUNNING: + updates["started_at"] = datetime.utcnow() + elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED): + updates["completed_at"] = datetime.utcnow() + + await self._db.execute( + "UPDATE jobs SET status = $1, ... WHERE id = $2", + updates, job_id, + ) + + logger.info( + "Job status updated", + job_id=job_id, + status=status.value, + ) +``` + +## Advanced Patterns + +### Pattern 5: Dead Letter Queue + +Handle permanently failed tasks for manual inspection. + +```python +@app.task(bind=True, max_retries=3) +def process_webhook(self, webhook_id: str, payload: dict) -> None: + """Process webhook with DLQ for failures.""" + try: + result = send_webhook(payload) + if not result.success: + raise WebhookFailedError(result.error) + except Exception as e: + if self.request.retries >= self.max_retries: + # Move to dead letter queue for manual inspection + dead_letter_queue.send({ + "task": "process_webhook", + "webhook_id": webhook_id, + "payload": payload, + "error": str(e), + "attempts": self.request.retries + 1, + "failed_at": datetime.utcnow().isoformat(), + }) + logger.error( + "Webhook moved to DLQ after max retries", + webhook_id=webhook_id, + error=str(e), + ) + return + + # Exponential backoff retry + raise self.retry(exc=e, countdown=2 ** self.request.retries * 60) +``` + +### Pattern 6: Status Polling Endpoint + +Provide an endpoint for clients to check job status. + +```python +from fastapi import FastAPI, HTTPException + +app = FastAPI() + +@app.get("/jobs/{job_id}") +async def get_job_status(job_id: str) -> JobStatusResponse: + """Get current status of a background job.""" + job = await jobs_repo.get(job_id) + + if job is None: + raise HTTPException(404, f"Job {job_id} not found") + + return JobStatusResponse( + job_id=job.id, + status=job.status.value, + created_at=job.created_at, + started_at=job.started_at, + completed_at=job.completed_at, + result=job.result if job.status == JobStatus.SUCCEEDED else None, + error=job.error if job.status == JobStatus.FAILED else None, + # Helpful for clients + is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED), + ) +``` + +### Pattern 7: Task Chaining and Workflows + +Compose complex workflows from simple tasks. + +```python +from celery import chain, group, chord + +# Simple chain: A → B → C +workflow = chain( + extract_data.s(source_id), + transform_data.s(), + load_data.s(destination_id), +) + +# Parallel execution: A, B, C all at once +parallel = group( + send_email.s(user_email), + send_sms.s(user_phone), + update_analytics.s(event_data), +) + +# Chord: Run tasks in parallel, then a callback +# Process all items, then send completion notification +workflow = chord( + [process_item.s(item_id) for item_id in item_ids], + send_completion_notification.s(batch_id), +) + +workflow.apply_async() +``` + +### Pattern 8: Alternative Task Queues + +Choose the right tool for your needs. + +**RQ (Redis Queue)**: Simple, Redis-based +```python +from rq import Queue +from redis import Redis + +queue = Queue(connection=Redis()) +job = queue.enqueue(send_email, "user@example.com", "Subject", "Body") +``` + +**Dramatiq**: Modern Celery alternative +```python +import dramatiq +from dramatiq.brokers.redis import RedisBroker + +dramatiq.set_broker(RedisBroker()) + +@dramatiq.actor +def send_email(to: str, subject: str, body: str) -> None: + email_client.send(to, subject, body) +``` + +**Cloud-native options:** +- AWS SQS + Lambda +- Google Cloud Tasks +- Azure Functions + +## Best Practices Summary + +1. **Return immediately** - Don't block requests for long operations +2. **Persist job state** - Enable status polling and debugging +3. **Make tasks idempotent** - Safe to retry on any failure +4. **Use idempotency keys** - For external service calls +5. **Set timeouts** - Both soft and hard limits +6. **Implement DLQ** - Capture permanently failed tasks +7. **Log transitions** - Track job state changes +8. **Retry appropriately** - Exponential backoff for transient errors +9. **Don't retry permanent failures** - Validation errors, invalid credentials +10. **Monitor queue depth** - Alert on backlog growth diff --git a/plugins/python-development/skills/python-code-style/SKILL.md b/plugins/python-development/skills/python-code-style/SKILL.md new file mode 100644 index 0000000..61cf0ba --- /dev/null +++ b/plugins/python-development/skills/python-code-style/SKILL.md @@ -0,0 +1,360 @@ +--- +name: python-code-style +description: Python code style, linting, formatting, naming conventions, and documentation standards. Use when writing new code, reviewing style, configuring linters, writing docstrings, or establishing project standards. +--- + +# Python Code Style & Documentation + +Consistent code style and clear documentation make codebases maintainable and collaborative. This skill covers modern Python tooling, naming conventions, and documentation standards. + +## When to Use This Skill + +- Setting up linting and formatting for a new project +- Writing or reviewing docstrings +- Establishing team coding standards +- Configuring ruff, mypy, or pyright +- Reviewing code for style consistency +- Creating project documentation + +## Core Concepts + +### 1. Automated Formatting + +Let tools handle formatting debates. Configure once, enforce automatically. + +### 2. Consistent Naming + +Follow PEP 8 conventions with meaningful, descriptive names. + +### 3. Documentation as Code + +Docstrings should be maintained alongside the code they describe. + +### 4. Type Annotations + +Modern Python code should include type hints for all public APIs. + +## Quick Start + +```bash +# Install modern tooling +pip install ruff mypy + +# Configure in pyproject.toml +[tool.ruff] +line-length = 120 +target-version = "py312" # Adjust based on your project's minimum Python version + +[tool.mypy] +strict = true +``` + +## Fundamental Patterns + +### Pattern 1: Modern Python Tooling + +Use `ruff` as an all-in-one linter and formatter. It replaces flake8, isort, and black with a single fast tool. + +```toml +# pyproject.toml +[tool.ruff] +line-length = 120 +target-version = "py312" # Adjust based on your project's minimum Python version + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade + "SIM", # flake8-simplify +] +ignore = ["E501"] # Line length handled by formatter + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +``` + +Run with: + +```bash +ruff check --fix . # Lint and auto-fix +ruff format . # Format code +``` + +### Pattern 2: Type Checking Configuration + +Configure strict type checking for production code. + +```toml +# pyproject.toml +[tool.mypy] +python_version = "3.12" +strict = true +warn_return_any = true +warn_unused_ignores = true +disallow_untyped_defs = true +disallow_incomplete_defs = true + +[[tool.mypy.overrides]] +module = "tests.*" +disallow_untyped_defs = false +``` + +Alternative: Use `pyright` for faster checking. + +```toml +[tool.pyright] +pythonVersion = "3.12" +typeCheckingMode = "strict" +``` + +### Pattern 3: Naming Conventions + +Follow PEP 8 with emphasis on clarity over brevity. + +**Files and Modules:** + +```python +# Good: Descriptive snake_case +user_repository.py +order_processing.py +http_client.py + +# Avoid: Abbreviations +usr_repo.py +ord_proc.py +http_cli.py +``` + +**Classes and Functions:** + +```python +# Classes: PascalCase +class UserRepository: + pass + +class HTTPClientFactory: # Acronyms stay uppercase + pass + +# Functions and variables: snake_case +def get_user_by_email(email: str) -> User | None: + retry_count = 3 + max_connections = 100 +``` + +**Constants:** + +```python +# Module-level constants: SCREAMING_SNAKE_CASE +MAX_RETRY_ATTEMPTS = 3 +DEFAULT_TIMEOUT_SECONDS = 30 +API_BASE_URL = "https://api.example.com" +``` + +### Pattern 4: Import Organization + +Group imports in a consistent order: standard library, third-party, local. + +```python +# Standard library +import os +from collections.abc import Callable +from typing import Any + +# Third-party packages +import httpx +from pydantic import BaseModel +from sqlalchemy import Column + +# Local imports +from myproject.models import User +from myproject.services import UserService +``` + +Use absolute imports exclusively: + +```python +# Preferred +from myproject.utils import retry_decorator + +# Avoid relative imports +from ..utils import retry_decorator +``` + +## Advanced Patterns + +### Pattern 5: Google-Style Docstrings + +Write docstrings for all public classes, methods, and functions. + +**Simple Function:** + +```python +def get_user(user_id: str) -> User: + """Retrieve a user by their unique identifier.""" + ... +``` + +**Complex Function:** + +```python +def process_batch( + items: list[Item], + max_workers: int = 4, + on_progress: Callable[[int, int], None] | None = None, +) -> BatchResult: + """Process items concurrently using a worker pool. + + Processes each item in the batch using the configured number of + workers. Progress can be monitored via the optional callback. + + Args: + items: The items to process. Must not be empty. + max_workers: Maximum concurrent workers. Defaults to 4. + on_progress: Optional callback receiving (completed, total) counts. + + Returns: + BatchResult containing succeeded items and any failures with + their associated exceptions. + + Raises: + ValueError: If items is empty. + ProcessingError: If the batch cannot be processed. + + Example: + >>> result = process_batch(items, max_workers=8) + >>> print(f"Processed {len(result.succeeded)} items") + """ + ... +``` + +**Class Docstring:** + +```python +class UserService: + """Service for managing user operations. + + Provides methods for creating, retrieving, updating, and + deleting users with proper validation and error handling. + + Attributes: + repository: The data access layer for user persistence. + logger: Logger instance for operation tracking. + + Example: + >>> service = UserService(repository, logger) + >>> user = service.create_user(CreateUserInput(...)) + """ + + def __init__(self, repository: UserRepository, logger: Logger) -> None: + """Initialize the user service. + + Args: + repository: Data access layer for users. + logger: Logger for tracking operations. + """ + self.repository = repository + self.logger = logger +``` + +### Pattern 6: Line Length and Formatting + +Set line length to 120 characters for modern displays while maintaining readability. + +```python +# Good: Readable line breaks +def create_user( + email: str, + name: str, + role: UserRole = UserRole.MEMBER, + notify: bool = True, +) -> User: + ... + +# Good: Chain method calls clearly +result = ( + db.query(User) + .filter(User.active == True) + .order_by(User.created_at.desc()) + .limit(10) + .all() +) + +# Good: Format long strings +error_message = ( + f"Failed to process user {user_id}: " + f"received status {response.status_code} " + f"with body {response.text[:100]}" +) +``` + +### Pattern 7: Project Documentation + +**README Structure:** + +```markdown +# Project Name + +Brief description of what the project does. + +## Installation + +\`\`\`bash +pip install myproject +\`\`\` + +## Quick Start + +\`\`\`python +from myproject import Client + +client = Client(api_key="...") +result = client.process(data) +\`\`\` + +## Configuration + +Document environment variables and configuration options. + +## Development + +\`\`\`bash +pip install -e ".[dev]" +pytest +\`\`\` +``` + +**CHANGELOG Format (Keep a Changelog):** + +```markdown +# Changelog + +## [Unreleased] + +### Added +- New feature X + +### Changed +- Modified behavior of Y + +### Fixed +- Bug in Z +``` + +## Best Practices Summary + +1. **Use ruff** - Single tool for linting and formatting +2. **Enable strict mypy** - Catch type errors before runtime +3. **120 character lines** - Modern standard for readability +4. **Descriptive names** - Clarity over brevity +5. **Absolute imports** - More maintainable than relative +6. **Google-style docstrings** - Consistent, readable documentation +7. **Document public APIs** - Every public function needs a docstring +8. **Keep docs updated** - Treat documentation as code +9. **Automate in CI** - Run linters on every commit +10. **Target Python 3.10+** - For new projects, Python 3.12+ is recommended for modern language features diff --git a/plugins/python-development/skills/python-configuration/SKILL.md b/plugins/python-development/skills/python-configuration/SKILL.md new file mode 100644 index 0000000..3dd049f --- /dev/null +++ b/plugins/python-development/skills/python-configuration/SKILL.md @@ -0,0 +1,368 @@ +--- +name: python-configuration +description: Python configuration management via environment variables and typed settings. Use when externalizing config, setting up pydantic-settings, managing secrets, or implementing environment-specific behavior. +--- + +# Python Configuration Management + +Externalize configuration from code using environment variables and typed settings. Well-managed configuration enables the same code to run in any environment without modification. + +## When to Use This Skill + +- Setting up a new project's configuration system +- Migrating from hardcoded values to environment variables +- Implementing pydantic-settings for typed configuration +- Managing secrets and sensitive values +- Creating environment-specific settings (dev/staging/prod) +- Validating configuration at application startup + +## Core Concepts + +### 1. Externalized Configuration + +All environment-specific values (URLs, secrets, feature flags) come from environment variables, not code. + +### 2. Typed Settings + +Parse and validate configuration into typed objects at startup, not scattered throughout code. + +### 3. Fail Fast + +Validate all required configuration at application boot. Missing config should crash immediately with a clear message. + +### 4. Sensible Defaults + +Provide reasonable defaults for local development while requiring explicit values for sensitive settings. + +## Quick Start + +```python +from pydantic_settings import BaseSettings +from pydantic import Field + +class Settings(BaseSettings): + database_url: str = Field(alias="DATABASE_URL") + api_key: str = Field(alias="API_KEY") + debug: bool = Field(default=False, alias="DEBUG") + +settings = Settings() # Loads from environment +``` + +## Fundamental Patterns + +### Pattern 1: Typed Settings with Pydantic + +Create a central settings class that loads and validates all configuration. + +```python +from pydantic_settings import BaseSettings +from pydantic import Field, PostgresDsn, ValidationError +import sys + +class Settings(BaseSettings): + """Application configuration loaded from environment variables.""" + + # Database + db_host: str = Field(alias="DB_HOST") + db_port: int = Field(default=5432, alias="DB_PORT") + db_name: str = Field(alias="DB_NAME") + db_user: str = Field(alias="DB_USER") + db_password: str = Field(alias="DB_PASSWORD") + + # Redis + redis_url: str = Field(default="redis://localhost:6379", alias="REDIS_URL") + + # API Keys + api_secret_key: str = Field(alias="API_SECRET_KEY") + + # Feature flags + enable_new_feature: bool = Field(default=False, alias="ENABLE_NEW_FEATURE") + + model_config = { + "env_file": ".env", + "env_file_encoding": "utf-8", + } + +# Create singleton instance at module load +try: + settings = Settings() +except ValidationError as e: + print(f"Configuration error:\n{e}") + sys.exit(1) +``` + +Import `settings` throughout your application: + +```python +from myapp.config import settings + +def get_database_connection(): + return connect( + host=settings.db_host, + port=settings.db_port, + database=settings.db_name, + ) +``` + +### Pattern 2: Fail Fast on Missing Configuration + +Required settings should crash the application immediately with a clear error. + +```python +from pydantic_settings import BaseSettings +from pydantic import Field, ValidationError +import sys + +class Settings(BaseSettings): + # Required - no default means it must be set + api_key: str = Field(alias="API_KEY") + database_url: str = Field(alias="DATABASE_URL") + + # Optional with defaults + log_level: str = Field(default="INFO", alias="LOG_LEVEL") + +try: + settings = Settings() +except ValidationError as e: + print("=" * 60) + print("CONFIGURATION ERROR") + print("=" * 60) + for error in e.errors(): + field = error["loc"][0] + print(f" - {field}: {error['msg']}") + print("\nPlease set the required environment variables.") + sys.exit(1) +``` + +A clear error at startup is better than a cryptic `None` failure mid-request. + +### Pattern 3: Local Development Defaults + +Provide sensible defaults for local development while requiring explicit values for secrets. + +```python +class Settings(BaseSettings): + # Has local default, but prod will override + db_host: str = Field(default="localhost", alias="DB_HOST") + db_port: int = Field(default=5432, alias="DB_PORT") + + # Always required - no default for secrets + db_password: str = Field(alias="DB_PASSWORD") + api_secret_key: str = Field(alias="API_SECRET_KEY") + + # Development convenience + debug: bool = Field(default=False, alias="DEBUG") + + model_config = {"env_file": ".env"} +``` + +Create a `.env` file for local development (never commit this): + +```bash +# .env (add to .gitignore) +DB_PASSWORD=local_dev_password +API_SECRET_KEY=dev-secret-key +DEBUG=true +``` + +### Pattern 4: Namespaced Environment Variables + +Prefix related variables for clarity and easy debugging. + +```bash +# Database configuration +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=myapp +DB_USER=admin +DB_PASSWORD=secret + +# Redis configuration +REDIS_URL=redis://localhost:6379 +REDIS_MAX_CONNECTIONS=10 + +# Authentication +AUTH_SECRET_KEY=your-secret-key +AUTH_TOKEN_EXPIRY_SECONDS=3600 +AUTH_ALGORITHM=HS256 + +# Feature flags +FEATURE_NEW_CHECKOUT=true +FEATURE_BETA_UI=false +``` + +Makes `env | grep DB_` useful for debugging. + +## Advanced Patterns + +### Pattern 5: Type Coercion + +Pydantic handles common conversions automatically. + +```python +from pydantic_settings import BaseSettings +from pydantic import Field, field_validator + +class Settings(BaseSettings): + # Automatically converts "true", "1", "yes" to True + debug: bool = False + + # Automatically converts string to int + max_connections: int = 100 + + # Parse comma-separated string to list + allowed_hosts: list[str] = Field(default_factory=list) + + @field_validator("allowed_hosts", mode="before") + @classmethod + def parse_allowed_hosts(cls, v: str | list[str]) -> list[str]: + if isinstance(v, str): + return [host.strip() for host in v.split(",") if host.strip()] + return v +``` + +Usage: + +```bash +ALLOWED_HOSTS=example.com,api.example.com,localhost +MAX_CONNECTIONS=50 +DEBUG=true +``` + +### Pattern 6: Environment-Specific Configuration + +Use an environment enum to switch behavior. + +```python +from enum import Enum +from pydantic_settings import BaseSettings +from pydantic import Field, computed_field + +class Environment(str, Enum): + LOCAL = "local" + STAGING = "staging" + PRODUCTION = "production" + +class Settings(BaseSettings): + environment: Environment = Field( + default=Environment.LOCAL, + alias="ENVIRONMENT", + ) + + # Settings that vary by environment + log_level: str = Field(default="DEBUG", alias="LOG_LEVEL") + + @computed_field + @property + def is_production(self) -> bool: + return self.environment == Environment.PRODUCTION + + @computed_field + @property + def is_local(self) -> bool: + return self.environment == Environment.LOCAL + +# Usage +if settings.is_production: + configure_production_logging() +else: + configure_debug_logging() +``` + +### Pattern 7: Nested Configuration Groups + +Organize related settings into nested models. + +```python +from pydantic import BaseModel +from pydantic_settings import BaseSettings + +class DatabaseSettings(BaseModel): + host: str = "localhost" + port: int = 5432 + name: str + user: str + password: str + +class RedisSettings(BaseModel): + url: str = "redis://localhost:6379" + max_connections: int = 10 + +class Settings(BaseSettings): + database: DatabaseSettings + redis: RedisSettings + debug: bool = False + + model_config = { + "env_nested_delimiter": "__", + "env_file": ".env", + } +``` + +Environment variables use double underscore for nesting: + +```bash +DATABASE__HOST=db.example.com +DATABASE__PORT=5432 +DATABASE__NAME=myapp +DATABASE__USER=admin +DATABASE__PASSWORD=secret +REDIS__URL=redis://redis.example.com:6379 +``` + +### Pattern 8: Secrets from Files + +For container environments, read secrets from mounted files. + +```python +from pydantic_settings import BaseSettings +from pydantic import Field +from pathlib import Path + +class Settings(BaseSettings): + # Read from environment variable or file + db_password: str = Field(alias="DB_PASSWORD") + + model_config = { + "secrets_dir": "/run/secrets", # Docker secrets location + } +``` + +Pydantic will look for `/run/secrets/db_password` if the env var isn't set. + +### Pattern 9: Configuration Validation + +Add custom validation for complex requirements. + +```python +from pydantic_settings import BaseSettings +from pydantic import Field, model_validator + +class Settings(BaseSettings): + db_host: str = Field(alias="DB_HOST") + db_port: int = Field(alias="DB_PORT") + read_replica_host: str | None = Field(default=None, alias="READ_REPLICA_HOST") + read_replica_port: int = Field(default=5432, alias="READ_REPLICA_PORT") + + @model_validator(mode="after") + def validate_replica_settings(self): + if self.read_replica_host and self.read_replica_port == self.db_port: + if self.read_replica_host == self.db_host: + raise ValueError( + "Read replica cannot be the same as primary database" + ) + return self +``` + +## Best Practices Summary + +1. **Never hardcode config** - All environment-specific values from env vars +2. **Use typed settings** - Pydantic-settings with validation +3. **Fail fast** - Crash on missing required config at startup +4. **Provide dev defaults** - Make local development easy +5. **Never commit secrets** - Use `.env` files (gitignored) or secret managers +6. **Namespace variables** - `DB_HOST`, `REDIS_URL` for clarity +7. **Import settings singleton** - Don't call `os.getenv()` throughout code +8. **Document all variables** - README should list required env vars +9. **Validate early** - Check config correctness at boot time +10. **Use secrets_dir** - Support mounted secrets in containers diff --git a/plugins/python-development/skills/python-design-patterns/SKILL.md b/plugins/python-development/skills/python-design-patterns/SKILL.md new file mode 100644 index 0000000..22603b6 --- /dev/null +++ b/plugins/python-development/skills/python-design-patterns/SKILL.md @@ -0,0 +1,411 @@ +--- +name: python-design-patterns +description: Python design patterns including KISS, Separation of Concerns, Single Responsibility, and composition over inheritance. Use when making architecture decisions, refactoring code structure, or evaluating when abstractions are appropriate. +--- + +# Python Design Patterns + +Write maintainable Python code using fundamental design principles. These patterns help you build systems that are easy to understand, test, and modify. + +## When to Use This Skill + +- Designing new components or services +- Refactoring complex or tangled code +- Deciding whether to create an abstraction +- Choosing between inheritance and composition +- Evaluating code complexity and coupling +- Planning modular architectures + +## Core Concepts + +### 1. KISS (Keep It Simple) + +Choose the simplest solution that works. Complexity must be justified by concrete requirements. + +### 2. Single Responsibility (SRP) + +Each unit should have one reason to change. Separate concerns into focused components. + +### 3. Composition Over Inheritance + +Build behavior by combining objects, not extending classes. + +### 4. Rule of Three + +Wait until you have three instances before abstracting. Duplication is often better than premature abstraction. + +## Quick Start + +```python +# Simple beats clever +# Instead of a factory/registry pattern: +FORMATTERS = {"json": JsonFormatter, "csv": CsvFormatter} + +def get_formatter(name: str) -> Formatter: + return FORMATTERS[name]() +``` + +## Fundamental Patterns + +### Pattern 1: KISS - Keep It Simple + +Before adding complexity, ask: does a simpler solution work? + +```python +# Over-engineered: Factory with registration +class OutputFormatterFactory: + _formatters: dict[str, type[Formatter]] = {} + + @classmethod + def register(cls, name: str): + def decorator(formatter_cls): + cls._formatters[name] = formatter_cls + return formatter_cls + return decorator + + @classmethod + def create(cls, name: str) -> Formatter: + return cls._formatters[name]() + +@OutputFormatterFactory.register("json") +class JsonFormatter(Formatter): + ... + +# Simple: Just use a dictionary +FORMATTERS = { + "json": JsonFormatter, + "csv": CsvFormatter, + "xml": XmlFormatter, +} + +def get_formatter(name: str) -> Formatter: + """Get formatter by name.""" + if name not in FORMATTERS: + raise ValueError(f"Unknown format: {name}") + return FORMATTERS[name]() +``` + +The factory pattern adds code without adding value here. Save patterns for when they solve real problems. + +### Pattern 2: Single Responsibility Principle + +Each class or function should have one reason to change. + +```python +# BAD: Handler does everything +class UserHandler: + async def create_user(self, request: Request) -> Response: + # HTTP parsing + data = await request.json() + + # Validation + if not data.get("email"): + return Response({"error": "email required"}, status=400) + + # Database access + user = await db.execute( + "INSERT INTO users (email, name) VALUES ($1, $2) RETURNING *", + data["email"], data["name"] + ) + + # Response formatting + return Response({"id": user.id, "email": user.email}, status=201) + +# GOOD: Separated concerns +class UserService: + """Business logic only.""" + + def __init__(self, repo: UserRepository) -> None: + self._repo = repo + + async def create_user(self, data: CreateUserInput) -> User: + # Only business rules here + user = User(email=data.email, name=data.name) + return await self._repo.save(user) + +class UserHandler: + """HTTP concerns only.""" + + def __init__(self, service: UserService) -> None: + self._service = service + + async def create_user(self, request: Request) -> Response: + data = CreateUserInput(**(await request.json())) + user = await self._service.create_user(data) + return Response(user.to_dict(), status=201) +``` + +Now HTTP changes don't affect business logic, and vice versa. + +### Pattern 3: Separation of Concerns + +Organize code into distinct layers with clear responsibilities. + +``` +┌─────────────────────────────────────────────────────┐ +│ API Layer (handlers) │ +│ - Parse requests │ +│ - Call services │ +│ - Format responses │ +└─────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────┐ +│ Service Layer (business logic) │ +│ - Domain rules and validation │ +│ - Orchestrate operations │ +│ - Pure functions where possible │ +└─────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────┐ +│ Repository Layer (data access) │ +│ - SQL queries │ +│ - External API calls │ +│ - Cache operations │ +└─────────────────────────────────────────────────────┘ +``` + +Each layer depends only on layers below it: + +```python +# Repository: Data access +class UserRepository: + async def get_by_id(self, user_id: str) -> User | None: + row = await self._db.fetchrow( + "SELECT * FROM users WHERE id = $1", user_id + ) + return User(**row) if row else None + +# Service: Business logic +class UserService: + def __init__(self, repo: UserRepository) -> None: + self._repo = repo + + async def get_user(self, user_id: str) -> User: + user = await self._repo.get_by_id(user_id) + if user is None: + raise UserNotFoundError(user_id) + return user + +# Handler: HTTP concerns +@app.get("/users/{user_id}") +async def get_user(user_id: str) -> UserResponse: + user = await user_service.get_user(user_id) + return UserResponse.from_user(user) +``` + +### Pattern 4: Composition Over Inheritance + +Build behavior by combining objects rather than inheriting. + +```python +# Inheritance: Rigid and hard to test +class EmailNotificationService(NotificationService): + def __init__(self): + super().__init__() + self._smtp = SmtpClient() # Hard to mock + + def notify(self, user: User, message: str) -> None: + self._smtp.send(user.email, message) + +# Composition: Flexible and testable +class NotificationService: + """Send notifications via multiple channels.""" + + def __init__( + self, + email_sender: EmailSender, + sms_sender: SmsSender | None = None, + push_sender: PushSender | None = None, + ) -> None: + self._email = email_sender + self._sms = sms_sender + self._push = push_sender + + async def notify( + self, + user: User, + message: str, + channels: set[str] | None = None, + ) -> None: + channels = channels or {"email"} + + if "email" in channels: + await self._email.send(user.email, message) + + if "sms" in channels and self._sms and user.phone: + await self._sms.send(user.phone, message) + + if "push" in channels and self._push and user.device_token: + await self._push.send(user.device_token, message) + +# Easy to test with fakes +service = NotificationService( + email_sender=FakeEmailSender(), + sms_sender=FakeSmsSender(), +) +``` + +## Advanced Patterns + +### Pattern 5: Rule of Three + +Wait until you have three instances before abstracting. + +```python +# Two similar functions? Don't abstract yet +def process_orders(orders: list[Order]) -> list[Result]: + results = [] + for order in orders: + validated = validate_order(order) + result = process_validated_order(validated) + results.append(result) + return results + +def process_returns(returns: list[Return]) -> list[Result]: + results = [] + for ret in returns: + validated = validate_return(ret) + result = process_validated_return(validated) + results.append(result) + return results + +# These look similar, but wait! Are they actually the same? +# Different validation, different processing, different errors... +# Duplication is often better than the wrong abstraction + +# Only after a third case, consider if there's a real pattern +# But even then, sometimes explicit is better than abstract +``` + +### Pattern 6: Function Size Guidelines + +Keep functions focused. Extract when a function: + +- Exceeds 20-50 lines (varies by complexity) +- Serves multiple distinct purposes +- Has deeply nested logic (3+ levels) + +```python +# Too long, multiple concerns mixed +def process_order(order: Order) -> Result: + # 50 lines of validation... + # 30 lines of inventory check... + # 40 lines of payment processing... + # 20 lines of notification... + pass + +# Better: Composed from focused functions +def process_order(order: Order) -> Result: + """Process a customer order through the complete workflow.""" + validate_order(order) + reserve_inventory(order) + payment_result = charge_payment(order) + send_confirmation(order, payment_result) + return Result(success=True, order_id=order.id) +``` + +### Pattern 7: Dependency Injection + +Pass dependencies through constructors for testability. + +```python +from typing import Protocol + +class Logger(Protocol): + def info(self, msg: str, **kwargs) -> None: ... + def error(self, msg: str, **kwargs) -> None: ... + +class Cache(Protocol): + async def get(self, key: str) -> str | None: ... + async def set(self, key: str, value: str, ttl: int) -> None: ... + +class UserService: + """Service with injected dependencies.""" + + def __init__( + self, + repository: UserRepository, + cache: Cache, + logger: Logger, + ) -> None: + self._repo = repository + self._cache = cache + self._logger = logger + + async def get_user(self, user_id: str) -> User: + # Check cache first + cached = await self._cache.get(f"user:{user_id}") + if cached: + self._logger.info("Cache hit", user_id=user_id) + return User.from_json(cached) + + # Fetch from database + user = await self._repo.get_by_id(user_id) + if user: + await self._cache.set(f"user:{user_id}", user.to_json(), ttl=300) + + return user + +# Production +service = UserService( + repository=PostgresUserRepository(db), + cache=RedisCache(redis), + logger=StructlogLogger(), +) + +# Testing +service = UserService( + repository=InMemoryUserRepository(), + cache=FakeCache(), + logger=NullLogger(), +) +``` + +### Pattern 8: Avoiding Common Anti-Patterns + +**Don't expose internal types:** + +```python +# BAD: Leaking ORM model to API +@app.get("/users/{id}") +def get_user(id: str) -> UserModel: # SQLAlchemy model + return db.query(UserModel).get(id) + +# GOOD: Use response schemas +@app.get("/users/{id}") +def get_user(id: str) -> UserResponse: + user = db.query(UserModel).get(id) + return UserResponse.from_orm(user) +``` + +**Don't mix I/O with business logic:** + +```python +# BAD: SQL embedded in business logic +def calculate_discount(user_id: str) -> float: + user = db.query("SELECT * FROM users WHERE id = ?", user_id) + orders = db.query("SELECT * FROM orders WHERE user_id = ?", user_id) + # Business logic mixed with data access + +# GOOD: Repository pattern +def calculate_discount(user: User, order_history: list[Order]) -> float: + # Pure business logic, easily testable + if len(order_history) > 10: + return 0.15 + return 0.0 +``` + +## Best Practices Summary + +1. **Keep it simple** - Choose the simplest solution that works +2. **Single responsibility** - Each unit has one reason to change +3. **Separate concerns** - Distinct layers with clear purposes +4. **Compose, don't inherit** - Combine objects for flexibility +5. **Rule of three** - Wait before abstracting +6. **Keep functions small** - 20-50 lines (varies by complexity), one purpose +7. **Inject dependencies** - Constructor injection for testability +8. **Delete before abstracting** - Remove dead code, then consider patterns +9. **Test each layer** - Isolated tests for each concern +10. **Explicit over clever** - Readable code beats elegant code diff --git a/plugins/python-development/skills/python-error-handling/SKILL.md b/plugins/python-development/skills/python-error-handling/SKILL.md new file mode 100644 index 0000000..aa6d97c --- /dev/null +++ b/plugins/python-development/skills/python-error-handling/SKILL.md @@ -0,0 +1,359 @@ +--- +name: python-error-handling +description: Python error handling patterns including input validation, exception hierarchies, and partial failure handling. Use when implementing validation logic, designing exception strategies, handling batch processing failures, or building robust APIs. +--- + +# Python Error Handling + +Build robust Python applications with proper input validation, meaningful exceptions, and graceful failure handling. Good error handling makes debugging easier and systems more reliable. + +## When to Use This Skill + +- Validating user input and API parameters +- Designing exception hierarchies for applications +- Handling partial failures in batch operations +- Converting external data to domain types +- Building user-friendly error messages +- Implementing fail-fast validation patterns + +## Core Concepts + +### 1. Fail Fast + +Validate inputs early, before expensive operations. Report all validation errors at once when possible. + +### 2. Meaningful Exceptions + +Use appropriate exception types with context. Messages should explain what failed, why, and how to fix it. + +### 3. Partial Failures + +In batch operations, don't let one failure abort everything. Track successes and failures separately. + +### 4. Preserve Context + +Chain exceptions to maintain the full error trail for debugging. + +## Quick Start + +```python +def fetch_page(url: str, page_size: int) -> Page: + if not url: + raise ValueError("'url' is required") + if not 1 <= page_size <= 100: + raise ValueError(f"'page_size' must be 1-100, got {page_size}") + # Now safe to proceed... +``` + +## Fundamental Patterns + +### Pattern 1: Early Input Validation + +Validate all inputs at API boundaries before any processing begins. + +```python +def process_order( + order_id: str, + quantity: int, + discount_percent: float, +) -> OrderResult: + """Process an order with validation.""" + # Validate required fields + if not order_id: + raise ValueError("'order_id' is required") + + # Validate ranges + if quantity <= 0: + raise ValueError(f"'quantity' must be positive, got {quantity}") + + if not 0 <= discount_percent <= 100: + raise ValueError( + f"'discount_percent' must be 0-100, got {discount_percent}" + ) + + # Validation passed, proceed with processing + return _process_validated_order(order_id, quantity, discount_percent) +``` + +### Pattern 2: Convert to Domain Types Early + +Parse strings and external data into typed domain objects at system boundaries. + +```python +from enum import Enum + +class OutputFormat(Enum): + JSON = "json" + CSV = "csv" + PARQUET = "parquet" + +def parse_output_format(value: str) -> OutputFormat: + """Parse string to OutputFormat enum. + + Args: + value: Format string from user input. + + Returns: + Validated OutputFormat enum member. + + Raises: + ValueError: If format is not recognized. + """ + try: + return OutputFormat(value.lower()) + except ValueError: + valid_formats = [f.value for f in OutputFormat] + raise ValueError( + f"Invalid format '{value}'. " + f"Valid options: {', '.join(valid_formats)}" + ) + +# Usage at API boundary +def export_data(data: list[dict], format_str: str) -> bytes: + output_format = parse_output_format(format_str) # Fail fast + # Rest of function uses typed OutputFormat + ... +``` + +### Pattern 3: Pydantic for Complex Validation + +Use Pydantic models for structured input validation with automatic error messages. + +```python +from pydantic import BaseModel, Field, field_validator + +class CreateUserInput(BaseModel): + """Input model for user creation.""" + + email: str = Field(..., min_length=5, max_length=255) + name: str = Field(..., min_length=1, max_length=100) + age: int = Field(ge=0, le=150) + + @field_validator("email") + @classmethod + def validate_email_format(cls, v: str) -> str: + if "@" not in v or "." not in v.split("@")[-1]: + raise ValueError("Invalid email format") + return v.lower() + + @field_validator("name") + @classmethod + def normalize_name(cls, v: str) -> str: + return v.strip().title() + +# Usage +try: + user_input = CreateUserInput( + email="user@example.com", + name="john doe", + age=25, + ) +except ValidationError as e: + # Pydantic provides detailed error information + print(e.errors()) +``` + +### Pattern 4: Map Errors to Standard Exceptions + +Use Python's built-in exception types appropriately, adding context as needed. + +| Failure Type | Exception | Example | +|--------------|-----------|---------| +| Invalid input | `ValueError` | Bad parameter values | +| Wrong type | `TypeError` | Expected string, got int | +| Missing item | `KeyError` | Dict key not found | +| Operational failure | `RuntimeError` | Service unavailable | +| Timeout | `TimeoutError` | Operation took too long | +| File not found | `FileNotFoundError` | Path doesn't exist | +| Permission denied | `PermissionError` | Access forbidden | + +```python +# Good: Specific exception with context +raise ValueError(f"'page_size' must be 1-100, got {page_size}") + +# Avoid: Generic exception, no context +raise Exception("Invalid parameter") +``` + +## Advanced Patterns + +### Pattern 5: Custom Exceptions with Context + +Create domain-specific exceptions that carry structured information. + +```python +class ApiError(Exception): + """Base exception for API errors.""" + + def __init__( + self, + message: str, + status_code: int, + response_body: str | None = None, + ) -> None: + self.status_code = status_code + self.response_body = response_body + super().__init__(message) + +class RateLimitError(ApiError): + """Raised when rate limit is exceeded.""" + + def __init__(self, retry_after: int) -> None: + self.retry_after = retry_after + super().__init__( + f"Rate limit exceeded. Retry after {retry_after}s", + status_code=429, + ) + +# Usage +def handle_response(response: Response) -> dict: + match response.status_code: + case 200: + return response.json() + case 401: + raise ApiError("Invalid credentials", 401) + case 404: + raise ApiError(f"Resource not found: {response.url}", 404) + case 429: + retry_after = int(response.headers.get("Retry-After", 60)) + raise RateLimitError(retry_after) + case code if 400 <= code < 500: + raise ApiError(f"Client error: {response.text}", code) + case code if code >= 500: + raise ApiError(f"Server error: {response.text}", code) +``` + +### Pattern 6: Exception Chaining + +Preserve the original exception when re-raising to maintain the debug trail. + +```python +import httpx + +class ServiceError(Exception): + """High-level service operation failed.""" + pass + +def upload_file(path: str) -> str: + """Upload file and return URL.""" + try: + with open(path, "rb") as f: + response = httpx.post("https://upload.example.com", files={"file": f}) + response.raise_for_status() + return response.json()["url"] + except FileNotFoundError as e: + raise ServiceError(f"Upload failed: file not found at '{path}'") from e + except httpx.HTTPStatusError as e: + raise ServiceError( + f"Upload failed: server returned {e.response.status_code}" + ) from e + except httpx.RequestError as e: + raise ServiceError(f"Upload failed: network error") from e +``` + +### Pattern 7: Batch Processing with Partial Failures + +Never let one bad item abort an entire batch. Track results per item. + +```python +from dataclasses import dataclass + +@dataclass +class BatchResult[T]: + """Results from batch processing.""" + + succeeded: dict[int, T] # index -> result + failed: dict[int, Exception] # index -> error + + @property + def success_count(self) -> int: + return len(self.succeeded) + + @property + def failure_count(self) -> int: + return len(self.failed) + + @property + def all_succeeded(self) -> bool: + return len(self.failed) == 0 + +def process_batch(items: list[Item]) -> BatchResult[ProcessedItem]: + """Process items, capturing individual failures. + + Args: + items: Items to process. + + Returns: + BatchResult with succeeded and failed items by index. + """ + succeeded: dict[int, ProcessedItem] = {} + failed: dict[int, Exception] = {} + + for idx, item in enumerate(items): + try: + result = process_single_item(item) + succeeded[idx] = result + except Exception as e: + failed[idx] = e + + return BatchResult(succeeded=succeeded, failed=failed) + +# Caller handles partial results +result = process_batch(items) +if not result.all_succeeded: + logger.warning( + f"Batch completed with {result.failure_count} failures", + failed_indices=list(result.failed.keys()), + ) +``` + +### Pattern 8: Progress Reporting for Long Operations + +Provide visibility into batch progress without coupling business logic to UI. + +```python +from collections.abc import Callable + +ProgressCallback = Callable[[int, int, str], None] # current, total, status + +def process_large_batch( + items: list[Item], + on_progress: ProgressCallback | None = None, +) -> BatchResult: + """Process batch with optional progress reporting. + + Args: + items: Items to process. + on_progress: Optional callback receiving (current, total, status). + """ + total = len(items) + succeeded = {} + failed = {} + + for idx, item in enumerate(items): + if on_progress: + on_progress(idx, total, f"Processing {item.id}") + + try: + succeeded[idx] = process_single_item(item) + except Exception as e: + failed[idx] = e + + if on_progress: + on_progress(total, total, "Complete") + + return BatchResult(succeeded=succeeded, failed=failed) +``` + +## Best Practices Summary + +1. **Validate early** - Check inputs before expensive operations +2. **Use specific exceptions** - `ValueError`, `TypeError`, not generic `Exception` +3. **Include context** - Messages should explain what, why, and how to fix +4. **Convert types at boundaries** - Parse strings to enums/domain types early +5. **Chain exceptions** - Use `raise ... from e` to preserve debug info +6. **Handle partial failures** - Don't abort batches on single item errors +7. **Use Pydantic** - For complex input validation with structured errors +8. **Document failure modes** - Docstrings should list possible exceptions +9. **Log with context** - Include IDs, counts, and other debugging info +10. **Test error paths** - Verify exceptions are raised correctly diff --git a/plugins/python-development/skills/python-observability/SKILL.md b/plugins/python-development/skills/python-observability/SKILL.md new file mode 100644 index 0000000..ce3f233 --- /dev/null +++ b/plugins/python-development/skills/python-observability/SKILL.md @@ -0,0 +1,400 @@ +--- +name: python-observability +description: Python observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems. +--- + +# Python Observability + +Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code. + +## When to Use This Skill + +- Adding structured logging to applications +- Implementing metrics collection with Prometheus +- Setting up distributed tracing across services +- Propagating correlation IDs through request chains +- Debugging production issues +- Building observability dashboards + +## Core Concepts + +### 1. Structured Logging + +Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats. + +### 2. The Four Golden Signals + +Track latency, traffic, errors, and saturation for every service boundary. + +### 3. Correlation IDs + +Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing. + +### 4. Bounded Cardinality + +Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs. + +## Quick Start + +```python +import structlog + +structlog.configure( + processors=[ + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.JSONRenderer(), + ], +) + +logger = structlog.get_logger() +logger.info("Request processed", user_id="123", duration_ms=45) +``` + +## Fundamental Patterns + +### Pattern 1: Structured Logging with Structlog + +Configure structlog for JSON output with consistent fields. + +```python +import logging +import structlog + +def configure_logging(log_level: str = "INFO") -> None: + """Configure structured logging for the application.""" + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(), + ], + wrapper_class=structlog.make_filtering_bound_logger( + getattr(logging, log_level.upper()) + ), + context_class=dict, + logger_factory=structlog.PrintLoggerFactory(), + cache_logger_on_first_use=True, + ) + +# Initialize at application startup +configure_logging("INFO") +logger = structlog.get_logger() +``` + +### Pattern 2: Consistent Log Fields + +Every log entry should include standard fields for filtering and correlation. + +```python +import structlog +from contextvars import ContextVar + +# Store correlation ID in context +correlation_id: ContextVar[str] = ContextVar("correlation_id", default="") + +logger = structlog.get_logger() + +def process_request(request: Request) -> Response: + """Process request with structured logging.""" + logger.info( + "Request received", + correlation_id=correlation_id.get(), + method=request.method, + path=request.path, + user_id=request.user_id, + ) + + try: + result = handle_request(request) + logger.info( + "Request completed", + correlation_id=correlation_id.get(), + status_code=200, + duration_ms=elapsed, + ) + return result + except Exception as e: + logger.error( + "Request failed", + correlation_id=correlation_id.get(), + error_type=type(e).__name__, + error_message=str(e), + ) + raise +``` + +### Pattern 3: Semantic Log Levels + +Use log levels consistently across the application. + +| Level | Purpose | Examples | +|-------|---------|----------| +| `DEBUG` | Development diagnostics | Variable values, internal state | +| `INFO` | Request lifecycle, operations | Request start/end, job completion | +| `WARNING` | Recoverable anomalies | Retry attempts, fallback used | +| `ERROR` | Failures needing attention | Exceptions, service unavailable | + +```python +# DEBUG: Detailed internal information +logger.debug("Cache lookup", key=cache_key, hit=cache_hit) + +# INFO: Normal operational events +logger.info("Order created", order_id=order.id, total=order.total) + +# WARNING: Abnormal but handled situations +logger.warning( + "Rate limit approaching", + current_rate=950, + limit=1000, + reset_seconds=30, +) + +# ERROR: Failures requiring investigation +logger.error( + "Payment processing failed", + order_id=order.id, + error=str(e), + payment_provider="stripe", +) +``` + +Never log expected behavior at `ERROR`. A user entering a wrong password is `INFO`, not `ERROR`. + +### Pattern 4: Correlation ID Propagation + +Generate a unique ID at ingress and thread it through all operations. + +```python +from contextvars import ContextVar +import uuid +import structlog + +correlation_id: ContextVar[str] = ContextVar("correlation_id", default="") + +def set_correlation_id(cid: str | None = None) -> str: + """Set correlation ID for current context.""" + cid = cid or str(uuid.uuid4()) + correlation_id.set(cid) + structlog.contextvars.bind_contextvars(correlation_id=cid) + return cid + +# FastAPI middleware example +from fastapi import Request + +async def correlation_middleware(request: Request, call_next): + """Middleware to set and propagate correlation ID.""" + # Use incoming header or generate new + cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4()) + set_correlation_id(cid) + + response = await call_next(request) + response.headers["X-Correlation-ID"] = cid + return response +``` + +Propagate to outbound requests: + +```python +import httpx + +async def call_downstream_service(endpoint: str, data: dict) -> dict: + """Call downstream service with correlation ID.""" + async with httpx.AsyncClient() as client: + response = await client.post( + endpoint, + json=data, + headers={"X-Correlation-ID": correlation_id.get()}, + ) + return response.json() +``` + +## Advanced Patterns + +### Pattern 5: The Four Golden Signals with Prometheus + +Track these metrics for every service boundary: + +```python +from prometheus_client import Counter, Histogram, Gauge + +# Latency: How long requests take +REQUEST_LATENCY = Histogram( + "http_request_duration_seconds", + "Request latency in seconds", + ["method", "endpoint", "status"], + buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], +) + +# Traffic: Request rate +REQUEST_COUNT = Counter( + "http_requests_total", + "Total HTTP requests", + ["method", "endpoint", "status"], +) + +# Errors: Error rate +ERROR_COUNT = Counter( + "http_errors_total", + "Total HTTP errors", + ["method", "endpoint", "error_type"], +) + +# Saturation: Resource utilization +DB_POOL_USAGE = Gauge( + "db_connection_pool_used", + "Number of database connections in use", +) +``` + +Instrument your endpoints: + +```python +import time +from functools import wraps + +def track_request(func): + """Decorator to track request metrics.""" + @wraps(func) + async def wrapper(request: Request, *args, **kwargs): + method = request.method + endpoint = request.url.path + start = time.perf_counter() + + try: + response = await func(request, *args, **kwargs) + status = str(response.status_code) + return response + except Exception as e: + status = "500" + ERROR_COUNT.labels( + method=method, + endpoint=endpoint, + error_type=type(e).__name__, + ).inc() + raise + finally: + duration = time.perf_counter() - start + REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc() + REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration) + + return wrapper +``` + +### Pattern 6: Bounded Cardinality + +Avoid labels with unbounded values to prevent metric explosion. + +```python +# BAD: User ID has potentially millions of values +REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this! + +# GOOD: Bounded values only +REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200") + +# If you need per-user metrics, use a different approach: +# - Log the user_id and query logs +# - Use a separate analytics system +# - Bucket users by type/tier +REQUEST_COUNT.labels( + method="GET", + endpoint="/users", + user_tier="premium", # Bounded set of values +) +``` + +### Pattern 7: Timed Operations with Context Manager + +Create a reusable timing context manager for operations. + +```python +from contextlib import contextmanager +import time +import structlog + +logger = structlog.get_logger() + +@contextmanager +def timed_operation(name: str, **extra_fields): + """Context manager for timing and logging operations.""" + start = time.perf_counter() + logger.debug("Operation started", operation=name, **extra_fields) + + try: + yield + except Exception as e: + elapsed_ms = (time.perf_counter() - start) * 1000 + logger.error( + "Operation failed", + operation=name, + duration_ms=round(elapsed_ms, 2), + error=str(e), + **extra_fields, + ) + raise + else: + elapsed_ms = (time.perf_counter() - start) * 1000 + logger.info( + "Operation completed", + operation=name, + duration_ms=round(elapsed_ms, 2), + **extra_fields, + ) + +# Usage +with timed_operation("fetch_user_orders", user_id=user.id): + orders = await order_repository.get_by_user(user.id) +``` + +### Pattern 8: OpenTelemetry Tracing + +Set up distributed tracing with OpenTelemetry. + +**Note:** OpenTelemetry is actively evolving. Check the [official Python documentation](https://opentelemetry.io/docs/languages/python/) for the latest API patterns and best practices. + +```python +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + +def configure_tracing(service_name: str, otlp_endpoint: str) -> None: + """Configure OpenTelemetry tracing.""" + provider = TracerProvider() + processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint)) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + +tracer = trace.get_tracer(__name__) + +async def process_order(order_id: str) -> Order: + """Process order with tracing.""" + with tracer.start_as_current_span("process_order") as span: + span.set_attribute("order.id", order_id) + + with tracer.start_as_current_span("validate_order"): + validate_order(order_id) + + with tracer.start_as_current_span("charge_payment"): + charge_payment(order_id) + + with tracer.start_as_current_span("send_confirmation"): + send_confirmation(order_id) + + return order +``` + +## Best Practices Summary + +1. **Use structured logging** - JSON logs with consistent fields +2. **Propagate correlation IDs** - Thread through all requests and logs +3. **Track the four golden signals** - Latency, traffic, errors, saturation +4. **Bound label cardinality** - Never use unbounded values as metric labels +5. **Log at appropriate levels** - Don't cry wolf with ERROR +6. **Include context** - User ID, request ID, operation name in logs +7. **Use context managers** - Consistent timing and error handling +8. **Separate concerns** - Observability code shouldn't pollute business logic +9. **Test your observability** - Verify logs and metrics in integration tests +10. **Set up alerts** - Metrics are useless without alerting diff --git a/plugins/python-development/skills/python-project-structure/SKILL.md b/plugins/python-development/skills/python-project-structure/SKILL.md new file mode 100644 index 0000000..504df63 --- /dev/null +++ b/plugins/python-development/skills/python-project-structure/SKILL.md @@ -0,0 +1,252 @@ +--- +name: python-project-structure +description: Python project organization, module architecture, and public API design. Use when setting up new projects, organizing modules, defining public interfaces with __all__, or planning directory layouts. +--- + +# Python Project Structure & Module Architecture + +Design well-organized Python projects with clear module boundaries, explicit public interfaces, and maintainable directory structures. Good organization makes code discoverable and changes predictable. + +## When to Use This Skill + +- Starting a new Python project from scratch +- Reorganizing an existing codebase for clarity +- Defining module public APIs with `__all__` +- Deciding between flat and nested directory structures +- Determining test file placement strategies +- Creating reusable library packages + +## Core Concepts + +### 1. Module Cohesion + +Group related code that changes together. A module should have a single, clear purpose. + +### 2. Explicit Interfaces + +Define what's public with `__all__`. Everything not listed is an internal implementation detail. + +### 3. Flat Hierarchies + +Prefer shallow directory structures. Add depth only for genuine sub-domains. + +### 4. Consistent Conventions + +Apply naming and organization patterns uniformly across the project. + +## Quick Start + +``` +myproject/ +├── src/ +│ └── myproject/ +│ ├── __init__.py +│ ├── services/ +│ ├── models/ +│ └── api/ +├── tests/ +├── pyproject.toml +└── README.md +``` + +## Fundamental Patterns + +### Pattern 1: One Concept Per File + +Each file should focus on a single concept or closely related set of functions. Consider splitting when a file: + +- Handles multiple unrelated responsibilities +- Grows beyond 300-500 lines (varies by complexity) +- Contains classes that change for different reasons + +```python +# Good: Focused files +# user_service.py - User business logic +# user_repository.py - User data access +# user_models.py - User data structures + +# Avoid: Kitchen sink files +# user.py - Contains service, repository, models, utilities... +``` + +### Pattern 2: Explicit Public APIs with `__all__` + +Define the public interface for every module. Unlisted members are internal implementation details. + +```python +# mypackage/services/__init__.py +from .user_service import UserService +from .order_service import OrderService +from .exceptions import ServiceError, ValidationError + +__all__ = [ + "UserService", + "OrderService", + "ServiceError", + "ValidationError", +] + +# Internal helpers remain private by omission +# from .internal_helpers import _validate_input # Not exported +``` + +### Pattern 3: Flat Directory Structure + +Prefer minimal nesting. Deep hierarchies make imports verbose and navigation difficult. + +``` +# Preferred: Flat structure +project/ +├── api/ +│ ├── routes.py +│ └── middleware.py +├── services/ +│ ├── user_service.py +│ └── order_service.py +├── models/ +│ ├── user.py +│ └── order.py +└── utils/ + └── validation.py + +# Avoid: Deep nesting +project/core/internal/services/impl/user/ +``` + +Add sub-packages only when there's a genuine sub-domain requiring isolation. + +### Pattern 4: Test File Organization + +Choose one approach and apply it consistently throughout the project. + +**Option A: Colocated Tests** + +``` +src/ +├── user_service.py +├── test_user_service.py +├── order_service.py +└── test_order_service.py +``` + +Benefits: Tests live next to the code they verify. Easy to see coverage gaps. + +**Option B: Parallel Test Directory** + +``` +src/ +├── services/ +│ ├── user_service.py +│ └── order_service.py +tests/ +├── services/ +│ ├── test_user_service.py +│ └── test_order_service.py +``` + +Benefits: Clean separation between production and test code. Standard for larger projects. + +## Advanced Patterns + +### Pattern 5: Package Initialization + +Use `__init__.py` to provide a clean public interface for package consumers. + +```python +# mypackage/__init__.py +"""MyPackage - A library for doing useful things.""" + +from .core import MainClass, HelperClass +from .exceptions import PackageError, ConfigError +from .config import Settings + +__all__ = [ + "MainClass", + "HelperClass", + "PackageError", + "ConfigError", + "Settings", +] + +__version__ = "1.0.0" +``` + +Consumers can then import directly from the package: + +```python +from mypackage import MainClass, Settings +``` + +### Pattern 6: Layered Architecture + +Organize code by architectural layer for clear separation of concerns. + +``` +myapp/ +├── api/ # HTTP handlers, request/response +│ ├── routes/ +│ └── middleware/ +├── services/ # Business logic +├── repositories/ # Data access +├── models/ # Domain entities +├── schemas/ # API schemas (Pydantic) +└── config/ # Configuration +``` + +Each layer should only depend on layers below it, never above. + +### Pattern 7: Domain-Driven Structure + +For complex applications, organize by business domain rather than technical layer. + +``` +ecommerce/ +├── users/ +│ ├── models.py +│ ├── services.py +│ ├── repository.py +│ └── api.py +├── orders/ +│ ├── models.py +│ ├── services.py +│ ├── repository.py +│ └── api.py +└── shared/ + ├── database.py + └── exceptions.py +``` + +## File and Module Naming + +### Conventions + +- Use `snake_case` for all file and module names: `user_repository.py` +- Avoid abbreviations that obscure meaning: `user_repository.py` not `usr_repo.py` +- Match class names to file names: `UserService` in `user_service.py` + +### Import Style + +Use absolute imports for clarity and reliability: + +```python +# Preferred: Absolute imports +from myproject.services import UserService +from myproject.models import User + +# Avoid: Relative imports +from ..services import UserService +from . import models +``` + +Relative imports can break when modules are moved or reorganized. + +## Best Practices Summary + +1. **Keep files focused** - One concept per file, consider splitting at 300-500 lines (varies by complexity) +2. **Define `__all__` explicitly** - Make public interfaces clear +3. **Prefer flat structures** - Add depth only for genuine sub-domains +4. **Use absolute imports** - More reliable and clearer +5. **Be consistent** - Apply patterns uniformly across the project +6. **Match names to content** - File names should describe their purpose +7. **Separate concerns** - Keep layers distinct and dependencies flowing one direction +8. **Document your structure** - Include a README explaining the organization diff --git a/plugins/python-development/skills/python-resilience/SKILL.md b/plugins/python-development/skills/python-resilience/SKILL.md new file mode 100644 index 0000000..27c5111 --- /dev/null +++ b/plugins/python-development/skills/python-resilience/SKILL.md @@ -0,0 +1,376 @@ +--- +name: python-resilience +description: Python resilience patterns including automatic retries, exponential backoff, timeouts, and fault-tolerant decorators. Use when adding retry logic, implementing timeouts, building fault-tolerant services, or handling transient failures. +--- + +# Python Resilience Patterns + +Build fault-tolerant Python applications that gracefully handle transient failures, network issues, and service outages. Resilience patterns keep systems running when dependencies are unreliable. + +## When to Use This Skill + +- Adding retry logic to external service calls +- Implementing timeouts for network operations +- Building fault-tolerant microservices +- Handling rate limiting and backpressure +- Creating infrastructure decorators +- Designing circuit breakers + +## Core Concepts + +### 1. Transient vs Permanent Failures + +Retry transient errors (network timeouts, temporary service issues). Don't retry permanent errors (invalid credentials, bad requests). + +### 2. Exponential Backoff + +Increase wait time between retries to avoid overwhelming recovering services. + +### 3. Jitter + +Add randomness to backoff to prevent thundering herd when many clients retry simultaneously. + +### 4. Bounded Retries + +Cap both attempt count and total duration to prevent infinite retry loops. + +## Quick Start + +```python +from tenacity import retry, stop_after_attempt, wait_exponential_jitter + +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=10), +) +def call_external_service(request: dict) -> dict: + return httpx.post("https://api.example.com", json=request).json() +``` + +## Fundamental Patterns + +### Pattern 1: Basic Retry with Tenacity + +Use the `tenacity` library for production-grade retry logic. For simpler cases, consider built-in retry functionality or a lightweight custom implementation. + +```python +from tenacity import ( + retry, + stop_after_attempt, + stop_after_delay, + wait_exponential_jitter, + retry_if_exception_type, +) + +TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError) + +@retry( + retry=retry_if_exception_type(TRANSIENT_ERRORS), + stop=stop_after_attempt(5) | stop_after_delay(60), + wait=wait_exponential_jitter(initial=1, max=30), +) +def fetch_data(url: str) -> dict: + """Fetch data with automatic retry on transient failures.""" + response = httpx.get(url, timeout=30) + response.raise_for_status() + return response.json() +``` + +### Pattern 2: Retry Only Appropriate Errors + +Whitelist specific transient exceptions. Never retry: + +- `ValueError`, `TypeError` - These are bugs, not transient issues +- `AuthenticationError` - Invalid credentials won't become valid +- HTTP 4xx errors (except 429) - Client errors are permanent + +```python +from tenacity import retry, retry_if_exception_type +import httpx + +# Define what's retryable +RETRYABLE_EXCEPTIONS = ( + ConnectionError, + TimeoutError, + httpx.ConnectTimeout, + httpx.ReadTimeout, +) + +@retry( + retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=10), +) +def resilient_api_call(endpoint: str) -> dict: + """Make API call with retry on network issues.""" + return httpx.get(endpoint, timeout=10).json() +``` + +### Pattern 3: HTTP Status Code Retries + +Retry specific HTTP status codes that indicate transient issues. + +```python +from tenacity import retry, retry_if_result, stop_after_attempt +import httpx + +RETRY_STATUS_CODES = {429, 502, 503, 504} + +def should_retry_response(response: httpx.Response) -> bool: + """Check if response indicates a retryable error.""" + return response.status_code in RETRY_STATUS_CODES + +@retry( + retry=retry_if_result(should_retry_response), + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=10), +) +def http_request(method: str, url: str, **kwargs) -> httpx.Response: + """Make HTTP request with retry on transient status codes.""" + return httpx.request(method, url, timeout=30, **kwargs) +``` + +### Pattern 4: Combined Exception and Status Retry + +Handle both network exceptions and HTTP status codes. + +```python +from tenacity import ( + retry, + retry_if_exception_type, + retry_if_result, + stop_after_attempt, + wait_exponential_jitter, + before_sleep_log, +) +import logging +import httpx + +logger = logging.getLogger(__name__) + +TRANSIENT_EXCEPTIONS = ( + ConnectionError, + TimeoutError, + httpx.ConnectError, + httpx.ReadTimeout, +) +RETRY_STATUS_CODES = {429, 500, 502, 503, 504} + +def is_retryable_response(response: httpx.Response) -> bool: + return response.status_code in RETRY_STATUS_CODES + +@retry( + retry=( + retry_if_exception_type(TRANSIENT_EXCEPTIONS) | + retry_if_result(is_retryable_response) + ), + stop=stop_after_attempt(5), + wait=wait_exponential_jitter(initial=1, max=30), + before_sleep=before_sleep_log(logger, logging.WARNING), +) +def robust_http_call( + method: str, + url: str, + **kwargs, +) -> httpx.Response: + """HTTP call with comprehensive retry handling.""" + return httpx.request(method, url, timeout=30, **kwargs) +``` + +## Advanced Patterns + +### Pattern 5: Logging Retry Attempts + +Track retry behavior for debugging and alerting. + +```python +from tenacity import retry, stop_after_attempt, wait_exponential +import structlog + +logger = structlog.get_logger() + +def log_retry_attempt(retry_state): + """Log detailed retry information.""" + exception = retry_state.outcome.exception() + logger.warning( + "Retrying operation", + attempt=retry_state.attempt_number, + exception_type=type(exception).__name__, + exception_message=str(exception), + next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None, + ) + +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, max=10), + before_sleep=log_retry_attempt, +) +def call_with_logging(request: dict) -> dict: + """External call with retry logging.""" + ... +``` + +### Pattern 6: Timeout Decorator + +Create reusable timeout decorators for consistent timeout handling. + +```python +import asyncio +from functools import wraps +from typing import TypeVar, Callable + +T = TypeVar("T") + +def with_timeout(seconds: float): + """Decorator to add timeout to async functions.""" + def decorator(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + async def wrapper(*args, **kwargs) -> T: + return await asyncio.wait_for( + func(*args, **kwargs), + timeout=seconds, + ) + return wrapper + return decorator + +@with_timeout(30) +async def fetch_with_timeout(url: str) -> dict: + """Fetch URL with 30 second timeout.""" + async with httpx.AsyncClient() as client: + response = await client.get(url) + return response.json() +``` + +### Pattern 7: Cross-Cutting Concerns via Decorators + +Stack decorators to separate infrastructure from business logic. + +```python +from functools import wraps +from typing import TypeVar, Callable +import structlog + +logger = structlog.get_logger() +T = TypeVar("T") + +def traced(name: str | None = None): + """Add tracing to function calls.""" + def decorator(func: Callable[..., T]) -> Callable[..., T]: + span_name = name or func.__name__ + + @wraps(func) + async def wrapper(*args, **kwargs) -> T: + logger.info("Operation started", operation=span_name) + try: + result = await func(*args, **kwargs) + logger.info("Operation completed", operation=span_name) + return result + except Exception as e: + logger.error("Operation failed", operation=span_name, error=str(e)) + raise + return wrapper + return decorator + +# Stack multiple concerns +@traced("fetch_user_data") +@with_timeout(30) +@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter()) +async def fetch_user_data(user_id: str) -> dict: + """Fetch user with tracing, timeout, and retry.""" + ... +``` + +### Pattern 8: Dependency Injection for Testability + +Pass infrastructure components through constructors for easy testing. + +```python +from dataclasses import dataclass +from typing import Protocol + +class Logger(Protocol): + def info(self, msg: str, **kwargs) -> None: ... + def error(self, msg: str, **kwargs) -> None: ... + +class MetricsClient(Protocol): + def increment(self, metric: str, tags: dict | None = None) -> None: ... + def timing(self, metric: str, value: float) -> None: ... + +@dataclass +class UserService: + """Service with injected infrastructure.""" + + repository: UserRepository + logger: Logger + metrics: MetricsClient + + async def get_user(self, user_id: str) -> User: + self.logger.info("Fetching user", user_id=user_id) + start = time.perf_counter() + + try: + user = await self.repository.get(user_id) + self.metrics.increment("user.fetch.success") + return user + except Exception as e: + self.metrics.increment("user.fetch.error") + self.logger.error("Failed to fetch user", user_id=user_id, error=str(e)) + raise + finally: + elapsed = time.perf_counter() - start + self.metrics.timing("user.fetch.duration", elapsed) + +# Easy to test with fakes +service = UserService( + repository=FakeRepository(), + logger=FakeLogger(), + metrics=FakeMetrics(), +) +``` + +### Pattern 9: Fail-Safe Defaults + +Degrade gracefully when non-critical operations fail. + +```python +from typing import TypeVar +from collections.abc import Callable + +T = TypeVar("T") + +def fail_safe(default: T, log_failure: bool = True): + """Return default value on failure instead of raising.""" + def decorator(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + async def wrapper(*args, **kwargs) -> T: + try: + return await func(*args, **kwargs) + except Exception as e: + if log_failure: + logger.warning( + "Operation failed, using default", + function=func.__name__, + error=str(e), + ) + return default + return wrapper + return decorator + +@fail_safe(default=[]) +async def get_recommendations(user_id: str) -> list[str]: + """Get recommendations, return empty list on failure.""" + ... +``` + +## Best Practices Summary + +1. **Retry only transient errors** - Don't retry bugs or authentication failures +2. **Use exponential backoff** - Give services time to recover +3. **Add jitter** - Prevent thundering herd from synchronized retries +4. **Cap total duration** - `stop_after_attempt(5) | stop_after_delay(60)` +5. **Log every retry** - Silent retries hide systemic problems +6. **Use decorators** - Keep retry logic separate from business logic +7. **Inject dependencies** - Make infrastructure testable +8. **Set timeouts everywhere** - Every network call needs a timeout +9. **Fail gracefully** - Return cached/default values for non-critical paths +10. **Monitor retry rates** - High retry rates indicate underlying issues diff --git a/plugins/python-development/skills/python-resource-management/SKILL.md b/plugins/python-development/skills/python-resource-management/SKILL.md new file mode 100644 index 0000000..dd9ce93 --- /dev/null +++ b/plugins/python-development/skills/python-resource-management/SKILL.md @@ -0,0 +1,421 @@ +--- +name: python-resource-management +description: Python resource management with context managers, cleanup patterns, and streaming. Use when managing connections, file handles, implementing cleanup logic, or building streaming responses with accumulated state. +--- + +# Python Resource Management + +Manage resources deterministically using context managers. Resources like database connections, file handles, and network sockets should be released reliably, even when exceptions occur. + +## When to Use This Skill + +- Managing database connections and connection pools +- Working with file handles and I/O +- Implementing custom context managers +- Building streaming responses with state +- Handling nested resource cleanup +- Creating async context managers + +## Core Concepts + +### 1. Context Managers + +The `with` statement ensures resources are released automatically, even on exceptions. + +### 2. Protocol Methods + +`__enter__`/`__exit__` for sync, `__aenter__`/`__aexit__` for async resource management. + +### 3. Unconditional Cleanup + +`__exit__` always runs, regardless of whether an exception occurred. + +### 4. Exception Handling + +Return `True` from `__exit__` to suppress exceptions, `False` to propagate them. + +## Quick Start + +```python +from contextlib import contextmanager + +@contextmanager +def managed_resource(): + resource = acquire_resource() + try: + yield resource + finally: + resource.cleanup() + +with managed_resource() as r: + r.do_work() +``` + +## Fundamental Patterns + +### Pattern 1: Class-Based Context Manager + +Implement the context manager protocol for complex resources. + +```python +class DatabaseConnection: + """Database connection with automatic cleanup.""" + + def __init__(self, dsn: str) -> None: + self._dsn = dsn + self._conn: Connection | None = None + + def connect(self) -> None: + """Establish database connection.""" + self._conn = psycopg.connect(self._dsn) + + def close(self) -> None: + """Close connection if open.""" + if self._conn is not None: + self._conn.close() + self._conn = None + + def __enter__(self) -> "DatabaseConnection": + """Enter context: connect and return self.""" + self.connect() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit context: always close connection.""" + self.close() + +# Usage with context manager (preferred) +with DatabaseConnection(dsn) as db: + result = db.execute(query) + +# Manual management when needed +db = DatabaseConnection(dsn) +db.connect() +try: + result = db.execute(query) +finally: + db.close() +``` + +### Pattern 2: Async Context Manager + +For async resources, implement the async protocol. + +```python +class AsyncDatabasePool: + """Async database connection pool.""" + + def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None: + self._dsn = dsn + self._min_size = min_size + self._max_size = max_size + self._pool: asyncpg.Pool | None = None + + async def __aenter__(self) -> "AsyncDatabasePool": + """Create connection pool.""" + self._pool = await asyncpg.create_pool( + self._dsn, + min_size=self._min_size, + max_size=self._max_size, + ) + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Close all connections in pool.""" + if self._pool is not None: + await self._pool.close() + + async def execute(self, query: str, *args) -> list[dict]: + """Execute query using pooled connection.""" + async with self._pool.acquire() as conn: + return await conn.fetch(query, *args) + +# Usage +async with AsyncDatabasePool(dsn) as pool: + users = await pool.execute("SELECT * FROM users WHERE active = $1", True) +``` + +### Pattern 3: Using @contextmanager Decorator + +Simplify context managers with the decorator for straightforward cases. + +```python +from contextlib import contextmanager, asynccontextmanager +import time +import structlog + +logger = structlog.get_logger() + +@contextmanager +def timed_block(name: str): + """Time a block of code.""" + start = time.perf_counter() + try: + yield + finally: + elapsed = time.perf_counter() - start + logger.info(f"{name} completed", duration_seconds=round(elapsed, 3)) + +# Usage +with timed_block("data_processing"): + process_large_dataset() + +@asynccontextmanager +async def database_transaction(conn: AsyncConnection): + """Manage database transaction.""" + await conn.execute("BEGIN") + try: + yield conn + await conn.execute("COMMIT") + except Exception: + await conn.execute("ROLLBACK") + raise + +# Usage +async with database_transaction(conn) as tx: + await tx.execute("INSERT INTO users ...") + await tx.execute("INSERT INTO audit_log ...") +``` + +### Pattern 4: Unconditional Resource Release + +Always clean up resources in `__exit__`, regardless of exceptions. + +```python +class FileProcessor: + """Process file with guaranteed cleanup.""" + + def __init__(self, path: str) -> None: + self._path = path + self._file: IO | None = None + self._temp_files: list[Path] = [] + + def __enter__(self) -> "FileProcessor": + self._file = open(self._path, "r") + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Clean up all resources unconditionally.""" + # Close main file + if self._file is not None: + self._file.close() + + # Clean up any temporary files + for temp_file in self._temp_files: + try: + temp_file.unlink() + except OSError: + pass # Best effort cleanup + + # Return None/False to propagate any exception +``` + +## Advanced Patterns + +### Pattern 5: Selective Exception Suppression + +Only suppress specific, documented exceptions. + +```python +class StreamWriter: + """Writer that handles broken pipe gracefully.""" + + def __init__(self, stream) -> None: + self._stream = stream + + def __enter__(self) -> "StreamWriter": + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool: + """Clean up, suppressing BrokenPipeError on shutdown.""" + self._stream.close() + + # Suppress BrokenPipeError (client disconnected) + # This is expected behavior, not an error + if exc_type is BrokenPipeError: + return True # Exception suppressed + + return False # Propagate all other exceptions +``` + +### Pattern 6: Streaming with Accumulated State + +Maintain both incremental chunks and accumulated state during streaming. + +```python +from collections.abc import Generator +from dataclasses import dataclass, field + +@dataclass +class StreamingResult: + """Accumulated streaming result.""" + + chunks: list[str] = field(default_factory=list) + _finalized: bool = False + + @property + def content(self) -> str: + """Get accumulated content.""" + return "".join(self.chunks) + + def add_chunk(self, chunk: str) -> None: + """Add chunk to accumulator.""" + if self._finalized: + raise RuntimeError("Cannot add to finalized result") + self.chunks.append(chunk) + + def finalize(self) -> str: + """Mark stream complete and return content.""" + self._finalized = True + return self.content + +def stream_with_accumulation( + response: StreamingResponse, +) -> Generator[tuple[str, str], None, str]: + """Stream response while accumulating content. + + Yields: + Tuple of (accumulated_content, new_chunk) for each chunk. + + Returns: + Final accumulated content. + """ + result = StreamingResult() + + for chunk in response.iter_content(): + result.add_chunk(chunk) + yield result.content, chunk + + return result.finalize() +``` + +### Pattern 7: Efficient String Accumulation + +Avoid O(n²) string concatenation when accumulating. + +```python +def accumulate_stream(stream) -> str: + """Efficiently accumulate stream content.""" + # BAD: O(n²) due to string immutability + # content = "" + # for chunk in stream: + # content += chunk # Creates new string each time + + # GOOD: O(n) with list and join + chunks: list[str] = [] + for chunk in stream: + chunks.append(chunk) + return "".join(chunks) # Single allocation +``` + +### Pattern 8: Tracking Stream Metrics + +Measure time-to-first-byte and total streaming time. + +```python +import time +from collections.abc import Generator + +def stream_with_metrics( + response: StreamingResponse, +) -> Generator[str, None, dict]: + """Stream response while collecting metrics. + + Yields: + Content chunks. + + Returns: + Metrics dictionary. + """ + start = time.perf_counter() + first_chunk_time: float | None = None + chunk_count = 0 + total_bytes = 0 + + for chunk in response.iter_content(): + if first_chunk_time is None: + first_chunk_time = time.perf_counter() - start + + chunk_count += 1 + total_bytes += len(chunk.encode()) + yield chunk + + total_time = time.perf_counter() - start + + return { + "time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2), + "total_time_ms": round(total_time * 1000, 2), + "chunk_count": chunk_count, + "total_bytes": total_bytes, + } +``` + +### Pattern 9: Managing Multiple Resources with ExitStack + +Handle a dynamic number of resources cleanly. + +```python +from contextlib import ExitStack, AsyncExitStack +from pathlib import Path + +def process_files(paths: list[Path]) -> list[str]: + """Process multiple files with automatic cleanup.""" + results = [] + + with ExitStack() as stack: + # Open all files - they'll all be closed when block exits + files = [stack.enter_context(open(p)) for p in paths] + + for f in files: + results.append(f.read()) + + return results + +async def process_connections(hosts: list[str]) -> list[dict]: + """Process multiple async connections.""" + results = [] + + async with AsyncExitStack() as stack: + connections = [ + await stack.enter_async_context(connect_to_host(host)) + for host in hosts + ] + + for conn in connections: + results.append(await conn.fetch_data()) + + return results +``` + +## Best Practices Summary + +1. **Always use context managers** - For any resource that needs cleanup +2. **Clean up unconditionally** - `__exit__` runs even on exception +3. **Don't suppress unexpectedly** - Return `False` unless suppression is intentional +4. **Use @contextmanager** - For simple resource patterns +5. **Implement both protocols** - Support `with` and manual management +6. **Use ExitStack** - For dynamic numbers of resources +7. **Accumulate efficiently** - List + join, not string concatenation +8. **Track metrics** - Time-to-first-byte matters for streaming +9. **Document behavior** - Especially exception suppression +10. **Test cleanup paths** - Verify resources are released on errors diff --git a/plugins/python-development/skills/python-testing-patterns/SKILL.md b/plugins/python-development/skills/python-testing-patterns/SKILL.md index 23ade50..6693894 100644 --- a/plugins/python-development/skills/python-testing-patterns/SKILL.md +++ b/plugins/python-development/skills/python-testing-patterns/SKILL.md @@ -618,6 +618,52 @@ def test_sorted_list_properties(lst): assert sorted_lst[i] <= sorted_lst[i + 1] ``` +## Test Design Principles + +### One Behavior Per Test + +Each test should verify exactly one behavior. This makes failures easy to diagnose and tests easy to maintain. + +```python +# BAD - testing multiple behaviors +def test_user_service(): + user = service.create_user(data) + assert user.id is not None + assert user.email == data["email"] + updated = service.update_user(user.id, {"name": "New"}) + assert updated.name == "New" + +# GOOD - focused tests +def test_create_user_assigns_id(): + user = service.create_user(data) + assert user.id is not None + +def test_create_user_stores_email(): + user = service.create_user(data) + assert user.email == data["email"] + +def test_update_user_changes_name(): + user = service.create_user(data) + updated = service.update_user(user.id, {"name": "New"}) + assert updated.name == "New" +``` + +### Test Error Paths + +Always test failure cases, not just happy paths. + +```python +def test_get_user_raises_not_found(): + with pytest.raises(UserNotFoundError) as exc_info: + service.get_user("nonexistent-id") + + assert "nonexistent-id" in str(exc_info.value) + +def test_create_user_rejects_invalid_email(): + with pytest.raises(ValueError, match="Invalid email format"): + service.create_user({"email": "not-an-email"}) +``` + ## Testing Best Practices ### Test Organization @@ -636,38 +682,131 @@ def test_sorted_list_properties(lst): # test_workflows.py ``` -### Test Naming +### Test Naming Convention + +A common pattern: `test___`. Adapt to your team's preferences. ```python -# Good test names +# Pattern: test___ +def test_create_user_with_valid_data_returns_user(): + ... + +def test_create_user_with_duplicate_email_raises_conflict(): + ... + +def test_get_user_with_unknown_id_returns_none(): + ... + +# Good test names - clear and descriptive def test_user_creation_with_valid_data(): """Clear name describes what is being tested.""" pass - def test_login_fails_with_invalid_password(): """Name describes expected behavior.""" pass - def test_api_returns_404_for_missing_resource(): """Specific about inputs and expected outcomes.""" pass - -# Bad test names +# Bad test names - avoid these def test_1(): # Not descriptive pass - def test_user(): # Too vague pass - def test_function(): # Doesn't explain what's tested pass ``` +### Testing Retry Behavior + +Verify that retry logic works correctly using mock side effects. + +```python +from unittest.mock import Mock + +def test_retries_on_transient_error(): + """Test that service retries on transient failures.""" + client = Mock() + # Fail twice, then succeed + client.request.side_effect = [ + ConnectionError("Failed"), + ConnectionError("Failed"), + {"status": "ok"}, + ] + + service = ServiceWithRetry(client, max_retries=3) + result = service.fetch() + + assert result == {"status": "ok"} + assert client.request.call_count == 3 + +def test_gives_up_after_max_retries(): + """Test that service stops retrying after max attempts.""" + client = Mock() + client.request.side_effect = ConnectionError("Failed") + + service = ServiceWithRetry(client, max_retries=3) + + with pytest.raises(ConnectionError): + service.fetch() + + assert client.request.call_count == 3 + +def test_does_not_retry_on_permanent_error(): + """Test that permanent errors are not retried.""" + client = Mock() + client.request.side_effect = ValueError("Invalid input") + + service = ServiceWithRetry(client, max_retries=3) + + with pytest.raises(ValueError): + service.fetch() + + # Only called once - no retry for ValueError + assert client.request.call_count == 1 +``` + +### Mocking Time with Freezegun + +Use freezegun to control time in tests for predictable time-dependent behavior. + +```python +from freezegun import freeze_time +from datetime import datetime, timedelta + +@freeze_time("2026-01-15 10:00:00") +def test_token_expiry(): + """Test token expires at correct time.""" + token = create_token(expires_in_seconds=3600) + assert token.expires_at == datetime(2026, 1, 15, 11, 0, 0) + +@freeze_time("2026-01-15 10:00:00") +def test_is_expired_returns_false_before_expiry(): + """Test token is not expired when within validity period.""" + token = create_token(expires_in_seconds=3600) + assert not token.is_expired() + +@freeze_time("2026-01-15 12:00:00") +def test_is_expired_returns_true_after_expiry(): + """Test token is expired after validity period.""" + token = Token(expires_at=datetime(2026, 1, 15, 11, 30, 0)) + assert token.is_expired() + +def test_with_time_travel(): + """Test behavior across time using freeze_time context.""" + with freeze_time("2026-01-01") as frozen_time: + item = create_item() + assert item.created_at == datetime(2026, 1, 1) + + # Move forward in time + frozen_time.move_to("2026-01-15") + assert item.age_days == 14 +``` + ### Test Markers ```python diff --git a/plugins/python-development/skills/python-type-safety/SKILL.md b/plugins/python-development/skills/python-type-safety/SKILL.md new file mode 100644 index 0000000..99ad437 --- /dev/null +++ b/plugins/python-development/skills/python-type-safety/SKILL.md @@ -0,0 +1,428 @@ +--- +name: python-type-safety +description: Python type safety with type hints, generics, protocols, and strict type checking. Use when adding type annotations, implementing generic classes, defining structural interfaces, or configuring mypy/pyright. +--- + +# Python Type Safety + +Leverage Python's type system to catch errors at static analysis time. Type annotations serve as enforced documentation that tooling validates automatically. + +## When to Use This Skill + +- Adding type hints to existing code +- Creating generic, reusable classes +- Defining structural interfaces with protocols +- Configuring mypy or pyright for strict checking +- Understanding type narrowing and guards +- Building type-safe APIs and libraries + +## Core Concepts + +### 1. Type Annotations + +Declare expected types for function parameters, return values, and variables. + +### 2. Generics + +Write reusable code that preserves type information across different types. + +### 3. Protocols + +Define structural interfaces without inheritance (duck typing with type safety). + +### 4. Type Narrowing + +Use guards and conditionals to narrow types within code blocks. + +## Quick Start + +```python +def get_user(user_id: str) -> User | None: + """Return type makes 'might not exist' explicit.""" + ... + +# Type checker enforces handling None case +user = get_user("123") +if user is None: + raise UserNotFoundError("123") +print(user.name) # Type checker knows user is User here +``` + +## Fundamental Patterns + +### Pattern 1: Annotate All Public Signatures + +Every public function, method, and class should have type annotations. + +```python +def get_user(user_id: str) -> User: + """Retrieve user by ID.""" + ... + +def process_batch( + items: list[Item], + max_workers: int = 4, +) -> BatchResult[ProcessedItem]: + """Process items concurrently.""" + ... + +class UserRepository: + def __init__(self, db: Database) -> None: + self._db = db + + async def find_by_id(self, user_id: str) -> User | None: + """Return User if found, None otherwise.""" + ... + + async def find_by_email(self, email: str) -> User | None: + ... + + async def save(self, user: User) -> User: + """Save and return user with generated ID.""" + ... +``` + +Use `mypy --strict` or `pyright` in CI to catch type errors early. For existing projects, enable strict mode incrementally using per-module overrides. + +### Pattern 2: Use Modern Union Syntax + +Python 3.10+ provides cleaner union syntax. + +```python +# Preferred (3.10+) +def find_user(user_id: str) -> User | None: + ... + +def parse_value(v: str) -> int | float | str: + ... + +# Older style (still valid, needed for 3.9) +from typing import Optional, Union + +def find_user(user_id: str) -> Optional[User]: + ... +``` + +### Pattern 3: Type Narrowing with Guards + +Use conditionals to narrow types for the type checker. + +```python +def process_user(user_id: str) -> UserData: + user = find_user(user_id) + + if user is None: + raise UserNotFoundError(f"User {user_id} not found") + + # Type checker knows user is User here, not User | None + return UserData( + name=user.name, + email=user.email, + ) + +def process_items(items: list[Item | None]) -> list[ProcessedItem]: + # Filter and narrow types + valid_items = [item for item in items if item is not None] + # valid_items is now list[Item] + return [process(item) for item in valid_items] +``` + +### Pattern 4: Generic Classes + +Create type-safe reusable containers. + +```python +from typing import TypeVar, Generic + +T = TypeVar("T") +E = TypeVar("E", bound=Exception) + +class Result(Generic[T, E]): + """Represents either a success value or an error.""" + + def __init__( + self, + value: T | None = None, + error: E | None = None, + ) -> None: + if (value is None) == (error is None): + raise ValueError("Exactly one of value or error must be set") + self._value = value + self._error = error + + @property + def is_success(self) -> bool: + return self._error is None + + @property + def is_failure(self) -> bool: + return self._error is not None + + def unwrap(self) -> T: + """Get value or raise the error.""" + if self._error is not None: + raise self._error + return self._value # type: ignore[return-value] + + def unwrap_or(self, default: T) -> T: + """Get value or return default.""" + if self._error is not None: + return default + return self._value # type: ignore[return-value] + +# Usage preserves types +def parse_config(path: str) -> Result[Config, ConfigError]: + try: + return Result(value=Config.from_file(path)) + except ConfigError as e: + return Result(error=e) + +result = parse_config("config.yaml") +if result.is_success: + config = result.unwrap() # Type: Config +``` + +## Advanced Patterns + +### Pattern 5: Generic Repository + +Create type-safe data access patterns. + +```python +from typing import TypeVar, Generic +from abc import ABC, abstractmethod + +T = TypeVar("T") +ID = TypeVar("ID") + +class Repository(ABC, Generic[T, ID]): + """Generic repository interface.""" + + @abstractmethod + async def get(self, id: ID) -> T | None: + """Get entity by ID.""" + ... + + @abstractmethod + async def save(self, entity: T) -> T: + """Save and return entity.""" + ... + + @abstractmethod + async def delete(self, id: ID) -> bool: + """Delete entity, return True if existed.""" + ... + +class UserRepository(Repository[User, str]): + """Concrete repository for Users with string IDs.""" + + async def get(self, id: str) -> User | None: + row = await self._db.fetchrow( + "SELECT * FROM users WHERE id = $1", id + ) + return User(**row) if row else None + + async def save(self, entity: User) -> User: + ... + + async def delete(self, id: str) -> bool: + ... +``` + +### Pattern 6: TypeVar with Bounds + +Restrict generic parameters to specific types. + +```python +from typing import TypeVar +from pydantic import BaseModel + +ModelT = TypeVar("ModelT", bound=BaseModel) + +def validate_and_create(model_cls: type[ModelT], data: dict) -> ModelT: + """Create a validated Pydantic model from dict.""" + return model_cls.model_validate(data) + +# Works with any BaseModel subclass +class User(BaseModel): + name: str + email: str + +user = validate_and_create(User, {"name": "Alice", "email": "a@b.com"}) +# user is typed as User + +# Type error: str is not a BaseModel subclass +result = validate_and_create(str, {"name": "Alice"}) # Error! +``` + +### Pattern 7: Protocols for Structural Typing + +Define interfaces without requiring inheritance. + +```python +from typing import Protocol, runtime_checkable + +@runtime_checkable +class Serializable(Protocol): + """Any class that can be serialized to/from dict.""" + + def to_dict(self) -> dict: + ... + + @classmethod + def from_dict(cls, data: dict) -> "Serializable": + ... + +# User satisfies Serializable without inheriting from it +class User: + def __init__(self, id: str, name: str) -> None: + self.id = id + self.name = name + + def to_dict(self) -> dict: + return {"id": self.id, "name": self.name} + + @classmethod + def from_dict(cls, data: dict) -> "User": + return cls(id=data["id"], name=data["name"]) + +def serialize(obj: Serializable) -> str: + """Works with any Serializable object.""" + return json.dumps(obj.to_dict()) + +# Works - User matches the protocol +serialize(User("1", "Alice")) + +# Runtime checking with @runtime_checkable +isinstance(User("1", "Alice"), Serializable) # True +``` + +### Pattern 8: Common Protocol Patterns + +Define reusable structural interfaces. + +```python +from typing import Protocol + +class Closeable(Protocol): + """Resource that can be closed.""" + def close(self) -> None: ... + +class AsyncCloseable(Protocol): + """Async resource that can be closed.""" + async def close(self) -> None: ... + +class Readable(Protocol): + """Object that can be read from.""" + def read(self, n: int = -1) -> bytes: ... + +class HasId(Protocol): + """Object with an ID property.""" + @property + def id(self) -> str: ... + +class Comparable(Protocol): + """Object that supports comparison.""" + def __lt__(self, other: "Comparable") -> bool: ... + def __le__(self, other: "Comparable") -> bool: ... +``` + +### Pattern 9: Type Aliases + +Create meaningful type names. + +**Note:** The `type` statement was introduced in Python 3.10 for simple aliases. Generic type statements require Python 3.12+. + +```python +# Python 3.10+ type statement for simple aliases +type UserId = str +type UserDict = dict[str, Any] + +# Python 3.12+ type statement with generics +type Handler[T] = Callable[[Request], T] +type AsyncHandler[T] = Callable[[Request], Awaitable[T]] + +# Python 3.9-3.11 style (needed for broader compatibility) +from typing import TypeAlias +from collections.abc import Callable, Awaitable + +UserId: TypeAlias = str +Handler: TypeAlias = Callable[[Request], Response] + +# Usage +def register_handler(path: str, handler: Handler[Response]) -> None: + ... +``` + +### Pattern 10: Callable Types + +Type function parameters and callbacks. + +```python +from collections.abc import Callable, Awaitable + +# Sync callback +ProgressCallback = Callable[[int, int], None] # (current, total) + +# Async callback +AsyncHandler = Callable[[Request], Awaitable[Response]] + +# With named parameters (using Protocol) +class OnProgress(Protocol): + def __call__( + self, + current: int, + total: int, + *, + message: str = "", + ) -> None: ... + +def process_items( + items: list[Item], + on_progress: ProgressCallback | None = None, +) -> list[Result]: + for i, item in enumerate(items): + if on_progress: + on_progress(i, len(items)) + ... +``` + +## Configuration + +### Strict Mode Checklist + +For `mypy --strict` compliance: + +```toml +# pyproject.toml +[tool.mypy] +python_version = "3.12" +strict = true +warn_return_any = true +warn_unused_ignores = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +no_implicit_optional = true +``` + +Incremental adoption goals: +- All function parameters annotated +- All return types annotated +- Class attributes annotated +- Minimize `Any` usage (acceptable for truly dynamic data) +- Generic collections use type parameters (`list[str]` not `list`) + +For existing codebases, enable strict mode per-module using `# mypy: strict` or configure per-module overrides in `pyproject.toml`. + +## Best Practices Summary + +1. **Annotate all public APIs** - Functions, methods, class attributes +2. **Use `T | None`** - Modern union syntax over `Optional[T]` +3. **Run strict type checking** - `mypy --strict` in CI +4. **Use generics** - Preserve type info in reusable code +5. **Define protocols** - Structural typing for interfaces +6. **Narrow types** - Use guards to help the type checker +7. **Bound type vars** - Restrict generics to meaningful types +8. **Create type aliases** - Meaningful names for complex types +9. **Minimize `Any`** - Use specific types or generics. `Any` is acceptable for truly dynamic data or when interfacing with untyped third-party code +10. **Document with types** - Types are enforceable documentation