Change Streams

What are Change Streams?

Change Streams allow applications to access real-time data changes in MongoDB. They provide a way to listen to database changes and react accordingly.

MongoDB Change Streams

Basic Change Stream

const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
await client.connect();

const db = client.db('myapp');
const collection = db.collection('users');

// Watch for changes
const changeStream = collection.watch();

changeStream.on('change', (change) => {
  console.log('Change detected:', change);
  
  switch (change.operationType) {
    case 'insert':
      console.log('New document:', change.fullDocument);
      break;
    case 'update':
      console.log('Updated fields:', change.updateDescription);
      break;
    case 'delete':
      console.log('Deleted document ID:', change.documentKey);
      break;
    case 'replace':
      console.log('Replaced document:', change.fullDocument);
      break;
  }
});

// Insert triggers change event
await collection.insertOne({ name: 'John Doe', email: 'john@example.com' });

Filtered Change Streams

// Watch specific operations
const pipeline = [
  { $match: { operationType: 'insert' } }
];

const insertStream = collection.watch(pipeline);

insertStream.on('change', (change) => {
  console.log('New document inserted:', change.fullDocument);
});

// Watch specific fields
const fieldPipeline = [
  {
    $match: {
      'updateDescription.updatedFields.status': { $exists: true }
    }
  }
];

const statusStream = collection.watch(fieldPipeline);

// Watch specific documents
const userPipeline = [
  {
    $match: {
      'fullDocument.userId': 'user-123'
    }
  }
];

const userStream = collection.watch(userPipeline, {
  fullDocument: 'updateLookup'
});

Resume Token

// Save resume token for crash recovery
let resumeToken;

const changeStream = collection.watch();

changeStream.on('change', (change) => {
  resumeToken = change._id;
  console.log('Processing change:', change);
});

// Resume from last position after restart
const resumedStream = collection.watch([], {
  resumeAfter: resumeToken
});

// Start after specific timestamp
const startTime = new Date('2024-01-01');
const timeStream = collection.watch([], {
  startAtOperationTime: startTime
});

Real-Time Notifications

// Notify users of new messages
class NotificationService {
  constructor(db) {
    this.messagesCollection = db.collection('messages');
    this.setupChangeStream();
  }
  
  setupChangeStream() {
    const pipeline = [
      {
        $match: {
          operationType: 'insert',
          'fullDocument.read': false
        }
      }
    ];
    
    const changeStream = this.messagesCollection.watch(pipeline);
    
    changeStream.on('change', async (change) => {
      const message = change.fullDocument;
      await this.sendNotification(message.userId, {
        title: 'New Message',
        body: message.content,
        messageId: message._id
      });
    });
  }
  
  async sendNotification(userId, notification) {
    // Send push notification, email, etc.
    console.log(`Notification to ${userId}:`, notification);
  }
}

Cache Invalidation

// Invalidate cache on data changes
const redis = require('redis');
const redisClient = redis.createClient();
await redisClient.connect();

const changeStream = collection.watch();

changeStream.on('change', async (change) => {
  if (change.operationType === 'update' || change.operationType === 'delete') {
    const userId = change.documentKey._id;
    await redisClient.del(`user:${userId}`);
    console.log(`Cache invalidated for user: ${userId}`);
  }
});

Data Synchronization

// Sync data to another database
class DataSyncService {
  constructor(sourceDb, targetDb) {
    this.sourceCollection = sourceDb.collection('users');
    this.targetCollection = targetDb.collection('users_replica');
    this.setupSync();
  }
  
  setupSync() {
    const changeStream = this.sourceCollection.watch();
    
    changeStream.on('change', async (change) => {
      switch (change.operationType) {
        case 'insert':
          await this.targetCollection.insertOne(change.fullDocument);
          break;
        case 'update':
          await this.targetCollection.updateOne(
            { _id: change.documentKey._id },
            { $set: change.updateDescription.updatedFields }
          );
          break;
        case 'delete':
          await this.targetCollection.deleteOne({ _id: change.documentKey._id });
          break;
        case 'replace':
          await this.targetCollection.replaceOne(
            { _id: change.documentKey._id },
            change.fullDocument
          );
          break;
      }
    });
  }
}

DynamoDB Streams

Enable Streams

const { DynamoDBClient, UpdateTableCommand } = require('@aws-sdk/client-dynamodb');

const client = new DynamoDBClient({ region: 'us-east-1' });

// Enable stream
const command = new UpdateTableCommand({
  TableName: 'Users',
  StreamSpecification: {
    StreamEnabled: true,
    StreamViewType: 'NEW_AND_OLD_IMAGES'  // or NEW_IMAGE, OLD_IMAGE, KEYS_ONLY
  }
});

await client.send(command);

Process Stream Records

const { DynamoDBStreamsClient, GetRecordsCommand } = require('@aws-sdk/client-dynamodb-streams');

const streamsClient = new DynamoDBStreamsClient({ region: 'us-east-1' });

async function processStreamRecords(shardIterator) {
  const command = new GetRecordsCommand({
    ShardIterator: shardIterator
  });
  
  const response = await streamsClient.send(command);
  
  for (const record of response.Records) {
    console.log('Event:', record.eventName);
    
    switch (record.eventName) {
      case 'INSERT':
        console.log('New item:', record.dynamodb.NewImage);
        break;
      case 'MODIFY':
        console.log('Old:', record.dynamodb.OldImage);
        console.log('New:', record.dynamodb.NewImage);
        break;
      case 'REMOVE':
        console.log('Deleted:', record.dynamodb.OldImage);
        break;
    }
  }
  
  // Continue processing
  if (response.NextShardIterator) {
    await processStreamRecords(response.NextShardIterator);
  }
}

