使用 Azure Cosmos DB 的事务发件箱模式
在分布式系统中实现可靠的消息传送可能很有挑战性。 本文介绍如何使用事务发件箱模式来可靠消息传递和保证事件传递,这是支持 幂等消息处理的重要组成部分。 为此,你将使用 Azure Cosmos DB 事务批处理和更改源与 Azure 服务总线结合使用。
概述
微服务体系结构越来越受欢迎,并承诺解决可伸缩性、可维护性和敏捷性等问题,尤其是在大型应用程序中。 但是,在数据处理方面,这种体系结构模式也带来了挑战。 在分布式应用程序中,每个服务独立维护在专用服务拥有的数据存储中作所需的数据。 为了支持此类方案,通常使用 RabbitMQ、Kafka 或 Azure 服务总线等消息传送解决方案,该解决方案通过消息传递总线将数据(事件)从一个服务分发到应用程序的其他服务。 然后,内部或外部使用者可以订阅这些消息,并在作数据后立即收到更改通知。
该区域中的一个已知示例是排序系统:当用户想要创建订单时, Ordering 服务将通过 REST 终结点从客户端应用程序接收数据。 它将有效负载映射到对象的内部表示形式 Order ,以验证数据。 成功提交数据库后,它会将 OrderCreated 事件发布到消息总线。 对新订单(例如InventoryInvoicing或服务)感兴趣的任何其他服务都将订阅OrderCreated消息、处理消息并将其存储在自己的数据库中。
以下伪代码显示此过程通常从 Ordering 服务的角度来看如何显示:
CreateNewOrder(CreateOrderDto order){
// Validate the incoming data.
...
// Apply business logic.
...
// Save the object to the database.
var result = _orderRespository.Create(order);
// Publish the respective event.
_messagingService.Publish(new OrderCreatedEvent(result));
return Ok();
}
此方法在保存订单对象和发布相应事件之间发生错误之前效果良好。 由于许多原因,此时发送事件可能会失败:
- 网络错误
- 消息服务中断
- 主机故障
无论错误是什么,结果是事件 OrderCreated 无法发布到消息总线。 不会通知其他服务已创建订单。 该服务 Ordering 现在必须处理与实际业务流程无关的各种事项。 它需要跟踪在消息总线重新联机后仍需要放在消息总线上的事件。 即使是最坏的情况也可能发生:由于事件丢失,应用程序中的数据不一致。
解决方案
有一种称为 事务发件箱 的已知模式,可以帮助你避免这些情况。 在事件最终推送到消息中转站之前,它可确保事件保存在数据存储中(通常保存在数据库中的发件箱表中)。 如果业务对象和相应的事件保存在同一数据库事务中,则保证不会丢失任何数据。 将提交所有内容,或者如果出现错误,所有内容都将回滚。 为了最终发布事件,不同的服务或工作进程会查询发件箱表以获取未经处理的条目,发布事件,并将其标记为已处理。 此模式可确保在创建或修改业务对象后不会丢失事件。
下载此体系结构的 Visio 文件。
在关系数据库中,模式的实现非常简单。 例如,如果服务使用 Entity Framework Core,它将使用 Entity Framework 上下文创建数据库事务、保存业务对象和事件以及提交事务或执行回滚。 此外,处理事件的辅助角色服务很容易实现:它定期查询发件箱表以获取新条目,将新插入的事件发布到消息总线,最后将这些条目标记为已处理。
在实践中,事情并不像他们一样容易看。 最重要的是,需要确保保留事件的顺序,以便 OrderUpdated 事件在事件之前 OrderCreated 不会发布。
Azure Cosmos DB 中的实现
本部分介绍如何在 Azure Cosmos DB 中实现事务发箱模式,以便在 Azure Cosmos DB 更改源和服务总线的帮助下实现不同服务之间的可靠有序消息传送。 它演示了一个管理Contact对象(FirstName、、LastNameEmail、Company信息等)的示例服务。 它使用命令和查询责任分离(CQRS)模式,并遵循基本的域驱动设计(DDD)概念。 可以在 GitHub 上找到实现的示例代码。
Contact示例服务中的对象具有以下结构:
{
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false
}
创建或更新后 Contact ,它会发出包含有关当前更改的信息的事件。 等等,域事件可以是:
-
ContactCreated。 添加联系人时引发。 -
ContactNameUpdated。 在更改FirstName或更改时LastName引发。 -
ContactEmailUpdated。 更新电子邮件地址时引发。 -
ContactCompanyUpdated。 更改任何公司属性时引发。
事务批处理
若要实现此模式,需要确保 Contact 业务对象和相应的事件将保存在同一数据库事务中。 在 Azure Cosmos DB 中,事务的工作方式不同于关系数据库系统中的事务。 Azure Cosmos DB 事务(称为 事务批处理)在单个 逻辑分区上运行,因此它们保证原子性、一致性、隔离和持续性(ACID)属性。 不能在不同的容器或逻辑分区的事务批处理作中保存两个文档。 对于示例服务,这意味着业务对象和事件或事件都将放在同一容器和逻辑分区中。
上下文、存储库和 UnitOfWork
示例实现的核心是一个 容器上下文 ,用于跟踪保存在同一事务批处理中的对象。 它维护已创建和修改的对象列表,并在单个 Azure Cosmos DB 容器上运行。 它的接口如下所示:
public interface IContainerContext
{
public Container Container { get; }
public List<IDataObject<Entity>> DataObjects { get; }
public void Add(IDataObject<Entity> entity);
public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
public void Reset();
}
容器上下文组件中的列表跟踪 Contact 和 DomainEvent 对象。 两者都将放在同一容器中。 这意味着,多种类型的对象存储在同一 Type Azure Cosmos DB 容器中,并使用属性来区分业务对象和事件。
对于每种类型,都有一个专用存储库来定义和实现数据访问。 存储库 Contact 接口提供以下方法:
public interface IContactsRepository
{
public void Create(Contact contact);
public Task<(Contact, string)> ReadAsync(Guid id, string etag);
public Task DeleteAsync(Guid id, string etag);
public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
public void Update(Contact contact, string etag);
}
存储库 Event 看起来类似,但只有一种方法,该方法在存储中创建新事件:
public interface IEventRepository
{
public void Create(ContactDomainEvent e);
}
这两个存储库接口的实现通过依赖项注入获取对单个 IContainerContext 实例的引用,以确保两者在同一 Azure Cosmos DB 上下文上运行。
最后一个组件是 UnitOfWork将实例中保留的 IContainerContext 更改提交到 Azure Cosmos DB:
public class UnitOfWork : IUnitOfWork
{
private readonly IContainerContext _context;
public IContactRepository ContactsRepo { get; }
public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
{
_context = ctx;
ContactsRepo = cRepo;
}
public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
{
return _context.SaveChangesAsync(cancellationToken);
}
}
事件处理:创建和发布
每次创建、修改或(软)删除对象时 Contact ,服务都会引发相应的事件。 提供的解决方案的核心是域驱动设计(DDD)和 吉米·博加德提出的调解人模式的组合。 他建议在将实际对象保存到数据库之前,维护由于域对象的修改而发生的事件列表,并发布这些事件。
更改列表保留在域对象本身中,以便其他组件无法修改事件链。 在域对象中维护事件(IEvent 实例)的行为通过接口 IEventEmitter<IEvent> 定义,并在抽象 DomainEntity 类中实现:
public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
private readonly List<IEvent> _events = new();
[JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();
public virtual void AddEvent(IEvent domainEvent)
{
var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
if (i < 0)
{
_events.Add(domainEvent);
}
else
{
_events.RemoveAt(i);
_events.Insert(i, domainEvent);
}
}
[...]
[...]
}
该 Contact 对象引发域事件。 实体 Contact 遵循基本的 DDD 概念,将域属性的 setter 配置为私有。 类中不存在公共资源库。 而是提供作内部状态的方法。 在这些方法中,可以引发特定修改的适当事件(例如 ContactNameUpdated 或 ContactEmailUpdated)。
下面是联系人姓名更新的示例。 (在方法末尾引发该事件。
public void SetName(string firstName, string lastName)
{
if (string.IsNullOrWhiteSpace(firstName) ||
string.IsNullOrWhiteSpace(lastName))
{
throw new ArgumentException("FirstName or LastName cannot be empty");
}
Name = new Name(firstName, lastName);
if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent
AddEvent(new ContactNameUpdatedEvent(Id, Name));
ModifiedAt = DateTimeOffset.UtcNow;
}
跟踪更改的对应 ContactNameUpdatedEvent项如下所示:
public class ContactNameUpdatedEvent : ContactDomainEvent
{
public Name Name { get; }
public ContactNameUpdatedEvent(Guid contactId, Name contactName) :
base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
{
Name = contactName;
}
}
到目前为止,事件刚刚记录在域对象中,任何事件都不会保存到数据库,甚至发布到消息中转站。 按照建议,在将业务对象保存到数据存储之前,将立即处理事件列表。 在这种情况下,它发生在SaveChangesAsync在专用IContainerContext方法中实现的RaiseDomainEvents实例方法中。 (dObjs 是容器上下文的跟踪实体的列表。
private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
var eventEmitters = new List<IEventEmitter<IEvent>>();
// Get all EventEmitters.
foreach (var o in dObjs)
if (o.Data is IEventEmitter<IEvent> ee)
eventEmitters.Add(ee);
// Raise events.
if (eventEmitters.Count <= 0) return;
foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
_mediator.Publish(evt);
}
在最后一行中, MediatR 包(C# 中中介模式的实现)用于在应用程序中发布事件。 这样做是可能的,因为所有事件(如 ContactNameUpdatedEvent 实现 INotification MediatR 包的接口)。
这些事件需要由相应的处理程序处理。 在这里, IEventsRepository 实现将发挥作用。 下面是事件处理程序的示例 NameUpdated :
public class ContactNameUpdatedHandler :
INotificationHandler<ContactNameUpdatedEvent>
{
private IEventRepository EventRepository { get; }
public ContactNameUpdatedHandler(IEventRepository eventRepo)
{
EventRepository = eventRepo;
}
public Task Handle(ContactNameUpdatedEvent notification,
CancellationToken cancellationToken)
{
EventRepository.Create(notification);
return Task.CompletedTask;
}
}
实例 IEventRepository 通过构造函数注入处理程序类。 在服务中发布方法 ContactNameUpdatedEvent 后, Handle 将调用该方法,并使用事件存储库实例创建通知对象。 该通知对象又插入到对象中的 IContainerContext 跟踪对象列表中,并将保存在同一事务批处理中的对象联接到 Azure Cosmos DB。
到目前为止,容器上下文知道要处理的对象。 为了最终将跟踪的对象保存到 Azure Cosmos DB, IContainerContext 实现将创建事务批处理,添加所有相关对象,并针对数据库运行作。 所描述的过程在方法中 SaveInTransactionalBatchAsync 进行处理,该方法调用 SaveChangesAsync 该方法。
下面是需要创建和运行事务批处理的实现的重要部分:
private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
CancellationToken cancellationToken)
{
if (DataObjects.Count > 0)
{
var pk = new PartitionKey(DataObjects[0].PartitionKey);
var tb = Container.CreateTransactionalBatch(pk);
DataObjects.ForEach(o =>
{
TransactionalBatchItemRequestOptions tro = null;
if (!string.IsNullOrWhiteSpace(o.Etag))
tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };
switch (o.State)
{
case EntityState.Created:
tb.CreateItem(o);
break;
case EntityState.Updated or EntityState.Deleted:
tb.ReplaceItem(o.Id, o, tro);
break;
}
});
var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
}
// Return copy of current list as result.
var result = new List<IDataObject<Entity>>(DataObjects);
// Work has been successfully done. Reset DataObjects list.
DataObjects.Clear();
return result;
}
下面概述了该过程到目前为止的工作原理(用于更新联系人对象的名称):
- 客户端想要更新联系人的名称。 该方法
SetName在联系人对象上调用,并更新属性。 - 该
ContactNameUpdated事件将添加到域对象中的事件列表中。 - 调用联系人存储库
Update的方法,该方法将域对象添加到容器上下文中。 对象现在被跟踪。 -
CommitAsync在实例上UnitOfWork调用,而实例又调用SaveChangesAsync容器上下文。 - 在其中
SaveChangesAsync,域对象列表中的所有事件都由实例发布,并通过事件存储库添加到MediatR容器上下文。 - 在其中
SaveChangesAsync创建了 aTransactionalBatch。 它将同时保存联系人对象和事件。 - 运行
TransactionalBatch和数据已提交到 Azure Cosmos DB。 -
SaveChangesAsync并CommitAsync成功返回。
持久性
如前面的代码片段所示,保存到 Azure Cosmos DB 的所有对象都包装在实例 DataObject 中。 此对象提供常见属性:
-
ID。 -
PartitionKey。 -
Type。 -
State。 同样Created,Updated不会保留在 Azure Cosmos DB 中。 -
Etag。 对于 乐观锁定。 -
TTL。 用于自动清理旧文档的生存时间属性。 -
Data。 泛型数据对象。
这些属性在一 IDataObject 个泛型接口中定义,该接口由存储库和容器上下文使用:
public interface IDataObject<out T> where T : Entity
{
string Id { get; }
string PartitionKey { get; }
string Type { get; }
T Data { get; }
string Etag { get; set; }
int Ttl { get; }
EntityState State { get; set; }
}
然后,包装在实例中 DataObject 并保存到数据库的对象将如下所示(Contact 和 ContactNameUpdatedEvent):
// Contact document/object. After creation.
{
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "contact",
"data": {
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false,
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
},
"ttl": -1,
"_etag": "\"180014cc-0000-1500-0000-614455330000\"",
"_ts": 1632301657
}
// After setting a new name, this is how an event document looks.
{
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "domainEvent",
"data": {
"name": {
"firstName": "Jane",
"lastName": "Doe"
},
"contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"action": "ContactNameUpdatedEvent",
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"createdAt": "2021-09-22T11:37:37.3022907+02:00"
},
"ttl": 120,
"_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
"_ts": 1632303457
}
可以看到 Contact ( ContactNameUpdatedEvent 类型 domainEvent)文档具有相同的分区键,并且这两个文档将保存在同一逻辑分区中。
更改源处理
若要读取事件流并将其发送到消息代理,该服务将使用 Azure Cosmos DB 更改源。
更改源是容器中更改的持久日志。 它在后台运行并跟踪修改。 在一个逻辑分区中,保证更改的顺序。 读取更改源的最方便方法是将 Azure 函数与 Azure Cosmos DB 触发器配合使用。 另一种选择是使用 更改源处理器库。 它允许将更改源处理作为后台服务(通过接口)集成到 Web API 中 IHostedService 。 此处的示例使用一个简单的控制台应用程序来实现抽象类 BackgroundService ,以在 .NET Core 应用程序中托管长时间运行的后台任务。
若要从 Azure Cosmos DB 更改源接收更改,需要实例化对象 ChangeFeedProcessor 、注册消息处理的处理程序方法,并开始侦听更改:
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
var changeFeedProcessor = _container
.GetChangeFeedProcessorBuilder<ExpandoObject>(
_configuration.GetSection("Cosmos")["ProcessorName"],
HandleChangesAsync)
.WithInstanceName(Environment.MachineName)
.WithLeaseContainer(_leaseContainer)
.WithMaxItems(25)
.WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
.WithPollInterval(TimeSpan.FromSeconds(3))
.Build();
_logger.LogInformation("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
_logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
return changeFeedProcessor;
}
然后,处理程序方法(HandleChangesAsync 此处)处理消息。 在此示例中,事件发布到服务总线主题,该主题已分区以实现可伸缩性,并 启用了重复功能。 任何对对象更改 Contact 感兴趣的服务都可以订阅该服务总线主题,并接收和处理其自己的上下文的更改。
生成的服务总线消息具有一个 SessionId 属性。 在服务总线中使用会话时,可以保证消息的顺序会保留(首先先出(FIFO)。 保留顺序对于此用例是必需的。
下面是处理来自更改源的消息的代码片段:
private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received {changes.Count} document(s).");
var eventsCount = 0;
Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();
foreach (var document in changes as dynamic)
{
if (!((IDictionary<string, object>)document).ContainsKey("type") ||
!((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.
if (document.type == EVENT_TYPE) // domainEvent.
{
string json = JsonConvert.SerializeObject(document.data);
var sbMessage = new ServiceBusMessage(json)
{
ContentType = "application/json",
Subject = document.data.action,
MessageId = document.id,
PartitionKey = document.partitionKey,
SessionId = document.partitionKey
};
// Create message batch per partitionKey.
if (partitionedMessages.ContainsKey(document.partitionKey))
{
partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
}
else
{
partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
}
eventsCount++;
}
}
if (partitionedMessages.Count > 0)
{
_logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");
// Loop over each partition.
foreach (var partition in partitionedMessages)
{
// Create batch for partition.
using var messageBatch =
await _topicSender.CreateMessageBatchAsync(cancellationToken);
foreach (var msg in partition.Value)
if (!messageBatch.TryAddMessage(msg))
throw new Exception();
_logger.LogInformation(
$"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");
try
{
await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e.Message);
throw;
}
}
}
else
{
_logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
}
}
错误处理
如果在处理更改时出错,更改源库将在成功处理最后一批的位置重启读取消息。 例如,如果应用程序成功处理了 10,000 条消息,现在正在处理批处理 10,001 到 10,025,并且发生错误,则它可以重启并选取其工作位置为 10,001。 该库通过 Azure Cosmos DB 容器中 Leases 保存的信息自动跟踪已处理的内容。
服务可能已将一些已重新处理的消息发送到服务总线。 通常,这种情况会导致消息处理重复。 如前所述,服务总线具有重复消息检测的功能,需要为此方案启用该功能。 服务会根据消息的应用程序控制 MessageId 属性检查是否已将消息添加到服务总线主题(或队列)。 该属性设置为 ID 事件文档。 如果同一消息再次发送到服务总线,该服务将忽略并删除它。
保养工作
在典型的事务发件箱实现中,服务将更新已处理的事件,并将属性Processedtrue设置为,指示消息已成功发布。 可以在处理程序方法中手动实现此行为。 在当前方案中,无需执行此类过程。 Azure Cosmos DB 跟踪使用更改源(与 Leases 容器结合使用)处理的事件。
最后一步,有时需要从容器中删除事件,以便仅保留最新的记录/文档。 为了定期进行清理,实现将应用 Azure Cosmos DB 的另一项功能:生存时间(TTL)文档。 Azure Cosmos DB 可以根据可添加到文档的属性自动删除文档 TTL :时间跨度(以秒为单位)。 该服务会不断检查容器中是否有属性 TTL 的文档。 文档过期后,Azure Cosmos DB 会将其从数据库中删除。
当所有组件按预期工作时,将快速处理和发布事件:几秒钟内。 如果 Azure Cosmos DB 中出现错误,则不会将事件发送到消息总线,因为业务对象和相应的事件都无法保存到数据库。 唯一需要考虑的一点是,当后台工作器(更改源处理器)或服务总线不可用时,对文档设置适当的 TTL 值 DomainEvent 。 在生产环境中,最好选择多天的时间跨度。 例如,10 天。 然后,所涉及的所有组件将有足够的时间来处理/发布应用程序中的更改。
概要
事务发件箱模式解决了在分布式系统中可靠地发布域事件的问题。 通过将业务对象的状态及其事件提交在同一事务批处理中,并使用后台处理器作为消息中继,可确保其他服务(内部或外部)最终会收到它们依赖的信息。 此示例不是事务发件箱模式的传统实现。 它使用 Azure Cosmos DB 更改源和生存时间等功能,使事情简单干净。
下面是本方案中使用的 Azure 组件的摘要:
下载此体系结构的 Visio 文件。
此解决方案的优点包括:
- 可靠的消息传送和事件的保证传送。
- 通过服务总线保留事件和消息重复顺序。
- 无需维护指示成功处理事件文档的额外
Processed属性。 - 通过生存时间从 Azure Cosmos DB 中删除事件(TTL)。 此过程不使用处理用户/应用程序请求所需的请求单位。 而是在后台任务中使用“剩余”请求单位。
- 通过
ChangeFeedProcessor(或 Azure 函数)对消息进行错误验证处理。 - 可选:多个更改源处理器,每个处理器都在更改源中维护自己的指针。
注意事项
本文中讨论的示例应用程序演示如何使用 Azure Cosmos DB 和服务总线在 Azure 上实现事务发件箱模式。 还有其他使用 NoSQL 数据库的方法。 若要保证业务对象和事件将可靠地保存在数据库中,可以在业务对象文档中嵌入事件列表。 此方法的缺点是清理过程需要更新包含事件的每个文档。 与使用 TTL 相比,这并不理想,尤其是在请求单位成本方面。
请记住,不应考虑此处生产就绪代码提供的示例代码。 它对多线程处理有一些限制,尤其是类中 DomainEntity 处理事件的方式以及对象在实现中的 CosmosContainerContext 跟踪方式。 将其用作你自己的实现的起点。 或者,请考虑使用已内置于其中的现有库,例如 NServiceBus 或 MassTransit。
部署此方案
可以在 GitHub 上找到源代码、部署文件和测试此方案的说明: https://github.com/Azure-Samples/transactional-outbox-pattern
供稿人
本文由Microsoft维护。 它最初是由以下贡献者撰写的。
主要作者:
- 克里斯蒂安·丹尼格 |首席软件工程师
要查看非公开的 LinkedIn 个人资料,请登录到 LinkedIn。
后续步骤
请查看以下文章了解详细信息: