发布和订阅消息教程介绍了使用 Azure Web PubSub 发布和订阅消息的基础知识。 在本教程中,你将了解 Azure Web PubSub 的事件系统,并使用它来生成具有实时通信功能的完整 Web 应用程序。
本教程介绍如何执行下列操作:
- 创建 Web PubSub 服务实例
- 配置 Azure Web PubSub 的事件处理程序设置
- 处理应用服务器中的事件并构建实时聊天应用
 
如果没有 Azure 帐户,请在开始前创建一个免费帐户。
先决条件
- 本设置需要 Azure CLI 版本 2.22.0 或更高版本。 如果使用 Azure Cloud Shell,则最新版本已安装。
创建 Azure Web PubSub 实例
创建资源组
资源组是在其中部署和管理 Azure 资源的逻辑容器。 使用 az group create 命令在 myResourceGroup 位置创建名为 eastus 的资源组。
az group create --name myResourceGroup --location EastUS
创建 Web PubSub 实例
运行 az extension add,安装 webpubsub 扩展或将其升级到当前版本。
az extension add --upgrade --name webpubsub
使用 Azure CLI az webpubsub create 命令在已创建的资源组中创建 Web PubSub。 以下命令在 EastUS 的资源组 myResourceGroup 下创建一个免费的 Web PubSub 资源:
重要
每个 Web PubSub 资源必须具有唯一名称。 在以下示例中,将 <your-unique-resource-name> 替换为 Web PubSub 的名称。
 
az webpubsub create --name "<your-unique-resource-name>" --resource-group "myResourceGroup" --location "EastUS" --sku Free_F1
此命令的输出会显示新建的资源的属性。 请记下下面列出的两个属性:
- 
              资源名称:为上面的 --name参数提供的名称。
- 
              主机名:在本例中,主机名为 <your-unique-resource-name>.webpubsub.azure.com/。
目前,只有你的 Azure 帐户才有权对这个新资源执行任何操作。
获取 ConnectionString 以供将来使用
重要
本文中出现的原始连接字符串仅用于演示目的。
连接字符串包括应用程序访问 Azure Web PubSub 服务所需的授权信息。 连接字符串中的访问密钥类似于服务的根密码。 在生产环境中,请始终保护访问密钥。 使用 Azure Key Vault 安全地管理和轮换密钥,并使用 WebPubSubServiceClient 对连接进行保护。
避免将访问密钥分发给其他用户、对其进行硬编码或将其以纯文本形式保存在其他人可以访问的任何位置。 如果你认为访问密钥可能已泄露,请轮换密钥。
 
使用 Azure CLI az webpubsub key 命令获取服务的 ConnectionString。  将 <your-unique-resource-name> 占位符替换为 Azure Web PubSub 实例的名称。
az webpubsub key show --resource-group myResourceGroup --name <your-unique-resource-name> --query primaryConnectionString --output tsv
复制主连接字符串以供稍后使用。
复制提取的 ConnectionString 并将其设置到环境变量 WebPubSubConnectionString 中,教程稍后将读取该变量。 将下面的 <connection-string> 替换为提取的 ConnectionString。
export WebPubSubConnectionString="<connection-string>"
SET WebPubSubConnectionString=<connection-string>
设置项目
先决条件
创建应用程序
Azure Web PubSub 中有两个角色:服务器和客户端。 此概念类似于 Web 应用程序中的服务器和客户端角色。 服务器负责管理客户端、侦听,以及响应客户端消息。 客户端负责通过服务器发送和接收用户的消息,并将它们可视化给最终用户。
在本教程中,我们将生成一个实时聊天 Web 应用程序。 在真实的 Web 应用程序中,服务器的职责还包括对客户端进行身份验证,以及为应用程序 UI 提供静态网页。
我们将使用 ASP.NET Core 8 来托管网页并处理传入的请求。
首先,让我们在 chatapp 文件夹中创建一个 ASP.NET Core Web 应用。
- 创建新的 Web 应用。 - mkdir chatapp
cd chatapp
dotnet new web
 
- 添加 - app.UseStaticFiles()Program.cs 以支持托管静态网页操作。
 - var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.UseStaticFiles();
app.Run();
 
- 创建一个 HTML 文件并将其保存为 - wwwroot/index.html,我们稍后会在聊天应用的 UI 中使用它。
 - <html>
  <body>
    <h1>Azure Web PubSub Chat</h1>
  </body>
</html>
 
可以通过运行 dotnet run --urls http://localhost:8080 来测试服务器,并在浏览器中访问 http://localhost:8080/index.html。
我们使用 express.js(一种常用的 Node.js Web 框架)来托管网页并处理传入请求。
首先,让我们在 chatapp 文件夹中创建一个 Express Web 应用。
- 安装 express.js - mkdir chatapp
cd chatapp
npm init -y
npm install --save express
 
- 然后创建一个 express 服务器并将其保存为 - server.js
 - const express = require('express');
