Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
CDC (Change Data Capture) är ett dataintegreringsmönster som samlar in ändringar som gjorts i data i ett källsystem, till exempel infogningar, uppdateringar och borttagningar. Dessa ändringar, som representeras som en lista, kallas ofta för ett CDC-flöde. Du kan bearbeta dina data mycket snabbare om du arbetar med ett CDC-flöde i stället för att läsa hela källdatauppsättningen. Transaktionsdatabaser som SQL Server, MySQL och Oracle genererar CDC-feeds. Deltatabeller genererar sitt eget CDC-flöde, som kallas för ett ändringsdataflöde (CDF).
Följande diagram visar att när en rad i en källtabell som innehåller anställdas data uppdateras genererar den en ny uppsättning rader i ett CDC-flöde som endast innehåller ändringarna. Varje rad i CDC-feeden innehåller vanligtvis ytterligare metadata, inklusive åtgärden, till exempel UPDATE och en kolumn som kan användas för att deterministiskt sortera varje rad i CDC-feeden så att du kan hantera oordningsuppdateringar. Kolumnen i följande diagram avgör till exempel sequenceNum radordningen i CDC-feeden:
Bearbeta ett ändringsdataflöde: Behåll endast de senaste data jämfört med att behålla historiska versioner av data
Bearbetningen av ett ändrat dataflöde kallas långsamt föränderliga dimensioner (SCD). När du bearbetar ett CDC-flöde har du ett val att göra:
- Behåller du bara de senaste data (dvs. skriver över befintliga data)? Detta kallas SCD Typ 1.
- Eller behåller du en historik över ändringar i data? Detta kallas SCD typ 2.
SCD-typ 1-bearbetning innebär att gamla data skrivs över med nya data när en ändring sker. Det innebär att ingen historik över ändringarna sparas. Endast den senaste versionen av data är tillgänglig. Det är en enkel metod och används ofta när historiken för ändringar inte är viktig, till exempel korrigera fel eller uppdatera icke-kritiska fält som kundens e-postadresser.
SCD-typ 2-bearbetning upprätthåller ett historiskt register över ändringar av data genom att skapa ytterligare poster för att samla in olika versioner av data över tid. Varje version av data är tidsstämplad eller taggad med metadata som gör det möjligt för användare att spåra när en ändring inträffade. Detta är användbart när det är viktigt att spåra utvecklingen av data, till exempel spåra kundadressändringar över tid i analyssyfte.
Exempel på SCD-typ 1- och typ 2-bearbetning med Lakeflow Deklarativa pipelines
Exemplen i det här avsnittet visar hur du använder SCD Typ 1 och Typ 2.
Steg 1: Förbereda exempeldata
I det här exemplet genererar du ett CDC-exempelflöde. Skapa först en notebook-fil och klistra in följande kod i den. Uppdatera variablerna i början av kodblocket till en katalog och ett schema där du har behörighet att skapa tabeller och vyer.
Den här koden skapar en ny Delta-tabell som innehåller flera ändringsposter. Schemat är följande:
-
id– Heltal, unik identifierare för den här medarbetaren -
name- Sträng, namnet på medarbetaren -
role- Sträng, anställdens roll -
country- Sträng, landskod, där medarbetare arbetar -
operation– Ändra typ(till exempelINSERT,UPDATE, ellerDELETE) -
sequenceNum– Heltal, identifierar den logiska ordningen för CDC-händelser i källdata. Lakeflow Declarative Pipelines använder denna sekvensering för att hantera ändringshändelser som anländer i fel ordning.
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5),
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
Du kan förhandsgranska dessa data med följande SQL-kommando:
SELECT *
FROM mycatalog.myschema.employees_cdf
Steg 2: Använd SCD-typ 1 för att endast behålla de senaste data
Vi rekommenderar att du använder AUTO CDC API:et i Lakeflow deklarativa pipelines för att bearbeta en ändringsdatamatning till en SCD Type 1-tabell.
- Skapa en ny notebook-fil.
- Klistra in följande kod i den.
- Skapa och ansluta till en pipeline.
Funktionen employees_cdf läser tabellen som vi nyss skapade ovan som en ström eftersom API:et create_auto_cdc_flow , som du använder för bearbetning av ändringsdatainsamling, förväntar sig en ström av ändringar som indata. Du omsluter den med en dekoratör @dp.temporary_view eftersom du inte vill materialisera den här strömmen till en tabell.
Sedan använder dp.create_target_table du för att skapa en strömmande tabell som innehåller resultatet av bearbetningen av ändringsdataflödet.
Slutligen använder dp.create_auto_cdc_flow du för att bearbeta ändringsdataflödet. Låt oss ta en titt på varje argument:
-
target– Måluppspelningstabellen som du definierade tidigare. -
source– Vyn över strömmen av ändringsposter, som du definierade tidigare. -
keys– Identifierar unika rader i ändringsflödet. Eftersom du använderidsom en unik identifierare angeriddu bara som den enda identifierande kolumnen. -
sequence_by– Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Du behöver den här sekvenseringen för att hantera ändringshändelser som inte är i ordning. AngesequenceNumsom sekvenseringskolumn. -
apply_as_deletes- Eftersom exempeldata innehåller borttagningsåtgärder använderapply_as_deletesdu för att ange när en CDC-händelse ska behandlas som enDELETEsnarare än en upsert. -
except_column_list– Innehåller en lista med kolumner som du inte vill inkludera i måltabellen. I det här exemplet använder du det här argumentet för att exkluderasequenceNumochoperation. -
stored_as_scd_type– Anger vilken SCD-typ du vill använda.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dp.temporary_view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
Kör den här pipelinen genom att klicka på Start.
Kör sedan följande fråga i SQL-redigeraren för att kontrollera att ändringsposterna har bearbetats korrekt:
SELECT *
FROM mycatalog.myschema.employees_current
Anmärkning
Den felaktiga uppdateringen för medarbetaren Chris har tagits bort korrekt eftersom deras roll fortfarande är inställd på Ägare istället för Chef.
Steg 3: Använd SCD-typ 2 för att behålla historiska data
I det här exemplet skapar du en andra måltabell med namnet employees_historical, som innehåller en fullständig historik över ändringar i medarbetarposter.
Lägg till den här koden i pipelinen. Den enda skillnaden här är att stored_as_scd_type är inställd på 2 i stället för 1.
dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
Kör den här pipelinen genom att klicka på Start.
Kör sedan följande fråga i SQL-redigeraren för att kontrollera att ändringsposterna har bearbetats korrekt:
SELECT *
FROM mycatalog.myschema.employees_historical
Du ser alla ändringar av anställda, inklusive de anställda som har tagits bort, till exempel Pat.
Steg 4: Rensa resurser
När du är klar rensar du resurser genom att följa dessa steg:
Ta bort pipelinen:
Anmärkning
När du tar bort pipelinen tas tabellerna
employeesochemployees_historicalbort automatiskt.- Klicka på Jobb och pipeline, hitta sedan namnet på den pipeline som du vill ta bort.
- Klicka på
I samma rad som pipelinenamnet och klicka sedan på Ta bort.
Ta bort anteckningsboken.
Ta bort tabellen som innehåller ändringsdataflödet:
- Klicka på Ny > fråga.
- Klistra in och kör följande SQL-kod och justera katalogen och schemat efter behov:
DROP TABLE mycatalog.myschema.employees_cdf
Nackdelar med att använda MERGE INTO och foreachBatch för insamling av ändringsdata
Databricks tillhandahåller ett MERGE INTO SQL-kommando som du kan använda med API:et foreachBatch för att överföra rader till en Delta-tabell. I det här avsnittet beskrivs hur den här tekniken kan användas för enkla användningsfall, men den här metoden blir alltmer komplex och bräcklig när den tillämpas på verkliga scenarier.
I det här exemplet använder du samma exempeländringsdataflöde som användes i föregående exempel.
Naiv implementering med MERGE INTO och foreachBatch
Skapa en notebook-fil och kopiera följande kod till den. Ändra variablerna catalog, schemaoch employees_table efter behov. Variablerna catalog och schema bör anges till platser i Unity Catalog där du kan skapa tabeller.
När du kör notebook-filen gör den följande:
- Skapar måltabellen i
create_table. Till skillnad fråncreate_auto_cdc_flow, som hanterar det här steget automatiskt, måste du ange schemat. - Läser ändringsdataflödet som en ström. Varje mikrobatch bearbetas med hjälp av
upsertToDeltametoden, som kör ettMERGE INTOkommando.
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
Om du vill se resultatet kör du följande SQL-fråga:
SELECT *
FROM mycatalog.myschema.employees_merge
Tyvärr är resultatet felaktigt, enligt följande:
Flera uppdateringar av samma nyckel i samma mikrobatch
Det första problemet är att koden inte hanterar flera uppdateringar av samma nyckel i samma mikrobatch. Du kan till exempel använda INSERT för att infoga medarbetaren Chris och sedan uppdatera deras roll från Ägare till Chef. Detta bör resultera i en rad, men i stället finns det två rader.
Vilken ändring vinner när det finns flera uppdateringar av samma nyckel i en mikrobatch?
Logiken blir mer komplex. Följande kodexempel hämtar den senaste raden efter sequenceNum och sammanfogar endast dessa data till måltabellen på följande sätt:
- Grupperar efter primärnyckel,
id. - Tar alla kolumner för raden som har maxvärdet
sequenceNumi batchen för den nyckeln. - Exploderar raden tillbaka ut.
upsertToDelta Uppdatera metoden enligt följande och kör sedan koden:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
När du frågar måltabellen ser du att den anställde med namnet Chris har rätt roll, men det finns fortfarande andra problem att lösa eftersom du fortfarande har borttagna poster som visas i måltabellen.
Oordnade uppdateringar mellan mikrobatcher
I det här avsnittet går vi igenom problemet med uppdateringar som inte är i ordning mellan mikrobatcher. Följande diagram illustrerar problemet: vad händer om raden för Chris har en UPDATE-åtgärd i den första mikrobatchen följt av en INSERT i en efterföljande mikrobatch? Koden hanterar inte detta korrekt.
Vilken ändring har prioritet när det finns oordnade uppdateringar av samma nyckel i flera mikrobatcher?
Om du vill åtgärda detta expanderar du koden för att lagra en version på varje rad enligt följande:
- Lagra
sequenceNumnär en rad senast uppdaterades. - För varje ny rad kontrollerar du om tidsstämpeln är större än den som lagras och tillämpar sedan följande logik:
- Om värdet är större, använd de nya uppgifterna från målet.
- Annars behålls datan i källan.
Uppdatera först metoden createTable för att lagra sequenceNum eftersom du kommer att använda den för att versionera varje rad.
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
Uppdatera sedan upsertToDelta för att hantera radversioner.
UPDATE SET Satsen MERGE INTO måste hantera varje kolumn separat.
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Hantera raderingar
Tyvärr har koden fortfarande ett problem. Den hanterar inte DELETE-operationer, vilket framgår av det faktum att medarbetaren Pat fortfarande finns i måltabellen.
Anta att borttagningar kommer i samma mikrobatch. Om du vill hantera dem uppdaterar upsertToDelta du metoden igen för att ta bort raden när ändringsdataposten anger borttagning enligt följande:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Hantera uppdateringar som kommer i fel ordning efter borttagningar
Tyvärr är koden ovan fortfarande inte helt korrekt eftersom den inte hanterar fall när en DELETE följs av en oordnad UPDATE över mikrobatcher.
Algoritmen för att hantera det här fallet måste komma ihåg borttagningar så att den kan hantera efterföljande out-of-order-uppdateringar. Så här gör du:
- I stället för att ta bort rader omedelbart tar du bort dem med en tidsstämpel eller
sequenceNum. Mjukt borttagna rader är markerade som borttagna. - Omdirigera alla användare till en vy som filtrerar bort gravstenar.
- Skapa ett rensningsjobb som tar bort gravstenarna över tid.
Använd följande kod:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
Användarna kan inte använda måltabellen direkt, så skapa en vy som de kan fråga:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
Skapa slutligen ett rensningsjobb som regelbundet tar bort gravstenade rader:
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY