Distributed Transactions

Challenges

Distributed transactions span multiple databases or services, making ACID guarantees difficult to achieve.

Service A ──┐
            ├──> Transaction
Service B ──┤

Service C ──┘

Two-Phase Commit (2PC)

Phase 1: Prepare

class TwoPhaseCommit {
  async executeTransaction(operations) {
    const txId = generateTransactionId();
    const participants = [];
    
    // Phase 1: Prepare
    for (const op of operations) {
      const prepared = await this.prepare(op.service, op.data, txId);
      if (!prepared) {
        await this.abort(txId, participants);
        throw new Error('Transaction aborted');
      }
      participants.push(op.service);
    }
    
    // Phase 2: Commit
    for (const service of participants) {
      await this.commit(service, txId);
    }
    
    return txId;
  }
  
  async prepare(service, data, txId) {
    // Lock resources and prepare
    const result = await service.prepare({
      transactionId: txId,
      data,
      state: 'prepared'
    });
    
    return result.canCommit;
  }
  
  async commit(service, txId) {
    await service.commit(txId);
  }
  
  async abort(txId, participants) {
    for (const service of participants) {
      await service.rollback(txId);
    }
  }
}

MongoDB 2PC Implementation

// Manual 2PC with MongoDB
class MongoTwoPhaseCommit {
  async transfer(fromAccount, toAccount, amount) {
    const txId = new ObjectId();
    
    try {
      // Phase 1: Prepare
      const prepared = await Promise.all([
        this.prepareDebit(fromAccount, amount, txId),
        this.prepareCredit(toAccount, amount, txId)
      ]);
      
      if (!prepared.every(p => p)) {
        throw new Error('Prepare failed');
      }
      
      // Phase 2: Commit
      await Promise.all([
        this.commitDebit(fromAccount, txId),
        this.commitCredit(toAccount, txId)
      ]);
      
      return txId;
    } catch (error) {
      await this.rollback(txId);
      throw error;
    }
  }
  
  async prepareDebit(accountId, amount, txId) {
    const result = await db.collection('accounts').updateOne(
      {
        _id: accountId,
        balance: { $gte: amount },
        'pendingTransactions.txId': { $ne: txId }
      },
      {
        $push: {
          pendingTransactions: {
            txId,
            amount: -amount,
            state: 'prepared'
          }
        }
      }
    );
    
    return result.modifiedCount === 1;
  }
  
  async commitDebit(accountId, txId) {
    const account = await db.collection('accounts').findOne({ _id: accountId });
    const tx = account.pendingTransactions.find(t => t.txId.equals(txId));
    
    await db.collection('accounts').updateOne(
      { _id: accountId },
      {
        $inc: { balance: tx.amount },
        $pull: { pendingTransactions: { txId } }
      }
    );
  }
  
  async rollback(txId) {
    await db.collection('accounts').updateMany(
      { 'pendingTransactions.txId': txId },
      { $pull: { pendingTransactions: { txId } } }
    );
  }
}

Saga Pattern

Choreography-Based Saga

// Each service publishes events
class OrderService {
  async createOrder(orderData) {
    const order = await db.orders.insertOne({
      ...orderData,
      status: 'pending'
    });
    
    // Publish event
    await eventBus.publish('OrderCreated', {
      orderId: order.insertedId,
      customerId: orderData.customerId,
      items: orderData.items
    });
    
    return order;
  }
  
  async handlePaymentFailed(event) {
    // Compensating transaction
    await db.orders.updateOne(
      { _id: event.orderId },
      { $set: { status: 'cancelled' } }
    );
  }
}

class PaymentService {
  async handleOrderCreated(event) {
    try {
      await this.processPayment(event.customerId, event.total);
      
      await eventBus.publish('PaymentSucceeded', {
        orderId: event.orderId
      });
    } catch (error) {
      await eventBus.publish('PaymentFailed', {
        orderId: event.orderId,
        reason: error.message
      });
    }
  }
}

class InventoryService {
  async handlePaymentSucceeded(event) {
    try {
      await this.reserveInventory(event.orderId);
      
      await eventBus.publish('InventoryReserved', {
        orderId: event.orderId
      });
    } catch (error) {
      await eventBus.publish('InventoryFailed', {
        orderId: event.orderId
      });
      
      // Trigger compensation
      await eventBus.publish('RefundRequired', {
        orderId: event.orderId
      });
    }
  }
}

Orchestration-Based Saga

// Central orchestrator
class OrderSagaOrchestrator {
  async executeOrderSaga(orderData) {
    const sagaId = generateId();
    const state = {
      sagaId,
      step: 0,
      compensations: []
    };
    
    try {
      // Step 1: Create order
      const order = await this.createOrder(orderData);
      state.compensations.push(() => this.cancelOrder(order.id));
      state.step = 1;
      
      // Step 2: Reserve inventory
      await this.reserveInventory(order.items);
      state.compensations.push(() => this.releaseInventory(order.items));
      state.step = 2;
      
      // Step 3: Process payment
      await this.processPayment(order.customerId, order.total);
      state.compensations.push(() => this.refundPayment(order.id));
      state.step = 3;
      
      // Step 4: Confirm order
      await this.confirmOrder(order.id);
      
      return order;
    } catch (error) {
      await this.compensate(state);
      throw error;
    }
  }
  
  async compensate(state) {
    console.log(`Compensating from step ${state.step}`);
    
    // Execute compensations in reverse order
    for (const compensation of state.compensations.reverse()) {
      try {
        await compensation();
      } catch (error) {
        console.error('Compensation failed:', error);
      }
    }
  }
}

Outbox Pattern

