49.6. 逻辑解码输出插件

可以在 PostgreSQL 源代码树的contrib/test_decoding子目录中找到示例输出插件。

49 .6.1. 初始化功能

通过动态加载以输出插件的名称作为库基本名称的共享库来加载输出插件。普通库搜索路径用于查找库。为了提供所需的输出插件回调,并指出该库实际上是一个输出插件,它需要提供一个名为_PG_output_plugin_init的函数。该函数传递了一个结构,该结构需要用单个操作的回调函数指针填充。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

begin_cbchange_cbcommit_cb回调是必需的,而startup_cbfilter_by_origin_cbtruncate_cbshutdown_cb是可选的。如果未设置truncate_cb,但要解码TRUNCATE,则该动作将被忽略。

49.6.2. Capabilities

要解码,格式化和输出更改,输出插件可以使用后端的大多数常规基础结构,包括调用输出函数。允许对关系进行只读访问,只要访问的关系要么是由initdbpg_catalog模式中创建的,要么已使用标记为用户提供的目录表

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

禁止执行任何导致分配 TransactionID 的操作。其中包括写入表,执行 DDL 更改和调用txid_current()

49 .6.3. 输出方式

输出插件回调可以以几乎任意格式将数据传递给使用者。对于某些用例,例如通过 SQL 查看更改,以可能包含任意数据(例如bytea)的数据类型返回数据非常麻烦。如果输出插件仅以服务器的编码输出文本数据,则可以通过将OutputPluginOptions.output_type设置为OUTPUT_PLUGIN_TEXTUAL_OUTPUT而不是startup callback中的OUTPUT_PLUGIN_BINARY_OUTPUT来声明。在这种情况下,所有数据都必须使用服务器的编码,以便text数据可以包含该数据。在启用 assert 的版本中对此进行检查。

49 .6.4. 输出插件回调

输出插件会通过其需要提供的各种回调来通知发生的更改。

并发事务按提交 Sequences 进行解码,并且仅属于特定事务的更改在begincommit回调之间进行解码。显式或隐式回滚的事务永远不会解码。成功保存点按照在事务中执行的 Sequences 折叠到包含它们的事务中。

Note

只有已经安全地刷新到磁盘的事务才会被解码。当synchronous_commit设置为off时,这可能导致COMMIT没有立即在紧随其后的pg_logical_slot_get_changes()中解码。

49 .6.4.1. 启动回调

每当创建复制插槽或要求复制更改流式传输时,都会调用可选的startup_cb回调,而与准备好发布更改的数量无关。

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

创建复制插槽时,is_init参数将为 true,否则为 false。 * options *指向输出插件可以设置的选项结构:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type必须设置为OUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUT。另请参见Section 49.6.3。如果receive_rewrites为 true,则将在某些 DDL 操作期间通过堆重写进行更改时调用输出插件。这些是处理 DDL 复制的插件所感兴趣的,但是它们需要特殊的处理。

启动回调应验证ctx->output_plugin_options中的选项。如果输出插件需要具有状态,则可以使用ctx->output_plugin_private进行存储。

49 .6.4.2. 关机回调

每当不再使用以前处于活动状态的复制插槽时,就会调用可选的shutdown_cb回调,该回调可用于将私有资源分配给输出插件。插槽不一定被丢弃,流只是被停止。

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

49 .6.4.3. Transaction 开始回调

只要已解码已提交事务的开始,就会调用必需的begin_cb回调。中止的事务及其内容永远不会被解码。

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);
  • txn *参数包含有关事务的元信息,例如已提交事务的时间戳及其 XID。

49 .6.4.4. Transaction 结束回叫

每当对事务提交进行解码时,都会调用所需的commit_cb回调。如果存在任何已修改的行,则将在此之前调用所有已修改的行的change_cb回调。

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

49 .6.4.5. 更改回叫

事务中每个单独的行修改都会调用所需的change_cb回调,它可以是INSERTUPDATEDELETE。即使原始命令一次修改了几行,也会为每一行单独调用回调。

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);
  • ctx txn 参数的内容与begin_cbcommit_cb回调的内容相同,但是关系 Descriptors relation *指向行所属的关系,并传递了描述行修改的 struct * change * 。

Note

只能使用逻辑解码来提取用户定义表中未注销(请参见UNLOGGED)和非临时性(请参见临时或临时)的更改。

49 .6.4.6. 截断回调

truncate_cb回调被TRUNCATE命令调用。

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

这些参数类似于change_cb回调。但是,由于需要一起执行通过外键连接的表上的TRUNCATE操作,因此此回调将接收到一系列关系,而不仅仅是一个关系。有关详细信息,请参见TRUNCATE语句的描述。

49 .6.4.7. 来源过滤器回调

调用可选的filter_by_origin_cb回调以确定输出插件是否感兴趣从* origin_id *重放的数据。

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);
  • ctx *参数具有与其他回调相同的内容。没有信息,但来源可用。为了表明与传入节点无关的更改不相关,请返回 true,从而将其过滤掉;否则为假。对于已过滤掉的事务和更改,不会调用其他回调。

在实现级联或多向复制解决方案时,这很有用。按原点进行过滤可以防止在此类设置中来回复制相同的更改。尽管 Transaction 和变更也携带有关原产地的信息,但通过此回调进行过滤的效率明显更高。

49 .6.4.8. 通用消息回调

每当对逻辑解码消息进行解码时,就会调用可选的message_cb回调。

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);
  • txn *参数包含有关事务的元信息,例如已提交事务的时间戳及其 XID。但是请注意,当消息是非事务性的并且在记录消息的事务中尚未分配 XID 时,它可以为 NULL。 * lsn *具有消息的 WAL 位置。 * transactional *表示是否以事务方式发送消息。 * prefix 是任意以 null 终止的前缀,可用于标识当前插件的有趣消息。最后, message 参数保存 message_size *大小的实际消息。

应该格外小心,以确保输出插件认为有趣的前缀是唯一的。使用 extensions 或输出插件本身通常是一个不错的选择。

49 .6.5. 产生输出的功能

要实际产生输出,输出插件可以在begin_cbcommit_cbchange_cb回调中将数据写入ctx->out中的StringInfo输出缓冲区。在写入输出缓冲区之前,必须调用OutputPluginPrepareWrite(ctx, last_write),在完成写入缓冲区之后,必须调用OutputPluginWrite(ctx, last_write)以执行写入。 * last_write *指示特定写入是否是回调的最后写入。

以下示例显示了如何将数据输出到输出插件的使用者:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);