On this page
执行两阶段提交
在本页面
Synopsis
本文档提供了一种使用两阶段提交方法将数据写入多个文档的多文档更新或“多文档事务”的模式。此外,您可以扩展此过程以提供rollback-like功能。
Background
MongoDB 数据库对单个document的操作始终是原子的;但是,涉及多个文档的操作(通常称为“多文档 Transaction”)不是原子操作。由于文档可能非常复杂,并且包含多个“嵌套”文档,因此单文档原子性为许多实际用例提供了必要的支持。
尽管单文档原子操作功能强大,但在某些情况下仍需要多文档事务。当执行由 Sequences 操作组成的事务时,会出现某些问题,例如:
原子性:如果一个操作失败,则事务中的上一个操作必须“回滚”到前一个状态(即“什么都没有,为“全有或全无”)。
一致性:如果重大故障(例如网络,硬件)中断了 Transaction,则数据库必须能够恢复一致状态。
对于需要多文档事务的情况,您可以在应用程序中实现两阶段提交,以为此类多文档更新提供支持。使用两阶段提交可确保数据一致,并且在发生错误的情况下,事务之前的状态为recoverable。但是,在此过程中,文档可以表示未决数据和状态。
Note
因为 MongoDB 只有单文档操作是原子的,所以两阶段提交只能提供类似于事务的语义。在两阶段提交或回滚期间,应用程序有可能在中间点返回中间数据。
Pattern
Overview
考虑一种情况,您要将资金从帐户A
转移到帐户B
。在关系数据库系统中,您可以在单个多语句事务中从A
中减去资金,并将资金添加到B
中。在 MongoDB 中,您可以模拟两阶段提交以达到可比的结果。
本教程中的示例使用以下两个集合:
名为
accounts
的集合,用于存储帐户信息。名为
transactions
的集合,用于存储有关资金转移 Transaction 的信息。
初始化源帐户和目标帐户
将帐户A
的文档和帐户B
的文档插入accounts
集合。
db.accounts.insert(
[
{ _id: "A", balance: 1000, pendingTransactions: [] },
{ _id: "B", balance: 1000, pendingTransactions: [] }
]
)
该操作返回一个具有操作状态的BulkWriteResult()对象。成功插入后,BulkWriteResult()将nInserted设置为2
。
初始化转移记录
对于每次执行的资金转帐,将包含转帐信息的文档插入transactions
集合。该文档包含以下字段:
source
和destination
字段,它们引用accounts
集合中的_id
字段,value
字段,用于指定影响source
和destination
帐户的balance
的转帐金额,state
字段,该字段反映了传输的当前状态。state
字段的值可以为initial
,pending
,applied
,done
,canceling
和canceled
。lastModified
字段,该字段反映上次修改日期。
要初始化从帐户A
到帐户B
的100
转移,请在transactions
集合中插入一个文档,其中包含转移信息,state
的 Transactionstate
和设置为当前日期的lastModified
字段:
db.transactions.insert(
{ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
)
该操作返回一个具有操作状态的WriteResult()对象。成功插入后,WriteResult()对象的nInserted设置为1
。
使用两阶段提交在帐户之间转移资金
检索要开始的事务。
在transactions
集合中,找到处于initial
状态的 Transaction。当前transactions
集合只有一个文档,即在初始化转移记录步骤中添加的文档。如果集合中包含其他文档,则查询将返回状态为initial
的任何事务,除非您指定其他查询条件。
var t = db.transactions.findOne( { state: "initial" } )
在mongoShell 程序中键入变量t
以打印变量的内容。该操作应打印类似于以下内容的文档,但lastModified
字段应反映插入操作的日期:
{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2014-07-11T20:39:26.345Z") }
将 Transaction 状态更新为待处理。
将事务state
从initial
设置为pending
,并使用$currentDate运算符将lastModified
字段设置为当前日期。
db.transactions.update(
{ _id: t._id, state: "initial" },
{
$set: { state: "pending" },
$currentDate: { lastModified: true }
}
)
该操作返回一个具有操作状态的WriteResult()对象。成功更新后,nMatched和nModified显示1
。
在 update 语句中,state: "initial"
条件确保没有其他进程已更新该记录。如果nMatched和nModified为0
,请返回第一步以获取其他事务,然后重新开始该过程。
将 Transaction 应用于两个帐户。
如果没有将 Transactiont
应用到两个帐户,请使用update()方法将 Transactiont
应用到两个帐户。在更新条件中,包括条件pendingTransactions: { $ne: t._id }
,以避免在步骤多次运行的情况下重新应用事务。
要将 Transaction 应用到该帐户,请同时更新balance
字段和pendingTransactions
字段。
更新源帐户,从其balance
减去 Transactionvalue
并将 Transaction_id
添加到其pendingTransactions
数组。
db.accounts.update(
{ _id: t.source, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
更新目标帐户,将事务value
添加到其balance
中,并将事务_id
添加到pendingTransactions
数组中。
db.accounts.update(
{ _id: t.destination, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
将 Transaction 状态更新为已应用。
使用以下update()操作将事务的state
设置为applied
并更新lastModified
字段:
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "applied" },
$currentDate: { lastModified: true }
}
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
更新两个帐户的未决 Transaction 列表。
从两个帐户的pendingTransactions
数组中删除已应用的 Transaction_id
。
更新源帐户。
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
更新目标帐户。
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
将 Transaction 状态更新为已完成。
通过将事务的state
设置为done
并更新lastModified
字段来完成事务:
db.transactions.update(
{ _id: t._id, state: "applied" },
{
$set: { state: "done" },
$currentDate: { lastModified: true }
}
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
从失败方案中恢复
事务处理过程中最重要的部分不是上面的原型示例,而是当事务未成功完成时,可以从各种故障场景中恢复的可能性。本节概述了可能的故障,并提供了从此类事件中恢复的步骤。
Recovery Operations
两阶段提交模式允许运行序列的应用程序恢复事务并达到一致状态。在应用程序启动时(可能有规律的时间间隔)运行恢复操作,以捕获所有未完成的事务。
达到一致状态所需的时间取决于应用程序恢复每个事务需要多长时间。
以下恢复过程使用lastModified
日期作为未决事务是否需要恢复的指示;具体来说,如果挂起或已应用的事务在最近 30 分钟内未更新,则过程将确定这些事务需要恢复。您可以使用不同的条件进行此确定。
待处理状态中的 Transaction
要从步骤“ 将事务状态更新为未决。”之后但在“ 将事务状态更新为已应用。”步骤之前发生的故障中恢复,请从transactions
集合中检索待恢复的事务以进行恢复:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );
然后从步骤“ 将 Transaction 应用到两个帐户。”continue
应用状态下的 Transaction
要从步骤“ 将事务状态更新为已应用。”之后但在“ 将事务状态更新为完成。”步骤之前发生的故障中恢复,请从transactions
集合中检索已应用的事务以进行恢复:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );
然后从“ 更新两个帐户的未决 Transaction 清单。”continue
Rollback Operations
在某些情况下,您可能需要“回滚”或撤消事务。例如,如果应用程序需要“取消”Transaction,或者在 Transaction 过程中某个帐户不存在或停止存在。
应用状态下的 Transaction
在“ 将事务状态更新为已应用。”步骤之后,您不应该回滚事务。而是通过切换源字段和目标字段中的值来完成该事务,并创建新 Transaction以撤消该事务。
待处理状态中的 Transaction
在“ 将事务状态更新为未决。”步骤之后但在“ 将事务状态更新为已应用。”步骤之前,您可以使用以下过程回滚事务:
将 Transaction 状态更新为取消。
将事务state
从pending
更新到canceling
。
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "canceling" },
$currentDate: { lastModified: true }
}
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
撤销两个帐户上的 Transaction。
要撤消两个帐户上的 Transaction,请先撤消 Transactiont
(如果已应用 Transaction)。在更新条件中,包括条件pendingTransactions: t._id
以便仅在已应用挂起的 Transaction 时更新帐户。
更新目标帐户,从其balance
中减去 Transactionvalue
,然后从pendingTransactions
数组中删除 Transaction_id
。
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{
$inc: { balance: -t.value },
$pull: { pendingTransactions: t._id }
}
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。如果待处理的 Transaction 之前尚未应用到该帐户,则没有文档将匹配更新条件,并且nMatched和nModified将是0
。
更新源帐户,将事务value
添加到其balance
中,并将事务_id
从pendingTransactions
数组中删除。
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{
$inc: { balance: t.value},
$pull: { pendingTransactions: t._id }
}
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。如果待处理的 Transaction 之前尚未应用到该帐户,则没有文档将匹配更新条件,并且nMatched和nModified将是0
。
将 Transaction 状态更新为已取消。
要完成回滚,请将事务state
从canceling
更新为cancelled
。
db.transactions.update(
{ _id: t._id, state: "canceling" },
{
$set: { state: "cancelled" },
$currentDate: { lastModified: true }
}
)
成功更新后,该方法返回WriteResult()对象,其中nMatched和nModified设置为1
。
Multiple Applications
事务部分存在,因此多个应用程序可以同时创建和运行操作,而不会引起数据不一致或冲突。在我们的过程中,要更新或检索 Transaction 文档,更新条件包括state
字段上的条件,以防止多个应用程序重新应用 Transaction。
例如,应用程序App1
和App2
都捕获处于initial
状态的同一事务。 App1
在App2
开始之前应用整个事务。 App2
尝试执行“ 将事务状态更新为未决。”步骤时,包含state: "initial"
准则的更新条件将不匹配任何文档,并且nMatched和nModified将为0
。这应该向App2
发出 signal 以返回到第一步,以另一个事务重新启动该过程。
当运行多个应用程序时,至关重要的是在任何时间点只有一个应用程序可以处理给定的事务。这样,除了在更新条件中包括事务的预期状态之外,您还可以在事务文档本身中创建标记,以标识正在处理事务的应用程序。使用findAndModify()方法修改 Transaction 并通过一步将其取回:
t = db.transactions.findAndModify(
{
query: { state: "initial", application: { $exists: false } },
update:
{
$set: { state: "pending", application: "App1" },
$currentDate: { lastModified: true }
},
new: true
}
)
修改事务操作,以确保只有与application
字段中的标识符匹配的应用程序才能应用事务。
如果应用程序App1
在事务执行期间失败,则可以使用recovery procedures,但是应用程序应确保在应用事务之前它们“拥有”该事务。例如,要查找并恢复挂起的作业,请使用类似于以下内容的查询:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
db.transactions.find(
{
application: "App1",
state: "pending",
lastModified: { $lt: dateThreshold }
}
)
在生产应用程序中使用两阶段提交
上面的示例 Transaction 特意简单。例如,假设始终可以将操作回滚到一个帐户,并且帐户余额可以包含负值。
生产实施可能会更复杂。通常,帐户需要有关当前余额,待处理贷方和待处理借方的信息。
对于所有事务,请确保在部署中使用适当的write concern级别。