How do Change Streams Work In MongoDB?

Change Streams in MongoDB allow applications to access real-time data changes without the complexity and overhead of polling the database. They provide a powerful and efficient way to listen to changes in collections, databases, or entire clusters.

These are the following sub topics that we are going to discuss:

Table of Content

  • How Change Streams Work
  • Key Features of Change Streams
  • Enabling Change Streams
  • Resume Change Streams
  • Practical Use Cases

How Change Streams Work

Change Streams leverage MongoDB’s oplog (operations log) to track changes. The oplog is a special capped collection that records all write operations. Change Streams listen to this oplog and provide a continuous, cursor-based stream of changes.

Key Features of Change Streams

  • Real-Time Notifications: Get real-time notifications on data changes (insert, update, delete, and replace operations).
  • Resume Tokens: Resume from a specific point in time using resume tokens, ensuring no data loss even if the stream is interrupted.
  • Filtering: Apply filters to only receive notifications for specific changes.

Enabling Change Streams

Change Streams are available in MongoDB 3.6 and later, and require a replica set or a sharded cluster. Here’s how to enable and use them:

Connecting to a Replica Set

Ensure you connect to a replica set. For a standalone server, you need to convert it to a single-node replica set.

rs.initiate()

Using Change Streams

You can watch for changes at different levels:

  • Collection Level
  • Database Level
  • Cluster Level

Collection-Level Change Stream

Here’s an example of using Change Streams at the collection level:

JavaScript
const { MongoClient } = require('mongodb');

async function run() {
  const client = new MongoClient(
      'mongodb://localhost:27017', { 
        useNewUrlParser: true, 
        useUnifiedTopology: true 
     });
  await client.connect();
  const db = client.db('testdb');
  const collection = db.collection('testcollection');

  // Open a Change Stream on the collection
  const changeStream = collection.watch();

  // Process each change document
  changeStream.on('change', (change) => {
    console.log(change);
  });
}

run().catch(console.error);

Change Document Structure

The change document has the following structure:

{
"_id": { "resumeToken": "..." },
"operationType": "insert",
"fullDocument": { "field1": "value1", ... },
"ns": { "db": "testdb", "coll": "testcollection" },
"documentKey": { "_id": "..." },
"updateDescription": { "updatedFields": { ... }, "removedFields": [ ... ] }
}

Parameters:

  • _id: The resume token for resuming the stream.
  • operationType: Type of operation (insert, update, delete, replace).
  • fullDocument: The entire document that was changed (for insert and replace).
  • ns: The namespace (database and collection).
  • documentKey: The key of the document that was changed.
  • updateDescription: Description of fields updated (for update operations).

Filtering Change Streams

You can filter Change Streams to only listen to specific changes:

const changeStream = collection.watch([
{ $match: { 'fullDocument.status': 'active' } }
]);

changeStream.on('change', (change) => {
console.log(change);
});

Resume Change Streams

To resume a Change Stream from a specific point, use the resume token:

JavaScript
let resumeToken;

changeStream.on('change', (change) => {
  console.log(change);
  resumeToken = change._id;
});

// To resume from the last seen point
const resumedChangeStream = 
    collection.watch([], { resumeAfter: resumeToken });

resumedChangeStream.on('change', (change) => {
  console.log(change);
});

Practical Use Cases

  • Real-Time Analytics: Monitor real-time data changes for analytics dashboards.
  • Event-Driven Architectures: Trigger actions in response to database changes, such as updating caches, sending notifications, or executing business logic.
  • Audit Logging: Track and log changes for auditing purposes.

Conclusion

Change Streams provide a robust and efficient way to monitor real-time changes in MongoDB. By leveraging the oplog, they offer low-latency, scalable, and resumable data streams that are ideal for a wide range of applications, from real-time analytics to event-driven systems. Understanding and utilizing Change Streams can significantly enhance the capabilities of your MongoDB-based applications.