What is database sharding and how do you implement it?

Answer

Database sharding is a horizontal partitioning technique that splits a large database into smaller, more manageable pieces called shards. Each shard is a separate database that contains a subset of the total data, distributed across multiple servers to improve performance, scalability, and availability.

Sharding Strategies

1. Range-Based Sharding

-- Shard by date ranges
-- Shard 1: Orders from 2023
CREATE TABLE orders_2023 (
    order_id BIGINT PRIMARY KEY,
    customer_id INT,
    order_date DATE,
    total DECIMAL(10,2)
) -- Stored on Server 1

-- Shard 2: Orders from 2024
CREATE TABLE orders_2024 (
    order_id BIGINT PRIMARY KEY,
    customer_id INT,
    order_date DATE,
    total DECIMAL(10,2)
) -- Stored on Server 2

-- Application logic for routing
function getShardForDate(orderDate) {
    if (orderDate >= '2024-01-01') return 'shard_2024';
    if (orderDate >= '2023-01-01') return 'shard_2023';
    return 'shard_archive';
}

2. Hash-Based Sharding

-- Shard by customer ID hash
-- Shard determination: customer_id % 4

-- Shard 0: customer_id % 4 = 0
CREATE TABLE customers_shard_0 (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) -- Server 1

-- Shard 1: customer_id % 4 = 1
CREATE TABLE customers_shard_1 (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) -- Server 2

-- Application routing logic
function getCustomerShard(customerId) {
    return `shard_${customerId % 4}`;
}

-- Consistent hashing for better distribution
function consistentHash(key, shardCount) {
    const hash = md5(key);
    return parseInt(hash.substring(0, 8), 16) % shardCount;
}

3. Directory-Based Sharding

-- Lookup service to map data to shards
CREATE TABLE shard_directory (
    entity_type VARCHAR(50),
    entity_id VARCHAR(100),
    shard_id VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (entity_type, entity_id)
);

-- Example entries
INSERT INTO shard_directory VALUES
('customer', '12345', 'shard_west', NOW()),
('customer', '67890', 'shard_east', NOW()),
('order', 'ORD001', 'shard_west', NOW());

-- Query to find shard
SELECT shard_id FROM shard_directory 
WHERE entity_type = 'customer' AND entity_id = '12345';

Implementation Approaches

1. Application-Level Sharding

class ShardManager:
    def __init__(self):
        self.shards = {
            'shard_0': 'mysql://user:pass@db1:3306/shard_0',
            'shard_1': 'mysql://user:pass@db2:3306/shard_1',
            'shard_2': 'mysql://user:pass@db3:3306/shard_2',
            'shard_3': 'mysql://user:pass@db4:3306/shard_3'
        }
    
    def get_shard(self, customer_id):
        shard_key = f"shard_{customer_id % 4}"
        return self.shards[shard_key]
    
    def execute_query(self, customer_id, query, params=None):
        connection_string = self.get_shard(customer_id)
        conn = connect(connection_string)
        return conn.execute(query, params)
    
    def execute_cross_shard_query(self, query, params=None):
        results = []
        for shard_name, connection_string in self.shards.items():
            conn = connect(connection_string)
            result = conn.execute(query, params)
            results.extend(result)
        return results

2. Middleware-Based Sharding

-- Using ProxySQL for MySQL sharding
-- Configure sharding rules
INSERT INTO mysql_query_rules (
    rule_id, active, match_pattern, destination_hostgroup, apply
) VALUES 
(1, 1, '^SELECT.*FROM customers WHERE customer_id = ([0-9]+).*', 
 0, 1),  -- Route to appropriate hostgroup based on customer_id

-- Hostgroup configuration
INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES
(0, 'shard0.example.com', 3306),
(1, 'shard1.example.com', 3306),
(2, 'shard2.example.com', 3306),
(3, 'shard3.example.com', 3306);

3. Database-Native Sharding

-- PostgreSQL: Foreign Data Wrappers for sharding
CREATE EXTENSION postgres_fdw;

-- Create foreign servers
CREATE SERVER shard_1 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard1.example.com', port '5432', dbname 'shard_db');

CREATE SERVER shard_2 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard2.example.com', port '5432', dbname 'shard_db');

-- Create user mappings
CREATE USER MAPPING FOR current_user SERVER shard_1
OPTIONS (user 'shard_user', password 'password');

-- Create foreign tables
CREATE FOREIGN TABLE customers_shard_1 (
    customer_id INT,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) SERVER shard_1 OPTIONS (schema_name 'public', table_name 'customers');

-- Create partitioned table with foreign table partitions
CREATE TABLE customers (
    customer_id INT,
    customer_name VARCHAR(100),
    email VARCHAR(100)
) PARTITION BY HASH (customer_id);

-- Attach foreign tables as partitions
ALTER TABLE customers ATTACH PARTITION customers_shard_1
FOR VALUES WITH (MODULUS 4, REMAINDER 0);

Cross-Shard Operations

1. Cross-Shard Queries

