使用
重要
该功能处于公开预览阶段。
本文介绍如何使用 from_json Lakeflow 声明性管道中的 SQL 函数推断和改进 JSON Blob 的架构。
概述
from_json SQL 函数分析 JSON 字符串列并返回结构值。 在 Lakeflow 声明性管道外部使用时,必须显式提供返回值的架构,使用参数 schema。 与 Lakeflow 声明性管道一起使用时,可以启用架构推理和演变,从而自动管理返回值的架构。 此功能既简化了初始设置(尤其是在架构未知的情况下),也简化了架构频繁更改时的日常操作。 它能够无缝处理来自流数据源(如 Auto Loader、Kafka 或 Kinesis)的任意 JSON 二进制大对象(Blob)。
具体而言,在 Lakeflow 声明性管道中使用时,SQL 函数的 from_json 架构推理和演变可以:
- 检测传入 JSON 记录中的新字段(包括嵌套 JSON 对象)
- 推断字段类型并将其映射到相应的 Spark 数据类型
- 自动改进架构以适应新字段
- 自动处理不符合当前架构的数据
语法:自动推断和改进架构
如果与 Lakeflow 声明性管道一起使用 from_json ,它可以自动推断和改进架构。 若要启用此功能,请将架构设置为 NULL 并指定 schemaLocationKey 选项。 这使它可以推断和跟踪架构。
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
查询可以有多个 from_json 表达式,但每个表达式必须具有唯 schemaLocationKey一的表达式。 在每个管道中,schemaLocationKey也必须是唯一的。
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
语法:固定架构
如果要改为强制实施特定架构,可以使用以下 from_json 语法来分析使用该架构的 JSON 字符串:
from_json(jsonStr, schema, [, options])
此语法可在任何 Azure Databricks 环境中使用,包括 Lakeflow 声明性管道。 此处
模式推断
from_json 从第一批 JSON 数据列推断出架构,并根据其 schemaLocationKey(必需)进行内部索引。
如果 JSON 字符串是单个对象(例如),{"id": 123, "name": "John"}from_json则推断 STRUCT 类型的架构,并向字段列表添加一个rescuedDataColumn。
STRUCT<id LONG, name STRING, _rescued_data STRING>
但是,如果 JSON 字符串具有顶级数组(例如 ["id": 123, "name": "John"]),则将 from_json ARRAY 包装在 STRUCT 中。 此方法支持恢复与推断架构不兼容的数据。 可以选择将数组值 分解 为下游的单独行。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
使用架构提示替代架构推理
可以选择提供 schemaHints 以影响 from_json 如何推断列的类型。 如果知道某个列是特定的数据类型,或者想要选择更常规的数据类型(例如,双精度浮点数而不是整数),这会很有帮助。 可以使用 SQL 架构规范语法为列数据类型提供任意数量的提示。 架构提示的语义与自动加载程序 架构提示的语义相同。 例如:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
当 JSON 字符串包含顶级 ARRAY 时,它将包装在 STRUCT 中。 在这些情况下,架构提示将应用于 ARRAY 架构,而不是包装的 STRUCT。 例如,考虑一个包含顶级数组的 JSON 字符串,例如:
[{"id": 123, "name": "John"}]
推断出的 ARRAY 结构被包装在 STRUCT 中:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
若要更改数据类型 id,请将架构提示指定为 element.id STRING。 若要添加新的 DOUBLE 列,请指定 element.new_col DOUBLE。 由于这些提示,顶级 JSON 数组的架构将变为:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
使用 schemaEvolutionMode 进化架构
from_json 在处理数据时检测新增列。 当from_json检测到新字段时,它会通过将新列合并到架构末尾来用最新架构更新推断的架构。 现有列的数据类型保持不变。 架构更新后,管道会自动按照更新的架构重启。
from_json 支持通过可选的 schemaEvolutionMode 设置来设定架构演变的以下模式。 这些模式与 自动加载程序一致。
schemaEvolutionMode |
读取新列时的行为 |
|---|---|
addNewColumns(默认值) |
数据流失败。 新列将添加到架构中。 现有列不会演变数据类型。 |
rescue |
架构永远不会演变,流不会因为架构更改而失败。 所有新列都记录在 已获救的数据列中。 |
failOnNewColumns |
流媒体失败。 除非更新了 schemaHints 或删除了有问题的数据,否则流不会重启。 |
none |
不会改进架构,将忽略新列,除非 rescuedDataColumn 设置了该选项,否则不会拯救数据。 由于架构更改,Stream 不会失败。 |
例如:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
恢复数据列
被恢复的数据列会作为_rescued_data自动添加到您的架构中。 可以通过设置 rescuedDataColumn 选项来重命名列。 例如:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
当您选择使用已获救的数据列时,任何与推断架构不匹配的列都会被获救,而不是被删除。 这可能是因为数据类型不匹配、架构中缺少的列或列名大小写差异而发生。
处理损坏的记录
若要存储格式不正确且无法分析的记录,请通过设置架构提示添加 _corrupt_record 列,如以下示例所示:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
若要重命名损坏的记录列,请设置 columnNameOfCorruptRecord 该选项。
JSON 分析器支持三种处理损坏记录的模式:
| 模式 | Description |
|---|---|
PERMISSIVE |
对于损坏的记录,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式不正确的字段设置为 null。 若要保留损坏的记录,可以设置以用户定义的架构命名 columnNameOfCorruptRecord 的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 在推断架构时,分析器会在输出架构中隐式添加字段 columnNameOfCorruptRecord 。 |
DROPMALFORMED |
忽略损坏的记录。 使用 DROPMALFORMED 模式与 rescuedDataColumn 时,数据类型不匹配不会导致记录被删除。 仅删除损坏的记录,例如不完整或格式不正确的 JSON。 |
FAILFAST |
解析器遇到损坏的记录时抛出异常。 将 FAILFAST 模式与 rescuedDataColumn 一起使用时,数据类型不匹配不会引发错误。 仅损坏的记录会引发错误,例如不完整或格式不正确的 JSON。 |
引用`from_json`输出中的字段
from_json 会在管道执行期间推断架构。 如果下游查询在from_json函数至少成功执行一次之前引用了from_json字段,该字段将无法解析,并且查询将被跳过。 在以下示例中,将跳过对白银表的查询分析,直到青铜查询中的 from_json 函数执行并推断架构。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
from_json如果同一查询中引用了函数及其推断的字段,则分析可能会失败,如以下示例所示:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
可以通过将对字段的 from_json 引用移动到下游查询(如上面的青铜/白银示例)来解决此问题。或者,可以指定 schemaHints 包含引用 from_json 字段。 例如:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
示例:自动推断和改进架构
本部分提供了使用 from_json 在 Lakeflow 声明性管道中启用自动架构推理和演变的示例代码。
从云对象存储创建流式处理表
以下示例使用 read_files 语法从云对象存储创建流式表。
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
从 Kafka 创建流表
以下示例使用 read_kafka 语法从 Kafka 创建流表。
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
示例:固定架构
有关用于 from_json 固定架构的示例代码,请参阅 from_json 函数。
FAQs
本部分解答了有关函数中 from_json 架构推理和演变支持的常见问题。
from_json和parse_json之间有什么区别?
该 parse_json 函数从 JSON 字符串返回一个 VARIANT 值。
VARIANT 提供了一种灵活高效的方法来存储半结构化数据。 这通过完全去除严格类型来绕过架构推理和演变。 但是,如果要在写入时强制实施架构(例如,因为架构相对严格), from_json 可能是更好的选择。
下表描述了 from_json 和 parse_json 之间的差异:
| 功能 | 用例 | 可用性 |
|---|---|---|
from_json |
模式演化与 from_json 保持模式一致性。 这在以下情况下很有用:
|
仅在 Lakeflow 声明性管道中架构推理和演变可用 |
parse_json |
VARIANT 特别适合保存不需要架构化的数据。 例如:
|
可用于和不使用 Lakeflow 声明性管道 |
是否可以在 Lakeflow 声明性管道之外使用 from_json 架构推理和演变语法?
否,不能在 Lakeflow 声明性管道之外使用 from_json 架构推理和演变语法。
如何访问由 from_json 推断的架构?
查看目标流表的模式。
是否可以传递 from_json 架构并执行演变?
不可以,不能传递 from_json 架构,也无法进行演变。 你可以提供架构提示以覆盖from_json推断出的某些或全部字段。
如果完全刷新表,架构会发生什么情况?
清除与表关联的架构位置,并从头开始重新推断架构。