在 Microsoft Fabric 中将 Azure AI 服务与 SynapseML 配合使用

Azure AI 服务 可帮助开发人员和组织构建负责任的应用程序,并准备好使用和自定义 API 和模型。 本文使用 Azure AI 服务执行包括:文本分析、翻译、文档智能、视觉、图像搜索、语音转文本和文本到语音、异常检测以及从 Web API 提取数据等任务。

Azure AI 服务可帮助开发人员创建应用程序,以便查看、听到、说话、理解并开始推理。 Azure AI 服务目录包括五大支柱: 视觉语音语言Web 搜索决策

先决条件

准备你的系统

首先导入所需的库并初始化 Spark 会话。

from pyspark.sql.functions import udf, col, lit
from synapse.ml.io.http import HTTPTransformer, http_udf
from requests import Request
from pyspark.ml import PipelineModel
import os
from pyspark.sql import SparkSession

# Start a Spark session.
spark = SparkSession.builder.getOrCreate()

导入 Azure AI 服务库。 在以下代码中,将占位符文本 <YOUR-KEY-VALUE> 替换为自己的键,并为每个服务设置位置值。

from synapse.ml.cognitive import *

# A general Azure AI services key for Text Analytics, Vision, and Document Intelligence (or use separate keys for each service).
service_key = "<YOUR-KEY-VALUE>"  # Replace `<YOUR-KEY-VALUE>` with your Azure AI services key. See prerequisites for details.
service_loc = "eastus"

# A Bing Search v7 subscription key.
bing_search_key = "<YOUR-KEY-VALUE>"  # Replace `<YOUR-KEY-VALUE>` with your Bing Search v7 subscription key. See prerequisites for details.

# An Anomaly Detector subscription key.
anomaly_key = "<YOUR-KEY-VALUE>"  # Replace `<YOUR-KEY-VALUE>` with your Anomaly Detector key. See prerequisites for details.
anomaly_loc = "westus2"

# A Translator subscription key.
translator_key = "<YOUR-KEY-VALUE>"  # Replace `<YOUR-KEY-VALUE>` with your Translator key. See prerequisites for details.
translator_loc = "eastus"

# An Azure Search key.
search_key = "<YOUR-KEY-VALUE>"  # Replace `<YOUR-KEY-VALUE>` with your Azure Search key. See prerequisites for details.

分析文本中的情绪

文本分析 服务提供了多种算法,用于从文本中提取智能见解。 例如,使用服务分析输入文本中的情绪。 服务返回介于 0.0 和 1.0 之间的分数:低分表示负面情绪,高分表示积极情绪。

此代码示例返回三个句子的情绪。

# Create a DataFrame that's tied to its column names
df = spark.createDataFrame(
    [
        ("I am so happy today, it's sunny!", "en-US"),
        ("I am frustrated by this rush hour traffic", "en-US"),
        ("The cognitive services on Spark aren't bad", "en-US"),
    ],
    ["text", "language"],
)

# Run the Text Analytics service with options
sentiment = (
    TextSentiment()
    .setTextCol("text")
    .setLocation(service_loc)
    .setSubscriptionKey(service_key)
    .setOutputCol("sentiment")
    .setErrorCol("error")
    .setLanguageCol("language")
)

# Show the results in a table.
display(
    sentiment.transform(df).select(
        "text", col("sentiment.document.sentiment").alias("sentiment")
    )
)

对健康数据进行文本分析

用于健康的文本分析 从非结构化文本中提取和标记医疗信息,如医生笔记、出院摘要、临床文档和电子健康记录。

此代码示例分析医生笔记中的文本并返回结构化数据。

df = spark.createDataFrame(
    [
        ("20mg of ibuprofen twice a day",),
        ("1tsp of Tylenol every 4 hours",),
        ("6 drops of vitamin B-12 every evening",),
    ],
    ["text"],
)

healthcare = (
    AnalyzeHealthText()
    .setSubscriptionKey(service_key)
    .setLocation(service_loc)
    .setLanguage("en")
    .setOutputCol("response")
)

display(healthcare.transform(df))

将文本翻译为其他语言

翻译器 是基于云的机器翻译服务,是用于构建智能应用的 Azure AI 服务系列认知 API 的一部分。 翻译器可轻松集成到应用、网站、工具和解决方案中。 它允许你以 90 种语言和方言添加多语言体验,并且适用于任何作系统进行文本翻译。