// Ensure message delivery with database transaction
class OutboxService {
  async createOrderWithOutbox(orderData) {
    const session = client.startSession();
    
    try {
      await session.withTransaction(async () => {
        // Insert order
        const order = await db.collection('orders').insertOne(
          orderData,
          { session }
        );
        
        // Insert outbox message
        await db.collection('outbox').insertOne({
          aggregateId: order.insertedId,
          eventType: 'OrderCreated',
          payload: orderData,
          createdAt: new Date(),
          processed: false
        }, { session });
      });
    } finally {
      await session.endSession();
    }
  }
  
  async processOutbox() {
    const messages = await db.collection('outbox').find({
      processed: false
    }).limit(100).toArray();
    
    for (const message of messages) {
      try {
        // Publish to event bus
        await eventBus.publish(message.eventType, message.payload);
        
        // Mark as processed
        await db.collection('outbox').updateOne(
          { _id: message._id },
          { $set: { processed: true, processedAt: new Date() } }
        );
      } catch (error) {
        console.error('Failed to process outbox message:', error);
      }
    }
  }
}

// Run periodically
setInterval(() => outboxService.processOutbox(), 5000);

Event Sourcing

// Store all changes as events
class EventStore {
  async appendEvent(aggregateId, event) {
    await db.collection('events').insertOne({
      aggregateId,
      eventType: event.type,
      data: event.data,
      timestamp: new Date(),
      version: event.version
    });
  }
  
  async getEvents(aggregateId) {
    return await db.collection('events')
      .find({ aggregateId })
      .sort({ version: 1 })
      .toArray();
  }
  
  async rebuildAggregate(aggregateId) {
    const events = await this.getEvents(aggregateId);
    
    let state = {};
    for (const event of events) {
      state = this.applyEvent(state, event);
    }
    
    return state;
  }
  
  applyEvent(state, event) {
    switch (event.eventType) {
      case 'OrderCreated':
        return { ...state, ...event.data, status: 'pending' };
      case 'PaymentProcessed':
        return { ...state, status: 'paid' };
      case 'OrderShipped':
        return { ...state, status: 'shipped' };
      default:
        return state;
    }
  }
}

Idempotency

// Ensure operations can be retried safely
class IdempotentService {
  async processPayment(paymentId, amount) {
    // Check if already processed
    const existing = await db.collection('payments').findOne({
      paymentId
    });
    
    if (existing) {
      return existing;  // Already processed
    }
    
    // Process payment
    const result = await paymentGateway.charge(amount);
    
    // Store result with idempotency key
    await db.collection('payments').insertOne({
      paymentId,
      amount,
      result,
      processedAt: new Date()
    });
    
    return result;
  }
}

Distributed Locks

// Redis-based distributed lock
class DistributedLock {
  async acquireLock(resource, ttl = 10000) {
    const lockKey = `lock:${resource}`;
    const lockValue = generateId();
    
    const acquired = await redis.set(
      lockKey,
      lockValue,
      'PX',
      ttl,
      'NX'
    );
    
    if (acquired) {
      return { lockValue, release: () => this.releaseLock(lockKey, lockValue) };
    }
    
    return null;
  }
  
  async releaseLock(lockKey, lockValue) {
    const script = `
      if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
      else
        return 0
      end
    `;
    
    await redis.eval(script, 1, lockKey, lockValue);
  }
}

// Usage
const lock = await distributedLock.acquireLock('account:123');
if (lock) {
  try {
    // Critical section
    await updateAccount('account:123');
  } finally {
    await lock.release();
  }
}

.NET Distributed Transactions

using MongoDB.Driver;

public class SagaOrchestrator
{
    public async Task<Order> ExecuteOrderSaga(OrderData orderData)
    {
        var compensations = new Stack<Func<Task>>();
        
        try
        {
            // Step 1: Create order
            var order = await CreateOrder(orderData);
            compensations.Push(() => CancelOrder(order.Id));
            
            // Step 2: Reserve inventory
            await ReserveInventory(order.Items);
            compensations.Push(() => ReleaseInventory(order.Items));
            
            // Step 3: Process payment
            await ProcessPayment(order.CustomerId, order.Total);
            compensations.Push(() => RefundPayment(order.Id));
            
            // Step 4: Confirm
            await ConfirmOrder(order.Id);
            
            return order;
        }
        catch
        {
            // Execute compensations
            while (compensations.Count > 0)
            {
                var compensation = compensations.Pop();
                await compensation();
            }
            throw;
        }
    }
}

Best Practices

const distributedTxBestPractices = [
  'Use saga pattern for long-running transactions',
  'Implement idempotency for all operations',
  'Use outbox pattern for reliable messaging',
  'Design compensating transactions',
  'Monitor saga execution',
  'Handle partial failures gracefully',
  'Use distributed locks sparingly',
  'Implement retry logic with backoff'
];

Interview Tips

  • Explain challenges: Distributed ACID is hard
  • Show 2PC: Two-phase commit protocol
  • Demonstrate saga: Choreography vs orchestration
  • Discuss patterns: Outbox, event sourcing
  • Mention idempotency: Safe retries
  • Show examples: MongoDB, distributed locks

Summary

Distributed transactions span multiple services/databases. Two-phase commit (2PC) provides ACID but has availability issues. Saga pattern uses compensating transactions for eventual consistency. Choreography-based sagas use events; orchestration uses central coordinator. Outbox pattern ensures reliable message delivery. Event sourcing stores all changes as events. Implement idempotency for safe retries. Use distributed locks carefully. Essential for microservices and distributed systems.

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your Nosql Knowledge

Ready to put your skills to the test? Take our interactive Nosql quiz and get instant feedback on your answers.