你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本教程介绍如何使用 Go 编程语言向 Azure 服务总线队列发送消息并从中接收消息。
Azure 服务总线是一个完全托管的企业消息代理,其中包含消息队列和发布/订阅功能。 服务总线用于将应用程序和服务彼此分离,从而提供分布式、可靠且高性能的消息传输。
使用 Azure SDK for Go 的 azservicebus 包,可以从 Azure 服务总线发送和接收消息,并使用 Go 编程语言。
在本教程结束时,你将能够:将单个消息或一批消息发送到队列、接收消息和未处理的死信消息。
先决条件
- 一份 Azure 订阅。 可以激活 Visual Studio 或 MSDN 订阅者权益 或注册 免费帐户。
- 如果没有要使用的队列,请按照 使用 Azure 门户创建服务总线队列 文章中的步骤创建队列。
- Go 版本 1.18 或更高版本
创建示例应用
首先,创建新的 Go 模块。
为名为
service-bus-go-how-to-use-queues的模块创建新目录。在
azservicebus目录中,初始化模块并安装所需的包。go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus创建名为
main.go的新文件。
进行身份验证并创建客户端
在 main.go 文件中,创建一个名为 GetClient 并添加以下代码的新函数:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
该 GetClient 函数返回使用 Azure 服务总线命名空间和凭据创建的新 azservicebus.Client 对象。 命名空间由 AZURE_SERVICEBUS_HOSTNAME 环境变量提供。 凭据是使用 azidentity.NewDefaultAzureCredential 函数创建的。
对于本地开发, DefaultAzureCredential 使用 Azure CLI 的访问令牌,可以通过运行 az login 命令向 Azure 进行身份验证来创建该令牌。
小窍门
若要使用连接字符串进行身份验证,请使用 NewClientFromConnectionString 函数。
将消息发送到队列
在 main.go 文件中,创建一个名为 SendMessage 并添加以下代码的新函数:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage 采用两个参数:消息字符串和对象 azservicebus.Client 。 然后,它会创建一个新 azservicebus.Sender 对象,并将消息发送到队列。 若要发送批量消息,请将 SendMessageBatch 函数添加到 main.go 文件。
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch 采用两个参数:消息切片和 azservicebus.Client 对象。 然后,它会创建一个新 azservicebus.Sender 对象,并将消息发送到队列。
从队列接收消息
将消息发送到队列后,可以使用该 azservicebus.Receiver 类型接收消息。 若要从队列接收消息,请将 GetMessage 函数添加到 main.go 文件中。
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage 获取对象 azservicebus.Client 并创建新 azservicebus.Receiver 对象。 然后,它会从队列接收消息。 该 Receiver.ReceiveMessages 函数采用两个参数:上下文和要接收的消息数。
Receiver.ReceiveMessages函数返回azservicebus.ReceivedMessage对象的切片。
接下来,for 循环会循环访问消息并打印消息正文。 然后调用该 CompleteMessage 函数以完成消息,并将其从队列中删除。
超出长度限制、发送到无效队列或未成功处理的消息可以发送到死信队列。 若要将消息发送到死信队列,请将 SendDeadLetterMessage 函数添加到 main.go 文件中。
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage 获取一个 azservicebus.Client 对象和一个 azservicebus.ReceivedMessage 对象。 然后,系统将消息发送到死信队列。 该函数采用两个 azservicebus.DeadLetterOptions 参数:上下文和对象。 如果消息未能发送到死信队列,该 Receiver.DeadLetterMessage 函数将返回错误。
要从死信队列接收消息,请将 ReceiveDeadLetterMessage 函数添加到 main.go 文件中。
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage 采用 azservicebus.Client 对象并创建包含死信队列选项的新的 azservicebus.Receiver 对象。 然后,它从死信队列接收消息。 然后,该函数从死信队列接收一条消息。 接着它打印该消息的死信原因和描述。
代码示例
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
运行代码
在运行代码之前,请创建名为 的 AZURE_SERVICEBUS_HOSTNAME环境变量。 将环境变量的值设置为服务总线命名空间。
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
接下来,运行以下命令 go run 以运行应用:
go run main.go
后续步骤
有关详细信息,请查看以下链接: