Change Streams

在本页面

3.6 版的新功能。

更改流允许应用程序访问实时数据更改,而不会带来复杂性和拖尾oplog的风险。应用程序可以使用变更流来订阅集合中的所有数据变更,并立即对其做出反应。

Important

仅当启用了"majority"阅读关注支持(默认)时,更改流才可用。

打开变更流

您只能针对replica setssharded clusters打开更改流。对于分片群集,必须针对mongos发出开放更改流操作。

副本集或分片群集必须使用副本集协议版本 1(pv1)和WiredTiger存储引擎(可以是encrypted)。

以下示例打开一个集合的变更流,并在光标上进行迭代以检索变更流文档。与 MongoDB 部署的连接保持打开状态时,游标保持打开状态,直到发生以下情况之一:

  • 游标已显式关闭。

  • 出现invalidate event

  • 如果部署是分片群集,则分片删除可能会导致打开的更改流游标关闭,并且关闭的更改流游标可能无法完全恢复。

Python
Java (Sync)
Node.js
PHP
Motor
C
C#
Ruby

The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cursor = db.inventory.watch()
document = next(cursor)

The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The following example uses stream to process the change events.

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
  // process next document
});

Alternatively, you can also use iterator to process the change events:

const changeStreamIterator = collection.watch();
const next = await changeStreamIterator.next();

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

$changeStream = $db->inventory->watch();
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$secondChange = $changeStream->current();

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cursor = db.inventory.watch()
document = await cursor.next()

The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

mongoc_collection_t *collection;
bson_t pipeline = BSON_INITIALIZER;
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
bson_iter_t iter;
bson_error_t error;

collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, &pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
   MONGOC_ERROR ("%s\n", error.message);
}

mongoc_change_stream_destroy (stream);

The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

var enumerator = inventory.Watch().ToEnumerable().GetEnumerator();
enumerator.MoveNext();
var next = enumerator.Current;
enumerator.Dispose();

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cursor = inventory.watch.to_enum
next_change = cursor.next

要检索数据更改事件通知,请迭代更改流cursor

Note

未封闭游标的生命周期取决于语言。

有关变更流响应文档格式的更多信息,请参见Change Events

修改更改流输出

Python
Java (Sync)
Node.js
PHP
Motor
C
C#
Ruby

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
    Document.parse("{'fullDocument.username': 'alice'}"),
    Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters any operations where the username is alice , or operations where the operationType is delete .

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline .

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

The following example uses stream to process the change events.

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];

const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
  // process next document
});

Alternatively, you can also use iterator to process the change events:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

有关变更流响应文档格式的更多信息,请参见Change Events

查找完整文档以进行更新操作

默认情况下,更改流仅在更新操作期间返回字段增量。但是,您可以配置更改流以返回更新文档的最新的多数提交版本。

Python
Java (Sync)
Node.js
PHP
Motor
C
C#
Ruby

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.

cursor = db.inventory.watch(full_document='updateLookup')
document = next(cursor)

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' } to the collection.watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

The following example uses stream to process the change events.

const collection = db.collection('inventory');
const changeStream = collection.watch({ fullDocument: 'updateLookup' });
changeStream.on('change', next => {
  // process next document
});

Alternatively, you can also use iterator to process the change events:

