Change Streams
React to real-time data changes with change streams for event-driven architectures.
What Change Streams Provide
Change streams expose a tailable cursor over the oplog, emitting events for inserts, updates, replaces, deletes, and invalidate. Applications subscribe to data changes without polling.
Use change streams for cache invalidation, search index updates, cross-service notifications, and audit pipelines. They require a replica set or sharded cluster.
const changeStream = collection.watch(
[{ $match: { operationType: { $in: ["insert", "update"] } } }],
{ fullDocument: "updateLookup" }
);
changeStream.on("change", (event) => {
console.log(event.operationType, event.fullDocument);
});Resume Tokens and Reliability
Each change event includes a resume token. Persist the token after processing so consumers can resume after restarts without missing events or reprocessing from the beginning.
If the resume point falls off the oplog, the stream errors with an invalidate event—handle by snapshotting current state and opening a new stream.
- Store resume tokens in durable storage (Redis, database, Kafka offset)
- Process events idempotently because redelivery can occur after failures
- Use fullDocument: "updateLookup" to get post-update document content
Filtering with Aggregation Pipeline
Pass a pipeline to watch() to filter events server-side—match on operation type, namespace, or updated fields. This reduces network traffic and consumer load compared to filtering client-side.
Combine $match on clusterTime or document fields for tenant-specific consumers in multi-tenant applications.
db.collection("orders").watch([
{ $match: { "fullDocument.status": "shipped", operationType: "update" } }
])Deployment Patterns
Run change stream consumers as dedicated worker processes with health checks and horizontal scaling cautiously—multiple consumers on the same stream require partitioning logic or duplicate handling.
For high-volume changefeeds, bridge change streams to Kafka or message queues for downstream fan-out. Atlas Triggers offer managed change event handlers without self-hosted workers.
- Avoid long blocking work in the change handler; enqueue for async processing
- Monitor consumer lag by comparing clusterTime to processing timestamp
- Use startAtOperationTime for bounded catch-up after planned downtime
Limitations and Alternatives
Change streams do not capture reads—only writes. They depend on oplog retention; heavy write volume with slow consumers risks losing resume points.
Alternatives include application-level domain events emitted at write time (more control, requires discipline) and Debezium CDC for heterogeneous pipelines. Choose based on coupling tolerance and operational complexity.
// Domain event pattern at write time
await collection.insertOne(order);
await eventBus.publish("order.created", { id: order._id });