const app = express();
app.use(express.static('public'));
app.listen(8080, () => console.log('server started'));
 
- 另外,创建一个 HTML 文件并将其保存为 - public/index.html,我们稍后会在聊天应用的 UI 中使用它。
 - <html>
<body>
  <h1>Azure Web PubSub Chat</h1>
</body>
</html>
 
可以通过运行 node server 来测试服务器,并在浏览器中访问 http://localhost:8080。
我们将使用 Javalin Web 框架来托管网页并处理传入的请求。
- 使用 Maven 创建新应用 - webpubsub-tutorial-chat,然后切换到 webpubsub-tutorial-chat 文件夹:
 - mvn archetype:generate --define interactiveMode=n --define groupId=com.webpubsub.tutorial --define artifactId=webpubsub-tutorial-chat --define archetypeArtifactId=maven-archetype-quickstart --define archetypeVersion=1.4
cd webpubsub-tutorial-chat
 
- 将 - javalinWeb 框架依赖项添加到- dependencies的- pom.xml节点中:
 - 
- 
              javalin:适用于 Java 的简单 Web 框架
- 
              slf4j-simple:适用于 Java 的记录器
 - <!-- https://mvnrepository.com/artifact/io.javalin/javalin -->
<dependency>
    <groupId>io.javalin</groupId>
    <artifactId>javalin</artifactId>
    <version>6.1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.12</version>
</dependency>
 
- 导航到 /src/main/java/com/webpubsub/tutorial 目录。 在编辑器中打开 App.java 文件。 使用 - Javalin.create提供静态文件:
 - package com.webpubsub.tutorial;
import io.javalin.Javalin;
public class App {
    public static void main(String[] args) {
        // start a server
        Javalin app = Javalin.create(config -> {
            config.staticFiles.add("public");
        }).start(8080);
    }
}
 - 根据你的设置,你可能需要将语言级别显式设置为 Java 8。 此操作可以在 pom.xml 中完成。 添加以下代码片段: - <build>
    <plugins>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
          <configuration>
            <source>1.8</source>
            <target>1.8</target>
          </configuration>
        </plugin>
    </plugins>
</build>
 
- 创建一个 HTML 文件并将其保存到 /src/main/resources/public/index.html 中。 稍后我们将在聊天应用的 UI 中使用它。 - <html>
<body>
  <h1>Azure Web PubSub Chat</h1>
</body>
</html>
 
可以通过在包含 pom.xml 文件的目录下运行以下命令来测试服务器,并在浏览器中访问  。
mvn compile & mvn package & mvn exec:java -Dexec.mainClass="com.webpubsub.tutorial.App" -Dexec.cleanupDaemonThreads=false
我们将使用 Flask(一种流行的 Python Web 框架)来完成这项工作。
首先创建准备好 Flask 的项目文件夹 chatapp。
- 创建并激活环境 - mkdir chatapp
cd chatapp
python3 -m venv .venv
. .venv/bin/activate
 
- 在激活的环境中,安装 Flask - pip install Flask
 
- 然后创建 Flask 服务器并将其另存为 - server.py
 - from flask import (
    Flask, 
    send_from_directory,
)
app = Flask(__name__)
@app.route('/<path:filename>')
def index(filename):
    return send_from_directory('public', filename)
if __name__ == '__main__':
    app.run(port=8080)
 
- 另外,创建一个 HTML 文件并将其保存为 - public/index.html,我们稍后会在聊天应用的 UI 中使用它。
 - <html>
<body>
  <h1>Azure Web PubSub Chat</h1>
</body>
</html>
 
可以通过运行 python server.py 来测试服务器,并在浏览器中访问 http://127.0.0.1:8080/index.html 。
 
添加协商终结点
在发布和订阅消息教程中,订阅者直接使用连接字符串。 在现实世界的应用程序中,与任何客户端共享连接字符串是不安全的,因为连接字符串具有对服务执行任何操作的高权限。 现在,让服务器使用连接字符串,并公开一个 negotiate 终结点,以便客户端获取带有访问令牌的完整 URL。 这样,服务器就可以在 negotiate 终结点之前添加身份验证中间件,以防止未经授权的访问。
首先安装依赖项。
dotnet add package Microsoft.Azure.WebPubSub.AspNetCore
现在,我们添加一个 /negotiate 终结点,供客户端调用以生成令牌。
using Azure.Core;
using Microsoft.Azure.WebPubSub.AspNetCore;
using Microsoft.Azure.WebPubSub.Common;
using Microsoft.Extensions.Primitives;
// Read connection string from environment
var connectionString = Environment.GetEnvironmentVariable("WebPubSubConnectionString");
if (connectionString == null)
{
    throw new ArgumentNullException(nameof(connectionString));
}
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddWebPubSub(o => o.ServiceEndpoint = new WebPubSubServiceEndpoint(connectionString))
    .AddWebPubSubServiceClient<Sample_ChatApp>();
