db.collection.watch()

在本页面

Definition

  • db.collection. watch(* pipeline options *)
ParameterTypeDescription
pipelinearray以下一个或多个聚合阶段的序列:


$match
$project
$addFields
$replaceRoot
$redact
有关聚合框架的完整文档,请参见Aggregation
| options |文档|可选。修改watch()行为的其他选项。
如果您未指定管道但正在传递options文档,则必须将空数组[]传递给pipeline参数。

options文档可以包含以下字段和值:

FieldTypeDescription
resumeAfterdocument可选的。指示watch尝试在恢复令牌中指定的操作之后开始恢复通知。

每个变更流事件文档都包含一个恢复令牌作为_id字段。
传递更改事件文档的* entire * _id字段,该字段代表您要在其后恢复的操作。
fullDocumentstring可选。默认情况下,watch()返回由更新操作修改的那些字段的增量,而不是整个更新的文档。
fullDocument设置为"updateLookup"以指示watch()查找更新文档的最新多数批准版本。 watch()除了updateDescription增量外,还返回fullDocument字段以及文档查找。
batchSizeint可选。指定从 MongoDB 集群的每批响应中返回的更改事件的最大数量。
具有与cursor.batchSize()相同的功能。
maxAwaitTimeMSint可选。服务器 await 新数据更改以报告更改流游标之前 await 的最大时间(以毫秒为单位),然后返回空批处理。
默认为1000毫秒。
collation文档可选。传递collation document为更改流游标指定collation
Returns:只要与 MongoDB 部署的连接保持打开状态并且集合存在,cursor就会保持打开状态。有关变更事件文档的示例,请参见Change Events

Behavior

  • db.collection.watch()仅通知已保留给大多数数据承载成员的数据更改。

  • 更改流光标保持打开状态,直到发生以下情况之一:

  • 游标已显式关闭。

    • 出现invalidate event;例如,删除或重命名集合。

    • 与 MongoDB 部署的连接已关闭。

    • 如果部署是分片群集,则分片删除可能会导致打开的更改流游标关闭,并且关闭的更改流游标可能无法完全恢复。

  • 您只能将db.collection.watch()有线老虎存储引擎一起使用。

  • db.collection.watch()可用于副本集和分片群集:

  • 对于副本集,您可以在任何数据承载成员上发出db.watch()

    • 对于分片群集,必须在mongos实例上发出db.watch()

Availability

仅当启用了"majority"阅读关注支持(默认)时,更改流才可用。

Resumability

与 MongoDB drivers不同,mongo shell 在发生错误后不会自动尝试恢复更改流游标。 MongoDB 驱动程序会在某些错误之后尝试一次自动恢复更改流游标。

db.collection.watch()使用操作日志中存储的信息来生成更改事件描述并生成与该操作相关的恢复令牌。如果由传递给resumeAfter选项的恢复令牌标识的操作已经从oplog删除,则db.collection.watch()无法恢复更改流。

有关恢复更改流的更多信息,请参见恢复变更流

Note

  • invalidate event(例如,收藏夹放置或重命名)关闭流后,您将无法恢复更改流。

  • 如果部署是分片群集,则分片删除可能会导致打开的更改流游标关闭,并且关闭的更改流游标可能无法完全恢复。

更新操作的完整文档查找

默认情况下,更改流光标返回用于更新操作的特定字段更改/增量。您还可以配置更改流以查找并返回更改文档的当前多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档有很大不同。

根据更新操作期间应用的更改数量和整个文档的大小,存在更新操作的更改事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭更改流游标并返回错误。

Access Control

对于执行authenticationauthorization的部署,请对要为其打开更改流的集合使用changeStreamfind特权操作对用户进行身份验证。

read built-in role包含必需的特权,以支持针对集合打开更改流。任何内置角色或继承read角色的user-defined role也可以支持针对集合打开更改流。

或者,使用db.createRole创建一个用户定义的角色,该角色将授予target collectionchangeStreamfind特权操作。有关更完整的文档,请参见User-Defined Roles

要将内置角色或用户定义的角色与现有用户相关联,请使用db.grantRolesToUser()db.updateUser()方法。您还可以在使用db.createUser()创建新用户时指定角色。

Examples

打开变更流

以下操作针对data.sensors集合打开更改流游标:

watchCursor = db.getSiblingDB("data").sensors.watch()

迭代光标以检查是否有新事件。使用cursor.isExhausted()方法来确保循环仅在更改流游标关闭时才退出,并且*最近一批中没有剩余对象:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      watchCursor.next();
   }
}

有关变更流输出的完整文档,请参见Change Events

使用完整文档更新查找更改流

fullDocument选项设置为"updateLookup",以指示更改流游标查找与更新更改流事件相关联的文档的最新多数提交版本。

以下操作使用fullDocument : "updateLookup"选项针对data.sensors集合打开更改流游标。

watchCursor = db.getSiblingDB("data").sensors.watch(
   [],
   { fullDocument : "updateLookup" }
)

迭代光标以检查是否有新事件。使用cursor.isExhausted()方法来确保循环仅在更改流游标关闭时才退出,并且*最近一批中没有剩余对象:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      watchCursor.next();
   }
}

对于任何更新操作,change 事件都会在fullDocument字段中返回文档查找的结果。

有关完整文档更新输出的示例,请参见更改流更新事件

有关变更流输出的完整文档,请参见Change Events

使用汇总管道过滤器更改流

以下操作使用聚合管道仅过滤insert事件来针对data.sensors集合打开更改流游标:

watchCursor = db.getSiblingDB("data").sensors.watch(
   [
      { $match : {"operationType" : "insert" } }
   ]
)

迭代光标以检查是否有新事件。使用cursor.isExhausted()方法来确保循环仅在更改流游标关闭时才退出,并且*最近一批中没有剩余对象:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      watchCursor.next();
   }
}

更改流游标仅返回operationTypeinsert的更改事件。有关变更流输出的完整文档,请参见Change Events

恢复更改流

变更流游标返回的每个文档都包含一个恢复令牌作为_id字段。要恢复更改流,请将您要恢复的更改事件的整个_id文档传递到watch()resumeAfter选项。

以下操作使用恢复令牌针对data.sensors集合恢复更改流游标。假设生成恢复令牌的操作尚未脱离集群的操作日志。

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;

while (!watchCursor.isExhausted()) {
   if (watchCursor.hasNext()) {
     firstChange = watchCursor.next();
     break;
   }
}

watchCursor.close();

let resumeToken = firstChange._id;

resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
   { resumeAfter : resumeToken }
)

迭代光标以检查是否有新事件。使用cursor.isExhausted()方法来确保循环仅在更改流游标关闭时才退出,并且*最近一批中没有剩余对象:

while (!resumedWatchCursor.isExhausted()){
   if (resumedWatchCursor.hasNext()){
      resumedWatchCursor.next();
   }
}

有关恢复更改流的完整文档,请参见恢复变更流