Dela via


Välj ett utdataläge för strukturerad direktuppspelning

I den här artikeln beskrivs hur du väljer ett utdataläge för tillståndskänslig strömning. Endast tillståndskänsliga strömmar som innehåller aggregeringar kräver en konfiguration av utdataläget.

Kopplingar stöder endast tilläggsutdataläget och utdataläget påverkar inte dedupliceringen. De godtyckliga tillståndskänsliga operatorerna mapGroupsWithState och flatMapGroupsWithState genererar poster med hjälp av sin egen anpassade logik, så att strömmens utdataläge inte påverkar deras beteende.

För tillståndslös direktuppspelning fungerar alla utdatalägen på samma sätt.

Om du vill konfigurera utdataläget korrekt måste du förstå tillståndskänslig strömning, vattenstämplar och utlösare. Mer information finns i följande artiklar:

Vad är utdataläge?

Utdataläget för en strukturerad strömfråga avgör vilka poster frågans operatorer avger under varje utlösningshändelse. De tre typerna av poster som kan genereras är:

  • Registrerar att framtida bearbetning inte förändrar något.
  • De poster som har ändrats sedan den senaste triggningen.
  • Alla poster i tillståndstabellen.

Att veta vilka typer av poster som ska genereras är viktigt för tillståndskänsliga operatorer eftersom en viss rad som skapas av en tillståndskänslig operator kan ändras från utlösare till utlösare. När en strömningsaggregeringsoperator till exempel tar emot fler rader för ett visst fönster kan det fönstrets aggregeringsvärden ändras mellan utlösare.

För tillståndslösa operatorer påverkar skillnaden mellan posttyper inte operatorns beteende. Posterna som en tillståndslös operator emitterar under en trigger är alltid källposterna som behandlas under den triggern.

Tillgängliga utdatalägen

Det finns tre utdatalägen som talar om för en operator vilka poster som ska genereras under en viss utlösare:

Utdataläge beskrivning
Tilläggsläge (standard) Som standard körs strömmande frågor i tilläggsläge. I det här läget genererar operatorer endast rader som inte kommer att ändras i framtida triggare. Tillståndsbevarande operatorer använder vattenmärket för att avgöra när detta inträffar.
Uppdateringsläge I uppdateringsläge sänder operatorerna ut alla rader som har ändrats under triggaren, även om posten som genereras kan ändras i en efterföljande trigger.
Fullständigt läge Fullständigt läge fungerar endast med strömningsaggregeringar. I fullständigt läge skickas alla rader som operatorn har producerat vidare nedströms.

Produktionsöverväganden

För många tillståndskänsliga strömningsåtgärder måste du välja mellan tilläggs- och uppdateringslägen. I följande avsnitt beskrivs överväganden som kan ligga till hjälp för ditt beslut.

Anteckning

Complete mode har vissa användningsområden, men kan fungera dåligt när datamängden ökar. Databricks rekommenderar att du använder materialiserade vyer för att få semantiska garantier associerade med fullständigt läge för inkrementell bearbetning för många tillståndsberoende operationer. Se även Materialiserade vyer.

Programsemantik

Programsemantik beskriver hur underordnade program använder strömmande data.

Om nedströmstjänster behöver vidta en enda åtgärd för varje nedströmsskrivning, använd läget "tillägg" i de flesta fall. Om du till exempel har en underordnad meddelandetjänst som skickar meddelanden för varje ny post som skrivs till mottagaren, ser tilläggsläget till att varje post bara skrivs en gång. Uppdateringsläget skriver posten varje gång som tillståndsinformationen ändras, vilket skulle resultera i många uppdateringar.

Om nedströms tjänster behöver uppdaterade resultat säkerställer uppdateringsläget att mottagaren förblir så uppdaterad som möjligt. Exempel är en maskininlärningsmodell som läser funktioner i realtid eller en instrumentpanel för analys som spårar aggregeringar i realtid.

Operator- och mottagarkompatibilitet

Strukturerad direktuppspelning stöder inte alla åtgärder som är tillgängliga i Apache Spark och vissa strömningsåtgärder stöds inte i alla utdatalägen. Mer information om operatorbegränsningar finns i OSS-strömningsdokumenten.