var app = builder.Build();
app.UseStaticFiles();
// return the Client Access URL with negotiate endpoint
app.MapGet("/negotiate", (WebPubSubServiceClient<Sample_ChatApp> service, HttpContext context) =>
{
    var id = context.Request.Query["id"];
    if (StringValues.IsNullOrEmpty(id))
    {
        context.Response.StatusCode = 400;
        return null;
    }
    return new
    {
        url = service.GetClientAccessUri(userId: id).AbsoluteUri
    };
});
app.Run();
sealed class Sample_ChatApp : WebPubSubHub
{
}
              AddWebPubSubServiceClient<THub>() 用于注入服务客户端 WebPubSubServiceClient<THub>,我们可以在协商步骤中使用它来生成客户端连接令牌,并在触发中心事件时,在中心方法中使用它来调用服务 REST API。 此令牌生成代码类似于我们在发布和订阅消息教程中使用的代码,其不同之处在于,我们在生成令牌时多传递了一个参数 (userId)。 用户 ID 可用于识别客户端的身份,因此,当你收到消息时,你知道消息来自何处。
该代码从我们在WebPubSubConnectionString设置的环境变量  中读取连接字符串。
使用 dotnet run --urls http://localhost:8080 重新运行服务器。
首先,安装 Azure Web PubSub SDK:
npm install --save @azure/web-pubsub
现在,让我们添加一个 /negotiate API 来生成令牌。
const express = require('express');
const { WebPubSubServiceClient } = require('@azure/web-pubsub');
const app = express();
const hubName = 'Sample_ChatApp';
let serviceClient = new WebPubSubServiceClient(process.env.WebPubSubConnectionString, hubName);
app.get('/negotiate', async (req, res) => {
    let id = req.query.id;
    if (!id) {
    res.status(400).send('missing user id');
    return;
    }
    let token = await serviceClient.getClientAccessToken({ userId: id });
    res.json({
    url: token.url
    });
});
app.use(express.static('public'));
app.listen(8080, () => console.log('server started'));
此令牌生成代码类似于我们在发布和订阅消息教程中使用的代码,其不同之处在于,我们在生成令牌时多传递了一个参数 (userId)。 用户 ID 可用于识别客户端的身份,因此,当你收到消息时,你知道消息来自何处。
该代码从我们在WebPubSubConnectionString设置的环境变量  中读取连接字符串。
通过运行 node server 重新运行服务器。
首先,将 Azure Web PubSub SDK 依赖项和 gson 添加到 dependencies 的 pom.xml 节点中:
<!-- https://mvnrepository.com/artifact/com.azure/azure-messaging-webpubsub -->
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-webpubsub</artifactId>
    <version>1.2.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.10.1</version>
</dependency>
现在,让我们向 /negotiate 文件添加 App.java API 以生成令牌:
package com.webpubsub.tutorial;
    
import com.azure.messaging.webpubsub.WebPubSubServiceClient;
import com.azure.messaging.webpubsub.WebPubSubServiceClientBuilder;
import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions;
import com.azure.messaging.webpubsub.models.WebPubSubClientAccessToken;
import com.azure.messaging.webpubsub.models.WebPubSubContentType;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.javalin.Javalin;
public class App {
    public static void main(String[] args) {
        String connectionString = System.getenv("WebPubSubConnectionString");
        if (connectionString == null) {
            System.out.println("Please set the environment variable WebPubSubConnectionString");
            return;
        }
        // create the service client
        WebPubSubServiceClient service = new WebPubSubServiceClientBuilder()
                .connectionString(connectionString)
                .hub("Sample_ChatApp")
                .buildClient();
        // start a server
        Javalin app = Javalin.create(config -> {
            config.staticFiles.add("public");
        }).start(8080);
        
        // Handle the negotiate request and return the token to the client
        app.get("/negotiate", ctx -> {
            String id = ctx.queryParam("id");
            if (id == null) {
                ctx.status(400);
                ctx.result("missing user id");
                return;
            }
            GetClientAccessTokenOptions option = new GetClientAccessTokenOptions();
            option.setUserId(id);
            WebPubSubClientAccessToken token = service.getClientAccessToken(option);
            ctx.contentType("application/json");
            Gson gson = new Gson();
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("url", token.getUrl());
            String response = gson.toJson(jsonObject);
            ctx.result(response);
            return;
        });
    }
}
此令牌生成代码类似于我们在发布和订阅消息教程中使用的代码,其不同之处在于,我们在生成令牌时调用 setUserId 方法来设置用户 ID。 用户 ID 可用于识别客户端的身份,因此,当你收到消息时,你知道消息来自何处。
该代码从我们在WebPubSubConnectionString设置的环境变量  中读取连接字符串。
使用以下命令重新运行服务器:
mvn compile & mvn package & mvn exec:java -Dexec.mainClass="com.webpubsub.tutorial.App" -Dexec.cleanupDaemonThreads=false
首先,安装 Azure Web PubSub SDK。
pip install azure-messaging-webpubsubservice
现在,让我们向服务器添加 /negotiate API 以生成令牌。
import os
from flask import (
    Flask, 
    request, 
    send_from_directory,
)
from azure.messaging.webpubsubservice import (
    WebPubSubServiceClient
)
hub_name = 'Sample_ChatApp'
connection_string = os.environ.get('WebPubSubConnectionString')
app = Flask(__name__)
service = WebPubSubServiceClient.from_connection_string(connection_string, hub=hub_name)
@app.route('/<path:filename>')
def index(filename):
    return send_from_directory('public', filename)