-- Aggregation across shards
-- Application-level aggregation
function getTotalOrdersByRegion() {
    const results = {};
    
    for (const shard of shards) {
        const shardResults = shard.query(`
            SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
            FROM orders 
            WHERE order_date >= '2024-01-01'
            GROUP BY region
        `);
        
        // Merge results
        for (const row of shardResults) {
            if (!results[row.region]) {
                results[row.region] = { count: 0, amount: 0 };
            }
            results[row.region].count += row.order_count;
            results[row.region].amount += row.total_amount;
        }
    }
    
    return results;
}

-- Using UNION ALL for cross-shard queries
SELECT region, SUM(order_count) as total_orders, SUM(total_amount) as total_revenue
FROM (
    SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
    FROM shard_1.orders WHERE order_date >= '2024-01-01' GROUP BY region
    
    UNION ALL
    
    SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
    FROM shard_2.orders WHERE order_date >= '2024-01-01' GROUP BY region
    
    UNION ALL
    
    SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
    FROM shard_3.orders WHERE order_date >= '2024-01-01' GROUP BY region
) combined_results
GROUP BY region;

2. Distributed Transactions

# Two-Phase Commit for cross-shard transactions
class DistributedTransaction:
    def __init__(self, shard_manager):
        self.shard_manager = shard_manager
        self.participants = []
    
    def begin_transaction(self, shard_ids):
        self.participants = []
        for shard_id in shard_ids:
            conn = self.shard_manager.get_connection(shard_id)
            conn.execute("BEGIN")
            self.participants.append((shard_id, conn))
    
    def prepare_phase(self):
        prepared_shards = []
        try:
            for shard_id, conn in self.participants:
                # Phase 1: Prepare
                result = conn.execute("PREPARE TRANSACTION 'xact_001'")
                if result.success:
                    prepared_shards.append((shard_id, conn))
                else:
                    raise Exception(f"Prepare failed on shard {shard_id}")
            return prepared_shards
        except Exception as e:
            # Abort on all prepared shards
            for shard_id, conn in prepared_shards:
                conn.execute("ROLLBACK PREPARED 'xact_001'")
            raise e
    
    def commit_phase(self, prepared_shards):
        for shard_id, conn in prepared_shards:
            # Phase 2: Commit
            conn.execute("COMMIT PREPARED 'xact_001'")

Shard Management

1. Shard Splitting

-- Split a shard when it becomes too large
-- Original shard: customer_id % 2 = 0 (customers 0, 2, 4, 6, ...)
-- Split into: 
--   customer_id % 4 = 0 (customers 0, 4, 8, ...)
--   customer_id % 4 = 2 (customers 2, 6, 10, ...)

-- Step 1: Create new shard
CREATE TABLE customers_shard_new (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    email VARCHAR(100)
);

-- Step 2: Migrate data
INSERT INTO customers_shard_new
SELECT * FROM customers_shard_0 
WHERE customer_id % 4 = 2;

-- Step 3: Remove migrated data from original shard
DELETE FROM customers_shard_0 
WHERE customer_id % 4 = 2;

-- Step 4: Update application routing logic
function getCustomerShard(customerId) {
    return `shard_${customerId % 4}`;  // Changed from % 2 to % 4
}

2. Shard Rebalancing

class ShardRebalancer:
    def __init__(self, shard_manager):
        self.shard_manager = shard_manager
    
    def analyze_shard_distribution(self):
        shard_stats = {}
        for shard_id in self.shard_manager.get_all_shards():
            stats = self.shard_manager.execute_query(shard_id, """
                SELECT 
                    COUNT(*) as row_count,
                    pg_size_pretty(pg_total_relation_size('customers')) as size
                FROM customers
            """)
            shard_stats[shard_id] = stats
        return shard_stats
    
    def rebalance_shards(self, source_shard, target_shard, migration_criteria):
        # Step 1: Identify data to migrate
        migration_data = self.shard_manager.execute_query(source_shard, f"""
            SELECT * FROM customers WHERE {migration_criteria}
        """)
        
        # Step 2: Insert into target shard
        for row in migration_data:
            self.shard_manager.execute_query(target_shard, 
                "INSERT INTO customers VALUES (%s, %s, %s)", 
                (row.customer_id, row.customer_name, row.email))
        
        # Step 3: Remove from source shard
        self.shard_manager.execute_query(source_shard, f"""
            DELETE FROM customers WHERE {migration_criteria}
        """)

Challenges and Solutions

1. Hot Spots

-- Problem: Uneven data distribution
-- Solution: Better shard key selection

-- Bad: Sequential IDs create hot spots
-- All new customers go to the latest shard
CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,  -- Sequential, creates hot spots
    customer_name VARCHAR(100)
);

-- Good: Use compound shard key
CREATE TABLE customers (
    customer_id BIGINT PRIMARY KEY,
    region VARCHAR(10),
    customer_name VARCHAR(100)
);

-- Shard by region + hash of customer_id
function getShardKey(customerId, region) {
    const regionHash = hash(region) % 2;  // 2 regions per shard group
    const customerHash = hash(customerId) % 2;  // 2 customer groups per region
    return `shard_${regionHash}_${customerHash}`;
}

2. Cross-Shard Joins

