使用 Azure OpenAI 将白板草图转换为管道

Microsoft Fabric 中的数据工厂提供云规模数据移动和数据转换服务,使你能够应对最复杂的数据工厂和 ETL 场景,并为你提供新式数据集成体验,让你可以通过丰富的数据源集引入、准备和转换数据。 在数据工厂中,可以创建管道,以使用开箱即用的丰富数据编排功能来设计灵活的数据工作流,以满足企业需求。

现在,借助 Azure 中的 gpt-4o AI 模型,我们正在突破数据工厂的功能极限,让你能够仅通过图像创建数据解决方案。

您需要什么来开始? 只需一个 Microsoft Fabric 帐户和一个想法。 在这里,我们将演示如何仅使用一张图片和gpt-4o将白板创意转换为 Fabric Data Factory 管道。

先决条件

在创建解决方案之前,请确保在 Azure 和 Fabric 中设置以下先决条件:

  • 已启用 Microsoft Fabric 的工作区。
  • 使用 API 密钥且部署了 gpt-4o 模型的 Azure OpenAI 帐户。
  • 你希望管道呈现的图像。

警告

API 密钥是敏感信息,应始终安全地将生产密钥仅存储在 Azure Key Vault 或其他安全存储中。 此示例使用的 OpenAI 密钥仅用于演示目的。 对于生产代码,请考虑使用 Microsoft Entra ID 而不是密钥身份验证,以获得更安全的环境,该环境不依赖于密钥共享,也不会在密钥被泄露时面临安全漏洞风险。

步骤 1:将图像上传到湖屋

在分析图像之前,需要将其上传到湖屋。 登录到 Microsoft Fabric 帐户并导航到工作区。 选择“+ 新建项”,创建一个新湖屋。

屏幕截图显示如何创建新的 Lakehouse。

设置 Lakehouse 后,在 files 下创建一个名为 images 的新文件夹,然后将图像上传到该文件夹。

屏幕截图显示要转换为管道的绘制图像。

步骤 2:在工作区中创建笔记本

现在我们只需创建一个笔记本来执行一些 Python 代码,在工作区中汇总并创建管道。

在工作区中创建新的笔记本:

屏幕截图显示如何在 Fabric 数据工厂工作区中创建新的笔记本。

在代码区域输入以下代码,该代码设置所需的库和配置并对图像进行编码:

# Configuration
AZURE_OPENAI_KEY = "<Your Azure OpenAI key>"
AZURE_OPENAI_GPT4O_ENDPOINT = "<Your Azure OpenAI gpt-4o deployment endpoint>"
IMAGE_PATH = "<Path to your uploaded image file>" # For example, "/lakehouse/default/files/images/pipeline.png"

# Install the OpenAI library
!pip install semantic-link --q 
!pip uninstall --yes openai
!pip install openai
%pip install openai --upgrade

# Imports
import os
import requests
import base64
import json
import time
import pprint
import openai
import sempy.fabric as fabric
import pandas as pd

# Load the image
image_bytes = open(IMAGE_PATH, 'rb').read()
encoded_image = base64.b64encode(image_bytes).decode('ascii')

## Request headers
headers = {
    "Content-Type": "application/json",
    "api-key": AZURE_OPENAI_KEY,
}

运行此代码块来配置环境。

步骤 3:使用 gpt-4o 来描述管道(可选)

此步骤为可选,但它向你展示了从图像中提取详细信息是多么简单,这可能与你的目的相关。 如果不执行此步骤,你仍然可以在下一步生成管道 JSON。

首先在笔记本的主菜单上选择“编辑”,然后选择工具栏上的“+ 在下方添加代码单元格”按钮,以便在上一个代码块后添加新的代码块。

屏幕截图显示在笔记本编辑器中的当前代码单元格下方的何处添加新的代码单元格。

然后将以下代码添加到新的部分。 此代码演示 gpt-4o 如何解释和汇总图像以了解其内容。

# Summarize the image

## Request payload
payload = {
    "messages": [
    {
        "role": "system",
        "content": [
        {
            "type": "text",
            "text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric pipeline. Show list of pipeline activities and how they are connected."
        }
        ]
    },
    {
        "role": "user",
        "content": [
        {
            "type": "image_url",
            "image_url": {
            "url": f"data:image/jpeg;base64,{encoded_image}"
            }
        }
        ]
    }
    ],
    "temperature": 0.7,
    "top_p": 0.95,
    "max_tokens": 800
}

## Send request
try:
    response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
    response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
    raise SystemExit(f"Failed to make the request. Error: {e}")

response_json = response.json()

## Show AI response
print(response_json["choices"][0]['message']['content'])

运行此代码块以查看图像及其组件的 AI 摘要。

步骤 4:生成管道 JSON

向笔记本添加另一个代码块,并添加以下代码。 此代码分析图像并生成管道 JSON。

# Analyze the image and generate the pipeline JSON

