Microsoft Fabric 中的数据工厂提供云规模数据移动和数据转换服务,使你能够应对最复杂的数据工厂和 ETL 场景,并为你提供新式数据集成体验,让你可以通过丰富的数据源集引入、准备和转换数据。 在数据工厂中,可以创建管道,以使用开箱即用的丰富数据编排功能来设计灵活的数据工作流,以满足企业需求。
现在,借助 Azure 中的 gpt-4o AI 模型,我们正在突破数据工厂的功能极限,让你能够仅通过图像创建数据解决方案。
您需要什么来开始? 只需一个 Microsoft Fabric 帐户和一个想法。 在这里,我们将演示如何仅使用一张图片和gpt-4o将白板创意转换为 Fabric Data Factory 管道。
先决条件
在创建解决方案之前,请确保在 Azure 和 Fabric 中设置以下先决条件:
- 已启用 Microsoft Fabric 的工作区。
- 使用 API 密钥且部署了
gpt-4o模型的 Azure OpenAI 帐户。 - 你希望管道呈现的图像。
警告
API 密钥是敏感信息,应始终安全地将生产密钥仅存储在 Azure Key Vault 或其他安全存储中。 此示例使用的 OpenAI 密钥仅用于演示目的。 对于生产代码,请考虑使用 Microsoft Entra ID 而不是密钥身份验证,以获得更安全的环境,该环境不依赖于密钥共享,也不会在密钥被泄露时面临安全漏洞风险。
步骤 1:将图像上传到湖屋
在分析图像之前,需要将其上传到湖屋。 登录到 Microsoft Fabric 帐户并导航到工作区。 选择“+ 新建项”,创建一个新湖屋。
设置 Lakehouse 后,在 files 下创建一个名为 images 的新文件夹,然后将图像上传到该文件夹。
步骤 2:在工作区中创建笔记本
现在我们只需创建一个笔记本来执行一些 Python 代码,在工作区中汇总并创建管道。
在工作区中创建新的笔记本:
在代码区域输入以下代码,该代码设置所需的库和配置并对图像进行编码:
# Configuration
AZURE_OPENAI_KEY = "<Your Azure OpenAI key>"
AZURE_OPENAI_GPT4O_ENDPOINT = "<Your Azure OpenAI gpt-4o deployment endpoint>"
IMAGE_PATH = "<Path to your uploaded image file>" # For example, "/lakehouse/default/files/images/pipeline.png"
# Install the OpenAI library
!pip install semantic-link --q
!pip uninstall --yes openai
!pip install openai
%pip install openai --upgrade
# Imports
import os
import requests
import base64
import json
import time
import pprint
import openai
import sempy.fabric as fabric
import pandas as pd
# Load the image
image_bytes = open(IMAGE_PATH, 'rb').read()
encoded_image = base64.b64encode(image_bytes).decode('ascii')
## Request headers
headers = {
"Content-Type": "application/json",
"api-key": AZURE_OPENAI_KEY,
}
运行此代码块来配置环境。
步骤 3:使用 gpt-4o 来描述管道(可选)
此步骤为可选,但它向你展示了从图像中提取详细信息是多么简单,这可能与你的目的相关。 如果不执行此步骤,你仍然可以在下一步生成管道 JSON。
首先在笔记本的主菜单上选择“编辑”,然后选择工具栏上的“+ 在下方添加代码单元格”按钮,以便在上一个代码块后添加新的代码块。
然后将以下代码添加到新的部分。 此代码演示 gpt-4o 如何解释和汇总图像以了解其内容。
# Summarize the image
## Request payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric pipeline. Show list of pipeline activities and how they are connected."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
response_json = response.json()
## Show AI response
print(response_json["choices"][0]['message']['content'])
运行此代码块以查看图像及其组件的 AI 摘要。
步骤 4:生成管道 JSON
向笔记本添加另一个代码块,并添加以下代码。 此代码分析图像并生成管道 JSON。
# Analyze the image and generate the pipeline JSON
## Setup new payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric pipeline. Succeeded is denoted by a green line, and Fail is denoted by a red line. Generate an ADF v2 pipeline JSON with what you see. Return ONLY the JSON text required, without any leading or trailing markdown denoting a code block."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
## Get JSON from request and show
response_json = response.json()
pipeline_json = response_json["choices"][0]['message']['content']
print(pipeline_json)
运行此代码块以从图像生成管道 JSON。
步骤 4:使用 Fabric REST API 创建管道
获得管道 JSON 后,就可以直接使用 Fabric REST API 创建它了。 向笔记本添加另一个代码块,并添加以下代码。 此代码在您的工作区中创建管道。
# Convert pipeline JSON to Fabric REST API request
json_data = json.loads(pipeline_json)
# Extract the activities from the JSON
activities = json_data["properties"]["activities"]
# Prepare the pipeline JSON definition
data = {}
activities_list = []
idx = 0
# Name mapping used to track activity name found in image to dynamically generated name
name_mapping = {}
for activity in activities:
idx = idx + 1
activity_name = activity["type"].replace("Activity","")
objName = f"{activity_name}{idx}"
# store the name mapping so we can deal with dependency
name_mapping[activity["name"]] = objName
if 'dependsOn' in activity:
activity_dependent_list = activity["dependsOn"]
dependent_activity = ""
if ( len(activity_dependent_list) > 0 ):
dependent_activity = activity_dependent_list[0]["activity"]
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} },
"sink": { "datasetSettings": {} } }})
case "Web":
activities_list.append({'name': objName, 'type': "Office365Outlook",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case _:
continue
else:
# simple activities with no dependencies
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} } , "sink": { "datasetSettings": {} } }})
case "SendEmail":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "Web":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline"})
case _:
print("NoOp")
# Now that the activities_list is created, assign it to the activities tag in properties
data['properties'] = { "activities": activities_list}
# Convert data from dict to string, then Byte Literal, before doing a Base-64 encoding
data_str = str(data).replace("'",'"')
createPipeline_json = data_str.encode(encoding="utf-8")
createPipeline_Json64 = base64.b64encode(createPipeline_json)
# Create a new pipeline in Fabric
timestr = time.strftime("%Y%m%d-%H%M%S")
pipelineName = f"Pipeline from image with AI-{timestr}"
payload = {
"displayName": pipelineName,
"type": "DataPipeline",
"definition": {
"parts": [
{
"path": "pipeline-content.json",
"payload": createPipeline_Json64,
"payloadType": "InlineBase64"
}
]
}
}
print(f"Creating pipeline: {pipelineName}")
# Call the Fabric REST API to generate the pipeline
client = fabric.FabricRestClient()
workspaceId = fabric.get_workspace_id()
try:
response = client.post(f"/v1/workspaces/{workspaceId}/items",json=payload)
if response.status_code != 201:
raise FabricHTTPException(response)
except WorkspaceNotFoundException as e:
print("Workspace is not available or cannot be found.")
except FabricHTTPException as e:
print(e)
print("Fabric HTTP Exception. Check that you have the correct Fabrric API endpoints.")
response = client.get(f"/v1/workspaces/{workspaceId}/Datapipelines")
df_items = pd.json_normalize(response.json()['value'])
print("List of pipelines in the workspace:")
df_items
输出会确认创建的管道的名称,并会显示工作区的管道的列表,以便你可以验证它是否存在。
步骤 6:使用管道
创建管道后,可以在 Fabric 工作区中对其进行编辑,以查看作为管道实现的图像。 可以选择每个活动以根据需要对其进行配置,然后根据需要运行和监视它。