mirror of
https://github.com/wshobson/agents.git
synced 2026-03-18 17:47:16 +00:00
Add domain expert agents with comprehensive skill sets: - service-mesh-expert (cloud-infrastructure): Istio/Linkerd patterns, mTLS, observability - event-sourcing-architect (backend-development): CQRS, event stores, projections, sagas - vector-database-engineer (llm-application-dev): embeddings, similarity search, hybrid search - monorepo-architect (developer-essentials): Nx, Turborepo, Bazel, pnpm workspaces - threat-modeling-expert (security-scanning): STRIDE, attack trees, security requirements Update all documentation to reflect correct counts: - 67 plugins, 99 agents, 107 skills, 71 commands
562 lines
13 KiB
Markdown
562 lines
13 KiB
Markdown
---
|
|
name: dbt-transformation-patterns
|
|
description: Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
|
|
---
|
|
|
|
# dbt Transformation Patterns
|
|
|
|
Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing.
|
|
|
|
## When to Use This Skill
|
|
|
|
- Building data transformation pipelines with dbt
|
|
- Organizing models into staging, intermediate, and marts layers
|
|
- Implementing data quality tests
|
|
- Creating incremental models for large datasets
|
|
- Documenting data models and lineage
|
|
- Setting up dbt project structure
|
|
|
|
## Core Concepts
|
|
|
|
### 1. Model Layers (Medallion Architecture)
|
|
|
|
```
|
|
sources/ Raw data definitions
|
|
↓
|
|
staging/ 1:1 with source, light cleaning
|
|
↓
|
|
intermediate/ Business logic, joins, aggregations
|
|
↓
|
|
marts/ Final analytics tables
|
|
```
|
|
|
|
### 2. Naming Conventions
|
|
|
|
| Layer | Prefix | Example |
|
|
|-------|--------|---------|
|
|
| Staging | `stg_` | `stg_stripe__payments` |
|
|
| Intermediate | `int_` | `int_payments_pivoted` |
|
|
| Marts | `dim_`, `fct_` | `dim_customers`, `fct_orders` |
|
|
|
|
## Quick Start
|
|
|
|
```yaml
|
|
# dbt_project.yml
|
|
name: 'analytics'
|
|
version: '1.0.0'
|
|
profile: 'analytics'
|
|
|
|
model-paths: ["models"]
|
|
analysis-paths: ["analyses"]
|
|
test-paths: ["tests"]
|
|
seed-paths: ["seeds"]
|
|
macro-paths: ["macros"]
|
|
|
|
vars:
|
|
start_date: '2020-01-01'
|
|
|
|
models:
|
|
analytics:
|
|
staging:
|
|
+materialized: view
|
|
+schema: staging
|
|
intermediate:
|
|
+materialized: ephemeral
|
|
marts:
|
|
+materialized: table
|
|
+schema: analytics
|
|
```
|
|
|
|
```
|
|
# Project structure
|
|
models/
|
|
├── staging/
|
|
│ ├── stripe/
|
|
│ │ ├── _stripe__sources.yml
|
|
│ │ ├── _stripe__models.yml
|
|
│ │ ├── stg_stripe__customers.sql
|
|
│ │ └── stg_stripe__payments.sql
|
|
│ └── shopify/
|
|
│ ├── _shopify__sources.yml
|
|
│ └── stg_shopify__orders.sql
|
|
├── intermediate/
|
|
│ └── finance/
|
|
│ └── int_payments_pivoted.sql
|
|
└── marts/
|
|
├── core/
|
|
│ ├── _core__models.yml
|
|
│ ├── dim_customers.sql
|
|
│ └── fct_orders.sql
|
|
└── finance/
|
|
└── fct_revenue.sql
|
|
```
|
|
|
|
## Patterns
|
|
|
|
### Pattern 1: Source Definitions
|
|
|
|
```yaml
|
|
# models/staging/stripe/_stripe__sources.yml
|
|
version: 2
|
|
|
|
sources:
|
|
- name: stripe
|
|
description: Raw Stripe data loaded via Fivetran
|
|
database: raw
|
|
schema: stripe
|
|
loader: fivetran
|
|
loaded_at_field: _fivetran_synced
|
|
freshness:
|
|
warn_after: {count: 12, period: hour}
|
|
error_after: {count: 24, period: hour}
|
|
tables:
|
|
- name: customers
|
|
description: Stripe customer records
|
|
columns:
|
|
- name: id
|
|
description: Primary key
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
- name: email
|
|
description: Customer email
|
|
- name: created
|
|
description: Account creation timestamp
|
|
|
|
- name: payments
|
|
description: Stripe payment transactions
|
|
columns:
|
|
- name: id
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
- name: customer_id
|
|
tests:
|
|
- not_null
|
|
- relationships:
|
|
to: source('stripe', 'customers')
|
|
field: id
|
|
```
|
|
|
|
### Pattern 2: Staging Models
|
|
|
|
```sql
|
|
-- models/staging/stripe/stg_stripe__customers.sql
|
|
with source as (
|
|
select * from {{ source('stripe', 'customers') }}
|
|
),
|
|
|
|
renamed as (
|
|
select
|
|
-- ids
|
|
id as customer_id,
|
|
|
|
-- strings
|
|
lower(email) as email,
|
|
name as customer_name,
|
|
|
|
-- timestamps
|
|
created as created_at,
|
|
|
|
-- metadata
|
|
_fivetran_synced as _loaded_at
|
|
|
|
from source
|
|
)
|
|
|
|
select * from renamed
|
|
```
|
|
|
|
```sql
|
|
-- models/staging/stripe/stg_stripe__payments.sql
|
|
{{
|
|
config(
|
|
materialized='incremental',
|
|
unique_key='payment_id',
|
|
on_schema_change='append_new_columns'
|
|
)
|
|
}}
|
|
|
|
with source as (
|
|
select * from {{ source('stripe', 'payments') }}
|
|
|
|
{% if is_incremental() %}
|
|
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
|
|
{% endif %}
|
|
),
|
|
|
|
renamed as (
|
|
select
|
|
-- ids
|
|
id as payment_id,
|
|
customer_id,
|
|
invoice_id,
|
|
|
|
-- amounts (convert cents to dollars)
|
|
amount / 100.0 as amount,
|
|
amount_refunded / 100.0 as amount_refunded,
|
|
|
|
-- status
|
|
status as payment_status,
|
|
|
|
-- timestamps
|
|
created as created_at,
|
|
|
|
-- metadata
|
|
_fivetran_synced as _loaded_at
|
|
|
|
from source
|
|
)
|
|
|
|
select * from renamed
|
|
```
|
|
|
|
### Pattern 3: Intermediate Models
|
|
|
|
```sql
|
|
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
|
|
with payments as (
|
|
select * from {{ ref('stg_stripe__payments') }}
|
|
),
|
|
|
|
customers as (
|
|
select * from {{ ref('stg_stripe__customers') }}
|
|
),
|
|
|
|
payment_summary as (
|
|
select
|
|
customer_id,
|
|
count(*) as total_payments,
|
|
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
|
|
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
|
|
min(created_at) as first_payment_at,
|
|
max(created_at) as last_payment_at
|
|
from payments
|
|
group by customer_id
|
|
)
|
|
|
|
select
|
|
customers.customer_id,
|
|
customers.email,
|
|
customers.created_at as customer_created_at,
|
|
coalesce(payment_summary.total_payments, 0) as total_payments,
|
|
coalesce(payment_summary.successful_payments, 0) as successful_payments,
|
|
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
|
|
payment_summary.first_payment_at,
|
|
payment_summary.last_payment_at
|
|
|
|
from customers
|
|
left join payment_summary using (customer_id)
|
|
```
|
|
|
|
### Pattern 4: Mart Models (Dimensions and Facts)
|
|
|
|
```sql
|
|
-- models/marts/core/dim_customers.sql
|
|
{{
|
|
config(
|
|
materialized='table',
|
|
unique_key='customer_id'
|
|
)
|
|
}}
|
|
|
|
with customers as (
|
|
select * from {{ ref('int_payments_pivoted_to_customer') }}
|
|
),
|
|
|
|
orders as (
|
|
select * from {{ ref('stg_shopify__orders') }}
|
|
),
|
|
|
|
order_summary as (
|
|
select
|
|
customer_id,
|
|
count(*) as total_orders,
|
|
sum(total_price) as total_order_value,
|
|
min(created_at) as first_order_at,
|
|
max(created_at) as last_order_at
|
|
from orders
|
|
group by customer_id
|
|
),
|
|
|
|
final as (
|
|
select
|
|
-- surrogate key
|
|
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
|
|
|
|
-- natural key
|
|
customers.customer_id,
|
|
|
|
-- attributes
|
|
customers.email,
|
|
customers.customer_created_at,
|
|
|
|
-- payment metrics
|
|
customers.total_payments,
|
|
customers.successful_payments,
|
|
customers.lifetime_value,
|
|
customers.first_payment_at,
|
|
customers.last_payment_at,
|
|
|
|
-- order metrics
|
|
coalesce(order_summary.total_orders, 0) as total_orders,
|
|
coalesce(order_summary.total_order_value, 0) as total_order_value,
|
|
order_summary.first_order_at,
|
|
order_summary.last_order_at,
|
|
|
|
-- calculated fields
|
|
case
|
|
when customers.lifetime_value >= 1000 then 'high'
|
|
when customers.lifetime_value >= 100 then 'medium'
|
|
else 'low'
|
|
end as customer_tier,
|
|
|
|
-- timestamps
|
|
current_timestamp as _loaded_at
|
|
|
|
from customers
|
|
left join order_summary using (customer_id)
|
|
)
|
|
|
|
select * from final
|
|
```
|
|
|
|
```sql
|
|
-- models/marts/core/fct_orders.sql
|
|
{{
|
|
config(
|
|
materialized='incremental',
|
|
unique_key='order_id',
|
|
incremental_strategy='merge'
|
|
)
|
|
}}
|
|
|
|
with orders as (
|
|
select * from {{ ref('stg_shopify__orders') }}
|
|
|
|
{% if is_incremental() %}
|
|
where updated_at > (select max(updated_at) from {{ this }})
|
|
{% endif %}
|
|
),
|
|
|
|
customers as (
|
|
select * from {{ ref('dim_customers') }}
|
|
),
|
|
|
|
final as (
|
|
select
|
|
-- keys
|
|
orders.order_id,
|
|
customers.customer_key,
|
|
orders.customer_id,
|
|
|
|
-- dimensions
|
|
orders.order_status,
|
|
orders.fulfillment_status,
|
|
orders.payment_status,
|
|
|
|
-- measures
|
|
orders.subtotal,
|
|
orders.tax,
|
|
orders.shipping,
|
|
orders.total_price,
|
|
orders.total_discount,
|
|
orders.item_count,
|
|
|
|
-- timestamps
|
|
orders.created_at,
|
|
orders.updated_at,
|
|
orders.fulfilled_at,
|
|
|
|
-- metadata
|
|
current_timestamp as _loaded_at
|
|
|
|
from orders
|
|
left join customers on orders.customer_id = customers.customer_id
|
|
)
|
|
|
|
select * from final
|
|
```
|
|
|
|
### Pattern 5: Testing and Documentation
|
|
|
|
```yaml
|
|
# models/marts/core/_core__models.yml
|
|
version: 2
|
|
|
|
models:
|
|
- name: dim_customers
|
|
description: Customer dimension with payment and order metrics
|
|
columns:
|
|
- name: customer_key
|
|
description: Surrogate key for the customer dimension
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
|
|
- name: customer_id
|
|
description: Natural key from source system
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
|
|
- name: email
|
|
description: Customer email address
|
|
tests:
|
|
- not_null
|
|
|
|
- name: customer_tier
|
|
description: Customer value tier based on lifetime value
|
|
tests:
|
|
- accepted_values:
|
|
values: ['high', 'medium', 'low']
|
|
|
|
- name: lifetime_value
|
|
description: Total amount paid by customer
|
|
tests:
|
|
- dbt_utils.expression_is_true:
|
|
expression: ">= 0"
|
|
|
|
- name: fct_orders
|
|
description: Order fact table with all order transactions
|
|
tests:
|
|
- dbt_utils.recency:
|
|
datepart: day
|
|
field: created_at
|
|
interval: 1
|
|
columns:
|
|
- name: order_id
|
|
tests:
|
|
- unique
|
|
- not_null
|
|
- name: customer_key
|
|
tests:
|
|
- not_null
|
|
- relationships:
|
|
to: ref('dim_customers')
|
|
field: customer_key
|
|
```
|
|
|
|
### Pattern 6: Macros and DRY Code
|
|
|
|
```sql
|
|
-- macros/cents_to_dollars.sql
|
|
{% macro cents_to_dollars(column_name, precision=2) %}
|
|
round({{ column_name }} / 100.0, {{ precision }})
|
|
{% endmacro %}
|
|
|
|
-- macros/generate_schema_name.sql
|
|
{% macro generate_schema_name(custom_schema_name, node) %}
|
|
{%- set default_schema = target.schema -%}
|
|
{%- if custom_schema_name is none -%}
|
|
{{ default_schema }}
|
|
{%- else -%}
|
|
{{ default_schema }}_{{ custom_schema_name }}
|
|
{%- endif -%}
|
|
{% endmacro %}
|
|
|
|
-- macros/limit_data_in_dev.sql
|
|
{% macro limit_data_in_dev(column_name, days=3) %}
|
|
{% if target.name == 'dev' %}
|
|
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
|
|
{% endif %}
|
|
{% endmacro %}
|
|
|
|
-- Usage in model
|
|
select * from {{ ref('stg_orders') }}
|
|
{{ limit_data_in_dev('created_at') }}
|
|
```
|
|
|
|
### Pattern 7: Incremental Strategies
|
|
|
|
```sql
|
|
-- Delete+Insert (default for most warehouses)
|
|
{{
|
|
config(
|
|
materialized='incremental',
|
|
unique_key='id',
|
|
incremental_strategy='delete+insert'
|
|
)
|
|
}}
|
|
|
|
-- Merge (best for late-arriving data)
|
|
{{
|
|
config(
|
|
materialized='incremental',
|
|
unique_key='id',
|
|
incremental_strategy='merge',
|
|
merge_update_columns=['status', 'amount', 'updated_at']
|
|
)
|
|
}}
|
|
|
|
-- Insert Overwrite (partition-based)
|
|
{{
|
|
config(
|
|
materialized='incremental',
|
|
incremental_strategy='insert_overwrite',
|
|
partition_by={
|
|
"field": "created_date",
|
|
"data_type": "date",
|
|
"granularity": "day"
|
|
}
|
|
)
|
|
}}
|
|
|
|
select
|
|
*,
|
|
date(created_at) as created_date
|
|
from {{ ref('stg_events') }}
|
|
|
|
{% if is_incremental() %}
|
|
where created_date >= dateadd(day, -3, current_date)
|
|
{% endif %}
|
|
```
|
|
|
|
## dbt Commands
|
|
|
|
```bash
|
|
# Development
|
|
dbt run # Run all models
|
|
dbt run --select staging # Run staging models only
|
|
dbt run --select +fct_orders # Run fct_orders and its upstream
|
|
dbt run --select fct_orders+ # Run fct_orders and its downstream
|
|
dbt run --full-refresh # Rebuild incremental models
|
|
|
|
# Testing
|
|
dbt test # Run all tests
|
|
dbt test --select stg_stripe # Test specific models
|
|
dbt build # Run + test in DAG order
|
|
|
|
# Documentation
|
|
dbt docs generate # Generate docs
|
|
dbt docs serve # Serve docs locally
|
|
|
|
# Debugging
|
|
dbt compile # Compile SQL without running
|
|
dbt debug # Test connection
|
|
dbt ls --select tag:critical # List models by tag
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### Do's
|
|
- **Use staging layer** - Clean data once, use everywhere
|
|
- **Test aggressively** - Not null, unique, relationships
|
|
- **Document everything** - Column descriptions, model descriptions
|
|
- **Use incremental** - For tables > 1M rows
|
|
- **Version control** - dbt project in Git
|
|
|
|
### Don'ts
|
|
- **Don't skip staging** - Raw → mart is tech debt
|
|
- **Don't hardcode dates** - Use `{{ var('start_date') }}`
|
|
- **Don't repeat logic** - Extract to macros
|
|
- **Don't test in prod** - Use dev target
|
|
- **Don't ignore freshness** - Monitor source data
|
|
|
|
## Resources
|
|
|
|
- [dbt Documentation](https://docs.getdbt.com/)
|
|
- [dbt Best Practices](https://docs.getdbt.com/guides/best-practices)
|
|
- [dbt-utils Package](https://hub.getdbt.com/dbt-labs/dbt_utils/latest/)
|
|
- [dbt Discourse](https://discourse.getdbt.com/)
|