Dela via


create_auto_cdc_from_snapshot_flow

Viktigt!

Den här funktionen finns i offentlig förhandsversion.

Funktionen create_auto_cdc_from_snapshot_flow skapar ett flöde som använder Lakeflow Declarative Pipelines CDC-funktionalitet för att bearbeta källdata från databasögonblicksbilder. Se Hur implementeras CDC med API:et AUTO CDC FROM SNAPSHOT ?.

Anmärkning

Den här funktionen ersätter den tidigare funktionen apply_changes_from_snapshot(). De två funktionerna har samma signatur. Databricks rekommenderar att du uppdaterar för att använda det nya namnet.

Viktigt!

Du måste ha en måltabell för strömmande data för den här åtgärden.

Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table().

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Anmärkning

För AUTO CDC FROM SNAPSHOT bearbetning är standardbeteendet att infoga en ny rad när en matchande post med samma nyckel(er) inte finns i målet. Om det finns en matchande post uppdateras den endast om något av värdena på raden har ändrats. Rader med nycklar som finns i målet men som inte längre finns i källan tas bort.

Mer information om CDC-bearbetning med ögonblicksbilder finns i AUTOMATISKA CDC-API:er: Förenkla ändringsdatainsamling med deklarativa pipelines för Lakeflow. Exempel på hur du använder create_auto_cdc_from_snapshot_flow() funktionen finns i exemplen med periodisk inmatning av ögonblicksbilder och historisk inmatning av ögonblicksbilder .

Parameterar

Parameter Typ Description
target str Obligatoriskt. Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du create_auto_cdc_from_snapshot_flow() kör funktionen.
source str eller lambda function Obligatoriskt. Antingen namnet på en tabell eller vy som ska ögonblicksbildas regelbundet, eller en Python lambda-funktion som returnerar DataFrame-ögonblicksbilden som ska bearbetas, samt ögonblicksbildversionen. Se Implementera source argumentet.
keys list Obligatoriskt. Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen. Du kan ange något av följande:
  • En lista med strängar: ["userId", "orderId"]
  • En lista över Spark SQL-funktioner col() : [col("userId"), col("orderId"]. Argument för col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).
stored_as_scd_type str eller int Om poster ska lagras som SCD-typ 1 eller SCD-typ 2. Ange till 1 för SCD typ 1 eller 2 för SCD typ 2. Standardvärdet är SCD typ 1.
track_history_column_list eller track_history_except_column_list list Ett urval av utdatakolumner som ska spåras i historiksyfte i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Använd track_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argument för col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId). Standardvärdet är att inkludera alla kolumner i måltabellen när inget track_history_column_list eller track_history_except_column_list argument skickas till funktionen.

source Implementera argumentet

Funktionen create_auto_cdc_from_snapshot_flow() innehåller source argumentet . För bearbetning av historiska ögonblicksbilder source förväntas argumentet vara en Python lambda-funktion som returnerar två värden till create_auto_cdc_from_snapshot_flow() funktionen: en Python DataFrame som innehåller de ögonblicksbildsdata som ska bearbetas och en ögonblicksbildversion.

Följande är signaturen för lambda-funktionen:

lambda Any => Optional[(DataFrame, Any)]
  • Argumentet till lambda-funktionen är den senast bearbetade ögonblicksbildversionen.
  • Returvärdet för lambda-funktionen är None eller en tuppeln med två värden: Det första värdet för tuppeln är en DataFrame som innehåller ögonblicksbilden som ska bearbetas. Det andra värdet för tuppeln är den ögonblicksbildsversion som representerar ögonblicksbildens logiska ordning.

Ett exempel som implementerar och anropar lambda-funktionen:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Lakeflow Deklarativa pipelines-körningen utför följande steg varje gång pipelinen som innehåller create_auto_cdc_from_snapshot_flow() funktionen utlöses:

  1. Kör next_snapshot_and_version funktionen för att läsa in nästa ögonblicksbilds-DataFrame och motsvarande ögonblicksbildsversion.
  2. Om ingen DataFrame returnerar avslutas körningen och pipelineuppdateringen markeras som slutförd.
  3. Identifierar ändringarna i den nya ögonblicksbilden och tillämpar dem stegvis på måltabellen.
  4. Återgår till steg 1 för att läsa in nästa ögonblicksbild och dess version.