借助 Microsoft Fabric REST API,你可以自动执行 Fabric 过程和流程,帮助组织更高效、更准确地完成任务。 通过自动执行这些工作流,可以减少错误、提高工作效率,并在整个运营过程中节省成本。
在 Fabric 中,项表示特定体验中的一组功能。 例如,Eventstream 是实时智能体验下的一个项。 Fabric 中的每个项都由一个“项定义”来定义,该对象概述了构成项的结构、格式和关键组件。
本文全面介绍了如何使用 Microsoft Fabric REST API 以在 Fabric 工作区中创建和管理 Eventstream 项。 可以找到每个 Eventstream API 操作的详细规范,以及设置和配置 API 调用的说明。
有关 Microsoft Fabric REST API 的完整概述,请访问:使用 Microsoft Fabric REST API
受支持的 Eventstream API
目前,Eventstream 支持以下基于定义的 API:
| API | 说明 |
|---|---|
| 创建 Eventstream 项定义 | 用于在工作区中创建事件流项,其中包含有关其拓扑的详细信息,包括源、目标、运算符和流。 |
| 获取 Eventstream 项定义 | 用于获取事件流项定义,其中包含有关其拓扑的详细信息,包括源、目标、运算符和流。 |
| 更新 Eventstream 项定义 | 用于更新或编辑事件流项定义,包括源、目标、运算符和流。 |
若要使用 CRUD 操作管理 Eventstream 项,请访问 Fabric REST API - Eventstream。 这些 API 支持以下操作:
- 创建 Eventstream
- 删除 Eventstream
- 获取 Eventstream
- 列出 Eventstream
- 更新 Eventstream
如何调用 Eventstream API?
步骤 1:向 Fabric 进行身份验证
若要使用 Fabric API,首先需要获取用于 Fabric 服务的 Microsoft Entra 令牌,然后在 API 调用的授权标头中使用该令牌。 获取 Microsoft Entra 令牌有两个选项。
选项 1:使用 MSAL.NET 获取令牌
如果应用程序需要使用服务主体访问 Fabric API,则可以使用 MSAL.NET 库获取访问令牌。 按照 Fabric API 快速入门创建 C# 控制台应用,该应用使用 MSAL.Net 库获取 Azure AD (AAD) 令牌,然后使用 C# HttpClient 调用列出工作区 API。
选项 2:使用 Fabric 门户获取令牌
你可以使用 Azure AD 令牌对 Fabric API 进行身份验证和测试。 登录到要测试的租户的 Fabric 门户,然后按 F12 进入浏览器的开发人员模式。 在控制台中运行以下内容:
powerBIAccessToken
复制令牌并将其粘贴到应用程序中。
第二步:准备一个 JSON 格式的事件流主体
创建将在 API 请求中转换为 base64 的 JSON 有效负载。 Eventstream 项定义采用类似于图形的结构,由以下组件组成:
| 字段 | 描述 |
|---|---|
| 源 | 可以引入到 Eventstream 中进行处理的数据源。 受支持的数据源包括 Azure 流式处理源、第三方流式处理源、数据库 CDC(变更数据捕获)、Azure Blob 存储事件和 Fabric 系统事件。 |
| 目的地 | Fabric 中处理的数据可以路由到的终端点,包括 Lakehouse、Eventhouse、Activator 等。 |
| 运算符 | 处理实时数据流的事件处理程序,例如筛选、聚合、分组依据和联接。 |
| 流 | 可用于实时中心的订阅和分析的数据流。 有两种类型的流:默认流和派生流。 |
使用 GitHub 中的
有关每个 API 属性的详细信息,可以参考 此 Swagger 文档 ,并指导你定义 Eventstream API 有效负载。
如果您使用Eventhouse 直接引入模式目标,请确保正确指定connectionName和mappingRuleName属性。 若要查找正确 connectionName,请导航到 Eventhouse KQL 数据库,选择 数据流并复制所需的 connectionName数据流。 有关创建引入映射的详细说明,请参阅 使用 ingestionMappingReference 的映射。
有关定义 Eventstream 项的更多详细信息,请查看 Eventstream 项定义 部分。
事件流定义的 JSON 示例:
{
"sources": [
{
"name": "SqlServerOnVmDbCdc",
"type": "SQLServerOnVMDBCDC",
"properties":
{
"dataConnectionId": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
"tableName": ""
}
}
],
"destinations": [
{
"name": "Lakehouse",
"type": "Lakehouse",
"properties":
{
"workspaceId": "bbbb1111-cc22-3333-44dd-555555eeeeee",
"itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
"schema": "",
"deltaTable": "newTable",
"minimumRows": 100000,
"maximumDurationInSeconds": 120,
"inputSerialization":
{
"type": "Json",
"properties":
{
"encoding": "UTF8"
}
}
},
"inputNodes": [{"name": "derivedStream"}]
}
],
"streams": [
{
"name": "myEventstream-stream",
"type": "DefaultStream",
"properties":
{},
"inputNodes": [{"name": "SqlServerOnVmDbCdc"}]
},
{
"name": "derivedStream",
"type": "DerivedStream",
"properties":
{
"inputSerialization":
{
"type": "Json",
"properties":
{
"encoding": "UTF8"
}
}
},
"inputNodes": [{"name": "GroupBy"}]
}
],
"operators": [
{
"name": "GroupBy",
"type": "GroupBy",
"inputNodes": [{"name": "myEventstream-stream"}],
"properties":
{
"aggregations": [
{
"aggregateFunction": "Average",
"column":
{
"expressionType": "ColumnReference",
"node": null,
"columnName": "payload",
"columnPathSegments": [{"field": "ts_ms"}]
},
"alias": "AVG_ts_ms"
}
],
"groupBy": [],
"window":
{
"type": "Tumbling",
"properties":
{
"duration":
{
"value": 5,
"unit": "Minute"
},
"offset":
{
"value": 1,
"unit": "Minute"
}
}
}
}
}
],
"compatibilityLevel": "1.1"
}
步骤 3:创建 Eventstream JSON 的 base64 字符串
使用 Base64 编码和解码 等工具将事件流 JSON 转换为 base64 字符串。
步骤 4:创建 API 请求正文
使用上一步骤中 Base64 编码的事件流 JSON 作为 API 请求正文的内容。
下面是 Base64 编码的字符串的有效负载的示例:
{
"definition": {
"parts": [
{
"path": "eventstream.json",
"payload": "ewogICJzb3VyY2VzIjogWwogICAgewogICAgICAibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMiLAogICAgICAidHlwZSI6ICJTUUxTZXJ2ZXJPblZNREJDREMiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiZGF0YUNvbm5lY3Rpb25JZCI6ICJhYWFhYWFhYS0wMDAwLTExMTEtMjIyMi1iYmJiYmJiYmJiYmIiLAogICAgICAgICJ0YWJsZU5hbWUiOiAiIgogICAgICB9CiAgICB9CiAgXSwKICAiZGVzdGluYXRpb25zIjogWwogICAgewogICAgICAibmFtZSI6ICJMYWtlaG91c2UiLAogICAgICAidHlwZSI6ICJMYWtlaG91c2UiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAid29ya3NwYWNlSWQiOiAiYmJiYjExMTEtY2MyMi0zMzMzLTQ0ZGQtNTU1NTU1ZWVlZWVlIiwKICAgICAgICAiaXRlbUlkIjogImNjY2MyMjIyLWRkMzMtNDQ0NC01NWVlLTY2NjY2NmZmZmZmZiIsCiAgICAgICAgInNjaGVtYSI6ICIiLAogICAgICAgICJkZWx0YVRhYmxlIjogIm5ld1RhYmxlIiwKICAgICAgICAibWluaW11bVJvd3MiOiAxMDAwMDAsCiAgICAgICAgIm1heGltdW1EdXJhdGlvbkluU2Vjb25kcyI6IDEyMCwKICAgICAgICAiaW5wdXRTZXJpYWxpemF0aW9uIjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJKc29uIiwKICAgICAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAgICAgIHsKICAgICAgICAgICAgImVuY29kaW5nIjogIlVURjgiCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICB9LAogICAgICAiaW5wdXROb2RlcyI6IFt7Im5hbWUiOiAiZGVyaXZlZFN0cmVhbSJ9XQogICAgfQogIF0sCiAgInN0cmVhbXMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIiwKICAgICAgInR5cGUiOiAiRGVmYXVsdFN0cmVhbSIsCiAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAge30sCiAgICAgICJpbnB1dE5vZGVzIjogW3sibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMifV0KICAgIH0sCiAgICB7CiAgICAgICJuYW1lIjogImRlcml2ZWRTdHJlYW0iLAogICAgICAidHlwZSI6ICJEZXJpdmVkU3RyZWFtIiwKICAgICAgInByb3BlcnRpZXMiOgogICAgICB7CiAgICAgICAgImlucHV0U2VyaWFsaXphdGlvbiI6CiAgICAgICAgewogICAgICAgICAgInR5cGUiOiAiSnNvbiIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJlbmNvZGluZyI6ICJVVEY4IgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIkdyb3VwQnkifV0KICAgIH0KICBdLAogICJvcGVyYXRvcnMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIkdyb3VwQnkiLAogICAgICAidHlwZSI6ICJHcm91cEJ5IiwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIn1dLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiYWdncmVnYXRpb25zIjogWwogICAgICAgICAgewogICAgICAgICAgICAiYWdncmVnYXRlRnVuY3Rpb24iOiAiQXZlcmFnZSIsCiAgICAgICAgICAgICJjb2x1bW4iOgogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImV4cHJlc3Npb25UeXBlIjogIkNvbHVtblJlZmVyZW5jZSIsCiAgICAgICAgICAgICAgIm5vZGUiOiBudWxsLAogICAgICAgICAgICAgICJjb2x1bW5OYW1lIjogInBheWxvYWQiLAogICAgICAgICAgICAgICJjb2x1bW5QYXRoU2VnbWVudHMiOiBbeyJmaWVsZCI6ICJ0c19tcyJ9XQogICAgICAgICAgICB9LAogICAgICAgICAgICAiYWxpYXMiOiAiQVZHX3RzX21zIgogICAgICAgICAgfQogICAgICAgIF0sCiAgICAgICAgImdyb3VwQnkiOiBbXSwKICAgICAgICAid2luZG93IjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJUdW1ibGluZyIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJkdXJhdGlvbiI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiA1LAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgIm9mZnNldCI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiAxLAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIF0sCiAgImNvbXBhdGliaWxpdHlMZXZlbCI6ICIxLjEiCn0=",
"payloadType": "InlineBase64"
},
{
"path": ".platform",
"payload": "ewogICIkc2NoZW1hIjogImh0dHBzOi8vZGV2ZWxvcGVyLm1pY3Jvc29mdC5jb20vanNvbi1zY2hlbWFzL2ZhYnJpYy9naXRJbnRlZ3JhdGlvbi9wbGF0Zm9ybVByb3BlcnRpZXMvMi4wLjAvc2NoZW1hLmpzb24iLAogICJtZXRhZGF0YSI6IHsKICAgICJ0eXBlIjogIkV2ZW50c3RyZWFtIiwKICAgICJkaXNwbGF5TmFtZSI6ICJhbGV4LWVzMSIKICB9LAogICJjb25maWciOiB7CiAgICAidmVyc2lvbiI6ICIyLjAiLAogICAgImxvZ2ljYWxJZCI6ICIwMDAwMDAwMC0wMDAwLTAwMDAtMDAwMC0wMDAwMDAwMDAwMDAiCiAgfQp9",
"payloadType": "InlineBase64"
}
]
}
}
步骤 5:使用 API 创建事件流项
在应用程序中发送一个请求,请求在有效负载中使用 Base64 编码的字符串创建一个 Eventstream 项。
PowerShell 示例:
$evenstreamAPI = "https://api.fabric.microsoft.com/v1/workspaces/$workspaceId/items"
## Invoke the API to create the Eventstream
Invoke-RestMethod -Headers $headerParams -Method POST -Uri $evenstreamAPI -Body ($body) -ContentType "application/json"
事件流项定义
Eventstream 项定义具有类似于图形的结构,由四个组件组成:源、目标、运算符和流。
来源
若要在 API 正文中定义 Eventstream 源,请确保根据表正确指定每个字段和属性。
| 字段 | 类型 | 描述 | 要求 | 允许的值/格式 |
|---|---|---|---|---|
id |
字符串 (UUID) | 由系统生成的源的唯一标识符。 | 在 CREATE 中为可选,在 UPDATE 中为必填 | UUID 格式 |
name |
字符串 | 源的唯一名称,用于在 Eventstream 中标识它。 | 必须 | 任意有效字符串 |
type |
字符串(枚举) | 指定源的类型。 必须与其中一个预定义值相匹配。 | 必须 |
AmazonKinesis、AmazonMSKKafka、、ApacheKafka、AzureBlobStorageEventsAzureCosmosDBCDC、AzureEventHub、 AzureIoTHubAzureSQLDBCDCAzureSQLMIDBCDCConfluentCloudCustomEndpointFabricCapacityUtilizationEventsGooglePubSubMySQLCDCPostgreSQLCDCSampleDataFabricWorkspaceItemEventsFabricJobEventsFabricOneLakeEvents |
properties |
物体 | 特定于所选源类型的其他设置。 | 必须 |
AzureEventHub 类型示例:dataConnectionId、consumerGroupName、inputSerialization |
API 正文中的 Eventstream 源示例:
{
"sources": [
{
"id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
"name": "AzureEventHubSource",
"type": "AzureEventHub",
"properties":
{
"dataConnectionId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
"consumerGroupName": "$Default",
"inputSerialization":
{
"type": "Json",
"properties":
{
"encoding": "UTF8"
}
}
}
}
]
}
目标
若要在 API 正文中定义 Eventstream 目标,请确保根据表正确指定每个字段和属性。
| 字段 | 类型 | 描述 | 要求 | 允许的值/格式 |
|---|---|---|---|---|
id |
字符串 (UUID) | 由系统生成的目标的唯一标识符。 | 在 CREATE 中为可选,在 UPDATE 中为必填 | UUID 格式 |
name |
字符串 | 目标的唯一名称,用于在 Eventstream 中标识它。 | 必须 | 任意有效字符串 |
type |
字符串(枚举) | 指定目标的类型。 必须与其中一个预定义值相匹配。 | 必须 |
"Activator"、"CustomEndpoint"、"Eventhouse"、"Lakehouse" |
properties |
物体 | 特定于所选目标类型的其他设置。 | 必须 |
Eventhouse 类型示例:"dataIngestionMode"、"workspaceId"、"itemId"、"databaseName" |
inputNodes |
数组 | 对目标的输入节点的引用,例如 Eventstream 名称或运算符名称。 | 必须 | 示例: eventstream-1 |
同样,如果使用 Eventhouse 直接引入模式 目标,请确保正确指定 connectionName 和 mappingRuleName 属性。 若要查找正确 connectionName,请导航到 Eventhouse KQL 数据库,选择 数据流并复制所需的 connectionName数据流。 有关创建引入映射的详细说明,请参阅 使用 ingestionMappingReference 的映射。
API 正文中的 Eventstream 源示例:
{
"destinations": [
{
"id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
"name": "EventhouseDestination",
"type": "Eventhouse",
"properties":
{
"dataIngestionMode": "ProcessedIngestion",
"workspaceId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
"itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
"databaseName": "myeventhouse",
"tableName": "mytable",
"inputSerialization":
{
"type": "Json",
"properties":
{
"encoding": "UTF8"
}
}
},
"inputNodes": [{"name": "eventstream-1"}]
}
]
}
运算符
若要在 API 正文中定义 Eventstream 运算符,请确保根据表正确指定每个字段和属性。
| 字段 | 类型 | 描述 | 要求 | 允许的值/格式 |
|---|---|---|---|---|
name |
字符串 | 运算符的唯一名称。 | 必须 | 任意有效字符串 |
type |
字符串(枚举) | 指定运算符的类型。 必须与其中一个预定义值相匹配。 | 必须 |
"Filter"、"Join"、"ManageFields"、"Aggregate"、"GroupBy"、"Union"、"Expand" |
properties |
物体 | 特定于所选运算符类型的其他设置。 | 必须 |
Filter 类型示例:"conditions" |
inputNodes |
数组 | 对运算符的输入节点的引用列表。 | 必须 | 示例: eventstream-1 |
inputSchemas |
数组 | 对运算符的输入节点的引用列表。 | 可选 |
Filter 类型示例:"schema" |
API 正文中的 Eventstream 运算符示例:
{
"operators": [
{
"name": "FilterName",
"type": "Filter",
"inputNodes": [{"name": "eventstream-1"}],
"properties":
{
"conditions": [
{
"column":
{
"node": "nodeName",
"columnName": "columnName",
"columnPath": ["path","to","column"]
},
"operator": "Equals",
"value":
{
"dataType": "nvarchar(max)",
"value": "stringValue"
}
}
]
}
}
]
}
流
若要在 API 正文中定义流,请确保根据表正确指定每个字段和属性。
| 字段 | 类型 | 描述 | 要求 | 允许的值/格式 |
|---|---|---|---|---|
id |
字符串 (UUID) | 由系统生成的流的唯一标识符。 | 可选 | UUID 格式 |
name |
字符串 | 流的唯一名称。 | 必须 | 任意有效字符串 |
type |
字符串(枚举) | 指定流的类型。 必须与其中一个预定义值相匹配。 | 必须 |
"DefaultStream"、"DerivedStream" |
properties |
物体 | 特定于所选流类型的其他设置。 | 必须 |
Filter 类型示例:"conditions" |
inputNodes |
数组 | 对流的输入节点的引用列表。 | 可选 | 示例:[]、"eventstream-1" |
API 正文中的流示例:
{
"streams": [
{
"name": "myEventstream-stream",
"type": "DefaultStream",
"properties":
{},
"inputNodes": [{"name": "sourceName"}]
},
{
"name": "DerivedStreamName",
"type": "DerivedStream",
"properties":
{
"inputSerialization":
{
"type": "Json",
"properties":
{
"encoding": "UTF8"
}
}
},
"inputNodes": [{"name": "FilterName"}]
}
]
}