@app.route('/negotiate')
def negotiate():
    id = request.args.get('id')
    if not id:
        return 'missing user id', 400
    token = service.get_client_access_token(user_id=id)
    return {
        'url': token['url']
    }, 200
if __name__ == '__main__':
    app.run(port=8080)
此令牌生成代码类似于我们在发布和订阅消息教程中使用的代码,其不同之处在于,我们在生成令牌时多传递了一个参数 (user_id)。 用户 ID 可用于识别客户端的身份,因此,当你收到消息时,你知道消息来自何处。
该代码从我们在WebPubSubConnectionString设置的环境变量  中读取连接字符串。
使用 python server.py 重新运行服务器。
 
你可以通过访问 http://localhost:8080/negotiate?id=user1 来测试此 API,它将为你提供 Azure Web PubSub 的完整 URL 和访问令牌。
处理事件
在 Azure Web PubSub 中,当客户端发生某些活动时(例如客户端正在进行连接、已连接、已断开连接或客户端正在发送消息),服务会向服务器发送通知,以便服务器对这些事件做出响应。
事件以 Webhook 的形式传送到服务器。 Webhook 由应用程序服务器提供和公开,并在 Azure Web PubSub 服务端注册。 每当发生事件时,服务都会调用 Webhook。
Azure Web PubSub 遵循 CloudEvents 来描述事件数据。
下面,我们在客户端已连接的情况下处理 connected 系统事件,并在客户端发送消息以构建聊天应用时处理 message 用户事件。
我们在上一步安装的适用于 AspNetCore Microsoft.Azure.WebPubSub.AspNetCore 的 Web PubSub SDK 也可以帮助解析和处理 CloudEvents 请求。
首先,在 app.Run() 之前添加事件处理程序。 指定事件的终结点路径,比如 /eventhandler。
app.MapWebPubSubHub<Sample_ChatApp>("/eventhandler/{*path}");
app.Run();
现在,在上一步创建的 Sample_ChatApp 类中添加一个构造函数,让其与用来调用 Web PubSub 服务的 WebPubSubServiceClient<Sample_ChatApp> 配合使用。 
              OnConnectedAsync() 用于在触发 connected 事件时进行响应,OnMessageReceivedAsync() 用于处理来自客户端的消息。
sealed class Sample_ChatApp : WebPubSubHub
{
    private readonly WebPubSubServiceClient<Sample_ChatApp> _serviceClient;
    public Sample_ChatApp(WebPubSubServiceClient<Sample_ChatApp> serviceClient)
    {
        _serviceClient = serviceClient;
    }
    public override async Task OnConnectedAsync(ConnectedEventRequest request)
    {
        Console.WriteLine($"[SYSTEM] {request.ConnectionContext.UserId} joined.");
    }
    public override async ValueTask<UserEventResponse> OnMessageReceivedAsync(UserEventRequest request, CancellationToken cancellationToken)
    {
        await _serviceClient.SendToAllAsync(RequestContent.Create(
        new
        {
            from = request.ConnectionContext.UserId,
            message = request.Data.ToString()
        }),
        ContentType.ApplicationJson);
        return new UserEventResponse();
    }
}
在上面的代码中,我们使用服务客户端以 JSON 格式向所有通过 SendToAllAsync 加入的人广播一条通知消息。
适用于 Express @azure/web-pubsub-express 的 Web PubSub SDK 有助于解析和处理 CloudEvents 请求。
npm install --save @azure/web-pubsub-express
使用以下代码更新 server.js,在 /eventhandler 处公开 REST API(此操作由 Web PubSub SDK 提供的 express 中间件完成)以处理“客户端已连接”事件:
const express = require("express");
const { WebPubSubServiceClient } = require("@azure/web-pubsub");
const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const app = express();
const hubName = "Sample_ChatApp";
let serviceClient = new WebPubSubServiceClient(process.env.WebPubSubConnectionString, hubName);
let handler = new WebPubSubEventHandler(hubName, {
  path: "/eventhandler",
  onConnected: async (req) => {
    console.log(`${req.context.userId} connected`);
  },
  handleUserEvent: async (req, res) => {
    if (req.context.eventName === "message")
      await serviceClient.sendToAll({
        from: req.context.userId,
        message: req.data,
      });
    res.success();
  },
});
app.get("/negotiate", async (req, res) => {
  let id = req.query.id;
  if (!id) {
    res.status(400).send("missing user id");
    return;
  }
  let token = await serviceClient.getClientAccessToken({ userId: id });
  res.json({
    url: token.url,
  });
});
app.use(express.static("public"));
app.use(handler.getMiddleware());
app.listen(8080, () => console.log("server started"));
在上面的代码中,onConnected 在连接客户端后直接向控制台输出一条消息。 你可以看到,我们使用了 req.context.userId,因此我们可以看到所连接客户端的标识。 当客户端发送消息时,会调用 handleUserEvent。 它使用 WebPubSubServiceClient.sendToAll() 将 JSON 对象中的消息广播到所有客户端。 你可以看到 handleUserEvent 还有一个 res 对象,你可以在其中将消息发送回事件发送方。 在这里,我们只需调用 res.success(),使 WebHook 返回 200(请注意,即使你不想向客户端返回任何内容,也需要执行此调用,否则 WebHook 永远不会返回内容并且客户端连接将会关闭)。
目前,你需要用 Java 自行实现事件处理程序。 步骤直接遵循协议规范,如以下列表所示:
- 为事件处理程序路径添加 HTTP 处理程序,比如 - /eventhandler。
 