-- Challenge: JOINs across shards are expensive
-- Solution 1: Denormalization
CREATE TABLE order_details_denormalized (
    order_id BIGINT,
    customer_id INT,
    customer_name VARCHAR(100),  -- Denormalized from customers table
    customer_email VARCHAR(100), -- Denormalized from customers table
    product_id INT,
    product_name VARCHAR(100),   -- Denormalized from products table
    quantity INT,
    unit_price DECIMAL(10,2)
);

-- Solution 2: Co-location of related data
-- Shard both customers and orders by customer_id
function getShardForCustomerData(customerId) {
    return `shard_${customerId % 4}`;
}

-- Both tables use same sharding strategy
CREATE TABLE customers_shard_0 AS SELECT * FROM customers WHERE customer_id % 4 = 0;
CREATE TABLE orders_shard_0 AS SELECT * FROM orders WHERE customer_id % 4 = 0;

-- Now JOINs work within each shard
SELECT c.customer_name, COUNT(o.order_id) as order_count
FROM customers_shard_0 c
LEFT JOIN orders_shard_0 o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.customer_name;

3. Global Constraints

-- Challenge: Unique constraints across shards
-- Solution: Global ID generation service

-- Centralized ID generator
CREATE TABLE global_sequences (
    sequence_name VARCHAR(50) PRIMARY KEY,
    current_value BIGINT NOT NULL,
    increment_by INT DEFAULT 1
);

-- Function to get next global ID
CREATE OR REPLACE FUNCTION get_next_global_id(seq_name VARCHAR(50))
RETURNS BIGINT AS $$
DECLARE
    next_id BIGINT;
BEGIN
    UPDATE global_sequences 
    SET current_value = current_value + increment_by
    WHERE sequence_name = seq_name
    RETURNING current_value INTO next_id;
    
    RETURN next_id;
END;
$$ LANGUAGE plpgsql;

-- Alternative: UUID-based approach
-- Use UUIDs to ensure global uniqueness without coordination
CREATE TABLE customers (
    customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_name VARCHAR(100),
    email VARCHAR(100) UNIQUE  -- Still need to check uniqueness across shards
);

Monitoring and Maintenance

1. Shard Health Monitoring

class ShardMonitor:
    def __init__(self, shard_manager):
        self.shard_manager = shard_manager
    
    def check_shard_health(self):
        health_report = {}
        
        for shard_id in self.shard_manager.get_all_shards():
            try:
                # Check connectivity
                conn = self.shard_manager.get_connection(shard_id)
                
                # Check basic metrics
                metrics = conn.execute("""
                    SELECT 
                        (SELECT COUNT(*) FROM customers) as customer_count,
                        (SELECT COUNT(*) FROM orders) as order_count,
                        pg_database_size(current_database()) as db_size
                """).fetchone()
                
                health_report[shard_id] = {
                    'status': 'healthy',
                    'customer_count': metrics.customer_count,
                    'order_count': metrics.order_count,
                    'db_size': metrics.db_size
                }
                
            except Exception as e:
                health_report[shard_id] = {
                    'status': 'unhealthy',
                    'error': str(e)
                }
        
        return health_report

2. Performance Monitoring

-- Monitor query performance across shards
CREATE TABLE shard_performance_log (
    shard_id VARCHAR(50),
    query_type VARCHAR(50),
    execution_time_ms INT,
    rows_affected INT,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Log slow queries
CREATE OR REPLACE FUNCTION log_shard_performance(
    p_shard_id VARCHAR(50),
    p_query_type VARCHAR(50),
    p_execution_time INT,
    p_rows_affected INT
) RETURNS VOID AS $$
BEGIN
    INSERT INTO shard_performance_log 
    (shard_id, query_type, execution_time_ms, rows_affected)
    VALUES (p_shard_id, p_query_type, p_execution_time, p_rows_affected);
END;
$$ LANGUAGE plpgsql;

-- Analyze performance trends
SELECT 
    shard_id,
    query_type,
    AVG(execution_time_ms) as avg_execution_time,
    MAX(execution_time_ms) as max_execution_time,
    COUNT(*) as query_count
FROM shard_performance_log
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY shard_id, query_type
ORDER BY avg_execution_time DESC;

Best Practices

1. Shard Key Selection

  • Choose keys with good distribution (avoid hot spots)
  • Consider query patterns (minimize cross-shard queries)
  • Plan for future scaling (avoid frequent resharding)
  • Use compound keys when necessary

2. Data Consistency

  • Implement eventual consistency where possible
  • Use distributed transactions sparingly
  • Design for idempotency
  • Handle partial failures gracefully

3. Operational Considerations

  • Automate shard provisioning and scaling
  • Implement comprehensive monitoring
  • Plan for disaster recovery across shards
  • Test failover scenarios regularly

Interview Tips

  • Understand when sharding is necessary vs other scaling approaches
  • Know different sharding strategies and their trade-offs
  • Be familiar with challenges like cross-shard queries and global constraints
  • Understand the complexity of distributed transactions
  • Know how to design shard keys for even distribution
  • Practice explaining shard management operations (splitting, rebalancing)
  • Be aware of alternatives like read replicas and vertical partitioning

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.