Dela via


Funktioner för att definiera datauppsättningar

pyspark.pipelines-modulen (här kallad dp) implementerar mycket av sina kärnfunktioner med hjälp av dekorationer. De här dekoratörerna accepterar en funktion som definierar antingen en direktuppspelnings- eller batchfråga och returnerar en Apache Spark DataFrame. Följande syntax visar ett enkelt exempel för att ange en Lakeflow Declarative Pipelines dataset:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Den här sidan innehåller en översikt över de funktioner och sökfrågor som definierar datauppsättningar i Lakeflow Deklarativa Pipelines. En fullständig lista över tillgängliga dekoratörer finns i utvecklarreferens för Lakeflow deklarativa pipelines.

De funktioner som du använder för att definiera datauppsättningar bör inte innehålla godtycklig Python-logik som inte är relaterad till datamängden, inklusive anrop till API:er från tredje part. Lakeflow Declarative Pipelines kör dessa funktioner flera gånger under planering, validering och uppdateringar. Att inkludera godtycklig logik kan leda till oväntade resultat.

Börja med att läsa in data för att definiera en datamängd

Funktioner som används för att definiera datauppsättningar för deklarativa pipelines i Lakeflow börjar vanligtvis med en spark.read eller spark.readStream åtgärd. Dessa läsåtgärder returnerar ett statiskt eller strömmande DataFrame-objekt som du använder för att definiera ytterligare transformeringar innan dataramen returneras. Andra exempel på Spark-åtgärder som returnerar en DataFrame är spark.table, eller spark.range.

Functions bör aldrig referera till DataFrames som definierats utanför funktionen. Om du försöker referera till DataFrames som definierats med ett annat omfång kan det leda till oväntat beteende. Ett exempel på ett metaprogrammeringsmönster för att skapa flera tabeller finns i Skapa tabeller i en for loop.

I följande exempel visas den grundläggande syntaxen för att läsa data med hjälp av batch- eller strömningslogik:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.read
    .format("cloudFiles")
    .option("cloudFile.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Om du behöver läsa data från ett externt REST API implementerar du den här anslutningen med hjälp av en anpassad Python-datakälla. Se Anpassade datakällor i PySpark.

Anmärkning

Det är möjligt att skapa godtyckliga Apache Spark-dataramar från Python-samlingar med data, inklusive Pandas DataFrames, diktat och listor. Dessa mönster kan vara användbara under utveckling och testning, men de flesta datauppsättningsdefinitioner för Lakeflow deklarativa pipelines bör börja med att importera data från filer, ett externt system eller en befintlig tabell eller vy.

Kedjetransformeringar

Lakeflows deklarativa pipelines stöder nästan alla Apache Spark DataFrame-transformationer. Du kan inkludera valfritt antal transformeringar i din datauppsättningsdefinitionsfunktion, men du bör se till att de metoder du använder alltid returnerar ett DataFrame-objekt.

Om du har en mellanliggande transformering som hanterar flera underordnade arbetsbelastningar och du inte behöver materialisera den som en tabell, kan du använda @dp.temporary_view() för att lägga till en tillfällig vy i din pipeline. Du kan sedan referera till den här vyn i spark.read.table("temp_view_name") i flera underordnade datauppsättningsdefinitioner. Följande syntax visar det här mönstret:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Detta säkerställer att Lakeflow Deklarativa Pipelines har fullt medveten om omvandlingarna i din översikt under pipelineplaneringen och förhindrar potentiella problem med godtycklig Python-kod som körs utanför datauppsättningsdefinitioner.

I din funktion kan du länka samman DataFrames för att skapa nya DataFrames utan att skriva inkrementella resultat som vyer, materialiserade vyer eller strömmande tabeller, som i följande exempel:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Om alla dina DataFrames utför sina inledande läsningar med hjälp av batchlogik är ditt returresultat en statisk DataFrame. Om du har frågor som strömmar är ditt returresultat en strömmande DataFrame.

Returnera en DataFrame

Använd @dp.table för att skapa en strömmande tabell från resultatet av en direktuppspelningsläsning. Använd @dp.materialized_view för att skapa en materialiserad vy från resultatet av en batchläsning. De flesta andra dekoratörer arbetar med både strömmande och statiska DataFrames, medan några kräver en strömmande DataFrame.

Den funktion som används för att definiera en datauppsättning måste returnera en Spark DataFrame. Använd aldrig metoder som som en del av din Lakeflow Deklarativa Pipelines datauppsättningskod sparar eller skriver till filer eller tabeller.

Exempel på Apache Spark-åtgärder som aldrig ska användas i Lakeflow deklarativa pipelineskoder:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Anmärkning

Lakeflow Deklarativa Pipelines stöder också användning av Pandas på Spark för funktioner för definition av datauppsättningar. Se Pandas API på Spark.

Använda SQL i en Python-pipeline

PySpark har stöd för operatorn spark.sql för att skriva DataFrame-kod med hjälp av SQL. När du använder det här mönstret i källkoden för Lakeflow Deklarativa Pipelines kompileras det till materialiserade vyer eller strömmande tabeller.

Följande kodexempel motsvarar användning spark.read.table("catalog_name.schema_name.table_name") för datauppsättningens frågelogik:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read och dlt.read_stream (äldre)

Den äldre dlt modulen innehåller dlt.read() och dlt.read_stream() funktioner som introducerades för att stödja funktioner i det äldre pipelinepubliceringsläget. Dessa metoder stöds, men Databricks rekommenderar att du alltid använder spark.read.table() funktionerna och spark.readStream.table() på grund av följande:

  • Funktionerna dlt har begränsat stöd för att läsa datauppsättningar som definierats utanför den aktuella pipelinen.
  • Funktionerna spark har stöd för att ange alternativ, till exempel skipChangeCommits, för att läsa åtgärder. Att ange alternativ stöds inte av dlt funktionerna.
  • Modulen dlt har ersatts av modulen pyspark.pipelines . Databricks rekommenderar att använda from pyspark import pipelines as dp för att importera pyspark.pipelines som ska användas när du skriver kod för Lakeflow deklarativa pipelines i Python.