了解 Azure Cosmos DB 中的更改源

已完成

Azure Cosmos DB 中的更改源是对容器更改的持久记录,按更改发生的顺序保存。 Azure Cosmos DB 中更改源支持的工作原理是侦听 Azure Cosmos DB 容器中发生的任何更改。 然后,它会按照所更改文档的修改顺序输出这些文档的排序列表。 可以异步和增量方式处理持久更改,并将输出分配给一个或多个使用者进行并行处理。

更改源和不同操作

现在,可以在更改源中看到所有插入和更新。 无法针对特定的操作类型来筛选更改源。 目前更改源不会记录删除操作。 作为一种替代方法,你可以对要删除的项添加软标记。 例如,可以在项中添加一个名为“deleted”的属性,将其值设置为“true”,然后为该项设置一个生存时间 (TTL) 值。 设置 TTL 可确保会自动删除该项。

读取 Azure Cosmos DB 更改源

可以使用推送模型或拉取模型来处理 Azure Cosmos DB 更改源。 使用推送模型时,更改源处理器会将工作推送到具有用于处理此工作的业务逻辑的客户端。 但是,检查工作和存储上次处理工作状态的复杂工作是由更改源处理器来处理的。

使用拉取模型时,客户端必须从服务器拉取工作。 在这种情况下,客户端具有处理工作的业务逻辑,并存储上次处理工作的状态。 客户端在多个客户端之间并行处理负载均衡,并处理错误。

注意

建议使用推送模型,因为可无需担心轮询更改源以获取后续更改、存储上次处理的更改状态等问题,通话时还有一些其他好处。

使用 Azure Cosmos DB 更改源的大多数方案都会使用推送模型选项之一。 但在某些情况下,你可能想要对拉取模型进行更低级别的控制。 额外的低级别控制包括:

  • 从特定的分区键读取更改
  • 控制客户端接收要处理的更改的速度
  • 对更改源中的现有数据执行一次性读取(例如,执行数据迁移)

使用推送模型读取更改源

可以通过两种方式从推送模型读取更改源:Azure Functions Azure Cosmos DB 触发器和更改源处理器库。 Azure Functions 在后台使用更改源处理器,因此这两种读取更改源的方式类似。 可将 Azure Functions 简单地视为更改源处理器的托管平台,两者并非完全不同的更改源读取方式。 Azure Functions 在后台使用更改源处理器。 它会自动将跨容器的分区的更改处理并行化。

Azure Functions

可以创建在 Azure Cosmos DB 容器更改源中的每个新事件上自动触发的小型反应式 Azure Functions。 有了适用于 Azure Cosmos DB 的 Azure Functions 触发器,你就可以利用更改源处理器的缩放与可靠事件检测功能,无需维护任何辅助角色基础结构。

显示更改源触发 Azure Functions 进行处理的图示。

更改源处理器

更改源处理器是 Azure Cosmos DB .NET V3Java V4 SDK 的一部分。 它简化了读取更改源的过程,可有效地在多个使用者之间分布事件处理。

实现更改源处理器需要四个主要组件:

  1. 监视的容器:监视的容器具有用于生成更改源的数据。 对受监视的容器的任何插入和更新都会反映在容器的更改源中。

  2. 租用容器:租用容器充当状态存储并协调处理跨多个辅助角色的更改源。 租用容器可以与受监视的容器存储在同一帐户中,也可以存储在单独的帐户中。

  3. 计算实例:计算实例托管更改源处理器,用于侦听更改。 根据平台,它可能由 VM、kubernetes Pod、Azure 应用服务实例、实际物理计算机表示。 它有一个唯一标识符,在本文中被称为实例名称。

  4. 委托:委托是用于定义开发人员要对更改源处理器读取的每一批更改执行何种操作的代码。

实现更改源处理器时,入口点始终是调用 ContainerGetChangeFeedProcessorBuilder 实例中的监视的容器:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

其中,第一个参数是描述此处理器的目标的唯一名称,第二个参数是要处理更改的委托实现。 下面是一个委托示例:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

此后,你使用 WithInstanceName 来定义计算实例名称或唯一标识符,这在各个你要部署的计算实例中应该是独一无二并且各不相同的,最后它通过 WithLeaseContainer 成为用于维护租用状态的容器。

调用 Build 可让你获得可通过调用 StartAsync 启动的处理器实例。

主机实例的正常生命周期为:

  1. 读取更改源。
  2. 如果没有发生更改,请在一段预定义的时间内保持睡眠状态(可在 WithPollInterval 中使用 Builder 进行自定义),然后转到 #1。
  3. 如果发生了更改,请将其发送给委托。
  4. 委托成功地处理更改后,以最新的处理时间点更新租用存储,然后转到 #1。