- 首先,我们想处理 abuse protection OPTIONS 请求,我们检查标头中是否包含 - WebHook-Request-Origin标头,然后返回标头- WebHook-Allowed-Origin。 为了简化演示,我们返回- *以允许所有来源。
 - 
// validation: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#protection
app.options("/eventhandler", ctx -> {
    ctx.header("WebHook-Allowed-Origin", "*");
});
 
- 然后,我们想检查传入的请求是否就是我们预期的事件。 比方说,我们现在关心的是系统 - connected事件,它应该包含标头- ce-type作为- azure.webpubsub.sys.connected。 我们会在滥用保护之后添加逻辑,以将已连接事件广播给所有客户端,使客户端可以看到谁加入了聊天室。
 - // validation: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#protection
app.options("/eventhandler", ctx -> {
    ctx.header("WebHook-Allowed-Origin", "*");
});
// handle events: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#events
app.post("/eventhandler", ctx -> {
    String event = ctx.header("ce-type");
    if ("azure.webpubsub.sys.connected".equals(event)) {
        String id = ctx.header("ce-userId");
        System.out.println(id + " connected.");
    }
    ctx.status(200);
});
 - 在上面的代码中,我们在连接客户端时直接向控制台打印一条消息。 你可以看到,我们使用了 - ctx.header("ce-userId"),因此我们可以看到所连接客户端的标识。
 
- 
              - ce-type事件的- message始终为- azure.webpubsub.user.message。 详见事件消息。 我们会更新处理消息的逻辑,这样,当消息传入时,我们就会以 JSON 格式向所有连接的客户端广播该消息。
 - // handle events: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#events
app.post("/eventhandler", ctx -> {
    String event = ctx.header("ce-type");
    if ("azure.webpubsub.sys.connected".equals(event)) {
        String id = ctx.header("ce-userId");
        System.out.println(id + " connected.");
    } else if ("azure.webpubsub.user.message".equals(event)) {
        String id = ctx.header("ce-userId");
        String message = ctx.body();
        Gson gson = new Gson();
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("from", id);
        jsonObject.addProperty("message", message);
        String messageToSend = gson.toJson(jsonObject);
        service.sendToAll(messageToSend, WebPubSubContentType.APPLICATION_JSON);
    }
    ctx.status(200);
});
 
目前,你需要用 Python 自行实现事件处理程序。 步骤直接遵循协议规范,如以下列表所示:
- 为事件处理程序路径添加 HTTP 处理程序,比如 - /eventhandler。
 
- 首先,我们想处理 abuse protection OPTIONS 请求,我们检查标头中是否包含 - WebHook-Request-Origin标头,然后返回标头- WebHook-Allowed-Origin。 为了简化演示,我们返回- *以允许所有来源。
 - # validation: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#protection
@app.route('/eventhandler', methods=['OPTIONS'])
def handle_event():
    if request.method == 'OPTIONS':
        if request.headers.get('WebHook-Request-Origin'):
            res = Response()
            res.headers['WebHook-Allowed-Origin'] = '*'
            res.status_code = 200
            return res
 
- 然后,我们想检查传入的请求是否就是我们预期的事件。 比方说,我们现在关心的是系统 - connected事件,它应该包含标头- ce-type作为- azure.webpubsub.sys.connected。 我们在 abuse protection 后面添加逻辑:
 - # validation: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#protection
# handle events: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#events
@app.route('/eventhandler', methods=['POST', 'OPTIONS'])
def handle_event():
    if request.method == 'OPTIONS':
        if request.headers.get('WebHook-Request-Origin'):
            res = Response()
            res.headers['WebHook-Allowed-Origin'] = '*'
            res.status_code = 200
            return res
    elif request.method == 'POST':
        user_id = request.headers.get('ce-userid')
        if request.headers.get('ce-type') == 'azure.webpubsub.sys.connected':
            return user_id + ' connected', 200
        else:
            return 'Not found', 404
 - 在上面的代码中,我们在连接客户端时直接向控制台打印一条消息。 你可以看到,我们使用了 - request.headers.get('ce-userid'),因此我们可以看到所连接客户端的标识。
 
