事件流定义

本文详细介绍了 Eventstream 项的定义结构。

定义部件

下表列出了 Eventstream 定义部件。

定义部件路径 类型 必选 DESCRIPTION
eventstream.json Eventstream (JSON) 描述 Eventstream 项的拓扑
eventstreamProperties.json EventstreamProperties (JSON) 描述事件流元数据
.platform PlatformDetails (JSON) 描述项的元数据

事件流项的每个定义部分构造如下:

事件流

描述 Eventstream 项的拓扑。

名称 类型 必选 DESCRIPTION
sources 数组 描述可以引入到 Eventstream 中进行处理的数据源。
destinations 数组 介绍 Fabric 中的终结点,其中处理的数据可以路由到,包括 Lakehouse、Eventhouse、Reflex 等。
operators 数组 定义处理实时数据流的事件处理程序,例如筛选器、聚合、分组依据和联接。
streams 数组 介绍可用于实时中心的订阅和分析的数据流。 有两种类型的流:默认流和派生流。

eventstream.json 实例

若要了解如何创建描述事件流项的 JSON 文件,请参阅 Eventstream REST API

{
    "sources": [
        {
            "name": "myEventHub",
            "type": "AzureEventHub",
            "properties": {
                "dataConnectionId": "cc8271ee-8f72-473d-969c-6828f5fd0d45",
                "consumerGroupName": "$Default",
                "inputSerialization": {
                    "type": "Json",
                    "properties": {
                        "encoding": "UTF8"
                    }
                }
            }
        }
    ],
    "destinations": [
        {
            "name": "myLakehouse",
            "type": "Lakehouse",
            "properties": {
                "workspaceId": "fdf52f3a-b687-41b8-8ff8-aeeca4d1edd8",
                "itemId": "737d6a97-e88c-45e1-9c39-adf1c9c4e817",
                "schema": "",
                "deltaTable": "newTable",
                "minimumRows": 100000,
                "maximumDurationInSeconds": 120,
                "inputSerialization": {
                    "type": "Json",
                    "properties": {
                        "encoding": "UTF8"
                    }
                }
            },
            "inputNodes": [
                {
                    "name": "derivedStream"
                }
            ]
        }
    ],
    "streams": [
        {
            "name": "myEventstream-stream",
            "type": "DefaultStream",
            "properties": {},
            "inputNodes": [
                {
                    "name": "myEventHub"
                }
            ]
        },
        {
            "name": "derivedStream",
            "type": "DerivedStream",
            "properties": {
                "inputSerialization": {
                    "type": "Json",
                    "properties": {
                        "encoding": "UTF8"
                    }
                }
            },
            "inputNodes": [
                {
                    "name": "GroupBy"
                }
            ]
        }
    ],
    "operators": [
        {
            "name": "GroupBy",
            "type": "GroupBy",
            "inputNodes": [
                {
                    "name": "myEventstream-stream"
                }
            ],
            "properties": {
                "aggregations": [
                    {
                        "aggregateFunction": "Average",
                        "column": {
                            "expressionType": "ColumnReference",
                            "node": null,
                            "columnName": "payload",
                            "columnPathSegments": [
                                {
                                    "field": "ts_ms"
                                }
                            ]
                        },
                        "alias": "AVG_ts_ms"
                    }
                ],
                "groupBy": [],
                "window": {
                    "type": "Tumbling",
                    "properties": {
                        "duration": {
                            "value": 5,
                            "unit": "Minute"
                        },
                        "offset": {
                            "value": 1,
                            "unit": "Minute"
                        }
                    }
                }
            }
        }
    ],
    "compatibilityLevel": "1.0"
}

若要为 API 有效负载构造 Eventstream 项,可以使用 GitHub 模板 定义事件流项。

EventstreamProperties

描述事件流元数据。

名称 类型 必选 DESCRIPTION
retentionTimeInDays 整数 描述 Eventstream 项的保留天数。 默认值为 1。 允许的值范围为 1 到 90。
eventThroughputLevel 枚举 描述 Eventstream 项的事件吞吐量级别。 默认值为 Low。 允许的值为 LowMediumHigh

eventstreamProperties.json 实例

{
  "retentionTimeInDays": 1,
  "eventThroughputLevel": "Low"
}

平台部件

平台有效负载是可选的。 平台部件是包含 Eventstream 元数据信息的文件。

  • 创建 具有定义的项会遵循平台文件(如果提供)。
  • 获取项 定义始终返回平台文件。
  • 如果提供了更新项 定义,则接受平台文件(如果设置了新的 URL 参数 updateMetadata=true)。

定义示例

下面是 Base64 编码的事件流定义示例,其中从 Base64 解码的有效负载内容示例的内容在 Base64 中编码 ,并放置在 payload 路径设置为 eventstream.json


{
  "displayName": "myEventstream",
  "type": "Eventstream",
  "description": "Create Eventstream item with definition",
  "definition": {
    "parts": [
      {
        "path": "eventstream.json",
        "payload": "<base64 encoded string>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "eventstreamProperties.json",
        "payload": "<base64 encoded string>",
        "payloadType": "InlineBase64"
      },
      {
        "path": ".platform",
        "payload": "ZG90UGxhdGZvcm1CYXNlNjRTdHJpbmc=",
        "payloadType": "InlineBase64"
      }
    ]
  }
}