## Setup new payload
payload = {
    "messages": [
    {
        "role": "system",
        "content": [
        {
            "type": "text",
            "text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric pipeline. Succeeded is denoted by a green line, and Fail is denoted by a red line. Generate an ADF v2 pipeline JSON with what you see. Return ONLY the JSON text required, without any leading or trailing markdown denoting a code block."
        }
        ]
    },
    {
        "role": "user",
        "content": [
        {
            "type": "image_url",
            "image_url": {
            "url": f"data:image/jpeg;base64,{encoded_image}"
            }
        }
        ]
    }
    ],
    "temperature": 0.7,
    "top_p": 0.95,
    "max_tokens": 800
}

## Send request
try:
    response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
    response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
    raise SystemExit(f"Failed to make the request. Error: {e}")

## Get JSON from request and show
response_json = response.json()
pipeline_json = response_json["choices"][0]['message']['content']
print(pipeline_json)

运行此代码块以从图像生成管道 JSON。

步骤 4:使用 Fabric REST API 创建管道

获得管道 JSON 后,就可以直接使用 Fabric REST API 创建它了。 向笔记本添加另一个代码块,并添加以下代码。 此代码在您的工作区中创建管道。

# Convert pipeline JSON to Fabric REST API request

json_data = json.loads(pipeline_json)

# Extract the activities from the JSON
activities = json_data["properties"]["activities"]

# Prepare the pipeline JSON definition
data = {}
activities_list = []

idx = 0

# Name mapping used to track activity name found in image to dynamically generated name
name_mapping = {}

for activity in activities:
    idx = idx + 1
    activity_name = activity["type"].replace("Activity","")

    objName = f"{activity_name}{idx}"

    # store the name mapping so we can deal with dependency 
    name_mapping[activity["name"]] = objName

    if 'dependsOn' in activity: 
        activity_dependent_list = activity["dependsOn"] 
        
        dependent_activity = ""
        if ( len(activity_dependent_list) > 0 ):
            dependent_activity = activity_dependent_list[0]["activity"]

        match activity_name:
            case "Copy":
                activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
                'typeProperties': { "source": { "datasetSettings": {} },
                "sink": { "datasetSettings": {} } }})
            case "Web":
                activities_list.append({'name': objName, 'type': "Office365Outlook",
                        "dependsOn": [
                            {
                                "activity":  name_mapping[dependent_activity] ,
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ]
                    }
                )
            case "ExecutePipeline":
                activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline",
                    "dependsOn": [
                            {
                                "activity":  name_mapping[dependent_activity] ,
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ]
                    }
                )
            case _:
                continue
    else:
        # simple activities with no dependencies
        match activity_name:
            case "Copy":
                activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
                'typeProperties': { "source": { "datasetSettings": {} } , "sink": { "datasetSettings": {} } }})
            case "SendEmail":
                 activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
            case "Web":
                activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
            case "ExecutePipeline":
                activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline"})
            case _:
                print("NoOp")

# Now that the activities_list is created, assign it to the activities tag in properties
data['properties'] = { "activities": activities_list}

# Convert data from dict to string, then Byte Literal, before doing a Base-64 encoding
data_str = str(data).replace("'",'"')
createPipeline_json = data_str.encode(encoding="utf-8")
createPipeline_Json64 = base64.b64encode(createPipeline_json)

# Create a new pipeline in Fabric
timestr = time.strftime("%Y%m%d-%H%M%S")
pipelineName = f"Pipeline from image with AI-{timestr}"

payload = {
        "displayName": pipelineName,
        "type": "DataPipeline",
        "definition": {
           "parts": [ 
             { 
              "path": "pipeline-content.json", 
              "payload": createPipeline_Json64, 
              "payloadType": "InlineBase64" 
              }
            ]
        }
}

print(f"Creating pipeline: {pipelineName}")

# Call the Fabric REST API to generate the pipeline
client = fabric.FabricRestClient()
workspaceId = fabric.get_workspace_id()
try:
    response = client.post(f"/v1/workspaces/{workspaceId}/items",json=payload)
    if response.status_code != 201:
        raise FabricHTTPException(response)
except WorkspaceNotFoundException as e:
    print("Workspace is not available or cannot be found.")
except FabricHTTPException as e:
    print(e)
    print("Fabric HTTP Exception. Check that you have the correct Fabrric API endpoints.")

response = client.get(f"/v1/workspaces/{workspaceId}/Datapipelines")
df_items = pd.json_normalize(response.json()['value'])
print("List of pipelines in the workspace:")
df_items

输出会确认创建的管道的名称,并会显示工作区的管道的列表,以便你可以验证它是否存在。

屏幕截图显示创建管道后的笔记本的输出。

步骤 6:使用管道

创建管道后,可以在 Fabric 工作区中对其进行编辑,以查看作为管道实现的图像。 可以选择每个活动以根据需要对其进行配置,然后根据需要运行和监视它。

屏幕截图显示了由笔记本通过 AI 生成的管道。