Dela via


Utveckla pipeline-kod med SQL

Lakeflow Deklarativa Pipelines introducerar flera nya SQL-nyckelord och funktioner för att definiera materialiserade vyer och strömmande tabeller i pipelines. SQL-stöd för utveckling av pipelines bygger på grunderna i Spark SQL och lägger till stöd för structured Streaming-funktioner.

Användare som är bekanta med PySpark DataFrames kanske föredrar att utveckla pipelinekod med Python. Python har stöd för mer omfattande testning och åtgärder som är svåra att implementera med SQL, till exempel metaprogramåtgärder. Se utveckla pipelinekod med Python.

En fullständig referens till SQL-syntaxen för Lakeflow Deklarativa pipelines finns i SQL-språkreferens för Lakeflow Deklarativa pipelines.

Grunderna i SQL för pipelineutveckling

SQL-kod som skapar datauppsättningar för deklarativa pipelines i Lakeflow använder CREATE OR REFRESH-syntaxen för att definiera materialiserade vyer och strömmande tabeller baserat på frågeresultat.

Nyckelordet STREAM anger om datakällan som refereras i en SELECT-sats ska läsas med strömmande semantik.

Läser och skriver som standard den katalog och det schema som angavs under pipelinekonfigurationen. Se Ange målkatalogen och schemat.

Lakeflow Deklarativa pipelines-källkoden skiljer sig kritiskt från SQL-skript: Lakeflow Deklarativa pipelines utvärderar alla datamängdsdefinitioner i alla källkodsfiler som konfigurerats i en pipeline och skapar ett dataflödesdiagram innan några frågor körs. Ordningen på frågor som förekommer i källfilerna definierar ordningen på kodutvärderingen, men inte ordningen på körning av frågor.

Skapa en materialiserad vy med SQL

I följande kodexempel visas den grundläggande syntaxen för att skapa en materialiserad vy med SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Skapa en strömningstabell med SQL

I följande kodexempel visas den grundläggande syntaxen för att skapa en strömningstabell med SQL. När du läser en källa för en strömmande tabell anger nyckelordet STREAM att använda strömmande semantik för källan. Använd inte nyckelordet STREAM när du skapar en materialiserad vy:

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Anmärkning

Använd stream-nyckelordet för att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor. Om du vill mata in data som har ändringsincheckningar kan du använda Python och SkipChangeCommits alternativet för att hantera fel.

Läsa in data från objektlagring

Lakeflow deklarativa pipeline stöder inläsning av data från alla format som stöds av Azure Databricks. Se alternativ för dataformat.

Anmärkning

I de här exemplen används data som är tillgängliga under /databricks-datasets och monteras automatiskt på din arbetsyta. Databricks rekommenderar att du använder volymsökvägar eller moln-URI:er för att referera till data som lagras i molnobjektlagring. Se Vad är Unity Catalog-volymer?.

Databricks rekommenderar att du använder tabeller för automatisk inläsning och strömning när du konfigurerar inkrementella inmatningsarbetsbelastningar mot data som lagras i molnobjektlagring. Se Vad är en automatisk inläsare?.

SQL använder funktionen read_files för att anropa funktioner för automatisk inläsning. Du måste också använda nyckelordet STREAM för att konfigurera en direktuppspelningsläsning med read_files.

Följande beskriver syntaxen för read_files i SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Alternativ för Auto Loader är nyckel-värde-par. Mer information om format och alternativ som stöds finns i Alternativ.

I följande exempel skapas en strömmande tabell från JSON-filer med autoinläsning:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Funktionen read_files stöder även batchsemantik för att skapa materialiserade vyer. I följande exempel används batchsemantik för att läsa en JSON-katalog och skapa en materialiserad vy:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Verifiera data med förväntningar

Du kan använda förväntningar för att ange och tillämpa datakvalitetsbegränsningar. Se avsnittet Hantera datakvalitet med pipeline-förväntningar.