- 
              - ce-type事件的- message始终为- azure.webpubsub.user.message。 详见事件消息。 我们会更新处理消息的逻辑,这样,当消息传入时,我们就会将消息广播到所有连接的客户端。
 - @app.route('/eventhandler', methods=['POST', 'OPTIONS'])
def handle_event():
    if request.method == 'OPTIONS':
        if request.headers.get('WebHook-Request-Origin'):
            res = Response()
            res.headers['WebHook-Allowed-Origin'] = '*'
            res.status_code = 200
            return res
    elif request.method == 'POST':
        user_id = request.headers.get('ce-userid')
        type = request.headers.get('ce-type')
        if type == 'azure.webpubsub.sys.connected':
            print(f"{user_id} connected")
            return '', 204
        elif type == 'azure.webpubsub.user.message':
            # default uses JSON
            service.send_to_all(message={
                'from': user_id,
                'message': request.data.decode('UTF-8')
            })
            # returned message is also received by the client
            return {
                'from': "system",
                'message': "message handled by server"
            }, 200
        else:
            return 'Bad Request', 400
 
 
更新网页
现在,让我们更新 index.html 以添加用于进行连接、发送消息以及在页面中显示收到的消息的逻辑。
<html>
  <body>
    <h1>Azure Web PubSub Chat</h1>
    <input id="message" placeholder="Type to chat...">
    <div id="messages"></div>
    <script>
      (async function () {
        let id = prompt('Please input your user name');
        let res = await fetch(`/negotiate?id=${id}`);
        let data = await res.json();
        let ws = new WebSocket(data.url);
        ws.onopen = () => console.log('connected');
        let messages = document.querySelector('#messages');
        
        ws.onmessage = event => {
          let m = document.createElement('p');
          let data = JSON.parse(event.data);
          m.innerText = `[${data.type || ''}${data.from || ''}] ${data.message}`;
          messages.appendChild(m);
        };
        let message = document.querySelector('#message');
        message.addEventListener('keypress', e => {
          if (e.charCode !== 13) return;
          ws.send(message.value);
          message.value = '';
        });
      })();
    </script>
  </body>
</html>
你可以在上面的代码中看到,我们使用浏览器中的本机 WebSocket API 进行连接,使用 WebSocket.send() 发送消息,使用 WebSocket.onmessage 侦听接收到的消息。
还可以使用客户端 SDK 连接到该服务,这样就可以完成自动重新连接、错误处理等操作。
现在还差一步就可以进行聊天了。 让我们在 Web PubSub 服务中配置我们关心的事件以及将事件发送到的位置。
设置事件处理程序
我们在 Web PubSub 服务中设置事件处理程序来告诉服务将事件发送到哪里。
当 Web 服务器在本地运行时,如果没有允许从 Internet 进行访问的终结点,Web PubSub 服务如何调用 localhost? 通常有两种方法。 一种方法是使用一些通用隧道工具向公众公开 localhost,另一种方法是使用 awps-tunnel 通过该工具将流量从 Web PubSub 服务以隧道传输方式传输到本地服务器。
在本部分,我们使用 Azure CLI 设置事件处理程序并使用 awps-tunnel 将流量路由到 localhost。
我们将 URL 模板设置为使用 tunnel 方案,以便 Web PubSub 通过 awps-tunnel 的隧道连接来路由消息。 事件处理程序可以按本文所述通过门户或 CLI 设置,在这里我们通过 CLI 设置。 由于我们按照上一步的设置侦听路径 /eventhandler 中的事件,因此我们将 URL 模板设置为 tunnel:///eventhandler。
使用 Azure CLI az webpubsub hub create 命令为 Sample_ChatApp 中心创建事件处理程序设置。
重要
将 <your-unique-resource-name> 替换为在前面的步骤中创建的 Web PubSub 资源的名称。
 
az webpubsub hub create -n "<your-unique-resource-name>" -g "myResourceGroup" --hub-name "Sample_ChatApp" --event-handler url-template="tunnel:///eventhandler" user-event-pattern="*" system-event="connected"
在本地运行 awps-tunnel
下载并安装 awps-tunnel
该工具在 Node.js 16 或更高版本上运行。
npm install -g @azure/web-pubsub-tunnel-tool
使用服务连接字符串并运行
export WebPubSubConnectionString="<your connection string>"
awps-tunnel run --hub Sample_ChatApp --upstream http://localhost:8080
运行 Web 服务器
现在,一切都已设置好。 让我们运行 Web 服务器并体验聊天应用。
现在使用 dotnet run --urls http://localhost:8080 运行服务器。
可以在此处找到本教程的完整代码示例。
现在使用 node server 运行服务器。
可以在此处找到本教程的完整代码示例。
现在,使用以下命令运行服务器:
mvn compile & mvn package & mvn exec:java -Dexec.mainClass="com.webpubsub.tutorial.App" -Dexec.cleanupDaemonThreads=false
可以在此处找到本教程的完整代码示例。
现在使用 python server.py 运行服务器。
可以在此处找到本教程的完整代码示例。
 
