Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
This feature is in Public Preview.
In Databricks Runtime 15.3 and above, you can use the VARIANT type to ingest semi-structured data. This article describes behavior and provides example patterns for ingesting data from cloud object storage using Auto Loader and COPY INTO, streaming records from Kafka, and SQL commands for creating new tables with variant data or inserting new records using the variant type. The following table summarizes the supported file formats and Databricks Runtime version support:
| File format | Supported Databricks Runtime version |
|---|---|
| JSON | 15.3 and above |
| XML | 16.4 and above |
| CSV | 16.4 and above |
See Query variant data.
Create a table with a variant column
VARIANT is a standard SQL type in Databricks Runtime 15.3 and above and supported by tables backed by Delta Lake. Managed tables on Azure Databricks use Delta Lake by default, so you can create an empty table with a single VARIANT column using the following syntax:
CREATE TABLE table_name (variant_column VARIANT)
Alternately, you can use the PARSE_JSON function on a JSON string or the FROM_XML function on an XML string to use a CTAS statement to create a table with a variant column. The following example creates a table with two columns:
- The
idcolumn extracted from the JSON string as aSTRINGtype. - The
variant_columncolumn contains the entire JSON string encoded asVARIANTtype.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Note
Databricks recommends extracting and storing fields as non-variant columns that you plan to use to accelerate queries and optimize storage layout.
VARIANT columns cannot be used for clustering keys, partitions, or Z-order keys. The VARIANT data type cannot be used for comparisons, grouping, ordering, and set operations. For a full list of limitations, see Limitations.
Insert data using parse_json
If the target table already contains a column encoded as VARIANT, you can use parse_json to insert JSON string records as VARIANT, as in the following example:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Insert data using from_xml
If the target table already contains a column encoded as VARIANT, you can use from_xml to insert XML string records as VARIANT. For example:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Insert data using from_csv
If the target table already contains a column encoded as VARIANT, you can use from_csv to insert XML string records as VARIANT. For example:
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Ingest data from cloud object storage as variant
Auto Loader can be used to load all data from the supported file sources as a single VARIANT column in a target table. Because VARIANT is flexible to schema and type changes and maintains case sensitivity and NULL values present in the data source, this pattern is robust to most ingestion scenarios with the following caveats:
- Malformed records cannot be encoded using
VARIANTtype. VARIANTtype can only hold records up to 16mb in size.
Note
Variant treats overly large records similar to corrupt records. In the default PERMISSIVE processing mode, overly large records are captured in the corruptRecordColumn.
Because the entire record is recorded as a single VARIANT column, no schema evolution occurs during ingestion and rescuedDataColumn is not supported. The following example assumes that the target table already exists with a single VARIANT column.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
You can also specify VARIANT when defining a schema or passing schemaHints. The data in the referenced source field must contain a valid record. The following examples demonstrate this syntax:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Use COPY INTO with variant
Databricks recommends using Auto Loader over COPY INTO when available.
COPY INTO supports ingesting the entire contents of a supported data source as a single column. The following example creates a new table with a single VARIANT column and then uses COPY INTO to ingest records from a JSON file source.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Stream Kafka data as variant
Many Kafka streams encode their payloads using JSON. Ingesting Kafka streams using VARIANT makes these workloads robust to schema changes.
The following example demonstrates reading a Kafka streaming source, casting the key as a STRING and the value as VARIANT, and writing out to a target table.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)