Dela via


materialiserad vy

Dekoratören @materialized_view kan användas för att definiera materialiserade vyer.

Om du vill definiera en materialiserad vy gäller du @materialized_view för en fråga som utför en batchläsning mot en datakälla.

Syntax

from pyspark import pipelines as dp

@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = True,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = False)
@dp.expect(...)
def <function-name>():
    return (<query>)

Parameterar

@dp.expect() är en valfri Lakeflow Deklarativ Pipelines förväntningsvillkor. Du kan inkludera flera förväntningar. Se Förväntningar.

Parameter Typ Description
funktion function Obligatoriskt. En funktion som returnerar en Apache Spark-batchdataram från en användardefinierad fråga.
name str Tabellnamnet. Om det inte anges används funktionsnamnet som standard.
comment str En beskrivning av tabellen.
spark_conf dict En lista över Spark-konfigurationer för körning av den här frågan
table_properties dict En lista över dict för tabellen.
path str En lagringsplats för tabelldata. Om den inte har angetts använder du den hanterade lagringsplatsen för schemat som innehåller tabellen.
partition_cols list En lista över en eller flera kolumner som ska användas för partitionering av tabellen.
cluster_by_auto bool Aktivera automatisk klustring av vätska på tabellen. Detta kan kombineras med cluster_by och definiera de kolumner som ska användas som inledande klustringsnycklar, följt av övervakning och automatiska uppdateringar av val av nycklar baserat på arbetsbelastningen. Se Automatisk flytande klustring.
cluster_by list Aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar. Se Använda flytande klustring för tabeller.
schema str eller StructType En schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med ett Python-StructType.
private bool Skapa en tabell, men publicera inte tabellen till metaarkivet. Tabellen är tillgänglig för pipelinen men är inte tillgänglig utanför pipelinen. Privata tabeller bevaras under pipelinens livslängd.
Standardvärdet är False.
row_filter str (Offentlig förhandsversion) En radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker.

Att ange ett schema är valfritt och kan göras med PySpark StructType eller SQL DDL. När du anger ett schema kan du inkludera genererade kolumner, kolumnmasker och primära och externa nycklar. See:

Examples

from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")