What is database sharding and how do you implement it?

Answer

Database sharding is a horizontal partitioning technique that splits a large database into smaller, more manageable pieces called shards. Each shard is a separate database that contains a subset of the total data, distributed across multiple servers to improve performance, scalability, and availability.

Sharding Strategies

1. Range-Based Sharding

-- Shard by date ranges
-- Shard 1: Orders from 2023
CREATE TABLE orders_2023 (
    order_id BIGINT PRIMARY KEY,
    customer_id INT,
    order_date DATE,
    total DECIMAL(10,2)
) -- Stored on Server 1

-- Shard 2: Orders from 2024
CREATE TABLE orders_2024 (
    order_id BIGINT PRIMARY KEY,
    customer_id INT,
    order_date DATE,
    total DECIMAL(10,2)
) -- Stored on Server 2

-- Application logic for routing
function getShardForDate(orderDate) {
    if (orderDate >= '2024-01-01') return 'shard_2024';
    if (orderDate >= '2023-01-01') return 'shard_2023';
    return 'shard_archive';
}

2. Hash-Based Sharding

-- Shard by customer ID hash
-- Shard determination: customer_id % 4

-- Shard 0: customer_id % 4 = 0
CREATE TABLE customers_shard_0 (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) -- Server 1

-- Shard 1: customer_id % 4 = 1
CREATE TABLE customers_shard_1 (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) -- Server 2

-- Application routing logic
function getCustomerShard(customerId) {
    return `shard_${customerId % 4}`;
}

-- Consistent hashing for better distribution
function consistentHash(key, shardCount) {
    const hash = md5(key);
    return parseInt(hash.substring(0, 8), 16) % shardCount;
}

3. Directory-Based Sharding

-- Lookup service to map data to shards
CREATE TABLE shard_directory (
    entity_type VARCHAR(50),
    entity_id VARCHAR(100),
    shard_id VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (entity_type, entity_id)
);

-- Example entries
INSERT INTO shard_directory VALUES
('customer', '12345', 'shard_west', NOW()),
('customer', '67890', 'shard_east', NOW()),
('order', 'ORD001', 'shard_west', NOW());

-- Query to find shard
SELECT shard_id FROM shard_directory 
WHERE entity_type = 'customer' AND entity_id = '12345';

Implementation Approaches

1. Application-Level Sharding

class ShardManager:
    def __init__(self):
        self.shards = {
            'shard_0': 'mysql://user:pass@db1:3306/shard_0',
            'shard_1': 'mysql://user:pass@db2:3306/shard_1',
            'shard_2': 'mysql://user:pass@db3:3306/shard_2',
            'shard_3': 'mysql://user:pass@db4:3306/shard_3'
        }
    
    def get_shard(self, customer_id):
        shard_key = f"shard_{customer_id % 4}"
        return self.shards[shard_key]
    
    def execute_query(self, customer_id, query, params=None):
        connection_string = self.get_shard(customer_id)
        conn = connect(connection_string)
        return conn.execute(query, params)
    
    def execute_cross_shard_query(self, query, params=None):
        results = []
        for shard_name, connection_string in self.shards.items():
            conn = connect(connection_string)
            result = conn.execute(query, params)
            results.extend(result)
        return results

2. Middleware-Based Sharding

-- Using ProxySQL for MySQL sharding
-- Configure sharding rules
INSERT INTO mysql_query_rules (
    rule_id, active, match_pattern, destination_hostgroup, apply
) VALUES 
(1, 1, '^SELECT.*FROM customers WHERE customer_id = ([0-9]+).*', 
 0, 1),  -- Route to appropriate hostgroup based on customer_id

-- Hostgroup configuration
INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES
(0, 'shard0.example.com', 3306),
(1, 'shard1.example.com', 3306),
(2, 'shard2.example.com', 3306),
(3, 'shard3.example.com', 3306);

3. Database-Native Sharding

