使用 Delta Lake 选择性地覆盖数据

Azure Databricks 利用 Delta Lake 功能支持两种不同的选择性覆盖选项:

  • replaceWhere 选项以原子方式替换与给定谓词匹配的所有记录。
  • 可以根据表的分区方式使用动态分区覆盖来替换数据目录。

对于大多数操作,Databricks 建议使用 replaceWhere 来指定要覆盖的数据。

Important

如果数据被意外覆盖,则可以使用 还原 来撤消更改。

使用 replaceWhere 进行的任意选择性覆盖

可以有选择性地只覆盖与任意表达式匹配的数据。

Note

SQL 需要 Databricks Runtime 12.2 LTS 或更高版本。

以下命令以原子方式替换目标表中一月的事件,该表按照 start_date 分区,数据位于 replace_data 中:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码在 replace_data中写出数据,验证所有行是否与谓词匹配,并使用 overwrite 语义执行原子替换。 如果操作中的任何值都超出约束范围,则此操作默认失败,并显示错误。

可以将此行为更改为谓词范围内的 overwrite 值和指定范围外的 insert 记录。 为此,请使用以下设置之一将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 来禁用约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

旧行为

在旧的默认行为中,replaceWhere 仅覆盖与分区列上的谓词匹配的数据。 在此旧模型中,以下命令以原子方式替换目标表中的一月,该表按照 date 分区,数据位于 df 中:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

如果要回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

动态分区覆盖

动态分区覆盖仅更新写入提交新数据的分区。 它会覆盖这些分区中的所有现有数据,并使其他人保持不变。

Azure Databricks 支持两种方法:

以下各节演示如何使用每个方法。

使用 REPLACE USING 进行的动态分区覆盖

Databricks Runtime 16.3及更高版本支持使用REPLACE USING对分区表进行动态分区覆盖。 此方法允许你跨所有计算类型选择性覆盖数据,无需设置 Spark 会话配置。 REPLACE USING 启用独立于计算的原子覆盖行为,适用于 Databricks SQL 仓库、无服务器计算和传统计算。

REPLACE USING 仅覆盖传入数据目标的分区。 所有其他分区保持不变。

以下示例演示如何使用 REPLACE USING 进行动态分区覆盖。 目前,只能使用 SQL,不能使用 Python 或 Scala。 有关详细信息,请参阅 INSERT SQL 语言参考。

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

使用动态分区覆盖时请记住以下约束和行为:

  • 你必须在 USING 中指定表的所有分区列。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单行可能会无意中覆盖整个分区。

如果需要比支持更多的REPLACE USING可自定义匹配逻辑,例如将值视为NULL相等,请改用互补。REPLACE ON 有关详细信息,请参阅 INSERT

使用 partitionOverwriteMode 的动态分区覆盖(旧版)

Important

此功能目前以公共预览版提供。

Databricks Runtime 11.3 LTS 及更高版本支持使用覆盖模式对分区表进行动态分区覆盖:SQL 中的 INSERT OVERWRITE,或带有 df.write.mode("overwrite") 的 DataFrame 写入。 这种类型的覆盖仅适用于经典计算,而不适用于 Databricks SQL 仓库或无服务器计算。

通过将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 来配置动态分区覆盖模式。 或者,可以将选项DataFrameWriter设置为 partitionOverwriteModedynamic。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 spark.sql.sources.partitionOverwriteMode 的默认值是 static

以下示例演示如何使用 partitionOverwriteMode

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

请牢记以下 partitionOverwriteMode 的限制和特性:

  • 您不能将 overwriteSchema 设置为 true
  • 不能同时在同一个partitionOverwriteMode操作中指定replaceWhereDataFrameWriter
  • 如果使用 replaceWhere 指定 DataFrameWriter,Delta Lake 会应用该条件来控制要覆盖的数据。 此选项优先于 partitionOverwriteMode 会话级配置。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单行可能会无意中覆盖整个分区。