Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här artikeln beskriver hur du kan använda Deklarativa pipelines för Lakeflow för att deklarera transformeringar på datauppsättningar och ange hur poster bearbetas via frågelogik. Den innehåller också exempel på vanliga transformationsmönster för att skapa Lakeflow deklarativa pipelines.
Du kan definiera en datauppsättning mot alla frågor som returnerar en DataFrame. Du kan använda Apache Spark inbyggda operationer, UDF:er, anpassad logik och MLflow-modeller som transformationer i Lakeflow Deklarativa Pipelines. När data har matats in i din pipeline kan du definiera nya datauppsättningar mot överordnade källor för att skapa nya strömmande tabeller, materialiserade vyer och vyer.
För mer information om hur du effektivt utför tillståndskänslig bearbetning med deklarativa Lakeflow-pipelines, se Optimera tillståndskänslig bearbetning i deklarativa Lakeflow-pipelines med vattenstämplar.
När du ska använda vyer, materialiserade vyer och strömmande tabeller
När du implementerar dina pipelinefrågor väljer du den bästa datamängdstypen för att säkerställa att de är effektiva och underhållsbara.
Överväg att använda en vy för att göra följande:
- Dela upp en stor eller komplex fråga i enklare, hanterbara frågor.
- Verifiera mellanliggande resultat med hjälp av förväntningar.
- Minska lagrings- och beräkningskostnader för resultat som du inte behöver behålla. Eftersom tabeller materialiseras behöver de ytterligare beräknings- och lagringsresurser.
Överväg att använda en materialiserad vy när:
- Flera nedströmsförfrågningar använder tabellen. Eftersom vyer beräknas på begäran, omberäknas vyn varje gång den frågas.
- Andra pipelines, jobb eller frågor konsumerar databastabellen. Eftersom vyer inte materialiseras kan du bara använda dem inom samma pipeline.
- Du vill visa resultatet av en fråga under utvecklingen. Eftersom tabeller materialiseras och kan visas och efterfrågas utanför pipelinen kan du använda tabeller under utvecklingen för att verifiera att beräkningarna är korrekta. När du har verifierat konverterar du frågor som inte kräver materialisering till vyer.
Överväg att använda en streamingtabell när:
- En fråga definieras mot en datakälla som växer kontinuerligt eller inkrementellt.
- Frågeresultat bör beräknas stegvis.
- Pipelinen behöver högt dataflöde och låg svarstid.
Anmärkning
Strömmande tabeller definieras alltid utifrån strömmande källor. Du kan också använda strömmande källor med AUTO CDC ... INTO för att tillämpa uppdateringar från CDC-flöden. Se API:er för AUTOMATISK CDC: Förenkla datainsamling av ändringar med Lakeflow Deklarativa pipelines.
Undanta tabeller från målschemat
Om du måste beräkna mellanliggande tabeller som inte är avsedda för extern förbrukning kan du förhindra att de publiceras till ett schema med hjälp av nyckelordet TEMPORARY. Tillfälliga tabeller hanterar och bearbetar fortfarande data enligt Lakeflow deklarativa pipelines-semantik men bör inte nås utanför den aktuella pipelinen. En tillfällig tabell finns kvar under livslängden för den pipeline som skapar den. Använd följande syntax för att deklarera temporära tabeller:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
python
@dp.table(
temporary=True)
def temp_table():
return ("...")
Kombinera strömmande tabeller och materialiserade vyer i en enda pipeline
Strömmande tabeller ärver bearbetningsgarantierna för Apache Spark Structured Streaming och är konfigurerade för att bearbeta frågor från tilläggsdatakällor, där nya rader alltid infogas i källtabellen i stället för att ändras.
Anmärkning
Även om strömmande tabeller som standard kräver tilläggsdatakällor, kan du åsidosätta det här beteendet med flaggan skipChangeCommits när en strömmande källa är en annan strömmande tabell som kräver uppdateringar eller borttagningar.
Ett vanligt strömningsmönster omfattar inmatning av källdata för att skapa de första datauppsättningarna i en pipeline. Dessa inledande datauppsättningar kallas ofta bronstabeller och utför ofta enkla transformeringar.
Å andra sidan kräver de sista tabellerna i en pipeline, som ofta kallas guldtabeller, ofta komplicerade aggregeringar eller läsning från mål för en AUTO CDC ... INTO operation. Eftersom dessa åtgärder skapar uppdateringar i stället för tillägg stöds de inte som indata till strömmande tabeller. Dessa transformationer passar bättre för materialiserade vyer.
Genom att blanda strömmande tabeller och materialiserade vyer i en enda pipeline kan du förenkla din pipeline, undvika kostsam återinmatning eller ombearbetning av rådata och ha den fulla kraften i SQL för att beräkna komplexa aggregeringar över en effektivt kodad och filtrerad datauppsättning. I följande exempel visas den här typen av blandad bearbetning:
Anmärkning
I de här exemplen används automatisk inläsning för att läsa in filer från molnlagring. Om du vill läsa in filer med Auto Loader i en Unity Catalog-aktiverad pipeline måste du använda externa platser. Mer information om hur du använder Unity Catalog med deklarativa pipelines för Lakeflow finns under Använd Unity Catalog med dina deklarativa pipelines för Lakeflow.
python
@dp.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dp.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dp.materialized_view
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.read.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Läs mer om hur du använder Auto Loader för successiv inläsning av JSON-filer från Azure Storage.
Stream-statisk sammanslagning
Stream-statisk-sammanslagningar är ett bra val när du avnormaliserar en kontinuerlig ström av endast tilläggsdata med en primärt statisk dimensionstabell.
För varje pipelineuppdatering sammanfogas nya poster från strömmen med den senaste ögonblicksbilden av den statiska tabellen. Om poster läggs till eller uppdateras i den statiska tabellen efter att den motsvarande datan från strömningstabellen har bearbetats beräknas inte de resulterande uppgifterna om såvida inte en hel uppdatering utförs.
I pipelines som har konfigurerats för triggad körning returnerar den statiska tabellen resultat vid den tidpunkt då uppdateringen började. I pipelines som konfigurerats för kontinuerlig körning efterfrågas den senaste versionen av den statiska tabellen varje gång tabellen bearbetar en uppdatering.
Följande är ett exempel på en ström-statisk koppling:
python
@dp.table
def customer_sales():
return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Beräkna aggregeringar effektivt
Du kan använda strömmande tabeller för att stegvis beräkna enkla fördelningsaggregeringar som antal, min, max eller summa och algebraiska aggregeringar som medelvärde eller standardavvikelse. Databricks rekommenderar inkrementell aggregering för frågor med ett begränsat antal grupper, till exempel en fråga med en GROUP BY country-sats. Endast nya indata läses med varje uppdatering.
Mer information om hur du skriver deklarativa pipelinefrågor för Lakeflow som utför inkrementella aggregeringar finns i Utföra fönsterbaserade aggregeringar med vattenstämplar.
Använd MLflow-modeller i Lakeflow-deklarativa pipelines
Anmärkning
Om du vill använda MLflow-modeller i en Unity Catalog-aktiverad pipeline måste din pipeline konfigureras för att använda den preview kanalen. Om du vill använda current-kanalen måste du konfigurera din pipeline för att publicera till Hive-metaarkivet.
Du kan använda MLflow-tränade modeller i deklarativa pipelines i Lakeflow. MLflow-modeller behandlas som transformeringar i Azure Databricks, vilket innebär att de agerar på en Spark DataFrame-indata och returnerar resultat som en Spark DataFrame. Eftersom Lakeflow Deklarativa pipelines definierar datauppsättningar mot DataFrames kan du konvertera Apache Spark-arbetsbelastningar som använder MLflow till Lakeflow Deklarativa pipelines med bara några rader kod. Mer information om MLflow finns i MLflow för ML-modellens livscykel.
Om du redan har ett Python-skript som anropar en MLflow-modell kan du anpassa den här koden för att fungera med Lakeflow Deklarativa Pipelines genom att använda @dp.table eller @dp.materialized_view-dekoratören och säkerställ att funktionerna är definierade för att returnera transformationsresultaten. Deklarativa pipelines för Lakeflow installerar inte MLflow som standard, så bekräfta att du har installerat MLflow-biblioteken med %pip install mlflow och har importerat mlflow och dp överst i din källkod. En introduktion till Lakeflows deklarativa syntax för pipeliner finns i Utveckla pipelinekod med Python.
Utför följande steg för att använda MLflow-modeller i Lakeflow deklarativa pipelines:
- Hämta körnings-ID:t och modellnamnet för MLflow-modellen. Körnings-ID och modellnamn används för att konstruera URI:n för MLflow-modellen.
- Använd URI:n för att definiera en Spark UDF för att läsa in MLflow-modellen.
- Anropa UDF i dina tabelldefinitioner för att använda MLflow-modellen.
I följande exempel visas den grundläggande syntaxen för det här mönstret:
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dp.materialized_view
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Som ett fullständigt exempel definierar följande kod en Spark UDF med namnet loaded_model_udf som läser in en MLflow-modell som tränats på låneriskdata. De datakolumner som används för att göra förutsägelsen skickas som ett argument till UDF. Tabellen loan_risk_predictions beräknar förutsägelser för varje rad i loan_risk_input_data.
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dp.materialized_view(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Behålla manuella borttagningar eller uppdateringar
Lakeflow Deklarativa Pipelines tillåter dig att manuellt ta bort eller uppdatera poster från en tabell och göra en uppdatering för att beräkna om underordnade tabeller.
Som standardinställning beräknar Lakeflow's deklarativa pipelines tabellresultat baserat på indata varje gång en pipeline uppdateras, så du måste se till att den borttagna dataposten inte laddas om från källdata. Om du anger egenskapen pipelines.reset.allowed tabell till false förhindras uppdateringar av en tabell, men förhindrar inte att inkrementella skrivningar till tabellerna eller nya data flödar till tabellen.
Följande diagram illustrerar ett exempel med två strömmande tabeller:
-
raw_user_tablematar in rå användardata från en källa. -
bmi_tableberäknar inkrementellt BMI-poäng med hjälp av vikt och höjd frånraw_user_table.
Du vill manuellt ta bort eller uppdatera användarposter från raw_user_table och beräkna om bmi_tablepå nytt.
Följande kod visar hur du anger egenskapen pipelines.reset.allowed tabell till false för att inaktivera fullständig uppdatering för raw_user_table så att avsedda ändringar behålls över tid, men underordnade tabeller omberäknas när en pipelineuppdatering körs:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);