-- PostgreSQL: Foreign Data Wrappers for sharding
CREATE EXTENSION postgres_fdw;

-- Create foreign servers
CREATE SERVER shard_1 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard1.example.com', port '5432', dbname 'shard_db');

CREATE SERVER shard_2 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard2.example.com', port '5432', dbname 'shard_db');

-- Create user mappings
CREATE USER MAPPING FOR current_user SERVER shard_1
OPTIONS (user 'shard_user', password 'password');

-- Create foreign tables
CREATE FOREIGN TABLE customers_shard_1 (
    customer_id INT,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) SERVER shard_1 OPTIONS (schema_name 'public', table_name 'customers');

-- Create partitioned table with foreign table partitions
CREATE TABLE customers (
    customer_id INT,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) PARTITION BY HASH (customer_id);

-- Attach foreign tables as partitions
ALTER TABLE customers ATTACH PARTITION customers_shard_1
FOR VALUES WITH (MODULUS 4, REMAINDER 0);

Cross-Shard Operations

1. Cross-Shard Queries

-- Aggregation across shards
-- Application-level aggregation
function getTotalOrdersByRegion() {
    const results = {};
    
    for (const shard of shards) {
        const shardResults = shard.query(`
            SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
            FROM orders 
            WHERE order_date >= '2024-01-01'
            GROUP BY region
        `);
        
        // Merge results
        for (const row of shardResults) {
            if (!results[row.region]) {
                results[row.region] = { count: 0, amount: 0 };
            }
            results[row.region].count += row.order_count;
            results[row.region].amount += row.total_amount;
        }
    }
    
    return results;
}

-- Using UNION ALL for cross-shard queries
SELECT region, SUM(order_count) as total_orders, SUM(total_amount) as total_revenue
FROM (
    SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
    FROM shard_1.orders WHERE order_date >= '2024-01-01' GROUP BY region
    
    UNION ALL
    
    SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
    FROM shard_2.orders WHERE order_date >= '2024-01-01' GROUP BY region
    
    UNION ALL
    
    SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
    FROM shard_3.orders WHERE order_date >= '2024-01-01' GROUP BY region
) combined_results
GROUP BY region;

2. Distributed Transactions

# Two-Phase Commit for cross-shard transactions
class DistributedTransaction:
    def __init__(self, shard_manager):
        self.shard_manager = shard_manager
        self.participants = []
    
    def begin_transaction(self, shard_ids):
        self.participants = []
        for shard_id in shard_ids:
            conn = self.shard_manager.get_connection(shard_id)
            conn.execute("BEGIN")
            self.participants.append((shard_id, conn))
    
    def prepare_phase(self):
        prepared_shards = []
        try:
            for shard_id, conn in self.participants:
                # Phase 1: Prepare
                result = conn.execute("PREPARE TRANSACTION 'xact_001'")
                if result.success:
                    prepared_shards.append((shard_id, conn))
                else:
                    raise Exception(f"Prepare failed on shard {shard_id}")
            return prepared_shards
        except Exception as e:
            # Abort on all prepared shards
            for shard_id, conn in prepared_shards:
                conn.execute("ROLLBACK PREPARED 'xact_001'")
            raise e
    
    def commit_phase(self, prepared_shards):
        for shard_id, conn in prepared_shards:
            # Phase 2: Commit
            conn.execute("COMMIT PREPARED 'xact_001'")

Shard Management

1. Shard Splitting

-- Split a shard when it becomes too large
-- Original shard: customer_id % 2 = 0 (customers 0, 2, 4, 6, ...)
-- Split into: 
--   customer_id % 4 = 0 (customers 0, 4, 8, ...)
--   customer_id % 4 = 2 (customers 2, 6, 10, ...)

-- Step 1: Create new shard
CREATE TABLE customers_shard_new (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    email VARCHAR(100)
);