Lambda Trigger

// AWS Lambda function triggered by DynamoDB Stream
exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventName === 'INSERT') {
      const newUser = record.dynamodb.NewImage;
      
      // Send welcome email
      await sendWelcomeEmail({
        email: newUser.email.S,
        name: newUser.name.S
      });
    }
    
    if (record.eventName === 'MODIFY') {
      const oldImage = record.dynamodb.OldImage;
      const newImage = record.dynamodb.NewImage;
      
      // Check if email changed
      if (oldImage.email.S !== newImage.email.S) {
        await sendEmailChangeNotification(newImage.email.S);
      }
    }
  }
};

Cassandra Change Data Capture

CDC Configuration

-- Enable CDC on table
ALTER TABLE users WITH cdc = true;

-- Query CDC log
SELECT * FROM system_distributed.cdc_log 
WHERE table_name = 'users';

Process CDC Events

const cassandra = require('cassandra-driver');

const client = new cassandra.Client({
  contactPoints: ['localhost'],
  localDataCenter: 'datacenter1'
});

// Poll CDC log
async function processCDC() {
  const query = `
    SELECT * FROM system_distributed.cdc_log 
    WHERE table_name = 'users' 
    AND cdc_time > ?
  `;
  
  const result = await client.execute(query, [lastProcessedTime], { prepare: true });
  
  for (const row of result.rows) {
    console.log('CDC Event:', row);
    // Process change
  }
}

setInterval(processCDC, 5000);

Redis Keyspace Notifications

Enable Notifications

# redis.conf
notify-keyspace-events Ex
# E: Keyevent events
# x: Expired events

Subscribe to Events

const redis = require('redis');
const subscriber = redis.createClient();

await subscriber.connect();

// Subscribe to expired keys
await subscriber.subscribe('__keyevent@0__:expired', (message) => {
  console.log('Key expired:', message);
  
  // Handle expiration
  if (message.startsWith('session:')) {
    const sessionId = message.split(':')[1];
    console.log(`Session ${sessionId} expired`);
  }
});

// Subscribe to set operations
await subscriber.subscribe('__keyevent@0__:set', (message) => {
  console.log('Key set:', message);
});

.NET Change Streams

using MongoDB.Driver;
using MongoDB.Bson;

public class ChangeStreamService
{
    private readonly IMongoCollection<User> _users;
    
    public void WatchChanges()
    {
        var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<User>>()
            .Match(change => change.OperationType == ChangeStreamOperationType.Insert);
        
        var options = new ChangeStreamOptions
        {
            FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
        };
        
        using (var cursor = _users.Watch(pipeline, options))
        {
            foreach (var change in cursor.ToEnumerable())
            {
                Console.WriteLine($"New user: {change.FullDocument.Name}");
                
                // Process change
                ProcessNewUser(change.FullDocument);
            }
        }
    }
    
    public async Task WatchChangesAsync()
    {
        var changeStream = await _users.WatchAsync();
        
        await changeStream.ForEachAsync(change =>
        {
            switch (change.OperationType)
            {
                case ChangeStreamOperationType.Insert:
                    Console.WriteLine($"Inserted: {change.FullDocument.Name}");
                    break;
                case ChangeStreamOperationType.Update:
                    Console.WriteLine($"Updated: {change.DocumentKey}");
                    break;
                case ChangeStreamOperationType.Delete:
                    Console.WriteLine($"Deleted: {change.DocumentKey}");
                    break;
            }
        });
    }
}

Use Cases

const changeStreamUseCases = {
  realTimeNotifications: 'Push notifications to users',
  cacheInvalidation: 'Invalidate cache on data changes',
  dataSync: 'Sync data across databases',
  audit: 'Track all data changes',
  analytics: 'Real-time analytics pipeline',
  search: 'Update search indexes',
  eventSourcing: 'Capture all events',
  replication: 'Custom replication logic'
};

Best Practices

const bestPractices = [
  'Use resume tokens for fault tolerance',
  'Filter changes with aggregation pipeline',
  'Handle errors and reconnect',
  'Monitor change stream lag',
  'Use appropriate fullDocument option',
  'Limit change stream scope',
  'Process changes idempotently',
  'Scale consumers horizontally'
];

Interview Tips

  • Explain change streams: Real-time data change notifications
  • Show MongoDB: Watch collections, resume tokens
  • Demonstrate DynamoDB: Streams, Lambda triggers
  • Discuss use cases: Notifications, cache invalidation, sync
  • Mention fault tolerance: Resume tokens, error handling
  • Show examples: Node.js, .NET implementations

Summary

Change streams provide real-time notifications of database changes. MongoDB offers change streams with filtering, resume tokens, and full document lookup. DynamoDB Streams capture item-level changes with Lambda integration. Cassandra provides CDC for change tracking. Redis supports keyspace notifications. Use cases include real-time notifications, cache invalidation, data synchronization, and audit logging. Implement fault tolerance with resume tokens. Essential for reactive, event-driven applications.

Test Your Knowledge

Take a quick quiz to test your understanding of this topic.

Test Your Nosql Knowledge

Ready to put your skills to the test? Take our interactive Nosql quiz and get instant feedback on your answers.