How do Change Streams Work In MongoDB?
Change Streams in MongoDB allow applications to access real-time data changes without the complexity and overhead of polling the database. They provide a powerful and efficient way to listen to changes in collections, databases, or entire clusters.
These are the following sub topics that we are going to discuss:
Table of Content
- How Change Streams Work
- Key Features of Change Streams
- Enabling Change Streams
- Resume Change Streams
- Practical Use Cases
How Change Streams Work
Change Streams leverage MongoDB’s oplog (operations log) to track changes. The oplog is a special capped collection that records all write operations. Change Streams listen to this oplog and provide a continuous, cursor-based stream of changes.
Key Features of Change Streams
- Real-Time Notifications: Get real-time notifications on data changes (insert, update, delete, and replace operations).
- Resume Tokens: Resume from a specific point in time using resume tokens, ensuring no data loss even if the stream is interrupted.
- Filtering: Apply filters to only receive notifications for specific changes.
Enabling Change Streams
Change Streams are available in MongoDB 3.6 and later, and require a replica set or a sharded cluster. Here’s how to enable and use them:
Connecting to a Replica Set
Ensure you connect to a replica set. For a standalone server, you need to convert it to a single-node replica set.
rs.initiate()
Using Change Streams
You can watch for changes at different levels:
- Collection Level
- Database Level
- Cluster Level
Collection-Level Change Stream
Here’s an example of using Change Streams at the collection level:
const { MongoClient } = require('mongodb');
async function run() {
const client = new MongoClient(
'mongodb://localhost:27017', {
useNewUrlParser: true,
useUnifiedTopology: true
});
await client.connect();
const db = client.db('testdb');
const collection = db.collection('testcollection');
// Open a Change Stream on the collection
const changeStream = collection.watch();
// Process each change document
changeStream.on('change', (change) => {
console.log(change);
});
}
run().catch(console.error);
Change Document Structure
The change document has the following structure:
{
"_id": { "resumeToken": "..." },
"operationType": "insert",
"fullDocument": { "field1": "value1", ... },
"ns": { "db": "testdb", "coll": "testcollection" },
"documentKey": { "_id": "..." },
"updateDescription": { "updatedFields": { ... }, "removedFields": [ ... ] }
}
Parameters:
- _id: The resume token for resuming the stream.
- operationType: Type of operation (insert, update, delete, replace).
- fullDocument: The entire document that was changed (for insert and replace).
- ns: The namespace (database and collection).
- documentKey: The key of the document that was changed.
- updateDescription: Description of fields updated (for update operations).
Filtering Change Streams
You can filter Change Streams to only listen to specific changes:
const changeStream = collection.watch([
{ $match: { 'fullDocument.status': 'active' } }
]);
changeStream.on('change', (change) => {
console.log(change);
});
Resume Change Streams
To resume a Change Stream from a specific point, use the resume token:
let resumeToken;
changeStream.on('change', (change) => {
console.log(change);
resumeToken = change._id;
});
// To resume from the last seen point
const resumedChangeStream =
collection.watch([], { resumeAfter: resumeToken });
resumedChangeStream.on('change', (change) => {
console.log(change);
});
Practical Use Cases
- Real-Time Analytics: Monitor real-time data changes for analytics dashboards.
- Event-Driven Architectures: Trigger actions in response to database changes, such as updating caches, sending notifications, or executing business logic.
- Audit Logging: Track and log changes for auditing purposes.
Conclusion
Change Streams provide a robust and efficient way to monitor real-time changes in MongoDB. By leveraging the oplog, they offer low-latency, scalable, and resumable data streams that are ideal for a wide range of applications, from real-time analytics to event-driven systems. Understanding and utilizing Change Streams can significantly enhance the capabilities of your MongoDB-based applications.