Alla mottagare stöder inte alla utdatalägen. Både Delta Lake, som stöder alla hanterade Unity Catalog-tabeller, och Kafka stöder alla utdatalägen. Mer information om sink-kompatibilitet finns i OSS-strömningsdokumenten.

Svarstid och kostnad

Utgångsläget påverkar hur lång tid som måste passera innan en post skrivs, och frekvensen och mängden data som skrivs kan påverka kostnaderna för strömmande pipeliner.

Tilläggsläget tvingar tillståndskänsliga operatorer att generera resultat först när tillståndskänsliga resultat har slutförts, vilket är minst lika länge som vattenstämpelfördröjningen. En vattenstämpelfördröjning på 1 hour i utdataläget för tillägg innebär att posterna har minst en timmes fördröjning innan de skickas nedströms.

Uppdateringsläget resulterar i en skrivning per utlösare per aggregeringsvärde. Om din mottagare debiteras per skrivning per enskild post kan det vara dyrt om posterna uppdateras många gånger innan vattenstämpelfördröjningen har passerat.

Exempel på konfigurationer

Följande kodexempel visar hur du konfigurerar utdataläge för direktuppspelningsuppdateringar till Unity Catalog-tabeller:

python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Se OSS-dokument för PySpark DataStreamWriter.outputMode eller Scala DataStreamWriter.outputMode.

Exempel på tillståndskänsliga strömnings- och utdatalägen

Följande exempel är avsett att hjälpa dig att resonera genom hur utdataläget interagerar med vattenstämplar för tillståndskänslig strömning.

Överväg en strömmande aggregering som beräknar de totala intäkterna som genereras varje timme i en butik med en vattenstämpelfördröjning på 15 minuter. Den första mikrobatchen bearbetar följande poster:

  • $15 kl. 14:40
  • $10 kl. 14:30
  • $30 kl. 15:10

Nu är motorns tidsstämpel 14:55 eftersom den subtraherar 15 minuter (fördröjningen) från den senaste tiden som registrerats (15:10). Strömningsaggregeringsoperatorn har följande status:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $30

I följande tabell beskrivs vad som skulle hända i varje utdataläge:

Utdataläge Resultat och orsak
Lägga till Strömningsaggregeringsoperatorn sänder inget vidare nedströms. Detta beror på att båda dessa fönster kan ändras när nya värden visas med en efterföljande utlösare: vattenmärket vid 14:55 anger att poster efter 14:55 fortfarande kan komma, och dessa poster kan falla i antingen [2pm, 3pm]-fönstret eller [3pm, 4pm]-fönstret.
Uppdatera Operatorn genererar båda posterna eftersom båda posterna tog emot uppdateringar.
Klart Operatören sänder ut alla poster.

Anta nu att strömmen tar emot ytterligare en post:

  • $20 kl. 15:20

Vattenstämpeln uppdateras till 15:05 eftersom motorn drar bort 15 minuter från 15:20. Nu har strömningsaggregeringsoperatorn följande tillstånd:

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $50

I följande tabell beskrivs vad som skulle hända i varje utdataläge:

Utdataläge Resultat och orsak
Lägga till Strömningsaggregeringsoperatorn observerar att vattenstämpeln vid 15:05 är högre än slutet av [2pm, 3pm]-fönstret. Enligt definitionen av vattenstämpeln kan det fönstret inte längre ändras, så det avger fönstret [2pm, 3pm].
Uppdatera Strömningsaggregeringsoperatorn genererar [3pm, 4pm]-fönstret eftersom tillståndsvärdet har ändrats från 30 USD till 50 USD.
Klart Operatören sänder ut alla poster.

Följande sammanfattar hur tillståndskänsliga operatorer beter sig i varje append-läge:

  • I append-läge skriver du dataposter en gång efter vattenmärkesfördröjningen.
  • I uppdateringsläge, skriv poster som har ändrats sedan föregående utlösare.
  • I fullständigt läge, skriv alla poster som någonsin skapats av den tillståndskänsliga operatorn.