Dela via


Optimera tillståndskänslig bearbetning i deklarativa Lakeflow-pipelines med vattenstämplar

För att effektivt hantera data som lagras i tillståndet, använd vattenstämplar när du utför tillståndsbaserad strömbearbetning i Lakeflow-deklarativa pipelines, inklusive aggregeringar, kopplingar och deduplicering. Den här artikeln beskriver hur du använder vattenmärken i dina Lakeflow deklarativa pipelineförfrågningar och innehåller exempel på de rekommenderade operationerna.

Anmärkning

För att säkerställa att frågor som utför aggregeringar bearbetas inkrementellt och inte helt omberäknas med varje uppdatering måste du använda vattenstämplar.

Vad är en vattenstämpel?

Vid dataströmbearbetning är en vattenstämpel en Apache Spark-funktion som kan definiera ett tidsbaserat tröskelvärde för bearbetning av data när tillståndskänsliga åtgärder som aggregeringar utförs. Data som anländer bearbetas tills tröskelvärdet har nåtts, då tidsfönstret som definieras av tröskelvärdet stängs. Vattenstämplar kan användas för att undvika problem vid frågebearbetning, främst vid bearbetning av större datamängder eller långvarig bearbetning. Dessa problem kan omfatta hög svarstid vid skapande av resultat och till och med OOM-fel (out-of-memory) på grund av mängden data som lagras i tillståndet under bearbetningen. Eftersom strömmande data i sig är osorterade stöder vattenmärken också att korrekta operationer som aggregeringar i tidsfönster kan beräknas.

Mer information om hur du använder vattenstämplar i strömbearbetning finns i Vattenstämpling i Apache Spark Structured Streaming och Tillämpa vattenstämplar för att kontrollera tröskelvärden för databearbetning.

Hur definierar du en vattenstämpel?

Du definierar en vattenstämpel genom att ange ett tidsstämpelfält och ett värde som representerar tidströskeln för att sena data ska tas emot. Data anses vara sena om de tas emot efter det definierade tidströskelvärdet. Om tröskelvärdet till exempel definieras som 10 minuter kan poster som anländer efter tröskelvärdet på 10 minuter tas bort.

Eftersom poster som tas emot efter det definierade tröskelvärdet kan tas bort är det viktigt att välja ett tröskelvärde som uppfyller kraven på svarstid kontra korrekthet. Om du väljer ett mindre tröskelvärde genereras poster tidigare, men det innebär också att sena poster är mer benägna att tas bort. Ett större tröskelvärde innebär en längre väntetid men eventuellt mer fullständighet av data. På grund av den större tillståndsstorleken kan ett större tröskelvärde också kräva ytterligare beräkningsresurser. Eftersom tröskelvärdet beror på dina data- och bearbetningskrav är det viktigt att testa och övervaka bearbetningen för att fastställa ett optimalt tröskelvärde.

Du använder withWatermark() funktionen i Python för att definiera en vattenstämpel. I SQL använder du WATERMARK -satsen för att definiera en vattenstämpel:

python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Använda vattenstämplar med ström-till-ström-anslutningar

För sammanfogningar mellan strömmar måste du definiera en vattenstämpel på båda sidor av kopplingen och ett tidsintervall-villkor. Eftersom varje kopplingskälla har en ofullständig vy över data krävs tidsintervallsatsen för att meddela strömningsmotorn när inga ytterligare matchningar kan göras. Tidsintervallsatsen måste använda samma fält som används för att definiera vattenstämplarna.

Eftersom det kan finnas tillfällen då varje ström kräver olika tröskelvärden för vattenstämplar behöver strömmarna inte ha samma tröskelvärden. För att undvika att data saknas behåller strömningsmotorn en global tidsstämpel baserad på den långsammaste strömmen.