-- Step 2: Migrate data
INSERT INTO customers_shard_new
SELECT * FROM customers_shard_0 
WHERE customer_id % 4 = 2;

-- Step 3: Remove migrated data from original shard
DELETE FROM customers_shard_0 
WHERE customer_id % 4 = 2;

-- Step 4: Update application routing logic
function getCustomerShard(customerId) {
    return `shard_${customerId % 4}`;  // Changed from % 2 to % 4
}

2. Shard Rebalancing

class ShardRebalancer:
    def __init__(self, shard_manager):
        self.shard_manager = shard_manager
    
    def analyze_shard_distribution(self):
        shard_stats = {}
        for shard_id in self.shard_manager.get_all_shards():
            stats = self.shard_manager.execute_query(shard_id, """
                SELECT 
                    COUNT(*) as row_count,
                    pg_size_pretty(pg_total_relation_size('customers')) as size
                FROM customers
            """)
            shard_stats[shard_id] = stats
        return shard_stats
    
    def rebalance_shards(self, source_shard, target_shard, migration_criteria):
        # Step 1: Identify data to migrate
        migration_data = self.shard_manager.execute_query(source_shard, f"""
            SELECT * FROM customers WHERE {migration_criteria}
        """)
        
        # Step 2: Insert into target shard
        for row in migration_data:
            self.shard_manager.execute_query(target_shard, 
                "INSERT INTO customers VALUES (%s, %s, %s)", 
                (row.customer_id, row.customer_name, row.email))
        
        # Step 3: Remove from source shard
        self.shard_manager.execute_query(source_shard, f"""
            DELETE FROM customers WHERE {migration_criteria}
        """)

Challenges and Solutions

1. Hot Spots

-- Problem: Uneven data distribution
-- Solution: Better shard key selection

-- Bad: Sequential IDs create hot spots
-- All new customers go to the latest shard
CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,  -- Sequential, creates hot spots
    customer_name VARCHAR(100)
);

-- Good: Use compound shard key
CREATE TABLE customers (
    customer_id BIGINT PRIMARY KEY,
    region VARCHAR(10),
    customer_name VARCHAR(100)
);

-- Shard by region + hash of customer_id
function getShardKey(customerId, region) {
    const regionHash = hash(region) % 2;  // 2 regions per shard group
    const customerHash = hash(customerId) % 2;  // 2 customer groups per region
    return `shard_${regionHash}_${customerHash}`;
}

2. Cross-Shard Joins

-- Challenge: JOINs across shards are expensive
-- Solution 1: Denormalization
CREATE TABLE order_details_denormalized (
    order_id BIGINT,
    customer_id INT,
    customer_name VARCHAR(100),  -- Denormalized from customers table
    customer_email VARCHAR(100), -- Denormalized from customers table
    product_id INT,
    product_name VARCHAR(100),   -- Denormalized from products table
    quantity INT,
    unit_price DECIMAL(10,2)
);

-- Solution 2: Co-location of related data
-- Shard both customers and orders by customer_id
function getShardForCustomerData(customerId) {
    return `shard_${customerId % 4}`;
}

-- Both tables use same sharding strategy
CREATE TABLE customers_shard_0 AS SELECT * FROM customers WHERE customer_id % 4 = 0;
CREATE TABLE orders_shard_0 AS SELECT * FROM orders WHERE customer_id % 4 = 0;

-- Now JOINs work within each shard
SELECT c.customer_name, COUNT(o.order_id) as order_count
FROM customers_shard_0 c
LEFT JOIN orders_shard_0 o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.customer_name;

3. Global Constraints

-- Challenge: Unique constraints across shards
-- Solution: Global ID generation service

-- Centralized ID generator
CREATE TABLE global_sequences (
    sequence_name VARCHAR(50) PRIMARY KEY,
    current_value BIGINT NOT NULL,
    increment_by INT DEFAULT 1
);