打开 http://localhost:8080/index.html。 可以输入用户名并开始聊天了。
使用 connect 事件处理程序进行延迟身份验证
在前面的部分中,我们演示如何使用协商终结点返回 Web PubSub 服务 URL 和 JWT 访问令牌,以便客户端连接 Web PubSub 服务。 在某些情况下,例如资源有限的边缘设备,客户端可能更愿意直接连接 Web PubSub 资源。 在这些情况下,可以将 connect 事件处理程序配置为对客户端进行延迟身份验证、将用户 ID 分配给客户端、指定客户端连接后加入的组、配置客户端拥有的权限以及 WebSocket 子协议作为对客户端的 WebSocket 响应等。有关详细信息,请参阅连接事件处理程序规范。
现在,让我们使用 connect 事件处理程序来实现与协商部分类似的功能。
更新中心设置
首先,让我们更新中心设置以包括 connect 事件处理程序,我们还需要允许匿名连接,使没有 JWT 访问令牌的客户端可以连接到此服务。
使用 Azure CLI az webpubsub hub update 命令为 Sample_ChatApp 中心创建事件处理程序设置。
重要
将 <your-unique-resource-name> 替换为在前面的步骤中创建的 Web PubSub 资源的名称。
 
az webpubsub hub update -n "<your-unique-resource-name>" -g "myResourceGroup" --hub-name "Sample_ChatApp" --allow-anonymous true --event-handler url-template="tunnel:///eventhandler" user-event-pattern="*" system-event="connected" system-event="connect"
更新上游逻辑以处理连接事件
现在,让我们更新上游逻辑以处理连接事件。 我们现在还可以移除协商终结点。
与我们出于演示目的在协商终结点中执行的操作类似,我们还从查询参数中读取 ID。 在连接事件中,原始客户端查询会保留在连接事件请求正文中。
在类 Sample_ChatApp 中,替代 OnConnectAsync() 来处理 connect 事件:
sealed class Sample_ChatApp : WebPubSubHub
{
    private readonly WebPubSubServiceClient<Sample_ChatApp> _serviceClient;
    public Sample_ChatApp(WebPubSubServiceClient<Sample_ChatApp> serviceClient)
    {
        _serviceClient = serviceClient;
    }
    public override ValueTask<ConnectEventResponse> OnConnectAsync(ConnectEventRequest request, CancellationToken cancellationToken)
    {
        if (request.Query.TryGetValue("id", out var id))
        {
            return new ValueTask<ConnectEventResponse>(request.CreateResponse(userId: id.FirstOrDefault(), null, null, null));
        }
        // The SDK catches this exception and returns 401 to the caller
        throw new UnauthorizedAccessException("Request missing id");
    }
    public override async Task OnConnectedAsync(ConnectedEventRequest request)
    {
        Console.WriteLine($"[SYSTEM] {request.ConnectionContext.UserId} joined.");
    }
    public override async ValueTask<UserEventResponse> OnMessageReceivedAsync(UserEventRequest request, CancellationToken cancellationToken)
    {
        await _serviceClient.SendToAllAsync(RequestContent.Create(
        new
        {
            from = request.ConnectionContext.UserId,
            message = request.Data.ToString()
        }),
        ContentType.ApplicationJson);
        return new UserEventResponse();
    }
}
更新 server.js 以处理客户端连接事件:
const express = require("express");
const { WebPubSubServiceClient } = require("@azure/web-pubsub");
const { WebPubSubEventHandler } = require("@azure/web-pubsub-express");
const app = express();
const hubName = "Sample_ChatApp";
let serviceClient = new WebPubSubServiceClient(process.env.WebPubSubConnectionString, hubName);
let handler = new WebPubSubEventHandler(hubName, {
  path: "/eventhandler",
  handleConnect: async (req, res) => {
    if (req.context.query.id){
      res.success({ userId: req.context.query.id });
    } else {
      res.fail(401, "missing user id");
    }
  },
  onConnected: async (req) => {
    console.log(`${req.context.userId} connected`);
  },
  handleUserEvent: async (req, res) => {
    if (req.context.eventName === "message")
      await serviceClient.sendToAll({
        from: req.context.userId,
        message: req.data,
      });
    res.success();
  },
});
app.use(express.static("public"));
app.use(handler.getMiddleware());
app.listen(8080, () => console.log("server started"));
现在,让我们添加逻辑来处理连接事件 azure.webpubsub.sys.connect:
// validation: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#protection
app.options("/eventhandler", ctx -> {
    ctx.header("WebHook-Allowed-Origin", "*");
});
// handle events: https://free.blessedness.top/azure/azure-web-pubsub/reference-cloud-events#connect
app.post("/eventhandler", ctx -> {
    String event = ctx.header("ce-type");
    if ("azure.webpubsub.sys.connect".equals(event)) {
        String body = ctx.body();
        System.out.println("Reading from request body...");
        Gson gson = new Gson();
        JsonObject requestBody = gson.fromJson(body, JsonObject.class); // Parse JSON request body
        JsonObject query = requestBody.getAsJsonObject("query");
        if (query != null) {
            System.out.println("Reading from request body query:" + query.toString());
            JsonElement idElement = query.get("id");
            if (idElement != null) {
                JsonArray idInQuery = query.get("id").getAsJsonArray();
                if (idInQuery != null && idInQuery.size() > 0) {
                    String id = idInQuery.get(0).getAsString();
                    ctx.contentType("application/json");
                    Gson response = new Gson();
                    JsonObject jsonObject = new JsonObject();
                    jsonObject.addProperty("userId", id);
                    ctx.result(response.toJson(jsonObject));
                    return;
                }
            }
        } else {
            System.out.println("No query found from request body.");
        }
        ctx.status(401).result("missing user id");
    } else if ("azure.webpubsub.sys.connected".equals(event)) {
        String id = ctx.header("ce-userId");
        System.out.println(id + " connected.");
        ctx.status(200);
    } else if ("azure.webpubsub.user.message".equals(event)) {
        String id = ctx.header("ce-userId");
        String message = ctx.body();
        service.sendToAll(String.format("{\"from\":\"%s\",\"message\":\"%s\"}", id, message), WebPubSubContentType.APPLICATION_JSON);
        ctx.status(200);
    }
});
现在,让我们处理系统 connect 事件,它应该包含标头 ce-type 作为 azure.webpubsub.sys.connect。 我们在 abuse protection 后面添加逻辑:
@app.route('/eventhandler', methods=['POST', 'OPTIONS'])
def handle_event():
    if request.method == 'OPTIONS' or request.method == 'GET':
        if request.headers.get('WebHook-Request-Origin'):
            res = Response()
            res.headers['WebHook-Allowed-Origin'] = '*'
            res.status_code = 200
            return res
    elif request.method == 'POST':
        user_id = request.headers.get('ce-userid')
        type = request.headers.get('ce-type')
        print("Received event of type:", type)
        # Sample connect logic if connect event handler is configured
        if type == 'azure.webpubsub.sys.connect':
            body = request.data.decode('utf-8')
            print("Reading from connect request body...")
            query = json.loads(body)['query']
            print("Reading from request body query:", query)
            id_element = query.get('id')
            user_id = id_element[0] if id_element else None
            if user_id:
                return {'userId': user_id}, 200
            return 'missing user id', 401
        elif type == 'azure.webpubsub.sys.connected':
            return user_id + ' connected', 200
        elif type == 'azure.webpubsub.user.message':
            service.send_to_all(content_type="application/json", message={
                'from': user_id,
                'message': request.data.decode('UTF-8')
            })
            return Response(status=204, content_type='text/plain')
        else:
            return 'Bad Request', 400
 
