Dela via


table

Dekoratören @table kan användas för att definiera strömmande tabeller.

För att definiera en strömmande tabell, tillämpa @table på en fråga som utför en strömmande läsning mot en datakälla eller använd funktionen create_streaming_table().

Anmärkning

I den äldre dlt modulen användes operatorn @table för att skapa både strömmande tabeller och materialiserade vyer. Operatorn @table i modulen pyspark.pipelines fungerar fortfarande på det här sättet, men Databricks rekommenderar att du använder operatorn @materialized_view för att skapa materialiserade vyer.

Syntax

from pyspark import pipelines as dp

@dp.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = True,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = False)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parameterar

@dp.expect() är en valfri Lakeflow Deklarativ Pipelines förväntningsvillkor. Du kan inkludera flera förväntningar. Se Förväntningar.

Parameter Typ Description
funktion function Obligatoriskt. En funktion som returnerar en Apache Spark-strömmande DataFrame från en användardefinierad fråga.
name str Tabellnamnet. Om det inte anges används funktionsnamnet som standard.
comment str En beskrivning av tabellen.
spark_conf dict En lista över Spark-konfigurationer för körning av den här frågan
table_properties dict En lista över dict för tabellen.
path str En lagringsplats för tabelldata. Om den inte har angetts använder du den hanterade lagringsplatsen för schemat som innehåller tabellen.
partition_cols list En lista över en eller flera kolumner som ska användas för partitionering av tabellen.
cluster_by_auto bool Aktivera automatisk klustring av vätska på tabellen. Detta kan kombineras med cluster_by och definiera de kolumner som ska användas som inledande klustringsnycklar, följt av övervakning och automatiska uppdateringar av val av nycklar baserat på arbetsbelastningen. Se Automatisk flytande klustring.
cluster_by list Aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar. Se Använda flytande klustring för tabeller.
schema str eller StructType En schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med ett Python-StructType.
private bool Skapa en tabell, men publicera inte tabellen till metaarkivet. Tabellen är tillgänglig för pipelinen men är inte tillgänglig utanför pipelinen. Privata tabeller bevaras under pipelinens livslängd.
Standardvärdet är False.
Privata tabeller skapades tidigare med parametern temporary .
row_filter str (Offentlig förhandsversion) En radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker.

Att ange ett schema är valfritt och kan göras med PySpark StructType eller SQL DDL. När du anger ett schema kan du inkludera genererade kolumner, kolumnmasker och primära och externa nycklar. See:

Examples

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")