Files
agents/tools/db-migrate.md
Seth Hobson ce7a5938c1 Consolidate workflows and tools from commands repository
Repository Restructure:
- Move all 83 agent .md files to agents/ subdirectory
- Add 15 workflow orchestrators from commands repo to workflows/
- Add 42 development tools from commands repo to tools/
- Update README for unified repository structure

This prepares the repository for unified plugin marketplace integration.
The commands repository functionality is now fully integrated, providing
complete workflow orchestration and development tooling alongside agents.

Directory Structure:
- agents/    - 83 specialized AI agents
- workflows/ - 15 multi-agent orchestration commands
- tools/     - 42 focused development utilities

No breaking changes to agent functionality - all agents remain accessible
with same names and behavior. Adds workflow and tool commands for enhanced
multi-agent coordination capabilities.
2025-10-08 08:25:17 -04:00

64 KiB

model
model
claude-sonnet-4-0

Database Migration Strategy and Implementation

You are a database migration expert specializing in zero-downtime deployments, data integrity, and multi-database environments. Create comprehensive migration scripts with rollback strategies, validation checks, and performance optimization.

Context

The user needs help with database migrations that ensure data integrity, minimize downtime, and provide safe rollback options. Focus on production-ready migration strategies that handle edge cases and large datasets.

Requirements

$ARGUMENTS

Instructions

1. Migration Analysis

Analyze the required database changes:

Schema Changes

  • Table Operations

    • Create new tables
    • Drop unused tables
    • Rename tables
    • Alter table engines/options
  • Column Operations

    • Add columns (nullable vs non-nullable)
    • Drop columns (with data preservation)
    • Rename columns
    • Change data types
    • Modify constraints
  • Index Operations

    • Create indexes (online vs offline)
    • Drop indexes
    • Modify index types
    • Add composite indexes
  • Constraint Operations

    • Foreign keys
    • Unique constraints
    • Check constraints
    • Default values

Data Migrations

  • Transformations

    • Data type conversions
    • Normalization/denormalization
    • Calculated fields
    • Data cleaning
  • Relationships

    • Moving data between tables
    • Splitting/merging tables
    • Creating junction tables
    • Handling orphaned records

2. Zero-Downtime Strategy

Implement migrations without service interruption:

Expand-Contract Pattern

-- Phase 1: Expand (backward compatible)
ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE;
CREATE INDEX CONCURRENTLY idx_users_email_verified ON users(email_verified);

-- Phase 2: Migrate Data (in batches)
UPDATE users 
SET email_verified = (email_confirmation_token IS NOT NULL)
WHERE id IN (
  SELECT id FROM users 
  WHERE email_verified IS NULL 
  LIMIT 10000
);

-- Phase 3: Contract (after code deployment)
ALTER TABLE users DROP COLUMN email_confirmation_token;

Blue-Green Schema Migration

# Step 1: Create new schema version
def create_v2_schema():
    """
    Create new tables with v2_ prefix
    """
    execute("""
        CREATE TABLE v2_orders (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            customer_id UUID NOT NULL,
            total_amount DECIMAL(10,2) NOT NULL,
            status VARCHAR(50) NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            metadata JSONB DEFAULT '{}'
        );
        
        CREATE INDEX idx_v2_orders_customer ON v2_orders(customer_id);
        CREATE INDEX idx_v2_orders_status ON v2_orders(status);
    """)

# Step 2: Sync data with dual writes
def enable_dual_writes():
    """
    Application writes to both old and new tables
    """
    # Trigger-based approach
    execute("""
        CREATE OR REPLACE FUNCTION sync_orders_to_v2() 
        RETURNS TRIGGER AS $$
        BEGIN
            INSERT INTO v2_orders (
                id, customer_id, total_amount, status, created_at
            ) VALUES (
                NEW.id, NEW.customer_id, NEW.amount, NEW.state, NEW.created
            ) ON CONFLICT (id) DO UPDATE SET
                total_amount = EXCLUDED.total_amount,
                status = EXCLUDED.status;
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;
        
        CREATE TRIGGER sync_orders_trigger
        AFTER INSERT OR UPDATE ON orders
        FOR EACH ROW EXECUTE FUNCTION sync_orders_to_v2();
    """)

# Step 3: Backfill historical data
def backfill_data():
    """
    Copy historical data in batches
    """
    batch_size = 10000
    last_id = None
    
    while True:
        query = """
            INSERT INTO v2_orders (
                id, customer_id, total_amount, status, created_at
            )
            SELECT 
                id, customer_id, amount, state, created
            FROM orders
            WHERE ($1::uuid IS NULL OR id > $1)
            ORDER BY id
            LIMIT $2
            ON CONFLICT (id) DO NOTHING
            RETURNING id
        """
        
        results = execute(query, [last_id, batch_size])
        if not results:
            break
            
        last_id = results[-1]['id']
        time.sleep(0.1)  # Prevent overload

# Step 4: Switch reads
# Step 5: Switch writes  
# Step 6: Drop old schema

3. Migration Scripts

Generate version-controlled migration files:

SQL Migrations

-- migrations/001_add_user_preferences.up.sql
BEGIN;