更新 index.html 以直接连接
现在,让我们更新网页以直接连接到 Web PubSub 服务。 需要说明的是,为了演示的目的,Web PubSub 服务终结点是硬编码到客户端代码中的,请将下面 html 中的服务主机名 <the host name of your service> 更新为你自己服务的值。 从服务器提取 Web PubSub 服务终结点值可能仍然有用,因为这样可以更灵活地控制客户端连接到哪里。
<html>
  <body>
    <h1>Azure Web PubSub Chat</h1>
    <input id="message" placeholder="Type to chat...">
    <div id="messages"></div>
    <script>
      (async function () {
        // sample host: mock.webpubsub.azure.com
        let hostname = "<the host name of your service>";
        let id = prompt('Please input your user name');
        let ws = new WebSocket(`wss://${hostname}/client/hubs/Sample_ChatApp?id=${id}`);
        ws.onopen = () => console.log('connected');
        let messages = document.querySelector('#messages');
        
        ws.onmessage = event => {
          let m = document.createElement('p');
          let data = JSON.parse(event.data);
          m.innerText = `[${data.type || ''}${data.from || ''}] ${data.message}`;
          messages.appendChild(m);
        };
        let message = document.querySelector('#message');
        message.addEventListener('keypress', e => {
          if (e.charCode !== 13) return;
          ws.send(message.value);
          message.value = '';
        });
      })();
    </script>
  </body>
</html>
重新运行服务器
现在,重新运行服务器,并按照之前的说明访问网页。 如果已停止 awps-tunnel,请重新运行隧道工具。
后续步骤
本教程大致介绍了事件系统在 Azure Web PubSub 服务中的工作原理。
查看其他教程,进一步深入了解如何使用该服务。