I följande exempel kopplas en ström av annonsvisningar och en ström av klick från användare på annonser. I det här exemplet måste ett klick ske inom 3 minuter efter exponeringen. När tidsintervallet på 3 minuter har passerat tas rader från tillståndet som inte längre kan matchas bort.

python

from pyspark import pipelines as dp

dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Utför fönsteraggregeringar med vattenstämplar

En vanlig tillståndskänslig åtgärd för strömmande data är en fönsterbaserad aggregering. Fönsteraggregeringar liknar grupperade aggregeringar, förutom att aggregeringsvärden returneras för den uppsättning rader som ingår i det definierade fönstret.

Ett fönster kan definieras som en viss längd och en aggregeringsåtgärd kan utföras på alla rader som ingår i fönstret. Spark Streaming stöder tre typer av fönster:

  • Rullande (fasta) fönster: En serie tidsintervall med fast storlek, icke-överlappande och sammanhängande. En indatapost tillhör endast ett enda fönster.
  • Skjutbara fönster: På samma sätt som rullande fönster är skjutbara fönster i fast storlek, men fönstren kan överlappa varandra, vilket innebär att ett register kan falla i flera fönster.

När data kommer förbi slutet av fönstret plus vattenstämpelns längd godkänns inga nya data för fönstret, resultatet av aggregeringen genereras och tillståndet för fönstret tas bort.

I följande exempel beräknas en summa av visningar var 5:e minut med ett fast fönster. I det här exemplet använder select-satsen aliaset impressions_window, och sedan definieras själva fönstret som en del av GROUP BY -satsen. Fönstret måste baseras på samma tidsstämpelkolumn som vattenstämpeln, clickTimestamp i kolumnen i det här exemplet.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Ett liknande exempel i Python för att beräkna vinst över timsbegränsade fönster:

from pyspark import pipelines as dp

@dp.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplicera strömmande poster

Strukturerad direktuppspelning har bearbetningsgarantier exakt en gång, men avduplicerar inte automatiskt poster från datakällor. Eftersom många meddelandeköer till exempel har minst en gång garantier bör dubbletter av poster förväntas när du läser från en av dessa meddelandeköer. Du kan använda dropDuplicatesWithinWatermark() funktionen för att av duplicera poster i ett angivet fält och ta bort dubbletter från en dataström även om vissa fält skiljer sig åt (till exempel händelsetid eller ankomsttid). Du måste ange en vattenstämpel för att använda dropDuplicatesWithinWatermark() funktionen. Alla duplicerade data som tas emot inom det tidsintervall som anges av vattenstämpeln tas bort.

Ordnade data är viktiga eftersom data som inte är i ordning gör att vattenmärket hoppar framåt felaktigt. När äldre data anländer anses de vara sena och ignoreras. Använd alternativet withEventTimeOrder för att bearbeta den första ögonblicksbilden i ordning baserat på tidsstämpeln som anges i vattenstämpeln. Alternativet withEventTimeOrder kan deklareras i koden som definierar datauppsättningen eller i pipelineinställningarna med hjälp av spark.databricks.delta.withEventTimeOrder.enabled. Till exempel:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Anmärkning

Alternativet withEventTimeOrder stöds endast med Python.

I följande exempel bearbetas data i ordning efter clickTimestamp, och poster som anländer inom 5 sekunder efter varandra och som innehåller dubbletter av userId och clickAdId-kolumner tas bort.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimera pipelinekonfigurationen för tillståndskänslig bearbetning

För att förhindra produktionsproblem och långa svarstider rekommenderar Databricks att du aktiverar RocksDB-baserad tillståndshantering för tillståndskänslig dataströmbearbetning, särskilt om bearbetningen kräver att du sparar en stor mängd mellanliggande tillstånd.

Severless-pipelines hanterar automatiskt konfigurationer för tillståndslager.

Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration innan du distribuerar en pipeline:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Mer information om RocksDB-tillståndslagret, inklusive konfigurationsrekommendationer för RocksDB, finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.