-- Add new table
CREATE TABLE user_preferences (
    user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
    theme VARCHAR(20) DEFAULT 'light',
    language VARCHAR(10) DEFAULT 'en',
    notifications JSONB DEFAULT '{"email": true, "push": false}',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Add update trigger
CREATE TRIGGER update_user_preferences_updated_at
    BEFORE UPDATE ON user_preferences
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- Add indexes
CREATE INDEX idx_user_preferences_language ON user_preferences(language);

-- Seed default data
INSERT INTO user_preferences (user_id)
SELECT id FROM users
ON CONFLICT DO NOTHING;

COMMIT;

-- migrations/001_add_user_preferences.down.sql
BEGIN;

DROP TABLE IF EXISTS user_preferences CASCADE;

COMMIT;

Framework Migrations (Rails/Django/Laravel)

# Django migration
from django.db import migrations, models
import django.contrib.postgres.fields

class Migration(migrations.Migration):
    dependencies = [
        ('app', '0010_previous_migration'),
    ]

    operations = [
        migrations.CreateModel(
            name='UserPreferences',
            fields=[
                ('user', models.OneToOneField(
                    'User', 
                    on_delete=models.CASCADE, 
                    primary_key=True
                )),
                ('theme', models.CharField(
                    max_length=20, 
                    default='light'
                )),
                ('language', models.CharField(
                    max_length=10, 
                    default='en',
                    db_index=True
                )),
                ('notifications', models.JSONField(
                    default=dict
                )),
                ('created_at', models.DateTimeField(
                    auto_now_add=True
                )),
                ('updated_at', models.DateTimeField(
                    auto_now=True
                )),
            ],
        ),
        
        # Custom SQL for complex operations
        migrations.RunSQL(
            sql=[
                """
                -- Forward migration
                UPDATE products 
                SET price_cents = CAST(price * 100 AS INTEGER)
                WHERE price_cents IS NULL;
                """,
            ],
            reverse_sql=[
                """
                -- Reverse migration
                UPDATE products 
                SET price = CAST(price_cents AS DECIMAL) / 100
                WHERE price IS NULL;
                """,
            ],
        ),
    ]

4. Data Integrity Checks

Implement comprehensive validation:

Pre-Migration Validation

def validate_pre_migration():
    """
    Check data integrity before migration
    """
    checks = []
    
    # Check for NULL values in required fields
    null_check = execute("""
        SELECT COUNT(*) as count
        FROM users
        WHERE email IS NULL OR username IS NULL
    """)[0]['count']
    
    if null_check > 0:
        checks.append({
            'check': 'null_values',
            'status': 'FAILED',
            'message': f'{null_check} users with NULL email/username',
            'action': 'Fix NULL values before migration'
        })
    
    # Check for duplicate values
    duplicate_check = execute("""
        SELECT email, COUNT(*) as count
        FROM users
        GROUP BY email
        HAVING COUNT(*) > 1
    """)
    
    if duplicate_check:
        checks.append({
            'check': 'duplicates',
            'status': 'FAILED', 
            'message': f'{len(duplicate_check)} duplicate emails found',
            'action': 'Resolve duplicates before adding unique constraint'
        })
    
    # Check foreign key integrity
    orphan_check = execute("""
        SELECT COUNT(*) as count
        FROM orders o
        LEFT JOIN users u ON o.user_id = u.id
        WHERE u.id IS NULL
    """)[0]['count']
    
    if orphan_check > 0:
        checks.append({
            'check': 'orphaned_records',
            'status': 'WARNING',
            'message': f'{orphan_check} orders with non-existent users',
            'action': 'Clean up orphaned records'
        })
    
    return checks

Post-Migration Validation

def validate_post_migration():
    """
    Verify migration success
    """
    validations = []
    
    # Row count validation
    old_count = execute("SELECT COUNT(*) FROM orders")[0]['count']
    new_count = execute("SELECT COUNT(*) FROM v2_orders")[0]['count']
    
    validations.append({
        'check': 'row_count',
        'expected': old_count,
        'actual': new_count,
        'status': 'PASS' if old_count == new_count else 'FAIL'
    })
    
    # Checksum validation
    old_checksum = execute("""
        SELECT 
            SUM(CAST(amount AS DECIMAL)) as total,
            COUNT(DISTINCT customer_id) as customers
        FROM orders
    """)[0]
    
    new_checksum = execute("""
        SELECT 
            SUM(total_amount) as total,
            COUNT(DISTINCT customer_id) as customers  
        FROM v2_orders
    """)[0]
    
    validations.append({
        'check': 'data_integrity',
        'status': 'PASS' if old_checksum == new_checksum else 'FAIL',
        'details': {
            'old': old_checksum,
            'new': new_checksum
        }
    })
    
    return validations

5. Rollback Procedures

Implement safe rollback strategies:

Automatic Rollback

class MigrationRunner:
    def __init__(self, migration):
        self.migration = migration
        self.checkpoint = None
        
    def run_with_rollback(self):
        """
        Execute migration with automatic rollback on failure
        """
        try:
            # Create restore point
            self.checkpoint = self.create_checkpoint()
            
            # Run pre-checks
            pre_checks = self.migration.validate_pre()
            if any(c['status'] == 'FAILED' for c in pre_checks):
                raise MigrationError("Pre-validation failed", pre_checks)
            
            # Execute migration
            with transaction.atomic():
                self.migration.forward()
                
                # Run post-checks
                post_checks = self.migration.validate_post()
                if any(c['status'] == 'FAILED' for c in post_checks):
                    raise MigrationError("Post-validation failed", post_checks)
                    
            # Clean up checkpoint after success
            self.cleanup_checkpoint()
            
        except Exception as e:
            logger.error(f"Migration failed: {e}")
            self.rollback()
            raise
            
    def rollback(self):
        """
        Restore to checkpoint
        """
        if self.checkpoint:
            execute(f"RESTORE DATABASE FROM CHECKPOINT '{self.checkpoint}'")

Manual Rollback Scripts

#!/bin/bash
# rollback_migration.sh

MIGRATION_VERSION=$1
DATABASE=$2

echo "Rolling back migration $MIGRATION_VERSION on $DATABASE"

# Check current version
CURRENT_VERSION=$(psql -d $DATABASE -t -c "SELECT version FROM schema_migrations ORDER BY version DESC LIMIT 1")

if [ "$CURRENT_VERSION" != "$MIGRATION_VERSION" ]; then
    echo "Error: Current version ($CURRENT_VERSION) doesn't match rollback version ($MIGRATION_VERSION)"
    exit 1
fi

# Execute rollback
psql -d $DATABASE -f "migrations/${MIGRATION_VERSION}.down.sql"

# Update version table
psql -d $DATABASE -c "DELETE FROM schema_migrations WHERE version = '$MIGRATION_VERSION'"

echo "Rollback completed successfully"

6. Performance Optimization

Minimize migration impact:

Batch Processing

def migrate_large_table(batch_size=10000):
    """
    Migrate large tables in batches
    """
    total_rows = execute("SELECT COUNT(*) FROM source_table")[0]['count']
    processed = 0
    
    while processed < total_rows:
        # Process batch
        execute("""
            INSERT INTO target_table (columns...)
            SELECT columns...
            FROM source_table
            ORDER BY id
            OFFSET %s
            LIMIT %s
            ON CONFLICT DO NOTHING
        """, [processed, batch_size])
        
        processed += batch_size
        
        # Progress tracking
        progress = (processed / total_rows) * 100
        logger.info(f"Migration progress: {progress:.1f}%")
        
        # Prevent overload
        time.sleep(0.5)

Index Management

-- Drop indexes before bulk insert
ALTER TABLE large_table DROP INDEX idx_column1;
ALTER TABLE large_table DROP INDEX idx_column2;

-- Bulk insert
INSERT INTO large_table SELECT * FROM temp_data;

-- Recreate indexes concurrently
CREATE INDEX CONCURRENTLY idx_column1 ON large_table(column1);
CREATE INDEX CONCURRENTLY idx_column2 ON large_table(column2);

7. NoSQL and Cross-Platform Migration Support

Handle modern database migrations across SQL, NoSQL, and hybrid environments:

Advanced Multi-Database Migration Framework

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
import asyncio
from dataclasses import dataclass

@dataclass
class MigrationOperation:
    operation_type: str
    collection_or_table: str
    data: Dict[str, Any]
    conditions: Optional[Dict[str, Any]] = None
    batch_size: int = 1000

class DatabaseAdapter(ABC):
    @abstractmethod
    async def connect(self, connection_string: str):
        pass
    
    @abstractmethod
    async def execute_migration(self, operation: MigrationOperation):
        pass
    
    @abstractmethod
    async def validate_migration(self, operation: MigrationOperation) -> bool:
        pass
    
    @abstractmethod
    async def rollback_migration(self, operation: MigrationOperation):
        pass

class MongoDBAdapter(DatabaseAdapter):
    def __init__(self):
        self.client = None
        self.db = None
    
    async def connect(self, connection_string: str):
        from motor.motor_asyncio import AsyncIOMotorClient
        self.client = AsyncIOMotorClient(connection_string)
        self.db = self.client.get_default_database()
    
    async def execute_migration(self, operation: MigrationOperation):
        collection = self.db[operation.collection_or_table]
        
        if operation.operation_type == 'add_field':
            await self._add_field(collection, operation)
        elif operation.operation_type == 'rename_field':
            await self._rename_field(collection, operation)
        elif operation.operation_type == 'migrate_data':
            await self._migrate_data(collection, operation)
        elif operation.operation_type == 'create_index':
            await self._create_index(collection, operation)
        elif operation.operation_type == 'schema_validation':
            await self._add_schema_validation(collection, operation)
    
    async def _add_field(self, collection, operation):
        """Add new field to all documents"""
        field_name = operation.data['field_name']
        default_value = operation.data.get('default_value')
        
        # Add field to documents that don't have it
        result = await collection.update_many(
            {field_name: {"$exists": False}},
            {"$set": {field_name: default_value}}
        )
        
        return {
            'matched_count': result.matched_count,
            'modified_count': result.modified_count
        }
    
    async def _rename_field(self, collection, operation):
        """Rename field across all documents"""
        old_name = operation.data['old_name']
        new_name = operation.data['new_name']
        
        result = await collection.update_many(
            {old_name: {"$exists": True}},
            {"$rename": {old_name: new_name}}
        )
        
        return {
            'matched_count': result.matched_count,
            'modified_count': result.modified_count
        }
    
    async def _migrate_data(self, collection, operation):
        """Transform data during migration"""
        pipeline = operation.data['pipeline']
        
        # Use aggregation pipeline for complex transformations
        cursor = collection.aggregate([
            {"$match": operation.conditions or {}},
            *pipeline,
            {"$merge": {
                "into": operation.collection_or_table,
                "on": "_id",
                "whenMatched": "replace"
            }}
        ])
        
        return [doc async for doc in cursor]
    
    async def _add_schema_validation(self, collection, operation):
        """Add JSON schema validation to collection"""
        schema = operation.data['schema']
        
        await self.db.command({
            "collMod": operation.collection_or_table,
            "validator": {"$jsonSchema": schema},
            "validationLevel": "strict",
            "validationAction": "error"
        })

class DynamoDBAdapter(DatabaseAdapter):
    def __init__(self):
        self.dynamodb = None
    
    async def connect(self, connection_string: str):
        import boto3
        self.dynamodb = boto3.resource('dynamodb')
    
    async def execute_migration(self, operation: MigrationOperation):
        table = self.dynamodb.Table(operation.collection_or_table)
        
        if operation.operation_type == 'add_gsi':
            await self._add_global_secondary_index(table, operation)
        elif operation.operation_type == 'migrate_data':
            await self._migrate_table_data(table, operation)
        elif operation.operation_type == 'update_capacity':
            await self._update_capacity(table, operation)
    
    async def _add_global_secondary_index(self, table, operation):
        """Add Global Secondary Index"""
        gsi_spec = operation.data['gsi_specification']
        
        table.update(
            GlobalSecondaryIndexUpdates=[
                {
                    'Create': gsi_spec
                }
            ]
        )
    
    async def _migrate_table_data(self, table, operation):
        """Migrate data between DynamoDB tables"""
        scan_kwargs = {
            'ProjectionExpression': operation.data.get('projection'),
            'FilterExpression': operation.conditions
        }
        
        target_table = self.dynamodb.Table(operation.data['target_table'])
        
        # Scan source table and write to target
        while True:
            response = table.scan(**scan_kwargs)
            
            # Transform and write items
            with target_table.batch_writer() as batch:
                for item in response['Items']:
                    transformed_item = self._transform_item(item, operation.data['transformation'])
                    batch.put_item(Item=transformed_item)
            
            if 'LastEvaluatedKey' not in response:
                break
            scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']

class CassandraAdapter(DatabaseAdapter):
    def __init__(self):
        self.session = None
    
    async def connect(self, connection_string: str):
        from cassandra.cluster import Cluster
        from cassandra.auth import PlainTextAuthProvider
        
        # Parse connection string for auth
        cluster = Cluster(['127.0.0.1'])
        self.session = cluster.connect()
    
    async def execute_migration(self, operation: MigrationOperation):
        if operation.operation_type == 'add_column':
            await self._add_column(operation)
        elif operation.operation_type == 'create_materialized_view':
            await self._create_materialized_view(operation)
        elif operation.operation_type == 'migrate_data':
            await self._migrate_data(operation)
    
    async def _add_column(self, operation):
        """Add column to Cassandra table"""
        table = operation.collection_or_table
        column_name = operation.data['column_name']
        column_type = operation.data['column_type']
        
        cql = f"ALTER TABLE {table} ADD {column_name} {column_type}"
        self.session.execute(cql)
    
    async def _create_materialized_view(self, operation):
        """Create materialized view for denormalization"""
        view_spec = operation.data['view_specification']
        self.session.execute(view_spec)

class CrossPlatformMigrator:
    def __init__(self):
        self.adapters = {
            'postgresql': PostgreSQLAdapter(),
            'mysql': MySQLAdapter(),
            'mongodb': MongoDBAdapter(),
            'dynamodb': DynamoDBAdapter(),
            'cassandra': CassandraAdapter(),
            'redis': RedisAdapter(),
            'elasticsearch': ElasticsearchAdapter()
        }
    
    async def migrate_between_platforms(self, source_config, target_config, migration_spec):
        """Migrate data between different database platforms"""
        source_adapter = self.adapters[source_config['type']]
        target_adapter = self.adapters[target_config['type']]
        
        await source_adapter.connect(source_config['connection_string'])
        await target_adapter.connect(target_config['connection_string'])
        
        # Execute migration plan
        for step in migration_spec['steps']:
            if step['type'] == 'extract':
                data = await self._extract_data(source_adapter, step)
            elif step['type'] == 'transform':
                data = await self._transform_data(data, step)
            elif step['type'] == 'load':
                await self._load_data(target_adapter, data, step)
    
    async def _extract_data(self, adapter, step):
        """Extract data from source database"""
        extraction_op = MigrationOperation(
            operation_type='extract',
            collection_or_table=step['source_table'],
            data=step.get('extraction_params', {}),
            conditions=step.get('conditions'),
            batch_size=step.get('batch_size', 1000)
        )
        
        return await adapter.execute_migration(extraction_op)
    
    async def _transform_data(self, data, step):
        """Transform data between formats"""
        transformation_rules = step['transformation_rules']
        
        transformed_data = []
        for record in data:
            transformed_record = {}
            
            for target_field, source_mapping in transformation_rules.items():
                if isinstance(source_mapping, str):
                    # Simple field mapping
                    transformed_record[target_field] = record.get(source_mapping)
                elif isinstance(source_mapping, dict):
                    # Complex transformation
                    if source_mapping['type'] == 'function':
                        func = source_mapping['function']
                        args = [record.get(arg) for arg in source_mapping['args']]
                        transformed_record[target_field] = func(*args)
                    elif source_mapping['type'] == 'concatenate':
                        fields = source_mapping['fields']
                        separator = source_mapping.get('separator', ' ')
                        values = [str(record.get(field, '')) for field in fields]
                        transformed_record[target_field] = separator.join(values)
            
            transformed_data.append(transformed_record)
        
        return transformed_data
    
    async def _load_data(self, adapter, data, step):
        """Load data into target database"""
        load_op = MigrationOperation(
            operation_type='load',
            collection_or_table=step['target_table'],
            data={'records': data},
            batch_size=step.get('batch_size', 1000)
        )
        
        return await adapter.execute_migration(load_op)

# Example usage
async def migrate_sql_to_nosql():
    """Example: Migrate from PostgreSQL to MongoDB"""
    migrator = CrossPlatformMigrator()
    
    source_config = {
        'type': 'postgresql',
        'connection_string': 'postgresql://user:pass@localhost/db'
    }
    
    target_config = {
        'type': 'mongodb',
        'connection_string': 'mongodb://localhost:27017/db'
    }
    
    migration_spec = {
        'steps': [
            {
                'type': 'extract',
                'source_table': 'users',
                'conditions': {'active': True},
                'batch_size': 5000
            },
            {
                'type': 'transform',
                'transformation_rules': {
                    '_id': 'id',
                    'full_name': {
                        'type': 'concatenate',
                        'fields': ['first_name', 'last_name'],
                        'separator': ' '
                    },
                    'metadata': {
                        'type': 'function',
                        'function': lambda created, updated: {
                            'created_at': created,
                            'updated_at': updated
                        },
                        'args': ['created_at', 'updated_at']
                    }
                }
            },
            {
                'type': 'load',
                'target_table': 'users',
                'batch_size': 1000
            }
        ]
    }
    
    await migrator.migrate_between_platforms(source_config, target_config, migration_spec)

8. Modern Migration Tools and Change Data Capture

Integrate with enterprise migration tools and real-time sync:

Atlas Schema Migrations (MongoDB)

// atlas-migration.js
const { MongoClient } = require('mongodb');

class AtlasMigration {
    constructor(connectionString) {
        this.client = new MongoClient(connectionString);
        this.migrations = new Map();
    }
    
    register(version, migration) {
        this.migrations.set(version, migration);
    }
    
    async migrate() {
        await this.client.connect();
        const db = this.client.db();
        
        // Get current version
        const versionsCollection = db.collection('schema_versions');
        const currentVersion = await versionsCollection
            .findOne({}, { sort: { version: -1 } });
        
        const startVersion = currentVersion?.version || 0;
        
        // Run pending migrations
        for (const [version, migration] of this.migrations) {
            if (version > startVersion) {
                console.log(`Running migration ${version}`);
                
                const session = this.client.startSession();
                
                try {
                    await session.withTransaction(async () => {
                        await migration.up(db, session);
                        await versionsCollection.insertOne({
                            version,
                            applied_at: new Date(),
                            checksum: migration.checksum
                        });
                    });
                } catch (error) {
                    console.error(`Migration ${version} failed:`, error);
                    if (migration.down) {
                        await migration.down(db, session);
                    }
                    throw error;
                } finally {
                    await session.endSession();
                }
            }
        }
    }
}

// Example MongoDB schema migration
const migration_001 = {
    checksum: 'sha256:abc123...',
    
    async up(db, session) {
        // Add new field to existing documents
        await db.collection('users').updateMany(
            { email_verified: { $exists: false } },
            { 
                $set: { 
                    email_verified: false,
                    verification_token: null,
                    verification_expires: null
                }
            },
            { session }
        );
        
        // Create new index
        await db.collection('users').createIndex(
            { email_verified: 1, verification_expires: 1 },
            { session }
        );
        
        // Add schema validation
        await db.command({
            collMod: 'users',
            validator: {
                $jsonSchema: {
                    bsonType: 'object',
                    required: ['email', 'email_verified'],
                    properties: {
                        email: { bsonType: 'string' },
                        email_verified: { bsonType: 'bool' },
                        verification_token: { 
                            bsonType: ['string', 'null'] 
                        }
                    }
                }
            }
        }, { session });
    },
    
    async down(db, session) {
        // Remove schema validation
        await db.command({
            collMod: 'users',
            validator: {}
        }, { session });
        
        // Drop index
        await db.collection('users').dropIndex(
            { email_verified: 1, verification_expires: 1 },
            { session }
        );
        
        // Remove fields
        await db.collection('users').updateMany(
            {},
            { 
                $unset: {
                    email_verified: '',
                    verification_token: '',
                    verification_expires: ''
                }
            },
            { session }
        );
    }
};

Change Data Capture (CDC) for Real-time Sync

# cdc-migration.py
import asyncio
from kafka import KafkaConsumer, KafkaProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json

class CDCMigrationManager:
    def __init__(self, config):
        self.config = config
        self.consumer = None
        self.producer = None
        self.schema_registry = None
        self.active_migrations = {}
    
    async def setup_cdc_pipeline(self):
        """Setup Change Data Capture pipeline"""
        # Kafka consumer for CDC events
        self.consumer = KafkaConsumer(
            'database.changes',
            bootstrap_servers=self.config['kafka_brokers'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='migration-consumer',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        # Kafka producer for processed events
        self.producer = KafkaProducer(
            bootstrap_servers=self.config['kafka_brokers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # Schema registry for data validation
        self.schema_registry = SchemaRegistryClient({
            'url': self.config['schema_registry_url']
        })
    
    async def process_cdc_events(self):
        """Process CDC events and apply to target databases"""
        for message in self.consumer:
            event = message.value
            
            # Parse CDC event
            operation = event['operation']  # INSERT, UPDATE, DELETE
            table = event['table']
            data = event['data']
            
            # Check if this table has active migration
            if table in self.active_migrations:
                migration_config = self.active_migrations[table]
                await self.apply_migration_transformation(event, migration_config)
            else:
                # Standard replication
                await self.replicate_change(event)
    
    async def apply_migration_transformation(self, event, migration_config):
        """Apply data transformation during migration"""
        transformation_rules = migration_config['transformation_rules']
        target_tables = migration_config['target_tables']
        
        # Transform data according to migration rules
        transformed_data = {}
        for target_field, rule in transformation_rules.items():
            if isinstance(rule, str):
                # Simple field mapping
                transformed_data[target_field] = event['data'].get(rule)
            elif isinstance(rule, dict):
                # Complex transformation
                if rule['type'] == 'function':
                    func_name = rule['function']
                    func = getattr(self, f'transform_{func_name}')
                    args = [event['data'].get(arg) for arg in rule['args']]
                    transformed_data[target_field] = func(*args)
        
        # Apply to target tables
        for target_table in target_tables:
            await self.apply_to_target(target_table, event['operation'], transformed_data)
    
    async def setup_debezium_connector(self, source_db_config):
        """Configure Debezium for CDC"""
        connector_config = {
            "name": f"migration-connector-{source_db_config['name']}",
            "config": {
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "database.hostname": source_db_config['host'],
                "database.port": source_db_config['port'],
                "database.user": source_db_config['user'],
                "database.password": source_db_config['password'],
                "database.dbname": source_db_config['database'],
                "database.server.name": source_db_config['name'],
                "table.include.list": ",".join(source_db_config['tables']),
                "plugin.name": "pgoutput",
                "slot.name": f"migration_slot_{source_db_config['name']}",
                "publication.name": f"migration_pub_{source_db_config['name']}",
                "transforms": "route",
                "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
                "transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
                "transforms.route.replacement": "database.changes"
            }
        }
        
        # Submit connector to Kafka Connect
        import requests
        response = requests.post(
            f"{self.config['kafka_connect_url']}/connectors",
            json=connector_config,
            headers={'Content-Type': 'application/json'}
        )
        
        if response.status_code != 201:
            raise Exception(f"Failed to create connector: {response.text}")

Advanced Monitoring and Observability

class EnterpriseeMigrationMonitor:
    def __init__(self, config):
        self.config = config
        self.metrics_client = self.setup_metrics_client()
        self.alerting_client = self.setup_alerting_client()
        self.migration_state = {
            'current_migrations': {},
            'completed_migrations': {},
            'failed_migrations': {}
        }
    
    def setup_metrics_client(self):
        """Setup Prometheus/Datadog metrics client"""
        from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry
        
        registry = CollectorRegistry()
        
        self.metrics = {
            'migration_duration': Histogram(
                'migration_duration_seconds',
                'Time spent on migration',
                ['migration_id', 'source_db', 'target_db'],
                registry=registry
            ),
            'rows_migrated': Counter(
                'migration_rows_total',
                'Total rows migrated',
                ['migration_id', 'table_name'],
                registry=registry
            ),
            'migration_errors': Counter(
                'migration_errors_total',
                'Total migration errors',
                ['migration_id', 'error_type'],
                registry=registry
            ),
            'active_migrations': Gauge(
                'active_migrations_count',
                'Number of active migrations',
                registry=registry
            ),
            'data_lag': Gauge(
                'migration_data_lag_seconds',
                'Data lag between source and target',
                ['migration_id'],
                registry=registry
            )
        }
        
        return registry
    
    async def track_migration_progress(self, migration_id):
        """Real-time migration progress tracking"""
        migration = self.migration_state['current_migrations'][migration_id]
        
        while migration['status'] == 'running':
            # Calculate progress metrics
            progress_stats = await self.calculate_progress_stats(migration)
            
            # Update Prometheus metrics
            self.metrics['rows_migrated'].labels(
                migration_id=migration_id,
                table_name=migration['table']
            ).inc(progress_stats['rows_processed_delta'])
            
            self.metrics['data_lag'].labels(
                migration_id=migration_id
            ).set(progress_stats['lag_seconds'])
            
            # Check for anomalies
            await self.detect_migration_anomalies(migration_id, progress_stats)
            
            # Generate alerts if needed
            await self.check_alert_conditions(migration_id, progress_stats)
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def detect_migration_anomalies(self, migration_id, stats):
        """AI-powered anomaly detection for migrations"""
        # Simple statistical anomaly detection
        if stats['rows_per_second'] < stats['expected_rows_per_second'] * 0.5:
            await self.trigger_alert(
                'migration_slow',
                f"Migration {migration_id} is running slower than expected",
                {'stats': stats}
            )
        
        if stats['error_rate'] > 0.01:  # 1% error rate threshold
            await self.trigger_alert(
                'migration_high_error_rate',
                f"Migration {migration_id} has high error rate: {stats['error_rate']}",
                {'stats': stats}
            )
        
        if stats['memory_usage'] > 0.8:  # 80% memory usage
            await self.trigger_alert(
                'migration_high_memory',
                f"Migration {migration_id} is using high memory: {stats['memory_usage']}",
                {'stats': stats}
            )
    
    async def setup_migration_dashboard(self):
        """Setup Grafana dashboard for migration monitoring"""
        dashboard_config = {
            "dashboard": {
                "title": "Database Migration Monitoring",
                "panels": [
                    {
                        "title": "Migration Progress",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "rate(migration_rows_total[5m])",
                                "legendFormat": "{{migration_id}} - {{table_name}}"
                            }
                        ]
                    },
                    {
                        "title": "Data Lag",
                        "type": "singlestat",
                        "targets": [
                            {
                                "expr": "migration_data_lag_seconds",
                                "legendFormat": "Lag (seconds)"
                            }
                        ]
                    },
                    {
                        "title": "Error Rate",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "rate(migration_errors_total[5m])",
                                "legendFormat": "{{error_type}}"
                            }
                        ]
                    },
                    {
                        "title": "Migration Duration",
                        "type": "heatmap",
                        "targets": [
                            {
                                "expr": "migration_duration_seconds",
                                "legendFormat": "Duration"
                            }
                        ]
                    }
                ]
            }
        }
        
        # Submit dashboard to Grafana API
        import requests
        response = requests.post(
            f"{self.config['grafana_url']}/api/dashboards/db",
            json=dashboard_config,
            headers={
                'Authorization': f"Bearer {self.config['grafana_token']}",
                'Content-Type': 'application/json'
            }
        )
        
        return response.json()

9. Event Sourcing and CQRS Migrations

Handle event-driven architecture migrations:

Event Store Migration Strategy

class EventStoreMigrator:
    def __init__(self, event_store_config):
        self.event_store = EventStore(event_store_config)
        self.event_transformers = {}
        self.aggregate_rebuilders = {}
    
    def register_event_transformer(self, event_type, transformer):
        """Register transformation for specific event type"""
        self.event_transformers[event_type] = transformer
    
    def register_aggregate_rebuilder(self, aggregate_type, rebuilder):
        """Register rebuilder for aggregate snapshots"""
        self.aggregate_rebuilders[aggregate_type] = rebuilder
    
    async def migrate_events(self, from_version, to_version):
        """Migrate events from one schema version to another"""
        # Get all events that need migration
        events_cursor = self.event_store.get_events_by_version_range(
            from_version, to_version
        )
        
        migrated_events = []
        
        async for event in events_cursor:
            if event.event_type in self.event_transformers:
                transformer = self.event_transformers[event.event_type]
                migrated_event = await transformer.transform(event)
                migrated_events.append(migrated_event)
            else:
                # No transformation needed
                migrated_events.append(event)
        
        # Write migrated events to new stream
        await self.event_store.append_events(
            f"migration-{to_version}",
            migrated_events
        )
        
        # Rebuild aggregates with new events
        await self.rebuild_aggregates(migrated_events)
    
    async def rebuild_aggregates(self, events):
        """Rebuild aggregate snapshots from migrated events"""
        aggregates_to_rebuild = set()
        
        for event in events:
            aggregates_to_rebuild.add(event.aggregate_id)
        
        for aggregate_id in aggregates_to_rebuild:
            aggregate_type = self.get_aggregate_type(aggregate_id)
            
            if aggregate_type in self.aggregate_rebuilders:
                rebuilder = self.aggregate_rebuilders[aggregate_type]
                await rebuilder.rebuild(aggregate_id)

# Example event transformation
class UserEventTransformer:
    async def transform(self, event):
        """Transform UserCreated event from v1 to v2"""
        if event.event_type == 'UserCreated' and event.version == 1:
            # v1 had separate first_name and last_name
            # v2 uses full_name
            old_data = event.data
            new_data = {
                'user_id': old_data['user_id'],
                'full_name': f"{old_data['first_name']} {old_data['last_name']}",
                'email': old_data['email'],
                'created_at': old_data['created_at']
            }
            
            return Event(
                event_id=event.event_id,
                event_type='UserCreated',
                aggregate_id=event.aggregate_id,
                version=2,
                data=new_data,
                metadata=event.metadata
            )
        
        return event

10. Cloud Database Migration Automation

Automate cloud database migrations with infrastructure as code:

AWS Database Migration with CDK

// aws-db-migration.ts
import * as cdk from 'aws-cdk-lib';
import * as dms from 'aws-cdk-lib/aws-dms';
import * as rds from 'aws-cdk-lib/aws-rds';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
import * as sfnTasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

export class DatabaseMigrationStack extends cdk.Stack {
    constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
        super(scope, id, props);
        
        // Create VPC for migration
        const vpc = new ec2.Vpc(this, 'MigrationVPC', {
            maxAzs: 2,
            subnetConfiguration: [
                {
                    cidrMask: 24,
                    name: 'private',
                    subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS
                },
                {
                    cidrMask: 24,
                    name: 'public',
                    subnetType: ec2.SubnetType.PUBLIC
                }
            ]
        });
        
        // DMS Replication Instance
        const replicationInstance = new dms.CfnReplicationInstance(this, 'ReplicationInstance', {
            replicationInstanceClass: 'dms.t3.medium',
            replicationInstanceIdentifier: 'migration-instance',
            allocatedStorage: 100,
            autoMinorVersionUpgrade: true,
            multiAz: false,
            publiclyAccessible: false,
            replicationSubnetGroupIdentifier: this.createSubnetGroup(vpc).ref
        });
        
        // Source and Target Endpoints
        const sourceEndpoint = new dms.CfnEndpoint(this, 'SourceEndpoint', {
            endpointType: 'source',
            engineName: 'postgres',
            serverName: 'source-db.example.com',
            port: 5432,
            databaseName: 'source_db',
            username: 'migration_user',
            password: 'migration_password'
        });
        
        const targetEndpoint = new dms.CfnEndpoint(this, 'TargetEndpoint', {
            endpointType: 'target',
            engineName: 'postgres',
            serverName: 'target-db.example.com',
            port: 5432,
            databaseName: 'target_db',
            username: 'migration_user',
            password: 'migration_password'
        });
        
        // Migration Task
        const migrationTask = new dms.CfnReplicationTask(this, 'MigrationTask', {
            replicationTaskIdentifier: 'full-load-and-cdc',
            sourceEndpointArn: sourceEndpoint.ref,
            targetEndpointArn: targetEndpoint.ref,
            replicationInstanceArn: replicationInstance.ref,
            migrationType: 'full-load-and-cdc',
            tableMappings: JSON.stringify({
                "rules": [
                    {
                        "rule-type": "selection",
                        "rule-id": "1",
                        "rule-name": "1",
                        "object-locator": {
                            "schema-name": "public",
                            "table-name": "%"
                        },
                        "rule-action": "include"
                    }
                ]
            }),
            replicationTaskSettings: JSON.stringify({
                "TargetMetadata": {
                    "TargetSchema": "",
                    "SupportLobs": true,
                    "FullLobMode": false,
                    "LobChunkSize": 0,
                    "LimitedSizeLobMode": true,
                    "LobMaxSize": 32,
                    "LoadMaxFileSize": 0,
                    "ParallelLoadThreads": 0,
                    "ParallelLoadBufferSize": 0,
                    "BatchApplyEnabled": false,
                    "TaskRecoveryTableEnabled": false
                },
                "FullLoadSettings": {
                    "TargetTablePrepMode": "DROP_AND_CREATE",
                    "CreatePkAfterFullLoad": false,
                    "StopTaskCachedChangesApplied": false,
                    "StopTaskCachedChangesNotApplied": false,
                    "MaxFullLoadSubTasks": 8,
                    "TransactionConsistencyTimeout": 600,
                    "CommitRate": 10000
                },
                "Logging": {
                    "EnableLogging": true,
                    "LogComponents": [
                        {
                            "Id": "SOURCE_UNLOAD",
                            "Severity": "LOGGER_SEVERITY_DEFAULT"
                        },
                        {
                            "Id": "TARGET_LOAD",
                            "Severity": "LOGGER_SEVERITY_DEFAULT"
                        }
                    ]
                }
            })
        });
        
        // Migration orchestration with Step Functions
        this.createMigrationOrchestration(migrationTask);
    }
    
    private createSubnetGroup(vpc: ec2.Vpc): dms.CfnReplicationSubnetGroup {
        return new dms.CfnReplicationSubnetGroup(this, 'ReplicationSubnetGroup', {
            replicationSubnetGroupDescription: 'Subnet group for DMS',
            replicationSubnetGroupIdentifier: 'migration-subnet-group',
            subnetIds: vpc.privateSubnets.map(subnet => subnet.subnetId)
        });
    }
    
    private createMigrationOrchestration(migrationTask: dms.CfnReplicationTask): void {
        // Lambda functions for migration steps
        const startMigrationFunction = new lambda.Function(this, 'StartMigration', {
            runtime: lambda.Runtime.PYTHON_3_9,
            handler: 'index.handler',
            code: lambda.Code.fromInline(`
import boto3
import json

def handler(event, context):
    dms = boto3.client('dms')
    task_arn = event['task_arn']
    
    response = dms.start_replication_task(
        ReplicationTaskArn=task_arn,
        StartReplicationTaskType='start-replication'
    )
    
    return {
        'statusCode': 200,
        'task_arn': task_arn,
        'task_status': response['ReplicationTask']['Status']
    }
            `)
        });
        
        const checkMigrationStatusFunction = new lambda.Function(this, 'CheckMigrationStatus', {
            runtime: lambda.Runtime.PYTHON_3_9,
            handler: 'index.handler',
            code: lambda.Code.fromInline(`
import boto3
import json

def handler(event, context):
    dms = boto3.client('dms')
    task_arn = event['task_arn']
    
    response = dms.describe_replication_tasks(
        Filters=[
            {
                'Name': 'replication-task-arn',
                'Values': [task_arn]
            }
        ]
    )
    
    task = response['ReplicationTasks'][0]
    status = task['Status']
    
    return {
        'task_arn': task_arn,
        'task_status': status,
        'is_complete': status in ['stopped', 'failed', 'ready']
    }
            `)
        });
        
        // Step Function definition
        const startMigrationTask = new sfnTasks.LambdaInvoke(this, 'StartMigrationTask', {
            lambdaFunction: startMigrationFunction,
            inputPath: '$',
            outputPath: '$'
        });
        
        const checkStatusTask = new sfnTasks.LambdaInvoke(this, 'CheckMigrationStatusTask', {
            lambdaFunction: checkMigrationStatusFunction,
            inputPath: '$',
            outputPath: '$'
        });
        
        const waitTask = new stepfunctions.Wait(this, 'WaitForMigration', {
            time: stepfunctions.WaitTime.duration(cdk.Duration.minutes(5))
        });
        
        const migrationComplete = new stepfunctions.Succeed(this, 'MigrationComplete');
        const migrationFailed = new stepfunctions.Fail(this, 'MigrationFailed');
        
        // Define state machine
        const definition = startMigrationTask
            .next(waitTask)
            .next(checkStatusTask)
            .next(new stepfunctions.Choice(this, 'IsMigrationComplete?')
                .when(stepfunctions.Condition.booleanEquals('$.is_complete', true),
                      new stepfunctions.Choice(this, 'MigrationSuccessful?')
                          .when(stepfunctions.Condition.stringEquals('$.task_status', 'stopped'), migrationComplete)
                          .otherwise(migrationFailed))
                .otherwise(waitTask));
        
        new stepfunctions.StateMachine(this, 'MigrationStateMachine', {
            definition: definition,
            timeout: cdk.Duration.hours(24)
        });
    }
}

