On this page
Change Streams
On this page
New in version 3.6.
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.
Important
Change stream is only available if "majority"
read concern support is enabled (default).
Open A Change Stream
You can only open a change stream against replica sets or sharded clusters. For a sharded cluster, you must issue the open change stream operation against the mongos
.
The replica set or the sharded cluster must use replica set protocol version 1 (pv1
) and WiredTiger storage engine (can be encrypted).
The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:
- The cursor is explicitly closed.
- An invalidate event occurs.
- If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
- Python
- Java (Sync)
- Node.js
- PHP
- Motor
- Other
- C
- C#
- Ruby
The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
cursor = db.inventory.watch()
document = next(cursor)
The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();
The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The following example uses stream to process the change events.
const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});
Alternatively, you can also use iterator to process the change events:
const changeStreamIterator = collection.watch();
const next = await changeStreamIterator.next();
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
cursor = db.inventory.watch()
document = await cursor.next()
The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
mongoc_collection_t *collection;
bson_t pipeline = BSON_INITIALIZER;
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
bson_iter_t iter;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, &pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
var enumerator = inventory.Watch().ToEnumerable().GetEnumerator();
enumerator.MoveNext();
var next = enumerator.Current;
enumerator.Dispose();
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
cursor = inventory.watch.to_enum
next_change = cursor.next
To retrieve the data change event notifications, iterate the change stream cursor
.
Note
The lifecycle of an unclosed cursor is language-dependent.
See Change Events for more information on the change stream response document format.
Modify Change Stream Output
- Python
- Java (Sync)
- Node.js
- PHP
- Motor
- Other
- C
- C#
- Ruby
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
The pipeline
list includes a single $match
stage that filters any operations where the username
is alice
, or operations where the operationType
is delete
.
Passing the pipeline
to the watch()
method directs the change stream to return notifications after passing them through the specified pipeline
.
You can control change stream output by providi