下面的代码示例将输入句子转换为目标语言。

from pyspark.sql.functions import col, flatten

# Create a DataFrame with the sentences to translate
df = spark.createDataFrame(
    [(["Hello, what is your name?", "Bye"],)],
    [
        "text",
    ],
)

# Run the Translator service.
translate = (
    Translate()
    .setSubscriptionKey(translator_key)
    .setLocation(translator_loc)
    .setTextCol("text")
    .setToLanguage(["zh-Hans"])
    .setOutputCol("translation")
)

# Show the translation results.
display(
    translate.transform(df)
    .withColumn("translation", flatten(col("translation.translations")))
    .withColumn("translation", col("translation.text"))
    .select("translation")
)

将文档的信息提取到结构化数据中

Azure AI 文档智能 是 Azure AI 服务的一部分,可让你使用机器学习构建自动化数据处理软件。 使用 Azure AI 文档智能从文档中识别和提取文本、键值对、选择标记、表和结构。 该服务输出结构化数据,其中包括来自原始文件的关系、边界框、置信度分数等。

以下代码分析名片图像并将其信息提取为结构化数据。

from pyspark.sql.functions import col, explode

# Create a DataFrame with the source files
imageDf = spark.createDataFrame(
    [
        (
            "https://mmlspark.blob.core.windows.net/datasets/FormRecognizer/business_card.jpg",
        )
    ],
    [
        "source",
    ],
)

# Run Azure AI Document Intelligence
analyzeBusinessCards = (
    AnalyzeBusinessCards()
    .setSubscriptionKey(service_key)
    .setLocation(service_loc)
    .setImageUrlCol("source")
    .setOutputCol("businessCards")
)

# Show recognition results.
display(
    analyzeBusinessCards.transform(imageDf)
    .withColumn(
        "documents", explode(col("businessCards.analyzeResult.documentResults.fields"))
    )
    .select("source", "documents")
)

分析和标记图像

Azure AI 视觉 分析图像以识别人脸、对象和自然语言说明。

此代码示例使用标记分析图像并对其进行 标记。 标记是图像中对象的单字描述、人员、风景和作。

# Create a DataFrame with image URLs.
base_url = "https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/"
df = spark.createDataFrame(
    [
        (base_url + "objects.jpg",),
        (base_url + "dog.jpg",),
        (base_url + "house.jpg",),
    ],
    [
        "image",
    ],
)

# Run Azure AI Vision to analyze images and extract information.
analysis = (
    AnalyzeImage()
    .setLocation(service_loc)
    .setSubscriptionKey(service_key)
    .setVisualFeatures(
        ["Categories", "Color", "Description", "Faces", "Objects", "Tags"]
    )
    .setOutputCol("analysis_results")
    .setImageUrlCol("image")
    .setErrorCol("error")
)

# Show the description tags.
display(analysis.transform(df).select("image", "analysis_results.description.tags"))

必应图像搜索 搜索 Web 以检索与用户自然语言查询相关的图像。

此示例使用文本查询查找引号的图像。 它输出与查询相关的图像 URL 列表。

# Number of images Bing returns per query
imgsPerBatch = 10
# List of offsets to page through the search results
offsets = [(i * imgsPerBatch,) for i in range(100)]
# Create a DataFrame of offsets to page through results
bingParameters = spark.createDataFrame(offsets, ["offset"])

# Run Bing Image Search with the text query
bingSearch = (
    BingImageSearch()
    .setSubscriptionKey(bing_search_key)
    .setOffsetCol("offset")
    .setQuery("Martin Luther King Jr. quotes")
    .setCount(imgsPerBatch)
    .setOutputCol("images")
)

# Create a transformer that extracts and flattens the Bing Image Search output into a single URL column
getUrls = BingImageSearch.getUrlTransformer("images", "url")

# Display the full results. Uncomment to use
# display(bingSearch.transform(bingParameters))

# Put both services into a pipeline
pipeline = PipelineModel(stages=[bingSearch, getUrls])

# Show the image URLs returned by the search
display(pipeline.transform(bingParameters))

将语音转换为文本