Output Format

  1. Comprehensive Migration Strategy: Multi-database platform support with NoSQL integration
  2. Cross-Platform Migration Tools: SQL to NoSQL, NoSQL to SQL, and hybrid migrations
  3. Modern Tooling Integration: Atlas, Debezium, Flyway, Prisma, and cloud-native solutions
  4. Change Data Capture Pipeline: Real-time synchronization with Kafka and schema registry
  5. Event Sourcing Migrations: Event store transformations and aggregate rebuilding
  6. Cloud Infrastructure Automation: AWS DMS, GCP Database Migration Service, Azure DMS
  7. Enterprise Monitoring Suite: Prometheus metrics, Grafana dashboards, and anomaly detection
  8. Advanced Validation Framework: Multi-database integrity checks and performance benchmarks
  9. Automated Rollback Procedures: Platform-specific recovery strategies
  10. Performance Optimization: Batch processing, parallel execution, and resource management

Focus on zero-downtime migrations with comprehensive validation, automated rollbacks, and enterprise-grade monitoring across all supported database platforms.

Cross-Command Integration

This command integrates seamlessly with other development workflow commands to create a comprehensive database-first development pipeline:

Integration with API Development (/api-scaffold)

# integrated-db-api-config.py
class IntegratedDatabaseApiConfig:
    def __init__(self):
        self.api_config = self.load_api_config()        # From /api-scaffold
        self.db_config = self.load_db_config()          # From /db-migrate
        self.migration_config = self.load_migration_config()
    
    def generate_api_aware_migrations(self):
        """Generate migrations that consider API endpoints and schemas"""
        return {
            # API-aware migration strategy
            'api_migration_strategy': f"""
-- Migration with API endpoint consideration
-- Migration: {datetime.now().strftime('%Y%m%d_%H%M%S')}_api_aware_schema_update.sql

-- Check API dependency before migration
DO $$
BEGIN
    -- Verify API endpoints that depend on this schema
    IF EXISTS (
        SELECT 1 FROM api_endpoints 
        WHERE schema_dependencies @> '["users", "profiles"]'
        AND is_active = true
    ) THEN
        RAISE NOTICE 'Found active API endpoints depending on this schema';
        
        -- Create migration strategy with API versioning
        CREATE TABLE IF NOT EXISTS api_migration_log (
            id SERIAL PRIMARY KEY,
            migration_name VARCHAR(255) NOT NULL,
            api_version VARCHAR(50) NOT NULL,
            schema_changes JSONB,
            rollback_script TEXT,
            created_at TIMESTAMP DEFAULT NOW()
        );
        
        -- Log this migration for API tracking
        INSERT INTO api_migration_log (
            migration_name, 
            api_version, 
            schema_changes
        ) VALUES (
            'api_aware_schema_update',
            '{self.api_config.get("version", "v1")}',
            '{{"tables": ["users", "profiles"], "type": "schema_update"}}'::jsonb
        );
    END IF;
END $$;

-- Backward-compatible schema changes
ALTER TABLE users ADD COLUMN IF NOT EXISTS new_field VARCHAR(255);

-- Create view for API backward compatibility
CREATE OR REPLACE VIEW users_api_v1 AS 
SELECT 
    id,
    username,
    email,
    -- Maintain API compatibility
    COALESCE(new_field, 'default_value') as new_field,
    created_at,
    updated_at
FROM users;

-- Grant API service access
GRANT SELECT ON users_api_v1 TO {self.api_config.get("db_user", "api_service")};

COMMIT;
            """,
            
            # Database connection pool optimization for API
            'connection_pool_config': {
                'fastapi': f"""
# FastAPI with optimized database connections
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool

class DatabaseConfig:
    def __init__(self):
        self.database_url = "{self.db_config.get('url', 'postgresql://localhost/app')}"
        self.api_config = {self.api_config}
        
    def create_engine(self):
        return create_engine(
            self.database_url,
            poolclass=QueuePool,
            pool_size={self.api_config.get('db_pool_size', 20)},
            max_overflow={self.api_config.get('db_max_overflow', 0)},
            pool_pre_ping=True,
            pool_recycle=3600,
            echo={str(self.api_config.get('debug', False)).lower()}
        )
    
    def get_session_maker(self):
        engine = self.create_engine()
        return sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Migration-aware API dependencies
async def get_db_with_migration_check():
    # Check if migrations are running
    async with get_db() as session:
        result = await session.execute(
            text("SELECT COUNT(*) FROM schema_migrations WHERE is_running = true")
        )
        running_migrations = result.scalar()
        
        if running_migrations > 0:
            raise HTTPException(
                status_code=503,
                detail="Database migrations in progress. API temporarily unavailable."
            )
        
        yield session
                """,
                
                'express': f"""
// Express.js with database migration awareness
const {{ Pool }} = require('pg');
const express = require('express');
const app = express();

class DatabaseManager {{
    constructor() {{
        this.pool = new Pool({{
            connectionString: '{self.db_config.get('url', 'postgresql://localhost/app')}',
            max: {self.api_config.get('db_pool_size', 20)},
            idleTimeoutMillis: 30000,
            connectionTimeoutMillis: 2000,
        }});
        
        this.migrationStatus = new Map();
    }}
    
    async checkMigrationStatus() {{
        try {{
            const client = await this.pool.connect();
            const result = await client.query(
                'SELECT COUNT(*) as count FROM schema_migrations WHERE is_running = true'
            );
            client.release();
            
            return result.rows[0].count === '0';
        }} catch (error) {{
            console.error('Failed to check migration status:', error);
            return false;
        }}
    }}
    
    // Middleware to check migration status
    migrationStatusMiddleware() {{
        return async (req, res, next) => {{
            const isSafe = await this.checkMigrationStatus();
            
            if (!isSafe) {{
                return res.status(503).json({{
                    error: 'Database migrations in progress',
                    message: 'API temporarily unavailable during database updates'
                }});
            }}
            
            next();
        }};
    }}
}}

const dbManager = new DatabaseManager();
app.use('/api', dbManager.migrationStatusMiddleware());
                """
            }
        }
    
    def generate_api_schema_sync(self):
        """Generate API schema synchronization with database"""
        return f"""
# API Schema Synchronization
import asyncio
import aiohttp
from sqlalchemy import text

class ApiSchemaSync:
    def __init__(self, api_base_url="{self.api_config.get('base_url', 'http://localhost:8000')}"):
        self.api_base_url = api_base_url
        self.db_config = {self.db_config}
    
    async def notify_api_of_schema_change(self, migration_name, schema_changes):
        '''Notify API service of database schema changes'''
        async with aiohttp.ClientSession() as session:
            payload = {{
                'migration_name': migration_name,
                'schema_changes': schema_changes,
                'timestamp': datetime.now().isoformat()
            }}
            
            try:
                async with session.post(
                    f"{{self.api_base_url}}/internal/schema-update",
                    json=payload,
                    timeout=30
                ) as response:
                    if response.status == 200:
                        print(f"API notified of schema changes: {{migration_name}}")
                    else:
                        print(f"Failed to notify API: {{response.status}}")
            except Exception as e:
                print(f"Error notifying API: {{e}}")
    
    async def validate_api_compatibility(self, proposed_changes):
        '''Validate that proposed schema changes won't break API'''
        async with aiohttp.ClientSession() as session:
            try:
                async with session.post(
                    f"{{self.api_base_url}}/internal/validate-schema",
                    json={{'proposed_changes': proposed_changes}},
                    timeout=30
                ) as response:
                    result = await response.json()
                    return result.get('compatible', False), result.get('issues', [])
            except Exception as e:
                print(f"Error validating API compatibility: {{e}}")
                return False, [f"Validation service unavailable: {{e}}"]
        """

