Event Sourcing & CQRS

Event Sourcing

Definition: Store all changes to application state as a sequence of events, rather than storing current state.

Traditional Approach:

Users Table:
| id  | name | email           | balance |
| 123 | John | john@email.com  | 500     |

Update: balance = 500 (lost history of how we got here)

Event Sourcing Approach:

Events:
1. UserCreated { id: 123, name: "John", email: "john@email.com" }
2. DepositMade { userId: 123, amount: 1000 }
3. WithdrawalMade { userId: 123, amount: 300 }
4. WithdrawalMade { userId: 123, amount: 200 }

Current balance = 1000 - 300 - 200 = 500 (can see full history)

Benefits

Complete Audit Trail: Every change recorded with timestamp and reason

Time Travel: Reconstruct state at any point in time

Event Replay: Rebuild state by replaying events

Debugging: See exactly what happened and when

Business Intelligence: Analyze historical patterns

Undo/Redo: Easy to implement

Challenges

Storage: Events accumulate over time

Performance: Replaying many events can be slow

Schema Evolution: Old events may have different structure

Complexity: More complex than CRUD

Eventual Consistency: Read models may lag behind events

CQRS (Command Query Responsibility Segregation)

Definition: Separate read and write operations into different models.

Traditional Approach:

Same model for reads and writes:
- User Service handles both queries and commands
- Same database schema for both

CQRS Approach:

Write Model (Commands):
- Handles: CreateUser, UpdateUser, DeleteUser
- Optimized for writes
- Enforces business rules

Read Model (Queries):
- Handles: GetUser, SearchUsers, GetUserOrders
- Optimized for reads
- Denormalized for fast queries

Benefits

Independent Scaling: Scale reads and writes separately

Optimized Models: Each optimized for its purpose

Simpler Queries: Read model can be denormalized

Better Performance: No complex joins in read model

Flexibility: Different databases for reads and writes

When to Use

Use CQRS When:

  • Complex domain logic
  • Different read/write patterns
  • High read-to-write ratio
  • Need for different data representations

Don’t Use CQRS When:

  • Simple CRUD applications
  • Small applications
  • Team lacks experience with pattern

Event Sourcing + CQRS Together

Common Pattern: Use event sourcing for write model, CQRS for read model

Flow:

  1. Command creates event
  2. Event stored in event store
  3. Event published to event bus
  4. Read model projections update based on events
Command → Event Store → Event Bus → Read Model Projections

           Write Model

Implementation Example

Event Store

class EventStore {
  constructor(db) {
    this.db = db;
  }
  
  // Append event
  async appendEvent(aggregateId, event) {
    const eventRecord = {
      aggregateId,
      eventType: event.type,
      eventData: event.data,
      timestamp: new Date(),
      version: await this.getNextVersion(aggregateId)
    };
    
    await this.db.collection('events').insertOne(eventRecord);
    
    // Publish event
    await this.publishEvent(event);
  }
  
  // Get all events for aggregate
  async getEvents(aggregateId) {
    return await this.db.collection('events')
      .find({ aggregateId })
      .sort({ version: 1 })
      .toArray();
  }
  
  // Get events since version
  async getEventsSince(aggregateId, version) {
    return await this.db.collection('events')
      .find({ 
        aggregateId,
        version: { $gt: version }
      })
      .sort({ version: 1 })
      .toArray();
  }
  
  async getNextVersion(aggregateId) {
    const lastEvent = await this.db.collection('events')
      .findOne({ aggregateId }, { sort: { version: -1 } });
    
    return lastEvent ? lastEvent.version + 1 : 1;
  }
}

Aggregate (Write Model)

class BankAccount {
  constructor(accountId) {
    this.accountId = accountId;
    this.balance = 0;
    this.version = 0;
    this.changes = [];
  }
  
  // Commands
  deposit(amount) {
    if (amount <= 0) {
      throw new Error('Amount must be positive');
    }
    
    this.applyChange({
      type: 'MoneyDeposited',
      data: { accountId: this.accountId, amount }
    });
  }
  
