Eventstream REST API

借助 Microsoft Fabric REST API,你可以自动执行 Fabric 过程和流程,帮助组织更高效、更准确地完成任务。 通过自动执行这些工作流,可以减少错误、提高工作效率,并在整个运营过程中节省成本。

在 Fabric 中,项表示特定体验中的一组功能。 例如,Eventstream 是实时智能体验下的一个项。 Fabric 中的每个项都由一个“项定义”来定义,该对象概述了构成项的结构、格式和关键组件

本文全面介绍了如何使用 Microsoft Fabric REST API 以在 Fabric 工作区中创建和管理 Eventstream 项。 可以找到每个 Eventstream API 操作的详细规范,以及设置和配置 API 调用的说明。

有关 Microsoft Fabric REST API 的完整概述,请访问:使用 Microsoft Fabric REST API

受支持的 Eventstream API

目前,Eventstream 支持以下基于定义的 API:

API 说明
创建 Eventstream 项定义 用于在工作区中创建事件流项,其中包含有关其拓扑的详细信息,包括源、目标、运算符和流。
获取 Eventstream 项定义 用于获取事件流项定义,其中包含有关其拓扑的详细信息,包括源、目标、运算符和流。
更新 Eventstream 项定义 用于更新或编辑事件流项定义,包括源、目标、运算符和流。

若要使用 CRUD 操作管理 Eventstream 项,请访问 Fabric REST API - Eventstream。 这些 API 支持以下操作:

  • 创建 Eventstream
  • 删除 Eventstream
  • 获取 Eventstream
  • 列出 Eventstream
  • 更新 Eventstream

如何调用 Eventstream API?

步骤 1:向 Fabric 进行身份验证

若要使用 Fabric API,首先需要获取用于 Fabric 服务的 Microsoft Entra 令牌,然后在 API 调用的授权标头中使用该令牌。 获取 Microsoft Entra 令牌有两个选项。

选项 1:使用 MSAL.NET 获取令牌

如果应用程序需要使用服务主体访问 Fabric API,则可以使用 MSAL.NET 库获取访问令牌。 按照 Fabric API 快速入门创建 C# 控制台应用,该应用使用 MSAL.Net 库获取 Azure AD (AAD) 令牌,然后使用 C# HttpClient 调用列出工作区 API。

选项 2:使用 Fabric 门户获取令牌

你可以使用 Azure AD 令牌对 Fabric API 进行身份验证和测试。 登录到要测试的租户的 Fabric 门户,然后按 F12 进入浏览器的开发人员模式。 在控制台中运行以下内容:

powerBIAccessToken

复制令牌并将其粘贴到应用程序中。

第二步:准备一个 JSON 格式的事件流主体

创建将在 API 请求中转换为 base64 的 JSON 有效负载。 Eventstream 项定义采用类似于图形的结构,由以下组件组成:

字段 描述
可以引入到 Eventstream 中进行处理的数据源。 受支持的数据源包括 Azure 流式处理源、第三方流式处理源、数据库 CDC(变更数据捕获)、Azure Blob 存储事件和 Fabric 系统事件。
目的地 Fabric 中处理的数据可以路由到的终端点,包括 Lakehouse、Eventhouse、Activator 等。
运算符 处理实时数据流的事件处理程序,例如筛选、聚合、分组依据和联接。
可用于实时中心的订阅和分析的数据流。 有两种类型的流:默认流和派生流。

使用 GitHub 中的 API 模板来帮助定义事件流正文。

有关每个 API 属性的详细信息,可以参考 此 Swagger 文档 ,并指导你定义 Eventstream API 有效负载。

如果您使用Eventhouse 直接引入模式目标,请确保正确指定connectionNamemappingRuleName属性。 若要查找正确 connectionName,请导航到 Eventhouse KQL 数据库,选择 数据流并复制所需的 connectionName数据流。 有关创建引入映射的详细说明,请参阅 使用 ingestionMappingReference 的映射

有关定义 Eventstream 项的更多详细信息,请查看 Eventstream 项定义 部分。

事件流定义的 JSON 示例:

{
  "sources": [
    {
      "name": "SqlServerOnVmDbCdc",
      "type": "SQLServerOnVMDBCDC",
      "properties":
      {
        "dataConnectionId": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
        "tableName": ""
      }
    }
  ],
  "destinations": [
    {
      "name": "Lakehouse",
      "type": "Lakehouse",
      "properties":
      {
        "workspaceId": "bbbb1111-cc22-3333-44dd-555555eeeeee",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "schema": "",
        "deltaTable": "newTable",
        "minimumRows": 100000,
        "maximumDurationInSeconds": 120,
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "derivedStream"}]
    }
  ],
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "SqlServerOnVmDbCdc"}]
    },
    {
      "name": "derivedStream",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "GroupBy"}]
    }
  ],
  "operators": [
    {
      "name": "GroupBy",
      "type": "GroupBy",
      "inputNodes": [{"name": "myEventstream-stream"}],
      "properties":
      {
        "aggregations": [
          {
            "aggregateFunction": "Average",
            "column":
            {
              "expressionType": "ColumnReference",
              "node": null,
              "columnName": "payload",
              "columnPathSegments": [{"field": "ts_ms"}]
            },
            "alias": "AVG_ts_ms"
          }
        ],
        "groupBy": [],
        "window":
        {
          "type": "Tumbling",
          "properties":
          {
            "duration":
            {
              "value": 5,
              "unit": "Minute"
            },
            "offset":
            {
              "value": 1,
              "unit": "Minute"
            }
          }
        }
      }
    }
  ],
  "compatibilityLevel": "1.1"
}

步骤 3:创建 Eventstream JSON 的 base64 字符串

使用 Base64 编码和解码 等工具将事件流 JSON 转换为 base64 字符串。

将 Eventstream JSON 编码为 base64 字符串的屏幕截图。

步骤 4:创建 API 请求正文

使用上一步骤中 Base64 编码的事件流 JSON 作为 API 请求正文的内容。

下面是 Base64 编码的字符串的有效负载的示例:

{
 "definition": {
  "parts": [
   {
    "path": "eventstream.json",
    "payload": "ewogICJzb3VyY2VzIjogWwogICAgewogICAgICAibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMiLAogICAgICAidHlwZSI6ICJTUUxTZXJ2ZXJPblZNREJDREMiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiZGF0YUNvbm5lY3Rpb25JZCI6ICJhYWFhYWFhYS0wMDAwLTExMTEtMjIyMi1iYmJiYmJiYmJiYmIiLAogICAgICAgICJ0YWJsZU5hbWUiOiAiIgogICAgICB9CiAgICB9CiAgXSwKICAiZGVzdGluYXRpb25zIjogWwogICAgewogICAgICAibmFtZSI6ICJMYWtlaG91c2UiLAogICAgICAidHlwZSI6ICJMYWtlaG91c2UiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAid29ya3NwYWNlSWQiOiAiYmJiYjExMTEtY2MyMi0zMzMzLTQ0ZGQtNTU1NTU1ZWVlZWVlIiwKICAgICAgICAiaXRlbUlkIjogImNjY2MyMjIyLWRkMzMtNDQ0NC01NWVlLTY2NjY2NmZmZmZmZiIsCiAgICAgICAgInNjaGVtYSI6ICIiLAogICAgICAgICJkZWx0YVRhYmxlIjogIm5ld1RhYmxlIiwKICAgICAgICAibWluaW11bVJvd3MiOiAxMDAwMDAsCiAgICAgICAgIm1heGltdW1EdXJhdGlvbkluU2Vjb25kcyI6IDEyMCwKICAgICAgICAiaW5wdXRTZXJpYWxpemF0aW9uIjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJKc29uIiwKICAgICAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAgICAgIHsKICAgICAgICAgICAgImVuY29kaW5nIjogIlVURjgiCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICB9LAogICAgICAiaW5wdXROb2RlcyI6IFt7Im5hbWUiOiAiZGVyaXZlZFN0cmVhbSJ9XQogICAgfQogIF0sCiAgInN0cmVhbXMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIiwKICAgICAgInR5cGUiOiAiRGVmYXVsdFN0cmVhbSIsCiAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAge30sCiAgICAgICJpbnB1dE5vZGVzIjogW3sibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMifV0KICAgIH0sCiAgICB7CiAgICAgICJuYW1lIjogImRlcml2ZWRTdHJlYW0iLAogICAgICAidHlwZSI6ICJEZXJpdmVkU3RyZWFtIiwKICAgICAgInByb3BlcnRpZXMiOgogICAgICB7CiAgICAgICAgImlucHV0U2VyaWFsaXphdGlvbiI6CiAgICAgICAgewogICAgICAgICAgInR5cGUiOiAiSnNvbiIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJlbmNvZGluZyI6ICJVVEY4IgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIkdyb3VwQnkifV0KICAgIH0KICBdLAogICJvcGVyYXRvcnMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIkdyb3VwQnkiLAogICAgICAidHlwZSI6ICJHcm91cEJ5IiwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIn1dLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiYWdncmVnYXRpb25zIjogWwogICAgICAgICAgewogICAgICAgICAgICAiYWdncmVnYXRlRnVuY3Rpb24iOiAiQXZlcmFnZSIsCiAgICAgICAgICAgICJjb2x1bW4iOgogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImV4cHJlc3Npb25UeXBlIjogIkNvbHVtblJlZmVyZW5jZSIsCiAgICAgICAgICAgICAgIm5vZGUiOiBudWxsLAogICAgICAgICAgICAgICJjb2x1bW5OYW1lIjogInBheWxvYWQiLAogICAgICAgICAgICAgICJjb2x1bW5QYXRoU2VnbWVudHMiOiBbeyJmaWVsZCI6ICJ0c19tcyJ9XQogICAgICAgICAgICB9LAogICAgICAgICAgICAiYWxpYXMiOiAiQVZHX3RzX21zIgogICAgICAgICAgfQogICAgICAgIF0sCiAgICAgICAgImdyb3VwQnkiOiBbXSwKICAgICAgICAid2luZG93IjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJUdW1ibGluZyIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJkdXJhdGlvbiI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiA1LAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgIm9mZnNldCI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiAxLAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIF0sCiAgImNvbXBhdGliaWxpdHlMZXZlbCI6ICIxLjEiCn0=",
    "payloadType": "InlineBase64"
   },
   {
    "path": ".platform",
    "payload": "ewogICIkc2NoZW1hIjogImh0dHBzOi8vZGV2ZWxvcGVyLm1pY3Jvc29mdC5jb20vanNvbi1zY2hlbWFzL2ZhYnJpYy9naXRJbnRlZ3JhdGlvbi9wbGF0Zm9ybVByb3BlcnRpZXMvMi4wLjAvc2NoZW1hLmpzb24iLAogICJtZXRhZGF0YSI6IHsKICAgICJ0eXBlIjogIkV2ZW50c3RyZWFtIiwKICAgICJkaXNwbGF5TmFtZSI6ICJhbGV4LWVzMSIKICB9LAogICJjb25maWciOiB7CiAgICAidmVyc2lvbiI6ICIyLjAiLAogICAgImxvZ2ljYWxJZCI6ICIwMDAwMDAwMC0wMDAwLTAwMDAtMDAwMC0wMDAwMDAwMDAwMDAiCiAgfQp9",
    "payloadType": "InlineBase64"
   }
  ]
 }
}

步骤 5:使用 API 创建事件流项

在应用程序中发送一个请求,请求在有效负载中使用 Base64 编码的字符串创建一个 Eventstream 项。

PowerShell 示例:

$evenstreamAPI = "https://api.fabric.microsoft.com/v1/workspaces/$workspaceId/items" 

## Invoke the API to create the Eventstream
Invoke-RestMethod -Headers $headerParams -Method POST -Uri $evenstreamAPI -Body ($body) -ContentType "application/json"

事件流项定义

Eventstream 项定义具有类似于图形的结构,由四个组件组成:源、目标、运算符和流。

来源

若要在 API 正文中定义 Eventstream 源,请确保根据表正确指定每个字段和属性。

字段 类型 描述 要求 允许的值/格式
id 字符串 (UUID) 由系统生成的源的唯一标识符。 在 CREATE 中为可选,在 UPDATE 中为必填 UUID 格式
name 字符串 源的唯一名称,用于在 Eventstream 中标识它。 必须 任意有效字符串
type 字符串(枚举) 指定源的类型。 必须与其中一个预定义值相匹配。 必须 AmazonKinesisAmazonMSKKafka、、ApacheKafkaAzureBlobStorageEventsAzureCosmosDBCDCAzureEventHubAzureIoTHubAzureSQLDBCDCAzureSQLMIDBCDCConfluentCloudCustomEndpointFabricCapacityUtilizationEventsGooglePubSubMySQLCDCPostgreSQLCDCSampleDataFabricWorkspaceItemEventsFabricJobEventsFabricOneLakeEvents
properties 物体 特定于所选源类型的其他设置。 必须 AzureEventHub 类型示例:dataConnectionIdconsumerGroupNameinputSerialization