-- Function to get next global ID
CREATE OR REPLACE FUNCTION get_next_global_id(seq_name VARCHAR(50))
RETURNS BIGINT AS $$
DECLARE
    next_id BIGINT;
BEGIN
    UPDATE global_sequences 
    SET current_value = current_value + increment_by
    WHERE sequence_name = seq_name
    RETURNING current_value INTO next_id;
    
    RETURN next_id;
END;
$$ LANGUAGE plpgsql;

-- Alternative: UUID-based approach
-- Use UUIDs to ensure global uniqueness without coordination
CREATE TABLE customers (
    customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_name VARCHAR(100),
    email VARCHAR(100) UNIQUE  -- Still need to check uniqueness across shards
);

Monitoring and Maintenance

1. Shard Health Monitoring

class ShardMonitor:
    def __init__(self, shard_manager):
        self.shard_manager = shard_manager
    
    def check_shard_health(self):
        health_report = {}
        
        for shard_id in self.shard_manager.get_all_shards():
            try:
                # Check connectivity
                conn = self.shard_manager.get_connection(shard_id)
                
                # Check basic metrics
                metrics = conn.execute("""
                    SELECT 
                        (SELECT COUNT(*) FROM customers) as customer_count,
                        (SELECT COUNT(*) FROM orders) as order_count,
                        pg_database_size(current_database()) as db_size
                """).fetchone()
                
                health_report[shard_id] = {
                    'status': 'healthy',
                    'customer_count': metrics.customer_count,
                    'order_count': metrics.order_count,
                    'db_size': metrics.db_size
                }
                
            except Exception as e:
                health_report[shard_id] = {
                    'status': 'unhealthy',
                    'error': str(e)
                }
        
        return health_report

2. Performance Monitoring

-- Monitor query performance across shards
CREATE TABLE shard_performance_log (
    shard_id VARCHAR(50),
    query_type VARCHAR(50),
    execution_time_ms INT,
    rows_affected INT,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Log slow queries
CREATE OR REPLACE FUNCTION log_shard_performance(
    p_shard_id VARCHAR(50),
    p_query_type VARCHAR(50),
    p_execution_time INT,
    p_rows_affected INT
) RETURNS VOID AS $$
BEGIN
    INSERT INTO shard_performance_log 
    (shard_id, query_type, execution_time_ms, rows_affected)
    VALUES (p_shard_id, p_query_type, p_execution_time, p_rows_affected);
END;
$$ LANGUAGE plpgsql;

-- Analyze performance trends
SELECT 
    shard_id,
    query_type,
    AVG(execution_time_ms) as avg_execution_time,
    MAX(execution_time_ms) as max_execution_time,
    COUNT(*) as query_count
FROM shard_performance_log
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY shard_id, query_type
ORDER BY avg_execution_time DESC;

Best Practices

1. Shard Key Selection

  • Choose keys with good distribution (avoid hot spots)
  • Consider query patterns (minimize cross-shard queries)
  • Plan for future scaling (avoid frequent resharding)
  • Use compound keys when necessary

2. Data Consistency

  • Implement eventual consistency where possible
  • Use distributed transactions sparingly
  • Design for idempotency
  • Handle partial failures gracefully

3. Operational Considerations

  • Automate shard provisioning and scaling
  • Implement comprehensive monitoring
  • Plan for disaster recovery across shards
  • Test failover scenarios regularly

Interview Tips

  • Understand when sharding is necessary vs other scaling approaches
  • Know different sharding strategies and their trade-offs
  • Be familiar with challenges like cross-shard queries and global constraints
  • Understand the complexity of distributed transactions
  • Know how to design shard keys for even distribution
  • Practice explaining shard management operations (splitting, rebalancing)
  • Be aware of alternatives like read replicas and vertical partitioning

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your SQL Knowledge

Ready to put your skills to the test? Take our interactive SQL quiz and get instant feedback on your answers.