Skip to content
Go back

Data Migrations: Where One Wrong Move Can Ruin Your Weekend

Last month, I was staring at a production database with 1 million+ rows that needed migrating. The schema change looked simple—just restructure some JSON columns and update a few indexes. Four hours later, I was still testing my scripts and preparing rollback plans.

That migration taught me what dozens before hadn’t: data migrations aren’t about moving data. They’re about risk mitigation disguised as engineering work.

Here’s everything I’ve learned painfully from several large data migrations.

Table of contents

Open Table of contents

The 90/10 Rule: Sharpen Your Axe First

There’s an old saying (often misattributed to Lincoln): “If I had six hours to chop down a tree, I’d spend four hours sharpening the axe.”

For data migrations, it’s more extreme. If I have 10 hours for a migration, I spend 9 hours analyzing. Here’s why that last hour becomes trivial when you’ve done the prep work right.

The Analysis Phase That Actually Matters

Before writing a single migration script, I run these queries:

-- Understand your data distribution
SELECT
    COUNT(*) as total_rows,
    COUNT(DISTINCT user_id) as unique_users,
    MIN(created_at) as oldest_record,
    MAX(created_at) as newest_record,
    AVG(LENGTH(data_column)) as avg_data_size,
    MAX(LENGTH(data_column)) as max_data_size
FROM your_table;

-- Find the edge cases that will break your migration
SELECT
    data_column,
    COUNT(*) as occurrences
FROM your_table
WHERE
    data_column IS NULL
    OR data_column = ''
    OR LENGTH(data_column) > 10000
GROUP BY data_column
ORDER BY occurrences DESC;

-- Check for referential integrity issues
SELECT
    t1.id,
    t1.foreign_key_id
FROM table1 t1
LEFT JOIN table2 t2 ON t1.foreign_key_id = t2.id
WHERE t2.id IS NULL;

This is where AI shines. Feed your schema (not your data) to Claude:

Here's my database schema:
[paste schema]

Generate SQL queries to:
1. Find data distribution patterns
2. Identify potential migration blockers
3. Calculate migration time estimates based on row counts
4. Detect anomalies that might cause issues

Claude will generate queries you didn’t think to write. It will give you the tools to succeed. You’re still in the driver’s seat. Claude gives you a backup camera.

Define Success Before You Start

The worst time to figure out if your migration worked is after it’s done. Define these criteria upfront:

# success_criteria.py
class MigrationValidator:
    def __init__(self, connection):
        self.conn = connection
        self.checks = []

    def validate_row_counts(self):
        """Total rows should match within 0.01%"""
        source_count = self.conn.execute("SELECT COUNT(*) FROM old_table")
        target_count = self.conn.execute("SELECT COUNT(*) FROM new_table")

        tolerance = source_count * 0.0001  # 0.01% tolerance
        assert abs(source_count - target_count) <= tolerance

    def validate_checksums(self):
        """Critical columns should have matching checksums"""
        source_checksum = self.conn.execute("""
            SELECT MD5(GROUP_CONCAT(user_id, amount ORDER BY id))
            FROM old_table
        """)
        target_checksum = self.conn.execute("""
            SELECT MD5(GROUP_CONCAT(user_id, amount ORDER BY id))
            FROM new_table
        """)
        assert source_checksum == target_checksum

    def validate_business_logic(self):
        """Business-critical queries should return same results"""
        # Your specific business validations here
        pass

Run these checks immediately after migration.

The Production Playbook: Zero Downtime or Minimal Downtime

Pattern 1: Copy-Modify-Swap (My Default)

Never modify production data in place. Ever.

# The pattern that's saved me countless times
1. Create a copy of your data
2. Run migration on the copy
3. Validate the copy thoroughly
4. Point application to new copy
5. Keep old data as instant rollback

Here’s the actual implementation:

# migration_script.py
import time
from datetime import datetime

class SafeMigration:
    def __init__(self, source_table, target_table):
        self.source = source_table
        self.target = target_table
        self.checkpoint_file = f"migration_checkpoint_{datetime.now()}.json"

    def migrate_with_checkpoints(self, batch_size=1000):
        """Migrate data in batches with checkpoint support"""

        # Load checkpoint if exists
        last_id = self.load_checkpoint()

        while True:
            # Fetch batch
            batch = self.fetch_batch(last_id, batch_size)
            if not batch:
                break

            # Process and insert
            transformed = self.transform_batch(batch)
            self.insert_batch(transformed)

            # Save checkpoint
            last_id = batch[-1]['id']
            self.save_checkpoint(last_id)

            # Rate limiting
            time.sleep(0.1)  # Prevent overwhelming the database

Pattern 2: Synthetic Data Testing

Production is always different from staging. If nothing else, the stakes are higher. You want to test your migration scripts at production scale. Use faker to some fake data to give you a sense of it. If your actual data is in AWS, don’t only test on local. Test on a copy on AWS to get a sense of network throughput and batch sizes.

# generate_test_data.py
from faker import Faker
import random

fake = Faker()