Följande kod definierar en förväntan med namnet valid_data som tar bort poster som är null under datainmatning:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Fråga efter materialiserade vyer och strömstabeller som är definierade i din pipeline

I följande exempel definieras fyra datauppsättningar:

  • En strömmande tabell med namnet orders som läser in JSON-data.
  • En materialiserad vy med namnet customers som läser in CSV-data.
  • En materialiserad vy med namnet customer_orders som kopplar poster från datauppsättningarna orders och customers, omvandlar ordertidsstämpeln till ett datum och väljer fälten customer_id, order_number, stateoch order_date.
  • En materialiserad vy med namnet daily_orders_by_state som aggregerar det dagliga antalet order för varje tillstånd.

Anmärkning

När du kör frågor mot vyer eller tabeller i pipelinen kan du ange katalogen och schemat direkt, eller så kan du använda standardvärdena som konfigurerats i pipelinen. I det här exemplet skrivs tabellerna orders, customersoch customer_orders från standardkatalogen och schemat som konfigurerats för pipelinen.

Publiceringsläget för äldre system använder LIVE-schemat för att hämta data från andra materialiserade vyer och strömmande tabeller som definierats i din pipeline. I nya pipelines ignoreras schemasyntaxen LIVE utan att det märks. Se LIVE-schemat (äldre).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Definiera en privat tabell

Du kan använda PRIVATE -satsen när du skapar en materialiserad vy eller en strömmande tabell. När du skapar en privat tabell skapar du tabellen, men skapar inte metadata för tabellen. PRIVATE Satsen instruerar Lakeflow Deklarativa pipelines att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden bevaras en privat tabell under pipelinens livslängd som skapar den, och inte bara en enda uppdatering.

Privata tabeller kan ha samma namn som tabeller i katalogen. Om du anger ett okvalificerat namn för en tabell i en pipeline, används den privata tabellen om det finns både en privat tabell och en katalogtabell med det namnet.

Privata tabeller refererades tidigare till som temporära tabeller.

Ta bort poster permanent från en materialiserad vy eller direktuppspelningstabell

Om du vill ta bort poster permanent från en strömmande tabell med borttagningsvektorer aktiverade, till exempel för GDPR-efterlevnad, måste ytterligare åtgärder utföras på objektets underliggande Delta-tabeller. Information om hur du tar bort poster från en strömmande tabell finns i Ta bort poster permanent från en strömmande tabell.

Materialiserade vyer återspeglar alltid data i de underliggande tabellerna när de uppdateras. Om du vill ta bort data i en materialiserad vy måste du ta bort data från källan och uppdatera den materialiserade vyn.

Parameterisera värden som används vid deklarering av tabeller eller vyer med SQL

Använd SET för att ange ett konfigurationsvärde i en fråga som deklarerar en tabell eller vy, inklusive Spark-konfigurationer. Alla tabeller eller vyer som du definierar i en källfil efter att -instruktionen SET har åtkomst till det definierade värdet. Alla Spark-konfigurationer som anges med instruktionen SET används vid körning av Spark-frågan för en tabell eller vy som följer SET-instruktionen. Om du vill läsa ett konfigurationsvärde i en fråga använder du syntaxen för stränginterpolation ${}. I följande exempel anges ett Spark-konfigurationsvärde med namnet startDate och det värdet används i en fråga:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Om du vill ange flera konfigurationsvärden använder du en separat SET-instruktion för varje värde.

Begränsningar

Satsen PIVOT stöds inte. Åtgärden pivot i Spark kräver ivrig inläsning av indata för att beräkna utdataschemat. Den här funktionen stöds inte i deklarativa Lakeflow-pipelines.

Anmärkning

Syntaxen CREATE OR REFRESH LIVE TABLE för att skapa en materialiserad vy är inaktuell. Använd CREATE OR REFRESH MATERIALIZED VIEWi stället .