On this page
db.collection.watch()
On this page
Definition
db.collection.
watch
( pipeline, options )-
Opens a change stream cursor on the collection.
Parameter Type Description pipeline
array A sequence of one or more of the following aggregation stages:
See Aggregation for complete documentation on the aggregation framework.
options
document Optional. Additional options that modify the behavior of
watch()
.You must pass an empty array
[]
to thepipeline
parameter if you are not specifying a pipeline but are passing theoptions
document.The
options
document can contain the following fields and values:Field Type Description resumeAfter
document Optional. Directs
watch
to attempt resuming notifications starting after the operation specified in the resume token.Each change stream event document includes a resume token as the
_id
field. Pass the entire_id
field of the change event document that represents the operation you want to resume after.fullDocument
string Optional. By default,
watch()
returns the delta of those fields modified by an update operation, instead of the entire updated document.Set
fullDocument
to"updateLookup"
to directwatch()
to look up the most current majority-committed version of the updated document.watch()
returns afullDocument
field with the document lookup in addition to theupdateDescription
delta.batchSize
int Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.
Has the same functionality as
cursor.batchSize()
.maxAwaitTimeMS
int Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.
Defaults to
1000
milliseconds.collation
document Optional. Pass a collation document to specify a collation for the change stream cursor. Returns: A cursor that remains open as long as a connection to the MongoDB deployment remains open and the collection exists. See Change Events for examples of change event documents.
Behavior
db.collection.watch()
only notifies on data changes that have persisted to a majority of data-bearing members.- The change stream cursor remains open until one of the following occurs:
- The cursor is explicitly closed.
- An invalidate event occurs; for example, a collection drop or rename.
- The connection to the MongoDB deployment is closed.
- If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
- You can only use
db.collection.watch()
with the Wired Tiger storage engine. db.collection.watch()
is available for replica sets and sharded clusters:- For a replica set, you can issue
db.watch()
on any data-bearing member. - For a sharded cluster, you must issue
db.watch()
on amongos
instance.
- For a replica set, you can issue
Availability
Change stream is only available if "majority"
read concern support is enabled (default).
Resumability
Unlike the MongoDB drivers , the mongo
shell does not automatically attempt to resume a change stream cursor after an error. The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.
db.collection.watch()
uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. If the operation identified by the resume token passed to the resumeAfter
option has already dropped off the oplog, db.collection.watch()
cannot resume the change stream.
See Resume a Change Stream for more information on resuming a change stream.
Note
- You cannot resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream.
- If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
Full Document Lookup of Update Operations
By default, the change stream cursor returns specific field changes/deltas for update operations. You can also configure the change stream to look up and return the current majority-committed version of the changed document. Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.
Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.
Access Control
For deployments enforcing authentication and authorization, authenticate as a user with the changeStream
and find
privilege actions on the collection against which you want to open a change stream.
The read
built-in role includes the required privileges to support opening a change stream against a collection. Any built-in role or user-defined role that inherits the read
role can also support opening change streams against a collection.
Alternatively, use db.createRole
to create a user-defined role that grants the changeStream
and find
privilege actions on the target collection. See User-Defined Roles for more complete documentation.
To associate a built-in role or user-defined role to an existing user, use the db.grantRolesToUser()
or db.updateUser()
methods. You can also specify the role when creating a new user using db.createUser()
.
Examples
Open a Change Stream
The following operation opens a change stream cursor against the data.sensors
collection:
watchCursor = db.getSiblingDB("data").sensors.watch()
Iterate the cursor to check for new events. Use the cursor.isExhausted()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
watchCursor.next();
}
}
For complete documentation on change stream output, see Change Events.
Change Stream with Full Document Update Lookup
Set the fullDocument
option to "updateLookup"
to direct the change stream cursor to lookup the most current majority-committed version of the document associated to an update change stream event.
The following operation opens a change stream cursor against the data.sensors
collection using the fullDocument : "updateLookup"
option.
watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)
Iterate the cursor to check for new events. Use the cursor.isExhausted()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
watchCursor.next();
}
}
For any update operation, the change event returns the result of the document lookup in the fullDocument
field.
For an example of the full document update output, see change stream update event.
For complete documentation on change stream output, see Change Events.
Change Stream with Aggregation Pipeline Filter
The following operation opens a change stream cursor against the data.sensors
collection using an aggregation pipeline to filter only insert
events:
watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)
Iterate the cursor to check for new events. Use the cursor.isExhausted()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
watchCursor.next();
}
}
The change stream cursor only returns change events where the operationType
is insert
. For complete documentation on change stream output, see Change Events.
Resuming a Change Stream
Every document returned by a change stream cursor includes a resume token as the _id
field. To resume a change stream, pass the entire _id
document of the change event you want to resume from to the resumeAfter
option of watch()
.
The following operation resumes a change stream cursor against the data.sensors
collection using a resume token. This assumes that the operation that generated the resume token has not rolled off the cluster’s oplog.
let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
while (!watchCursor.isExhausted