def generate_production_scale_data(row_count):
    """Generate data matching production patterns"""

    # Match your production data distribution
    data = []
    for i in range(row_count):
        record = {
            'id': i,
            'user_id': random.randint(1, row_count // 100),  # ~100 records per user
            'data': fake.json(
                data_columns={
                    'amount': 'pyfloat:2:2:positive',
                    'timestamp': 'date_time',
                    'status': 'random_element:elements=active,inactive,pending'
                }
            ),
            'created_at': fake.date_time_between('-2y', 'now')
        }

        # Add edge cases (5% of data)
        if random.random() < 0.05:
            record['data'] = None  # Test NULL handling
        elif random.random() < 0.02:
            record['data'] = '{}'  # Test empty JSON

        data.append(record)

    return data

# Test with production-scale data
test_data = generate_production_scale_data(1_000_000)
run_migration_dry_run(test_data)

The Security Patterns That Keep You Employed

Principle of Least Privilege

Create a temporary migration user with surgical precision:

-- Create migration user with minimal permissions
CREATE USER 'migration_bot'@'localhost'
IDENTIFIED BY 'temp_password_delete_after_migration';

-- Grant ONLY what's needed
GRANT SELECT ON old_database.* TO 'migration_bot'@'localhost';
GRANT INSERT ON new_database.target_table TO 'migration_bot'@'localhost';
GRANT UPDATE (status_column) ON new_database.target_table TO 'migration_bot'@'localhost';

-- Revoke immediately after migration
DROP USER 'migration_bot'@'localhost';

Handling Encrypted Data

If you must decrypt sensitive data during migration:

# secure_migration.py
import os
from cryptography.fernet import Fernet
from contextlib import contextmanager
import gc

@contextmanager
def temporary_decrypt(encrypted_data, key):
    """Minimize decrypted data lifetime"""
    try:
        cipher = Fernet(key)
        decrypted = cipher.decrypt(encrypted_data)
        yield decrypted
    finally:
        # Overwrite memory
        if 'decrypted' in locals():
            decrypted = b'0' * len(decrypted)
        gc.collect()  # Force garbage collection

# Use it
with temporary_decrypt(encrypted_column, key) as data:
    # Process data quickly
    migrated = transform_data(data)
    # Data is automatically scrubbed when context exits

The Rules I Never Break

1. Always Have a Golden Copy

# Before ANY migration
pg_dump production_db > backup_$(date +%Y%m%d_%H%M%S).sql
aws s3 cp backup_*.sql s3://disaster-recovery/migrations/

# Verify backup is restorable
pg_restore --dbname=test_restore backup_*.sql

This is easier if you’re in a cloud environment and can take snapshots.

2. Build Scripts with Dry Run Mode

Every migration script I write has this structure:

# migrate.py
import argparse
import logging

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--dry-run', action='store_true',
                       help='Show what would be migrated without doing it')
    parser.add_argument('--batch-size', type=int, default=1000)
    parser.add_argument('--limit', type=int,
                       help='Limit number of records (for testing)')

    args = parser.parse_args()

    if args.dry_run:
        logging.info("DRY RUN MODE - No data will be modified")
        connection = get_read_only_connection()
    else:
        connection = get_write_connection()

    # Always show what will happen
    preview_migration(connection, limit=10)

    if not args.dry_run:
        response = input("Proceed with migration? (yes/no): ")
        if response.lower() != 'yes':
            logging.info("Migration cancelled")
            return

    run_migration(connection, args)

3. Checkpoint Everything

# checkpoint_manager.py
import json
import os
from datetime import datetime

class CheckpointManager:
    def __init__(self, migration_id):
        self.checkpoint_file = f"checkpoint_{migration_id}_{datetime.now():%Y%m%d}.json"
        self.stats = {
            'started_at': datetime.now().isoformat(),
            'total_processed': 0,
            'last_successful_id': 0,
            'errors': []
        }

    def save(self):
        """Save after every batch"""
        with open(self.checkpoint_file, 'w') as f:
            json.dump(self.stats, f, indent=2)

    def can_resume(self):
        """Check if we can resume from checkpoint"""
        return os.path.exists(self.checkpoint_file)

    def get_resume_point(self):
        """Get the last successful ID to resume from"""
        if self.can_resume():
            with open(self.checkpoint_file) as f:
                data = json.load(f)
                return data.get('last_successful_id', 0)
        return 0

Real Migration That Almost Went Wrong

A month ago, I was migrating a sensitive table. Simple JSON restructure. The dry run worked perfectly on 10,000 sample rows.

Production had 1 million rows.

What the dry run didn’t catch: 0.003% of records (about 1,400) had malformed JSON from a bug three years ago. The migration crashed at row 847,293.

Because I had checkpoints, I:

  1. Resumed from row 847,000
  2. Added error handling for malformed JSON
  3. Completed the migration
  4. Fixed the bad records separately

Without checkpoints, I would have restarted from zero. That’s 6 hours vs 30 minutes.

What I’m Still Figuring Out

The Validation Window: How long should you run both old and new systems in parallel? I’ve done anywhere from 1 hour to 1 week. Still no perfect answer. Depends on how well the migration goes, and how sensitive the data is for the business.

Partial Rollback Strategies: When 99.9% of data migrates perfectly but 0.1% fails, should you rollback everything or handle the exceptions separately? I lean toward handling exceptions, but it’s case-by-case.

Real-Time Migration Monitoring: I want better dashboards that show migration progress, error rates, and performance impact in real-time. Also for real-time data, doing rollbacks is always a pain. Moving forward is usually easier. Being able to segregate migrated vs not migrated data is key.

The Checklist That Saves Weekends

Before any production migration:

Your Turn

Drop me a note at hello@ashishacharya.com. I’m collecting migration war stories and might do a follow-up post on the really gnarly ones. Let me know if you have some.


Share this post on:

Previous Post
From Shower Thought to Blog Draft: Writing Posts with Claude Code Mobile
Next Post
The Two-Key System: How We Turned House Hunting Into a Team Sport with AI