mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Remove references to non-existent resource files (references/, assets/, scripts/, examples/) from 115 skill SKILL.md files. These sections pointed to directories and files that were never created, causing confusion when users install skills. Also fix broken Code of Conduct links in issue templates to use absolute GitHub URLs instead of relative paths that 404.
584 lines
16 KiB
Markdown
584 lines
16 KiB
Markdown
---
|
|
name: data-quality-frameworks
|
|
description: Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.
|
|
---
|
|
|
|
# Data Quality Frameworks
|
|
|
|
Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.
|
|
|
|
## When to Use This Skill
|
|
|
|
- Implementing data quality checks in pipelines
|
|
- Setting up Great Expectations validation
|
|
- Building comprehensive dbt test suites
|
|
- Establishing data contracts between teams
|
|
- Monitoring data quality metrics
|
|
- Automating data validation in CI/CD
|
|
|
|
## Core Concepts
|
|
|
|
### 1. Data Quality Dimensions
|
|
|
|
| Dimension | Description | Example Check |
|
|
| ---------------- | ------------------------ | -------------------------------------------------- |
|
|
| **Completeness** | No missing values | `expect_column_values_to_not_be_null` |
|
|
| **Uniqueness** | No duplicates | `expect_column_values_to_be_unique` |
|
|
| **Validity** | Values in expected range | `expect_column_values_to_be_in_set` |
|
|
| **Accuracy** | Data matches reality | Cross-reference validation |
|
|
| **Consistency** | No contradictions | `expect_column_pair_values_A_to_be_greater_than_B` |
|
|
| **Timeliness** | Data is recent | `expect_column_max_to_be_between` |
|
|
|
|
### 2. Testing Pyramid for Data
|
|
|
|
```
|
|
/\
|
|
/ \ Integration Tests (cross-table)
|
|
/────\
|
|
/ \ Unit Tests (single column)
|
|
/────────\
|
|
/ \ Schema Tests (structure)
|
|
/────────────\
|
|
```
|
|
|
|
## Quick Start
|
|
|
|
### Great Expectations Setup
|
|
|
|
```bash
|
|
# Install
|
|
pip install great_expectations
|
|
|
|
# Initialize project
|
|
great_expectations init
|
|
|
|
# Create datasource
|
|
great_expectations datasource new
|
|
```
|
|
|
|
```python
|
|
# great_expectations/checkpoints/daily_validation.yml
|
|
import great_expectations as gx
|
|
|
|
# Create context
|
|
context = gx.get_context()
|
|
|
|
# Create expectation suite
|
|
suite = context.add_expectation_suite("orders_suite")
|
|
|
|
# Add expectations
|
|
suite.add_expectation(
|
|
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
|
|
)
|
|
suite.add_expectation(
|
|
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
|
|
)
|
|
|
|
# Validate
|
|
results = context.run_checkpoint(checkpoint_name="daily_orders")
|
|
```
|
|
|
|
## Patterns
|
|
|
|
### Pattern 1: Great Expectations Suite
|
|
|
|
```python
|
|
# expectations/orders_suite.py
|
|
import great_expectations as gx
|
|
from great_expectations.core import ExpectationSuite
|
|
from great_expectations.core.expectation_configuration import ExpectationConfiguration
|
|
|
|
def build_orders_suite() -> ExpectationSuite:
|
|
"""Build comprehensive orders expectation suite"""
|
|
|
|
suite = ExpectationSuite(expectation_suite_name="orders_suite")
|
|
|
|
# Schema expectations
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_table_columns_to_match_set",
|
|
kwargs={
|
|
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
|
|
"exact_match": False # Allow additional columns
|
|
}
|
|
))
|
|
|
|
# Primary key
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_values_to_not_be_null",
|
|
kwargs={"column": "order_id"}
|
|
))
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_values_to_be_unique",
|
|
kwargs={"column": "order_id"}
|
|
))
|
|
|
|
# Foreign key
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_values_to_not_be_null",
|
|
kwargs={"column": "customer_id"}
|
|
))
|
|
|
|
# Categorical values
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_values_to_be_in_set",
|
|
kwargs={
|
|
"column": "status",
|
|
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
|
|
}
|
|
))
|
|
|
|
# Numeric ranges
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_values_to_be_between",
|
|
kwargs={
|
|
"column": "amount",
|
|
"min_value": 0,
|
|
"max_value": 100000,
|
|
"strict_min": True # amount > 0
|
|
}
|
|
))
|
|
|
|
# Date validity
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_values_to_be_dateutil_parseable",
|
|
kwargs={"column": "created_at"}
|
|
))
|
|
|
|
# Freshness - data should be recent
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_max_to_be_between",
|
|
kwargs={
|
|
"column": "created_at",
|
|
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
|
|
"max_value": {"$PARAMETER": "now"}
|
|
}
|
|
))
|
|
|
|
# Row count sanity
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_table_row_count_to_be_between",
|
|
kwargs={
|
|
"min_value": 1000, # Expect at least 1000 rows
|
|
"max_value": 10000000
|
|
}
|
|
))
|
|
|
|
# Statistical expectations
|
|
suite.add_expectation(ExpectationConfiguration(
|
|
expectation_type="expect_column_mean_to_be_between",
|
|
kwargs={
|
|
"column": "amount",
|
|
"min_value": 50,
|
|
"max_value": 500
|
|
}
|
|
))
|
|
|
|
return suite
|
|
```
|
|
|
|
### Pattern 2: Great Expectations Checkpoint
|
|
|
|
```yaml
|
|
# great_expectations/checkpoints/orders_checkpoint.yml
|
|
name: orders_checkpoint
|
|
config_version: 1.0
|
|
class_name: Checkpoint
|
|
run_name_template: "%Y%m%d-%H%M%S-orders-validation"
|
|
|
|
validations:
|
|
- batch_request:
|
|
datasource_name: warehouse
|
|
data_connector_name: default_inferred_data_connector_name
|
|
data_asset_name: orders
|
|
data_connector_query:
|
|
index: -1 # Latest batch
|
|
expectation_suite_name: orders_suite
|
|
|
|
action_list:
|
|
- name: store_validation_result
|
|
action:
|
|
class_name: StoreValidationResultAction
|
|
|
|
- name: store_evaluation_parameters
|
|
action:
|
|
class_name: StoreEvaluationParametersAction
|
|
|
|
- name: update_data_docs
|
|
action:
|
|
class_name: UpdateDataDocsAction
|
|
|
|
# Slack notification on failure
|
|
- name: send_slack_notification
|
|
action:
|
|
class_name: SlackNotificationAction
|
|
slack_webhook: ${SLACK_WEBHOOK}
|
|
notify_on: failure
|
|
renderer:
|
|
module_name: great_expectations.render.renderer.slack_renderer
|
|
class_name: SlackRenderer
|
|
```
|
|
|
|
```python
|
|
# Run checkpoint
|
|
import great_expectations as gx
|
|
|
|
context = gx.get_context()
|
|
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
|
|
|
|
if not result.success:
|
|
failed_expectations = [
|
|
r for r in result.run_results.values()
|
|
if not r.success
|
|
]
|
|
raise ValueError(f"Data quality check failed: {failed_expectations}")
|
|
```
|
|
|
|
### Pattern 3: dbt Data Tests
|
|
|
|
```yaml
|
|
# models/marts/core/_core__models.yml
|
|
version: 2
|
|
|
|
models:
|
|
- name: fct_orders
|
|
description: Order fact table
|
|
tests:
|
|
# Table-level tests
|
|
- dbt_utils.recency:
|
|
datepart: day
|
|
field: created_at
|
|
interval: 1
|
|
- dbt_utils.at_least_one
|
|
- dbt_utils.expression_is_true:
|
|
expression: "total_amount >= 0"
|
|
|
|
columns:
|
|
- name: order_id
|
|
description: Primary key
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
|
|
- name: customer_id
|
|
description: Foreign key to dim_customers
|
|
tests:
|
|
- not_null
|
|
- relationships:
|
|
to: ref('dim_customers')
|
|
field: customer_id
|
|
|
|
- name: order_status
|
|
tests:
|
|
- accepted_values:
|
|
values:
|
|
["pending", "processing", "shipped", "delivered", "cancelled"]
|
|
|
|
- name: total_amount
|
|
tests:
|
|
- not_null
|
|
- dbt_utils.expression_is_true:
|
|
expression: ">= 0"
|
|
|
|
- name: created_at
|
|
tests:
|
|
- not_null
|
|
- dbt_utils.expression_is_true:
|
|
expression: "<= current_timestamp"
|
|
|
|
- name: dim_customers
|
|
columns:
|
|
- name: customer_id
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
|
|
- name: email
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
# Custom regex test
|
|
- dbt_utils.expression_is_true:
|
|
expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"
|
|
```
|
|
|
|
### Pattern 4: Custom dbt Tests
|
|
|
|
```sql
|
|
-- tests/generic/test_row_count_in_range.sql
|
|
{% test row_count_in_range(model, min_count, max_count) %}
|
|
|
|
with row_count as (
|
|
select count(*) as cnt from {{ model }}
|
|
)
|
|
|
|
select cnt
|
|
from row_count
|
|
where cnt < {{ min_count }} or cnt > {{ max_count }}
|
|
|
|
{% endtest %}
|
|
|
|
-- Usage in schema.yml:
|
|
-- tests:
|
|
-- - row_count_in_range:
|
|
-- min_count: 1000
|
|
-- max_count: 10000000
|
|
```
|
|
|
|
```sql
|
|
-- tests/generic/test_sequential_values.sql
|
|
{% test sequential_values(model, column_name, interval=1) %}
|
|
|
|
with lagged as (
|
|
select
|
|
{{ column_name }},
|
|
lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
|
|
from {{ model }}
|
|
)
|
|
|
|
select *
|
|
from lagged
|
|
where {{ column_name }} - prev_value != {{ interval }}
|
|
and prev_value is not null
|
|
|
|
{% endtest %}
|
|
```
|
|
|
|
```sql
|
|
-- tests/singular/assert_orders_customers_match.sql
|
|
-- Singular test: specific business rule
|
|
|
|
with orders_customers as (
|
|
select distinct customer_id from {{ ref('fct_orders') }}
|
|
),
|
|
|
|
dim_customers as (
|
|
select customer_id from {{ ref('dim_customers') }}
|
|
),
|
|
|
|
orphaned_orders as (
|
|
select o.customer_id
|
|
from orders_customers o
|
|
left join dim_customers c using (customer_id)
|
|
where c.customer_id is null
|
|
)
|
|
|
|
select * from orphaned_orders
|
|
-- Test passes if this returns 0 rows
|
|
```
|
|
|
|
### Pattern 5: Data Contracts
|
|
|
|
```yaml
|
|
# contracts/orders_contract.yaml
|
|
apiVersion: datacontract.com/v1.0.0
|
|
kind: DataContract
|
|
metadata:
|
|
name: orders
|
|
version: 1.0.0
|
|
owner: data-platform-team
|
|
contact: data-team@company.com
|
|
|
|
info:
|
|
title: Orders Data Contract
|
|
description: Contract for order event data from the ecommerce platform
|
|
purpose: Analytics, reporting, and ML features
|
|
|
|
servers:
|
|
production:
|
|
type: snowflake
|
|
account: company.us-east-1
|
|
database: ANALYTICS
|
|
schema: CORE
|
|
|
|
terms:
|
|
usage: Internal analytics only
|
|
limitations: PII must not be exposed in downstream marts
|
|
billing: Charged per query TB scanned
|
|
|
|
schema:
|
|
type: object
|
|
properties:
|
|
order_id:
|
|
type: string
|
|
format: uuid
|
|
description: Unique order identifier
|
|
required: true
|
|
unique: true
|
|
pii: false
|
|
|
|
customer_id:
|
|
type: string
|
|
format: uuid
|
|
description: Customer identifier
|
|
required: true
|
|
pii: true
|
|
piiClassification: indirect
|
|
|
|
total_amount:
|
|
type: number
|
|
minimum: 0
|
|
maximum: 100000
|
|
description: Order total in USD
|
|
|
|
created_at:
|
|
type: string
|
|
format: date-time
|
|
description: Order creation timestamp
|
|
required: true
|
|
|
|
status:
|
|
type: string
|
|
enum: [pending, processing, shipped, delivered, cancelled]
|
|
description: Current order status
|
|
|
|
quality:
|
|
type: SodaCL
|
|
specification:
|
|
checks for orders:
|
|
- row_count > 0
|
|
- missing_count(order_id) = 0
|
|
- duplicate_count(order_id) = 0
|
|
- invalid_count(status) = 0:
|
|
valid values: [pending, processing, shipped, delivered, cancelled]
|
|
- freshness(created_at) < 24h
|
|
|
|
sla:
|
|
availability: 99.9%
|
|
freshness: 1 hour
|
|
latency: 5 minutes
|
|
```
|
|
|
|
### Pattern 6: Automated Quality Pipeline
|
|
|
|
```python
|
|
# quality_pipeline.py
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Any
|
|
import great_expectations as gx
|
|
from datetime import datetime
|
|
|
|
@dataclass
|
|
class QualityResult:
|
|
table: str
|
|
passed: bool
|
|
total_expectations: int
|
|
failed_expectations: int
|
|
details: List[Dict[str, Any]]
|
|
timestamp: datetime
|
|
|
|
class DataQualityPipeline:
|
|
"""Orchestrate data quality checks across tables"""
|
|
|
|
def __init__(self, context: gx.DataContext):
|
|
self.context = context
|
|
self.results: List[QualityResult] = []
|
|
|
|
def validate_table(self, table: str, suite: str) -> QualityResult:
|
|
"""Validate a single table against expectation suite"""
|
|
|
|
checkpoint_config = {
|
|
"name": f"{table}_validation",
|
|
"config_version": 1.0,
|
|
"class_name": "Checkpoint",
|
|
"validations": [{
|
|
"batch_request": {
|
|
"datasource_name": "warehouse",
|
|
"data_asset_name": table,
|
|
},
|
|
"expectation_suite_name": suite,
|
|
}],
|
|
}
|
|
|
|
result = self.context.run_checkpoint(**checkpoint_config)
|
|
|
|
# Parse results
|
|
validation_result = list(result.run_results.values())[0]
|
|
results = validation_result.results
|
|
|
|
failed = [r for r in results if not r.success]
|
|
|
|
return QualityResult(
|
|
table=table,
|
|
passed=result.success,
|
|
total_expectations=len(results),
|
|
failed_expectations=len(failed),
|
|
details=[{
|
|
"expectation": r.expectation_config.expectation_type,
|
|
"success": r.success,
|
|
"observed_value": r.result.get("observed_value"),
|
|
} for r in results],
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
|
|
"""Run validation for all tables"""
|
|
results = {}
|
|
|
|
for table, suite in tables.items():
|
|
print(f"Validating {table}...")
|
|
results[table] = self.validate_table(table, suite)
|
|
|
|
return results
|
|
|
|
def generate_report(self, results: Dict[str, QualityResult]) -> str:
|
|
"""Generate quality report"""
|
|
report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]
|
|
|
|
total_passed = sum(1 for r in results.values() if r.passed)
|
|
total_tables = len(results)
|
|
|
|
report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
|
|
report.append("")
|
|
|
|
for table, result in results.items():
|
|
status = "✅" if result.passed else "❌"
|
|
report.append(f"### {status} {table}")
|
|
report.append(f"- Expectations: {result.total_expectations}")
|
|
report.append(f"- Failed: {result.failed_expectations}")
|
|
|
|
if not result.passed:
|
|
report.append("- Failed checks:")
|
|
for detail in result.details:
|
|
if not detail["success"]:
|
|
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
# Usage
|
|
context = gx.get_context()
|
|
pipeline = DataQualityPipeline(context)
|
|
|
|
tables_to_validate = {
|
|
"orders": "orders_suite",
|
|
"customers": "customers_suite",
|
|
"products": "products_suite",
|
|
}
|
|
|
|
results = pipeline.run_all(tables_to_validate)
|
|
report = pipeline.generate_report(results)
|
|
|
|
# Fail pipeline if any table failed
|
|
if not all(r.passed for r in results.values()):
|
|
print(report)
|
|
raise ValueError("Data quality checks failed!")
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### Do's
|
|
|
|
- **Test early** - Validate source data before transformations
|
|
- **Test incrementally** - Add tests as you find issues
|
|
- **Document expectations** - Clear descriptions for each test
|
|
- **Alert on failures** - Integrate with monitoring
|
|
- **Version contracts** - Track schema changes
|
|
|
|
### Don'ts
|
|
|
|
- **Don't test everything** - Focus on critical columns
|
|
- **Don't ignore warnings** - They often precede failures
|
|
- **Don't skip freshness** - Stale data is bad data
|
|
- **Don't hardcode thresholds** - Use dynamic baselines
|
|
- **Don't test in isolation** - Test relationships too
|