Complete Workflow Integration

# complete-database-workflow.py
class CompleteDatabaseWorkflow:
    def __init__(self):
        self.configs = {
            'api': self.load_api_config(),           # From /api-scaffold
            'testing': self.load_test_config(),      # From /test-harness
            'security': self.load_security_config(), # From /security-scan
            'docker': self.load_docker_config(),     # From /docker-optimize
            'k8s': self.load_k8s_config(),          # From /k8s-manifest
            'frontend': self.load_frontend_config(), # From /frontend-optimize
            'database': self.load_db_config()        # From /db-migrate
        }
    
    async def execute_complete_workflow(self):
        console.log("🚀 Starting complete database migration workflow...")
        
        # 1. Pre-migration Security Scan
        security_scan = await self.run_security_scan()
        console.log("✅ Database security scan completed")
        
        # 2. API Compatibility Check
        api_compatibility = await self.check_api_compatibility()
        console.log("✅ API compatibility verified")
        
        # 3. Container-based Migration Testing
        container_tests = await self.run_container_tests()
        console.log("✅ Container-based migration tests passed")
        
        # 4. Production Migration with Monitoring
        migration_result = await self.run_production_migration()
        console.log("✅ Production migration completed")
        
        # 5. Frontend Cache Invalidation
        cache_invalidation = await self.invalidate_frontend_caches()
        console.log("✅ Frontend caches invalidated")
        
        # 6. Kubernetes Deployment Update
        k8s_deployment = await self.update_k8s_deployment()
        console.log("✅ Kubernetes deployment updated")
        
        # 7. Post-migration Testing Pipeline
        post_migration_tests = await self.run_post_migration_tests()
        console.log("✅ Post-migration tests completed")
        
        return {
            'status': 'success',
            'workflow_id': self.generate_workflow_id(),
            'components': {
                security_scan,
                api_compatibility,
                container_tests,
                migration_result,
                cache_invalidation,
                k8s_deployment,
                post_migration_tests
            },
            'migration_summary': {
                'zero_downtime': True,
                'rollback_plan': 'available',
                'performance_impact': 'minimal',
                'security_validated': True
            }
        }

This integrated database migration workflow ensures that database changes are coordinated across all layers of the application stack, from API compatibility to frontend cache invalidation, creating a comprehensive database-first development pipeline that maintains data integrity and system reliability.

Focus on enterprise-grade migrations with zero-downtime deployments, comprehensive monitoring, and platform-agnostic strategies for modern polyglot persistence architectures.