const changeStreamIterator = collection.watch({ fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

To return the most current majority-committed version of the updated document, pass "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" to the watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$nextChange = $changeStream->current();

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

In the example below, all update operations notifications include a ```full_document`` field that represents the current version of the document affected by the update operation.

cursor = db.inventory.watch(full_document='updateLookup')
document = await cursor.next()

To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, &pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
   MONGOC_ERROR ("%s\n", error.message);
}

mongoc_change_stream_destroy (stream);

To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to the collection.Watch() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var enumerator = inventory.Watch(options).ToEnumerable().GetEnumerator();
enumerator.MoveNext();
var next = enumerator.Current;
enumerator.Dispose();

To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup' to the watch() method.

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

Note

如果在更新操作之后但在查找之前有一个或多个大多数操作修改了更新后的文档,则返回的完整文档可能与更新操作时的文档有很大差异。

但是,更改流文档中包含的增量始终正确描述了应用于该更改流事件的监视集合更改。

有关变更流响应文档格式的更多信息,请参见Change Events

恢复更改流

通过在打开游标时指定resumeAfter令牌,可以恢复更改流。对于resumeAfter令牌,请使用更改流事件文档_id值。将_id值传递到更改流将尝试在指定操作之后开始恢复通知。

Important

  • 如果时间戳记是过去的,则 oplog 必须具有足够的历史记录来定位与令牌或时间戳记关联的操作。

  • invalidate event(例如,收藏夹放置或重命名)关闭流后,您将无法恢复更改流。

Python
Java (Sync)
Node.js
PHP
Motor
C
C#
Ruby

In the example below, resume_token contains the change stream notification id. The resume_after modifier takes a parameter that must resolve to a resume token. Passing the resume_token to the resume_after modifier directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

resume_token = document.get("_id")
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

In the example below, the resumeToken contains the change stream notification id. The resumeAfter() method takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

In the example below, resumeToken contains the change stream notification id. The resumeAfter takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter modifier directs the change stream to attempt to resume notifications starting after the operation specified.

const collection = db.collection('inventory');
const changeStream = collection.watch();

let resumeToken, newChangeStream;
changeStream.on('change', next => {
  resumeToken = next._id;
  changeStream.close();

  newChangeStream = collection.watch({ resumeAfter });
  newChangeStream.on('change', next => {
    // process next document
  });
});

In the example below, $resumeToken contains the change stream notification id. The resumeAfter option takes a value that must resolve to a resume token. Passing the $resumeToken to the resumeAfter option directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

$resumeToken = ($lastChange !== null) ? $lastChange->_id : null;

if ($resumeToken === null) {
    throw new \Exception('resumeToken was not found');
}

$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();

$nextChange = $changeStream->current();

In the example below, resume_token contains the change stream notification id. The resume_after modifier takes a parameter that must resolve to a resume token. Passing the resume_token to the resume_after modifier directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting after the operation specified.

stream = mongoc_collection_watch (collection, &pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
   bson_iter_init_find (&iter, change, "_id");
   BSON_APPEND_VALUE (&opts, "resumeAfter", bson_iter_value (&iter));

   mongoc_change_stream_destroy (stream);
   stream = mongoc_collection_watch (collection, &pipeline, &opts);
   mongoc_change_stream_next (stream, &change);
   mongoc_change_stream_destroy (stream);
} else {
   if (mongoc_change_stream_error_document (stream, &error, NULL)) {
      MONGOC_ERROR ("%s\n", error.message);
   }

   mongoc_change_stream_destroy (stream);
}

In the example below, the resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. Passing the resumeToken to the Watch() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

var resumeToken = lastChangeStreamDocument.ResumeToken;
  var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
  var enumerator = inventory.Watch(options).ToEnumerable().GetEnumerator();
  enumerator.MoveNext();
  var next = enumerator.Current;
  enumerator.Dispose();

In the example below, resume_token contains the change stream notification id. The resume_after modifier takes a parameter that must resolve to a resume token. Passing the resume_token to the resume_after modifier directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

resume_token = next_change['_id']
cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = cursor.next

Use Cases

变更流可以使具有相关业务系统的体系结构受益,一旦数据变更能够持久,就可以通知下游系统。例如,更改流可以在实现提取,转换和加载(ETL)服务,跨平台同步,协作功能和通知服务时为开发人员节省时间。

Access Control

对于执行authenticationauthorization的部署,请对要为其打开更改流的集合使用changeStreamfind特权操作对用户进行身份验证。

read built-in role包含必需的特权,以支持针对集合打开更改流。任何内置角色或继承read角色的user-defined role也可以支持针对集合打开更改流。

或者,使用db.createRole创建一个用户定义的角色,该角色将授予target collectionchangeStreamfind特权操作。有关更完整的文档,请参见User-Defined Roles

要将内置角色或用户定义的角色与现有用户相关联,请使用db.grantRolesToUser()db.updateUser()方法。您还可以在使用db.createUser()创建新用户时指定角色。

Event Notification

更改流仅通知已保留到副本集中大多数含数据成员的数据更改。这样可以确保仅由多数情况下提交的更改触发通知,这些更改在故障情况下是持久的。

例如,考虑一个 3 成员replica set,其中针对primary打开了一个变更流游标。如果 Client 端发出插入操作,则该更改流仅在该插入一直存在于大多数数据承载成员之后才通知应用程序数据更改。