Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Een streamingtabel is een Delta-tabel met extra ondersteuning voor streaming of incrementele gegevensverwerking. Een streamingtabel kan het doelwit zijn van een of meer datastromen in een ETL-pijplijn.
Streamingtabellen zijn een goede keuze voor gegevensopname om de volgende redenen:
- Elke invoerrij wordt slechts één keer verwerkt, waarmee het overgrote deel van de gegevensinvoertaken wordt gemodelleerd (dat wil zeggen door rijen aan een tabel toe te voegen of bij te werken).
- Ze kunnen grote hoeveelheden alleen toevoegbare gegevens verwerken.
Streamingtabellen zijn ook een goede keuze voor streamingtransformaties met lage latentie om de volgende redenen:
- Redeneer over rijen en tijdvensters
- Grote hoeveelheden gegevens verwerken
- Lage latentie
In het volgende diagram ziet u hoe streamingtabellen werken.
Bij elke update lezen de stromen die zijn gekoppeld aan een streamingtabel de gewijzigde informatie in een streamingbron en voegen nieuwe gegevens toe aan die tabel.
Streamingtabellen worden gedefinieerd en bijgewerkt door één pijplijn. U definieert expliciet streamingtabellen in de broncode van de pijplijn. Tabellen die zijn gedefinieerd door een pijplijn, kunnen niet worden gewijzigd of bijgewerkt door een andere pijplijn. U kunt meerdere stromen definiëren die moeten worden toegevoegd aan één streamingtabel.
Opmerking
Wanneer u een streamingtabel buiten een pijplijn maakt met behulp van Databricks SQL, maakt Azure Databricks een pijplijn die wordt gebruikt om de tabel bij te werken. U kunt de pijplijn zien door taken en pijplijnen te selecteren in de linkernavigatiebalk in uw werkruimte. U kunt de kolom Pijplijntype toevoegen aan uw weergave. Streamingtabellen die zijn gemaakt in declaratieve pijplijnen van Lakeflow hebben een type ETL. Streamingtabellen die zijn gemaakt in Databricks SQL hebben een type MV/ST.
Voor meer informatie over stromen, zie Incrementeel gegevens laden en verwerken met Lakeflow Declarative Pipelines-stromen.
Streamingtabellen voor gegevensinvoer
Streamingtabellen zijn ontworpen voor gegevensbronnen die alleen toevoegingen toelaten en verwerken invoer slechts één keer.
In het volgende voorbeeld ziet u hoe u een streamingtabel gebruikt om nieuwe bestanden op te nemen uit cloudopslag.
Python
from pyspark import pipelines as dp
# create a streaming table
@dp.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
Wanneer u de spark.readStream functie in een definitie van een gegevensset gebruikt, zorgt dit ervoor dat Lakeflow-declaratieve pijplijnen de gegevensset behandelen als een stroom en de gemaakte tabel een streamingtabel is.
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
Zie Gegevens laden met declaratieve pijplijnen van Lakeflow voor meer informatie over het laden van gegevens in een streamingtabel.
Het volgende diagram illustreert hoe append-only streaming-tabellen werken.
Een rij die al is toegevoegd aan een streamingtabel, zal niet opnieuw worden uitgevraagd bij latere updates van de pijplijn. Als u de query wijzigt (bijvoorbeeld van SELECT LOWER (name) naar SELECT UPPER (name)), worden bestaande rijen niet bijgewerkt naar hoofdletters, maar nieuwe rijen zijn hoofdletters. U kunt een volledige vernieuwingsoperatie starten om alle eerdere gegevens opnieuw op te halen uit de brontabel, en zo alle rijen in de streamingtabel bij te werken.
Streamingtabellen en streaming met lage latentie
Streamingtabellen zijn ontworpen voor streaming met lage latentie over gebonden toestand. Streamingtabellen maken gebruik van controlepuntbeheer, waardoor ze geschikt zijn voor streaming met lage latentie. Ze verwachten echter streams die van nature begrensd zijn of voorzien van een watermerk.
Een natuurlijk gebonden stroom wordt geproduceerd door een streaminggegevensbron met een goed gedefinieerd begin en einde. Een voorbeeld van een natuurlijk gebonden stroom is het lezen van gegevens uit een map met bestanden waarin geen nieuwe bestanden worden toegevoegd nadat een eerste batch bestanden is geplaatst. De stream wordt beschouwd als gebonden omdat het aantal bestanden eindig is en vervolgens eindigt de stroom nadat alle bestanden zijn verwerkt.
U kunt ook een watermerk gebruiken om een stroom te binden. Een watermerk in Spark Structured Streaming is een mechanisme waarmee late gegevens kunnen worden verwerkt door op te geven hoe lang het systeem moet wachten op vertraagde gebeurtenissen voordat het tijdvenster als voltooid wordt overwogen. Een niet-gebonden stroom die geen watermerk heeft, kan ertoe leiden dat een pijplijn mislukt vanwege geheugendruk.
Voor meer informatie over stateful stroomverwerking, zie Optimaliseer de stateful verwerking in Lakeflow-declaratieve pijplijnen met behulp van watermerken.
Stream-snapshot-koppelingen
Stream-snapshot joins zijn joins tussen een stream en een dimensie waarvan een opname wordt gemaakt wanneer streams starten. Deze joins worden niet opnieuw gecomputeerd als de dimensie verandert nadat de stream is gestart, omdat de dimensietabel wordt behandeld als een momentopname in de tijd en wijzigingen in de dimensietabel nadat de stream is gestart, niet worden weerspiegeld, tenzij u de dimensietabel opnieuw laadt of vernieuwt. Dit is redelijk gedrag als u kleine verschillen in een join kunt accepteren. Een benaderende samenvoeging is bijvoorbeeld acceptabel wanneer het aantal transacties vele ordes van grootte groter is dan het aantal klanten.
In het volgende codevoorbeeld voegen we een dimensietabel toe, klanten, met twee rijen met een steeds groter wordende gegevensset, transacties. We materialiseren een join tussen deze twee gegevenssets in een tabel met de naam sales_report. Houd er rekening mee dat als een extern proces de tabel klanten bijwerkt door een nieuwe rij (customer_id=3, name=Zoya) toe te voegen, deze nieuwe rij niet aanwezig is in de join omdat er een momentopname is gemaakt van de statische dimensietabel toen streams werden gestart.
from pyspark import pipelines as dp
@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
return spark.read.table("customers")
@dp.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return facts.join(dims, on="customer_id", how="inner")
Beperkingen voor streaming-tabellen
Streamingtabellen hebben de volgende beperkingen:
-
Beperkte evolutie: U kunt de query wijzigen zonder de volledige gegevensset opnieuw te compileren. Zonder een volledige vernieuwing ziet een streamingtabel slechts één keer elke rij, zodat verschillende query's verschillende rijen hebben verwerkt. Als u bijvoorbeeld
UPPER()toevoegt aan een veld in de query, worden alleen de rijen die na de wijziging zijn verwerkt, in hoofdletters weergegeven. Dit betekent dat u rekening moet houden met alle eerdere versies van de query die worden uitgevoerd op uw gegevensset. Voor het opnieuw verwerken van bestaande rijen die vóór de wijziging zijn verwerkt, is een volledige vernieuwing vereist. - State Management: Streaming-tabellen hebben een lage latentie, dus u moet ervoor zorgen dat de stromen waarover ze werken van nature begrensd zijn of begrensd met een watermerk. Voor meer informatie, zie statusgevoelige verwerking optimaliseren in declaratieve Lakeflow-pijplijnen met watermerken.
- Joins worden niet herberekend: Joins in streamingtabellen worden niet herberekend wanneer dimensies veranderen. Dit kenmerk kan goed zijn voor "snel-maar-fout"-scenario's. Als u wilt dat uw weergave altijd juist is, kunt u een gerealiseerde weergave gebruiken. Gerealiseerde weergaven zijn altijd correct omdat ze joins automatisch opnieuw compileren wanneer dimensies veranderen. Voor meer informatie, zie gerealiseerde weergaven.