  withdraw(amount) {
    if (amount <= 0) {
      throw new Error('Amount must be positive');
    }
    
    if (this.balance < amount) {
      throw new Error('Insufficient funds');
    }
    
    this.applyChange({
      type: 'MoneyWithdrawn',
      data: { accountId: this.accountId, amount }
    });
  }
  
  // Apply event to state
  applyChange(event) {
    this.apply(event);
    this.changes.push(event);
  }
  
  apply(event) {
    switch (event.type) {
      case 'AccountCreated':
        this.balance = 0;
        break;
      case 'MoneyDeposited':
        this.balance += event.data.amount;
        break;
      case 'MoneyWithdrawn':
        this.balance -= event.data.amount;
        break;
    }
    this.version++;
  }
  
  // Load from events
  loadFromHistory(events) {
    events.forEach(event => this.apply(event));
  }
  
  // Get uncommitted changes
  getUncommittedChanges() {
    return this.changes;
  }
  
  // Mark changes as committed
  markChangesAsCommitted() {
    this.changes = [];
  }
}

Repository

class BankAccountRepository {
  constructor(eventStore) {
    this.eventStore = eventStore;
  }
  
  // Load aggregate from events
  async getById(accountId) {
    const events = await this.eventStore.getEvents(accountId);
    
    if (events.length === 0) {
      return null;
    }
    
    const account = new BankAccount(accountId);
    account.loadFromHistory(events);
    
    return account;
  }
  
  // Save aggregate changes
  async save(account) {
    const changes = account.getUncommittedChanges();
    
    for (const event of changes) {
      await this.eventStore.appendEvent(account.accountId, event);
    }
    
    account.markChangesAsCommitted();
  }
}

Command Handler

class BankAccountCommandHandler {
  constructor(repository) {
    this.repository = repository;
  }
  
  async handleDeposit(command) {
    const { accountId, amount } = command;
    
    // Load aggregate
    let account = await this.repository.getById(accountId);
    
    if (!account) {
      account = new BankAccount(accountId);
      account.applyChange({
        type: 'AccountCreated',
        data: { accountId }
      });
    }
    
    // Execute command
    account.deposit(amount);
    
    // Save events
    await this.repository.save(account);
  }
  
  async handleWithdraw(command) {
    const { accountId, amount } = command;
    
    const account = await this.repository.getById(accountId);
    
    if (!account) {
      throw new Error('Account not found');
    }
    
    account.withdraw(amount);
    
    await this.repository.save(account);
  }
}

Read Model Projection

class AccountBalanceProjection {
  constructor(db) {
    this.db = db;
  }
  
  // Handle events and update read model
  async handleEvent(event) {
    switch (event.type) {
      case 'AccountCreated':
        await this.db.collection('account_balances').insertOne({
          accountId: event.data.accountId,
          balance: 0,
          lastUpdated: new Date()
        });
        break;
        
      case 'MoneyDeposited':
        await this.db.collection('account_balances').updateOne(
          { accountId: event.data.accountId },
          { 
            $inc: { balance: event.data.amount },
            $set: { lastUpdated: new Date() }
          }
        );
        break;
        
      case 'MoneyWithdrawn':
        await this.db.collection('account_balances').updateOne(
          { accountId: event.data.accountId },
          { 
            $inc: { balance: -event.data.amount },
            $set: { lastUpdated: new Date() }
          }
        );
        break;
    }
  }
  
  // Query read model
  async getBalance(accountId) {
    return await this.db.collection('account_balances')
      .findOne({ accountId });
  }
}

Query Handler

class AccountQueryHandler {
  constructor(db) {
    this.db = db;
  }
  
  // Fast queries against read model
  async getAccountBalance(accountId) {
    return await this.db.collection('account_balances')
      .findOne({ accountId });
  }
  
  async getAccountsWithLowBalance(threshold) {
    return await this.db.collection('account_balances')
      .find({ balance: { $lt: threshold } })
      .toArray();
  }
  
  async getAccountHistory(accountId) {
    return await this.db.collection('account_history')
      .find({ accountId })
      .sort({ timestamp: -1 })
      .toArray();
  }
}

Snapshots

Problem: Replaying thousands of events is slow

Solution: Periodically save snapshots of aggregate state