API 正文中的 Eventstream 源示例:

{
  "sources": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "AzureEventHubSource",
      "type": "AzureEventHub",
      "properties":
      {
        "dataConnectionId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "consumerGroupName": "$Default",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      }
    }
  ]
}

目标

若要在 API 正文中定义 Eventstream 目标,请确保根据表正确指定每个字段和属性。

字段 类型 描述 要求 允许的值/格式
id 字符串 (UUID) 由系统生成的目标的唯一标识符。 在 CREATE 中为可选,在 UPDATE 中为必填 UUID 格式
name 字符串 目标的唯一名称,用于在 Eventstream 中标识它。 必须 任意有效字符串
type 字符串(枚举) 指定目标的类型。 必须与其中一个预定义值相匹配。 必须 "Activator""CustomEndpoint""Eventhouse""Lakehouse"
properties 物体 特定于所选目标类型的其他设置。 必须 Eventhouse 类型示例:"dataIngestionMode""workspaceId""itemId""databaseName"
inputNodes 数组 对目标的输入节点的引用,例如 Eventstream 名称或运算符名称。 必须 示例: eventstream-1

同样,如果使用 Eventhouse 直接引入模式 目标,请确保正确指定 connectionNamemappingRuleName 属性。 若要查找正确 connectionName,请导航到 Eventhouse KQL 数据库,选择 数据流并复制所需的 connectionName数据流。 有关创建引入映射的详细说明,请参阅 使用 ingestionMappingReference 的映射


API 正文中的 Eventstream 源示例:

{
  "destinations": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "EventhouseDestination",
      "type": "Eventhouse",
      "properties":
      {
        "dataIngestionMode": "ProcessedIngestion",
        "workspaceId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "databaseName": "myeventhouse",
        "tableName": "mytable",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "eventstream-1"}]
    }
  ]
}

运算符

若要在 API 正文中定义 Eventstream 运算符,请确保根据表正确指定每个字段和属性。

字段 类型 描述 要求 允许的值/格式
name 字符串 运算符的唯一名称。 必须 任意有效字符串
type 字符串(枚举) 指定运算符的类型。 必须与其中一个预定义值相匹配。 必须 "Filter""Join""ManageFields""Aggregate""GroupBy""Union""Expand"
properties 物体 特定于所选运算符类型的其他设置。 必须 Filter 类型示例:"conditions"
inputNodes 数组 对运算符的输入节点的引用列表。 必须 示例: eventstream-1
inputSchemas 数组 对运算符的输入节点的引用列表。 可选 Filter 类型示例:"schema"

API 正文中的 Eventstream 运算符示例:

{
  "operators": [
    {
      "name": "FilterName",
      "type": "Filter",
      "inputNodes": [{"name": "eventstream-1"}],
      "properties":
      {
        "conditions": [
          {
            "column":
            {
              "node": "nodeName",
              "columnName": "columnName",
              "columnPath": ["path","to","column"]
            },
            "operator": "Equals",
            "value":
            {
              "dataType": "nvarchar(max)",
              "value": "stringValue"
            }
          }
        ]
      }
    }
  ]
}

若要在 API 正文中定义流,请确保根据表正确指定每个字段和属性。

字段 类型 描述 要求 允许的值/格式
id 字符串 (UUID) 由系统生成的流的唯一标识符。 可选 UUID 格式
name 字符串 流的唯一名称。 必须 任意有效字符串
type 字符串(枚举) 指定流的类型。 必须与其中一个预定义值相匹配。 必须 "DefaultStream""DerivedStream"
properties 物体 特定于所选流类型的其他设置。 必须 Filter 类型示例:"conditions"
inputNodes 数组 对流的输入节点的引用列表。 可选 示例:[]"eventstream-1"

API 正文中的流示例:

{
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "sourceName"}]
    },
    {
      "name": "DerivedStreamName",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "FilterName"}]
    }
  ]
}