Azure AI 语音服务将语音音频流或文件转换为文本。 下面的代码示例转录一个音频文件。

# Create a DataFrame with the audio URL in the 'url' column
df = spark.createDataFrame(
    [("https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav",)], ["url"]
)

# Run Azure AI Speech to transcribe the audio
speech_to_text = (
    SpeechToTextSDK()
    .setSubscriptionKey(service_key)
    .setLocation(service_loc)
    .setOutputCol("text")
    .setAudioDataCol("url")
    .setLanguage("en-US")
    .setProfanity("Masked")
)

# Show the transcription results
display(speech_to_text.transform(df).select("url", "text.DisplayText"))

将文本转换为语音

文本转语音 是一项服务,可让你构建自然说话的应用和服务。 从 119 种语言和变体中的 270 多个神经语音中进行选择。

下面的代码示例将文本转换为音频文件。

from synapse.ml.cognitive import TextToSpeech

fs = ""
if running_on_databricks():
    fs = "dbfs:"
elif running_on_synapse_internal():
    fs = "Files"

# Create a dataframe with text and an output file location
df = spark.createDataFrame(
    [
        (
            "Reading out loud is fun! Check out aka.ms/spark for more information",
            fs + "/output.mp3",
        )
    ],
    ["text", "output_file"],
)

tts = (
    TextToSpeech()
    .setSubscriptionKey(service_key)
    .setTextCol("text")
    .setLocation(service_loc)
    .setVoiceName("en-US-JennyNeural")
    .setOutputFileCol("output_file")
)

# Check that there are no errors during audio creation
display(tts.transform(df))

检测时序数据中的异常

异常检测器 检测时序数据中的异常情况。 此示例使用异常检测器服务查找整个时序中的异常。

# Create a DataFrame with the point data that Anomaly Detector requires
df = spark.createDataFrame(
    [
        ("1972-01-01T00:00:00Z", 826.0),
        ("1972-02-01T00:00:00Z", 799.0),
        ("1972-03-01T00:00:00Z", 890.0),
        ("1972-04-01T00:00:00Z", 900.0),
        ("1972-05-01T00:00:00Z", 766.0),
        ("1972-06-01T00:00:00Z", 805.0),
        ("1972-07-01T00:00:00Z", 821.0),
        ("1972-08-01T00:00:00Z", 20000.0),
        ("1972-09-01T00:00:00Z", 883.0),
        ("1972-10-01T00:00:00Z", 898.0),
        ("1972-11-01T00:00:00Z", 957.0),
        ("1972-12-01T00:00:00Z", 924.0),
        ("1973-01-01T00:00:00Z", 881.0),
        ("1973-02-01T00:00:00Z", 837.0),
        ("1973-03-01T00:00:00Z", 9000.0),
    ],
    ["timestamp", "value"],
).withColumn("group", lit("series1"))

# Run Anomaly Detector to detect anomalies
anomaly_detector = (
    SimpleDetectAnomalies()
    .setSubscriptionKey(anomaly_key)
    .setLocation(anomaly_loc)
    .setTimestampCol("timestamp")
    .setValueCol("value")
    .setOutputCol("anomalies")
    .setGroupbyCol("group")
    .setGranularity("monthly")
)

# Show results with anomalies marked as True
display(
    anomaly_detector.transform(df).select("timestamp", "value", "anomalies.isAnomaly")
)

从任意 Web API 获取信息

将大型管道中的任何 Web 服务与 Spark 上的 HTTP 配合使用。 以下代码示例使用 世界银行 API 获取有关全球不同国家和地区的信息。

# Use any request from the Python requests library.


def world_bank_request(country):
    return Request(
        "GET", "http://api.worldbank.org/v2/country/{}?format=json".format(country)
    )


# Create a DataFrame that specifies the countries to get data for.
df = spark.createDataFrame([("br",), ("usa",)], ["country"]).withColumn(
    "request", http_udf(world_bank_request)(col("country"))
)

# Improve big data performance by using concurrency.
client = (
    HTTPTransformer().setConcurrency(3).setInputCol("request").setOutputCol("response")
)

# Get the body of the response.


def get_response_body(resp):
    return resp.entity.content.decode()


# Show country details from the response.
display(
    client.transform(df).select(
        "country", udf(get_response_body)(col("response")).alias("response")
    )
)