class SnapshotStore {
  async saveSnapshot(aggregateId, state, version) {
    await this.db.collection('snapshots').updateOne(
      { aggregateId },
      { 
        $set: {
          aggregateId,
          state,
          version,
          timestamp: new Date()
        }
      },
      { upsert: true }
    );
  }
  
  async getSnapshot(aggregateId) {
    return await this.db.collection('snapshots')
      .findOne({ aggregateId });
  }
}

// Load with snapshot
async getById(accountId) {
  // Try to load snapshot
  const snapshot = await this.snapshotStore.getSnapshot(accountId);
  
  const account = new BankAccount(accountId);
  
  if (snapshot) {
    // Load from snapshot
    account.loadFromSnapshot(snapshot.state);
    
    // Load events since snapshot
    const events = await this.eventStore.getEventsSince(
      accountId,
      snapshot.version
    );
    account.loadFromHistory(events);
  } else {
    // Load all events
    const events = await this.eventStore.getEvents(accountId);
    account.loadFromHistory(events);
  }
  
  return account;
}

.NET Implementation

// Event
public abstract class Event
{
    public Guid AggregateId { get; set; }
    public DateTime Timestamp { get; set; }
    public int Version { get; set; }
}

public class MoneyDepositedEvent : Event
{
    public decimal Amount { get; set; }
}

// Aggregate
public class BankAccount
{
    public Guid Id { get; private set; }
    public decimal Balance { get; private set; }
    private readonly List<Event> _changes = new();
    
    public void Deposit(decimal amount)
    {
        if (amount <= 0)
            throw new ArgumentException("Amount must be positive");
        
        ApplyChange(new MoneyDepositedEvent
        {
            AggregateId = Id,
            Amount = amount,
            Timestamp = DateTime.UtcNow
        });
    }
    
    private void ApplyChange(Event @event)
    {
        Apply(@event);
        _changes.Add(@event);
    }
    
    private void Apply(Event @event)
    {
        switch (@event)
        {
            case MoneyDepositedEvent e:
                Balance += e.Amount;
                break;
            case MoneyWithdrawnEvent e:
                Balance -= e.Amount;
                break;
        }
    }
    
    public void LoadFromHistory(IEnumerable<Event> events)
    {
        foreach (var @event in events)
        {
            Apply(@event);
        }
    }
    
    public IEnumerable<Event> GetUncommittedChanges()
    {
        return _changes;
    }
}

// Repository
public class BankAccountRepository
{
    private readonly IEventStore _eventStore;
    
    public async Task<BankAccount> GetByIdAsync(Guid accountId)
    {
        var events = await _eventStore.GetEventsAsync(accountId);
        
        var account = new BankAccount();
        account.LoadFromHistory(events);
        
        return account;
    }
    
    public async Task SaveAsync(BankAccount account)
    {
        var changes = account.GetUncommittedChanges();
        
        foreach (var @event in changes)
        {
            await _eventStore.AppendEventAsync(@event);
        }
    }
}

Best Practices

  • Use snapshots - Avoid replaying too many events
  • Version events - Handle schema evolution
  • Idempotent projections - Handle duplicate events
  • Monitor lag - Ensure read models stay current
  • Test event replay - Verify projections work correctly
  • Document events - Clear event schemas
  • Handle failures - Retry failed projections
  • Use correlation IDs - Track event causality

Interview Tips

  • Explain event sourcing: Store changes as events
  • Show CQRS: Separate read and write models
  • Demonstrate benefits: Audit trail, time travel
  • Discuss challenges: Storage, complexity
  • Mention snapshots: Performance optimization
  • Show implementation: Event store, projections

Summary

Event Sourcing stores all changes as events instead of current state. Provides complete audit trail, time travel, and event replay. CQRS separates read and write models for independent optimization and scaling. Write model handles commands and business logic. Read model optimized for queries with denormalized data. Events flow from write model to read model projections. Use snapshots to avoid replaying many events. Implement idempotent projections. Best for complex domains with audit requirements. Adds complexity but provides powerful capabilities. Essential for systems requiring full history and flexible querying.

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your System-design Knowledge

Ready to put your skills to the test? Take our interactive System-design quiz and get instant feedback on your answers.