Sharding
What is Sharding?
Sharding is horizontal partitioning that distributes data across multiple servers (shards) to achieve scalability beyond a single server’s capacity.
MongoDB Sharding
Architecture
┌──────────────┐
│ Query Router │ (mongos)
└──────┬───────┘
│
┌───┴────┬────────┐
│ │ │
┌──▼──┐ ┌──▼──┐ ┌──▼──┐
│Shard1│ │Shard2│ │Shard3│
└──────┘ └──────┘ └──────┘Enable Sharding
// Connect to mongos
const client = new MongoClient('mongodb://mongos1:27017');
// Enable sharding on database
sh.enableSharding("mydb");
// Shard collection
sh.shardCollection("mydb.users", { userId: "hashed" });
// Check sharding status
sh.status();Shard Keys
// Hashed shard key (even distribution)
sh.shardCollection("mydb.users", { userId: "hashed" });
// Range-based shard key
sh.shardCollection("mydb.orders", { customerId: 1, orderDate: 1 });
// Compound shard key
sh.shardCollection("mydb.events", { deviceId: 1, timestamp: 1 });Node.js with Sharded Cluster
const { MongoClient } = require('mongodb');
// Connect to mongos (query router)
const client = new MongoClient('mongodb://mongos1:27017,mongos2:27017/mydb');
await client.connect();
const db = client.db('mydb');
// Operations work transparently
await db.collection('users').insertOne({
userId: '123',
name: 'John Doe',
email: 'john@example.com'
});
// Query routed to appropriate shard
const user = await db.collection('users').findOne({ userId: '123' });
// Targeted query (uses shard key)
const orders = await db.collection('orders').find({
customerId: '123'
}).toArray();
// Scatter-gather query (no shard key)
const allOrders = await db.collection('orders').find({
status: 'pending'
}).toArray();Chunk Management
// View chunks
db.getSiblingDB('config').chunks.find({ ns: "mydb.users" });
// Split chunk manually
sh.splitAt("mydb.users", { userId: "user-5000" });
// Move chunk
sh.moveChunk("mydb.users", { userId: "user-1000" }, "shard02");
// Balance chunks
sh.startBalancer();
sh.stopBalancer();
sh.isBalancerRunning();Cassandra Partitioning
Partition Key
-- Partition key determines data distribution
CREATE TABLE users (
user_id UUID,
name TEXT,
email TEXT,
PRIMARY KEY (user_id)
);
-- Compound partition key
CREATE TABLE user_events (
user_id UUID,
event_type TEXT,
timestamp TIMESTAMP,
data TEXT,
PRIMARY KEY ((user_id, event_type), timestamp)
);
-- Partition key: (user_id, event_type)
-- Clustering key: timestampToken Ranges
const cassandra = require('cassandra-driver');
const client = new cassandra.Client({
contactPoints: ['node1', 'node2', 'node3'],
localDataCenter: 'dc1',
keyspace: 'myapp'
});
// Data distributed by token hash
// Each node owns token ranges
// Consistent hashing ensures even distribution
// Query by partition key (single node)
await client.execute(
'SELECT * FROM users WHERE user_id = ?',
[userId],
{ prepare: true }
);
// Query without partition key (all nodes)
await client.execute(
'SELECT * FROM users WHERE email = ?',
[email],
{ prepare: true }
);Redis Cluster
Setup
# Create cluster with 6 nodes (3 masters, 3 slaves)
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1Node.js with Redis Cluster
const Redis = require('ioredis');
const cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: 7000 },
{ host: '127.0.0.1', port: 7001 },
{ host: '127.0.0.1', port: 7002 }
]);
// Operations automatically routed to correct node
await cluster.set('user:123', JSON.stringify({ name: 'John' }));
const user = await cluster.get('user:123');
// Hash tags for multi-key operations
await cluster.set('{user:123}:profile', 'data1');
await cluster.set('{user:123}:settings', 'data2');
// Both keys on same node due to {user:123} tag
// Pipeline with hash tags
const pipeline = cluster.pipeline();
pipeline.get('{user:123}:profile');
pipeline.get('{user:123}:settings');
const results = await pipeline.exec();Hash Slots
// Redis Cluster uses 16384 hash slots
// Each key mapped to slot: CRC16(key) % 16384
// Slots distributed across nodes
// Check key slot
const slot = cluster.keySlot('user:123');
console.log(`Key maps to slot: ${slot}`);
// Cluster info
const info = await cluster.cluster('INFO');
const nodes = await cluster.cluster('NODES');DynamoDB Partitioning
Partition Key Design
// Good partition key (high cardinality)
const tableSchema = {
TableName: 'Users',
KeySchema: [
{ AttributeName: 'userId', KeyType: 'HASH' } // UUID
]
};
// Bad partition key (low cardinality)
const badSchema = {
TableName: 'Users',
KeySchema: [
{ AttributeName: 'country', KeyType: 'HASH' } // Only ~200 values
]
};
// Composite key for better distribution
const goodSchema = {
TableName: 'Orders',
KeySchema: [
{ AttributeName: 'customerId', KeyType: 'HASH' },
{ AttributeName: 'orderDate', KeyType: 'RANGE' }
]
};Write Sharding
// Add random suffix for write distribution
async function createOrder(customerId, orderData) {
const suffix = Math.floor(Math.random() * 10); // 0-9
const partitionKey = `${customerId}#${suffix}`;
await docClient.send(new PutCommand({
TableName: 'Orders',
Item: {
partitionKey,
customerId,
...orderData
}
}));
}
// Query all partitions
async function getCustomerOrders(customerId) {
const promises = [];
for (let i = 0; i < 10; i++) {
const partitionKey = `${customerId}#${i}`;
promises.push(
docClient.send(new QueryCommand({
TableName: 'Orders',
KeyConditionExpression: 'partitionKey = :pk',
ExpressionAttributeValues: {
':pk': partitionKey
}
}))
);
}
const results = await Promise.all(promises);
return results.flatMap(r => r.Items);
}Shard Key Selection
const shardKeyGuidelines = {
goodShardKeys: [
'High cardinality (many unique values)',
'Even distribution of data',
'Even distribution of queries',
'Frequently used in queries',
'Immutable or rarely changes'
],
badShardKeys: [
'Low cardinality (few unique values)',
'Monotonically increasing (timestamps)',
'Rarely queried',
'Frequently updated'
],
examples: {
good: ['userId (UUID)', 'email (hashed)', 'orderId'],
bad: ['status', 'createdAt', 'country']
}
};Rebalancing
// MongoDB - Automatic balancing
sh.startBalancer();
// Manual chunk migration
sh.moveChunk("mydb.users",
{ userId: "user-1000" },
"shard02"
);
// Cassandra - Add node triggers rebalancing
// nodetool status
// nodetool cleanup
// Redis Cluster - Reshard slots
// redis-cli --cluster reshard 127.0.0.1:7000Query Patterns
// Targeted query (uses shard key) - Fast
db.users.find({ userId: "123" });
// Scatter-gather query (no shard key) - Slow
db.users.find({ email: "john@example.com" });
// Compound shard key query
db.orders.find({
customerId: "123",
orderDate: { $gte: new Date("2024-01-01") }
});.NET with Sharding
using MongoDB.Driver;
public class ShardingService
{
private readonly IMongoClient _client;
public ShardingService()
{
// Connect to mongos
_client = new MongoClient("mongodb://mongos1:27017,mongos2:27017");
}
public async Task<User> GetUser(string userId)
{
var db = _client.GetDatabase("mydb");
var collection = db.GetCollection<User>("users");
// Targeted query using shard key
return await collection.Find(u => u.UserId == userId).FirstOrDefaultAsync();
}
public async Task<List<User>> GetUsersByStatus(string status)
{
var db = _client.GetDatabase("mydb");
var collection = db.GetCollection<User>("users");
// Scatter-gather query (no shard key)
return await collection.Find(u => u.Status == status).ToListAsync();
}
}Best Practices
const shardingBestPractices = [
'Choose shard key carefully (hard to change)',
'Use high-cardinality shard keys',
'Avoid monotonically increasing keys',
'Include shard key in queries when possible',
'Monitor shard distribution',
'Plan for growth',
'Use hashed sharding for even distribution',
'Test with production-like data volume'
];Interview Tips
- Explain sharding: Horizontal partitioning across servers
- Show MongoDB: Shard keys, chunks, mongos
- Demonstrate Cassandra: Partition keys, token ranges
- Discuss Redis: Hash slots, cluster mode
- Mention shard key: Selection criteria
- Show examples: Node.js, .NET implementations
Summary
Sharding distributes data across multiple servers for horizontal scalability. MongoDB uses shard keys to partition data into chunks distributed across shards. Cassandra uses partition keys with consistent hashing. Redis Cluster uses hash slots (16384). DynamoDB automatically partitions by partition key. Choose high-cardinality shard keys for even distribution. Include shard key in queries for targeted operations. Monitor and rebalance as needed. Essential for scaling beyond single-server capacity.
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.