本文详细介绍了 Eventstream 项的定义结构。
定义部件
下表列出了 Eventstream 定义部件。
| 定义部件路径 | 类型 | 必选 | DESCRIPTION |
|---|---|---|---|
eventstream.json |
Eventstream (JSON) | ✅ | 描述 Eventstream 项的拓扑 |
eventstreamProperties.json |
EventstreamProperties (JSON) | ❌ | 描述事件流元数据 |
.platform |
PlatformDetails (JSON) | ❌ | 描述项的元数据 |
事件流项的每个定义部分构造如下:
-
路径:文件名,例如:
eventstream.json - 有效负载类型:InlineBase64
- 有效负载:请参阅 从 Base64 解码的有效负载内容示例
事件流
描述 Eventstream 项的拓扑。
| 名称 | 类型 | 必选 | DESCRIPTION |
|---|---|---|---|
sources |
数组 | 是 | 描述可以引入到 Eventstream 中进行处理的数据源。 |
destinations |
数组 | 是 | 介绍 Fabric 中的终结点,其中处理的数据可以路由到,包括 Lakehouse、Eventhouse、Reflex 等。 |
operators |
数组 | 是 | 定义处理实时数据流的事件处理程序,例如筛选器、聚合、分组依据和联接。 |
streams |
数组 | 是 | 介绍可用于实时中心的订阅和分析的数据流。 有两种类型的流:默认流和派生流。 |
eventstream.json 实例
若要了解如何创建描述事件流项的 JSON 文件,请参阅 Eventstream REST API。
{
"sources": [
{
"name": "myEventHub",
"type": "AzureEventHub",
"properties": {
"dataConnectionId": "cc8271ee-8f72-473d-969c-6828f5fd0d45",
"consumerGroupName": "$Default",
"inputSerialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
}
}
],
"destinations": [
{
"name": "myLakehouse",
"type": "Lakehouse",
"properties": {
"workspaceId": "fdf52f3a-b687-41b8-8ff8-aeeca4d1edd8",
"itemId": "737d6a97-e88c-45e1-9c39-adf1c9c4e817",
"schema": "",
"deltaTable": "newTable",
"minimumRows": 100000,
"maximumDurationInSeconds": 120,
"inputSerialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
},
"inputNodes": [
{
"name": "derivedStream"
}
]
}
],
"streams": [
{
"name": "myEventstream-stream",
"type": "DefaultStream",
"properties": {},
"inputNodes": [
{
"name": "myEventHub"
}
]
},
{
"name": "derivedStream",
"type": "DerivedStream",
"properties": {
"inputSerialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
},
"inputNodes": [
{
"name": "GroupBy"
}
]
}
],
"operators": [
{
"name": "GroupBy",
"type": "GroupBy",
"inputNodes": [
{
"name": "myEventstream-stream"
}
],
"properties": {
"aggregations": [
{
"aggregateFunction": "Average",
"column": {
"expressionType": "ColumnReference",
"node": null,
"columnName": "payload",
"columnPathSegments": [
{
"field": "ts_ms"
}
]
},
"alias": "AVG_ts_ms"
}
],
"groupBy": [],
"window": {
"type": "Tumbling",
"properties": {
"duration": {
"value": 5,
"unit": "Minute"
},
"offset": {
"value": 1,
"unit": "Minute"
}
}
}
}
}
],
"compatibilityLevel": "1.0"
}
若要为 API 有效负载构造 Eventstream 项,可以使用 GitHub 模板 定义事件流项。
EventstreamProperties
描述事件流元数据。
| 名称 | 类型 | 必选 | DESCRIPTION |
|---|---|---|---|
retentionTimeInDays |
整数 | ❌ | 描述 Eventstream 项的保留天数。 默认值为 1。 允许的值范围为 1 到 90。 |
eventThroughputLevel |
枚举 | ❌ | 描述 Eventstream 项的事件吞吐量级别。 默认值为 Low。 允许的值为 Low、Medium、High。 |
eventstreamProperties.json 实例
{
"retentionTimeInDays": 1,
"eventThroughputLevel": "Low"
}
平台部件
平台有效负载是可选的。 平台部件是包含 Eventstream 元数据信息的文件。
定义示例
下面是 Base64 编码的事件流定义示例,其中从 Base64 解码的有效负载内容示例的内容在 Base64 中编码 ,并放置在 payload 路径设置为 eventstream.json :
{
"displayName": "myEventstream",
"type": "Eventstream",
"description": "Create Eventstream item with definition",
"definition": {
"parts": [
{
"path": "eventstream.json",
"payload": "<base64 encoded string>",
"payloadType": "InlineBase64"
},
{
"path": "eventstreamProperties.json",
"payload": "<base64 encoded string>",
"payloadType": "InlineBase64"
},
{
"path": ".platform",
"payload": "ZG90UGxhdGZvcm1CYXNlNjRTdHJpbmc=",
"payloadType": "InlineBase64"
}
]
}
}