Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
              pyspark.pipelines-modulen (här kallad dp) implementerar mycket av sina kärnfunktioner med hjälp av dekorationer. De här dekoratörerna accepterar en funktion som definierar antingen en direktuppspelnings- eller batchfråga och returnerar en Apache Spark DataFrame. Följande syntax visar ett enkelt exempel för att ange en Lakeflow Declarative Pipelines dataset:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset
Den här sidan innehåller en översikt över de funktioner och sökfrågor som definierar datauppsättningar i Lakeflow Deklarativa Pipelines. En fullständig lista över tillgängliga dekoratörer finns i utvecklarreferens för Lakeflow deklarativa pipelines.
De funktioner som du använder för att definiera datauppsättningar bör inte innehålla godtycklig Python-logik som inte är relaterad till datamängden, inklusive anrop till API:er från tredje part. Lakeflow Declarative Pipelines kör dessa funktioner flera gånger under planering, validering och uppdateringar. Att inkludera godtycklig logik kan leda till oväntade resultat.
Börja med att läsa in data för att definiera en datamängd
Funktioner som används för att definiera datauppsättningar för deklarativa pipelines i Lakeflow börjar vanligtvis med en spark.read eller spark.readStream åtgärd. Dessa läsåtgärder returnerar ett statiskt eller strömmande DataFrame-objekt som du använder för att definiera ytterligare transformeringar innan dataramen returneras. Andra exempel på Spark-åtgärder som returnerar en DataFrame är spark.table, eller spark.range.
Functions bör aldrig referera till DataFrames som definierats utanför funktionen. Om du försöker referera till DataFrames som definierats med ett annat omfång kan det leda till oväntat beteende. Ett exempel på ett metaprogrammeringsmönster för att skapa flera tabeller finns i Skapa tabeller i en for loop.
I följande exempel visas den grundläggande syntaxen för att läsa data med hjälp av batch- eller strömningslogik:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )
Om du behöver läsa data från ett externt REST API implementerar du den här anslutningen med hjälp av en anpassad Python-datakälla. Se Anpassade datakällor i PySpark.
Anmärkning
Det är möjligt att skapa godtyckliga Apache Spark-dataramar från Python-samlingar med data, inklusive Pandas DataFrames, diktat och listor. Dessa mönster kan vara användbara under utveckling och testning, men de flesta datauppsättningsdefinitioner för Lakeflow deklarativa pipelines bör börja med att importera data från filer, ett externt system eller en befintlig tabell eller vy.
Kedjetransformeringar
Lakeflows deklarativa pipelines stöder nästan alla Apache Spark DataFrame-transformationer. Du kan inkludera valfritt antal transformeringar i din datauppsättningsdefinitionsfunktion, men du bör se till att de metoder du använder alltid returnerar ett DataFrame-objekt.
Om du har en mellanliggande transformering som hanterar flera underordnade arbetsbelastningar och du inte behöver materialisera den som en tabell, kan du använda @dp.temporary_view() för att lägga till en tillfällig vy i din pipeline. Du kan sedan referera till den här vyn i spark.read.table("temp_view_name") i flera underordnade datauppsättningsdefinitioner. Följande syntax visar det här mönstret:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)
Detta säkerställer att Lakeflow Deklarativa Pipelines har fullt medveten om omvandlingarna i din översikt under pipelineplaneringen och förhindrar potentiella problem med godtycklig Python-kod som körs utanför datauppsättningsdefinitioner.
I din funktion kan du länka samman DataFrames för att skapa nya DataFrames utan att skriva inkrementella resultat som vyer, materialiserade vyer eller strömmande tabeller, som i följande exempel:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)
Om alla dina DataFrames utför sina inledande läsningar med hjälp av batchlogik är ditt returresultat en statisk DataFrame. Om du har frågor som strömmar är ditt returresultat en strömmande DataFrame.
Returnera en DataFrame
Använd @dp.table för att skapa en strömmande tabell från resultatet av en direktuppspelningsläsning. Använd @dp.materialized_view för att skapa en materialiserad vy från resultatet av en batchläsning. De flesta andra dekoratörer arbetar med både strömmande och statiska DataFrames, medan några kräver en strömmande DataFrame.
Den funktion som används för att definiera en datauppsättning måste returnera en Spark DataFrame. Använd aldrig metoder som som en del av din Lakeflow Deklarativa Pipelines datauppsättningskod sparar eller skriver till filer eller tabeller.
Exempel på Apache Spark-åtgärder som aldrig ska användas i Lakeflow deklarativa pipelineskoder:
collect()count()toPandas()save()saveAsTable()start()toTable()
Anmärkning
Lakeflow Deklarativa Pipelines stöder också användning av Pandas på Spark för funktioner för definition av datauppsättningar. Se Pandas API på Spark.
Använda SQL i en Python-pipeline
PySpark har stöd för operatorn spark.sql för att skriva DataFrame-kod med hjälp av SQL. När du använder det här mönstret i källkoden för Lakeflow Deklarativa Pipelines kompileras det till materialiserade vyer eller strömmande tabeller.
Följande kodexempel motsvarar användning spark.read.table("catalog_name.schema_name.table_name") för datauppsättningens frågelogik:
@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
              dlt.read och dlt.read_stream (äldre)
Den äldre dlt modulen innehåller dlt.read() och dlt.read_stream() funktioner som introducerades för att stödja funktioner i det äldre pipelinepubliceringsläget. Dessa metoder stöds, men Databricks rekommenderar att du alltid använder spark.read.table() funktionerna och spark.readStream.table() på grund av följande:
- Funktionerna 
dlthar begränsat stöd för att läsa datauppsättningar som definierats utanför den aktuella pipelinen. - Funktionerna 
sparkhar stöd för att ange alternativ, till exempelskipChangeCommits, för att läsa åtgärder. Att ange alternativ stöds inte avdltfunktionerna. - Modulen 
dlthar ersatts av modulenpyspark.pipelines. Databricks rekommenderar att användafrom pyspark import pipelines as dpför att importerapyspark.pipelinessom ska användas när du skriver kod för Lakeflow deklarativa pipelines i Python.