使用 from_json 在 Lakeflow 声明性管道中推断和演化模式

重要

该功能处于公开预览阶段。

本文介绍如何使用 from_json Lakeflow 声明性管道中的 SQL 函数推断和改进 JSON Blob 的架构。

概述

from_json SQL 函数分析 JSON 字符串列并返回结构值。 在 Lakeflow 声明性管道外部使用时,必须显式提供返回值的架构,使用参数 schema。 与 Lakeflow 声明性管道一起使用时,可以启用架构推理和演变,从而自动管理返回值的架构。 此功能既简化了初始设置(尤其是在架构未知的情况下),也简化了架构频繁更改时的日常操作。 它能够无缝处理来自流数据源(如 Auto Loader、Kafka 或 Kinesis)的任意 JSON 二进制大对象(Blob)。

具体而言,在 Lakeflow 声明性管道中使用时,SQL 函数的 from_json 架构推理和演变可以:

  • 检测传入 JSON 记录中的新字段(包括嵌套 JSON 对象)
  • 推断字段类型并将其映射到相应的 Spark 数据类型
  • 自动改进架构以适应新字段
  • 自动处理不符合当前架构的数据

语法:自动推断和改进架构

如果与 Lakeflow 声明性管道一起使用 from_json ,它可以自动推断和改进架构。 若要启用此功能,请将架构设置为 NULL 并指定 schemaLocationKey 选项。 这使它可以推断和跟踪架构。

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

Python

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

查询可以有多个 from_json 表达式,但每个表达式必须具有唯 schemaLocationKey一的表达式。 在每个管道中,schemaLocationKey也必须是唯一的。

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

语法:固定架构

如果要改为强制实施特定架构,可以使用以下 from_json 语法来分析使用该架构的 JSON 字符串:

from_json(jsonStr, schema, [, options])

此语法可在任何 Azure Databricks 环境中使用,包括 Lakeflow 声明性管道。 此处提供了详细信息。

模式推断

from_json 从第一批 JSON 数据列推断出架构,并根据其 schemaLocationKey(必需)进行内部索引。

如果 JSON 字符串是单个对象(例如),{"id": 123, "name": "John"}from_json则推断 STRUCT 类型的架构,并向字段列表添加一个rescuedDataColumn

STRUCT<id LONG, name STRING, _rescued_data STRING>

但是,如果 JSON 字符串具有顶级数组(例如 ["id": 123, "name": "John"]),则将 from_json ARRAY 包装在 STRUCT 中。 此方法支持恢复与推断架构不兼容的数据。 可以选择将数组值 分解 为下游的单独行。

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

使用架构提示替代架构推理

可以选择提供 schemaHints 以影响 from_json 如何推断列的类型。 如果知道某个列是特定的数据类型,或者想要选择更常规的数据类型(例如,双精度浮点数而不是整数),这会很有帮助。 可以使用 SQL 架构规范语法为列数据类型提供任意数量的提示。 架构提示的语义与自动加载程序 架构提示的语义相同。 例如:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

当 JSON 字符串包含顶级 ARRAY 时,它将包装在 STRUCT 中。 在这些情况下,架构提示将应用于 ARRAY 架构,而不是包装的 STRUCT。 例如,考虑一个包含顶级数组的 JSON 字符串,例如:

[{"id": 123, "name": "John"}]

推断出的 ARRAY 结构被包装在 STRUCT 中:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

若要更改数据类型 id,请将架构提示指定为 element.id STRING。 若要添加新的 DOUBLE 列,请指定 element.new_col DOUBLE。 由于这些提示,顶级 JSON 数组的架构将变为:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

使用 schemaEvolutionMode 进化架构

from_json 在处理数据时检测新增列。 当from_json检测到新字段时,它会通过将新列合并到架构末尾来用最新架构更新推断的架构。 现有列的数据类型保持不变。 架构更新后,管道会自动按照更新的架构重启。

from_json 支持通过可选的 schemaEvolutionMode 设置来设定架构演变的以下模式。 这些模式与 自动加载程序一致。

schemaEvolutionMode 读取新列时的行为
addNewColumns(默认值) 数据流失败。 新列将添加到架构中。 现有列不会演变数据类型。
rescue 架构永远不会演变,流不会因为架构更改而失败。 所有新列都记录在 已获救的数据列中
failOnNewColumns 流媒体失败。 除非更新了 schemaHints 或删除了有问题的数据,否则流不会重启。
none 不会改进架构,将忽略新列,除非 rescuedDataColumn 设置了该选项,否则不会拯救数据。 由于架构更改,Stream 不会失败。

例如:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

恢复数据列

被恢复的数据列会作为_rescued_data自动添加到您的架构中。 可以通过设置 rescuedDataColumn 选项来重命名列。 例如:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

当您选择使用已获救的数据列时,任何与推断架构不匹配的列都会被获救,而不是被删除。 这可能是因为数据类型不匹配、架构中缺少的列或列名大小写差异而发生。

处理损坏的记录

若要存储格式不正确且无法分析的记录,请通过设置架构提示添加 _corrupt_record 列,如以下示例所示:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

若要重命名损坏的记录列,请设置 columnNameOfCorruptRecord 该选项。

JSON 分析器支持三种处理损坏记录的模式:

模式 Description
PERMISSIVE 对于损坏的记录,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式不正确的字段设置为 null。 若要保留损坏的记录,可以设置以用户定义的架构命名 columnNameOfCorruptRecord 的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 在推断架构时,分析器会在输出架构中隐式添加字段 columnNameOfCorruptRecord
DROPMALFORMED 忽略损坏的记录。
使用 DROPMALFORMED 模式与 rescuedDataColumn 时,数据类型不匹配不会导致记录被删除。 仅删除损坏的记录,例如不完整或格式不正确的 JSON。
FAILFAST 解析器遇到损坏的记录时抛出异常。
FAILFAST 模式与 rescuedDataColumn 一起使用时,数据类型不匹配不会引发错误。 仅损坏的记录会引发错误,例如不完整或格式不正确的 JSON。

引用`from_json`输出中的字段

from_json 会在管道执行期间推断架构。 如果下游查询在from_json函数至少成功执行一次之前引用了from_json字段,该字段将无法解析,并且查询将被跳过。 在以下示例中,将跳过对白银表的查询分析,直到青铜查询中的 from_json 函数执行并推断架构。

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

from_json如果同一查询中引用了函数及其推断的字段,则分析可能会失败,如以下示例所示:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

可以通过将对字段的 from_json 引用移动到下游查询(如上面的青铜/白银示例)来解决此问题。或者,可以指定 schemaHints 包含引用 from_json 字段。 例如:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

示例:自动推断和改进架构

本部分提供了使用 from_json 在 Lakeflow 声明性管道中启用自动架构推理和演变的示例代码。

从云对象存储创建流式处理表

以下示例使用 read_files 语法从云对象存储创建流式表。

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

@dp.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

从 Kafka 创建流表

以下示例使用 read_kafka 语法从 Kafka 创建流表。

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Python

@dp.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

示例:固定架构

有关用于 from_json 固定架构的示例代码,请参阅 from_json 函数

FAQs

本部分解答了有关函数中 from_json 架构推理和演变支持的常见问题。

from_jsonparse_json之间有什么区别?

parse_json 函数从 JSON 字符串返回一个 VARIANT 值。

VARIANT 提供了一种灵活高效的方法来存储半结构化数据。 这通过完全去除严格类型来绕过架构推理和演变。 但是,如果要在写入时强制实施架构(例如,因为架构相对严格), from_json 可能是更好的选择。

下表描述了 from_jsonparse_json 之间的差异:

功能 用例 可用性
from_json 模式演化与 from_json 保持模式一致性。 这在以下情况下很有用:
  • 你想要强制实施数据架构(例如,在保存数据架构之前查看每个架构更改)。
  • 你想要优化存储,并且需要低查询延迟和成本。
  • 在类型不匹配的数据上你需要触发错误。
  • 你想要从损坏的 JSON 记录中提取部分结果,并将格式不正确的记录 _corrupt_record 存储在列中。 相比之下,VARIANT 引入返回无效 JSON 的错误。
仅在 Lakeflow 声明性管道中架构推理和演变可用
parse_json VARIANT 特别适合保存不需要架构化的数据。 例如:
  • 你想要保留半结构化的数据,因为它很灵活。
  • 架构变化太快,无法适配为一个固定的架构,导致频繁的流故障和重启。
  • 您不希望在处理数据类型不匹配时出现失败。 (即使存在类型不匹配,有效的 JSON 记录也会始终成功导入到 VARIANT。)
  • 用户不想处理包含不符合架构的字段的恢复数据列。
可用于和不使用 Lakeflow 声明性管道

是否可以在 Lakeflow 声明性管道之外使用 from_json 架构推理和演变语法?

否,不能在 Lakeflow 声明性管道之外使用 from_json 架构推理和演变语法。

如何访问由 from_json 推断的架构?

查看目标流表的模式。

是否可以传递 from_json 架构并执行演变?

不可以,不能传递 from_json 架构,也无法进行演变。 你可以提供架构提示以覆盖from_json推断出的某些或全部字段。

如果完全刷新表,架构会发生什么情况?

清除与表关联的架构位置,并从头开始重新推断架构。