Dela via


Självstudie: Skapa en ETL-pipeline med ändringsdatainsamling med Lakeflow Deklarativa pipelines

Lär dig hur du skapar och distribuerar en ETL-pipeline (extrahera, transformera och ladda) med Change Data Capture (CDC) med hjälp av Lakeflow Deklarativa Pipelines för dataorkestrering och Auto Loader. En ETL-pipeline implementerar stegen för att läsa data från källsystem, transformera dessa data baserat på krav, till exempel datakvalitetskontroller och registrera deduplicering, och skriva data till ett målsystem, till exempel ett informationslager eller en datasjö.

I den här handledningen använder du data från en customers-tabell i en MySQL-databas för att:

  • Extrahera ändringarna från en transaktionsdatabas med hjälp av Debezium eller ett annat verktyg och spara dem i molnobjektlagring (S3, ADLS eller GCS). I den här självstudien hoppar du över att konfigurera ett externt CDC-system och genererar i stället falska data för att förenkla självstudien.
  • Använd Auto Loader för att inkrementellt läsa in meddelanden från molnobjektlagring och lagra rådatameddelandena i customers_cdc tabellen. Auto Loader härleder schemat och hanterar schemautveckling.
  • customers_cdc_clean Skapa tabellen för att kontrollera datakvaliteten med hjälp av förväntningar. Det id bör till exempel aldrig vara null eftersom den används för att köra upsert-operationer.
  • Utför AUTO CDC ... INTO på de rensade CDC-data för att införa eller uppdatera ändringar i den slutliga customers-tabellen.
  • Visa hur Lakeflow Deklarativa Pipelines kan skapa en SCD2 (Slowly Changing Dimension typ 2) tabell för att spåra alla ändringar.

Målet är att mata in rådata nästan i realtid och skapa en tabell för ditt analytikerteam samtidigt som datakvaliteten säkerställs.

Självstudien använder arkitekturen medallion Lakehouse, där den matar in rådata via bronsskiktet, rensar och validerar data med silverskiktet och tillämpar dimensionsmodellering och aggregering med hjälp av guldskiktet. För mer information, se Vad innebär arkitekturen med medallion lakehouse?

Det implementerade flödet ser ut så här:

Deklarativa pipelines för Lakeflow med CDC

Mer information om Lakeflow Deklarativa Pipelines, Auto Loader och CDC finns i Lakeflow Deklarativa Pipelines, Vad är Auto Loader?, och Vad är Change Data Capture (CDC)?

Kravspecifikation

För att slutföra den här självstudien måste du uppfylla följande krav:

Ändra datainsamling i en ETL-pipeline

CDC (Change Data Capture) är den process som samlar in ändringar i poster som gjorts i en transaktionsdatabas (till exempel MySQL eller PostgreSQL) eller ett informationslager. CDC samlar in åtgärder som databorttagningar, tillägg och uppdateringar, vanligtvis som en ström för att materialisera tabeller på nytt i externa system. CDC möjliggör inkrementell inläsning samtidigt som behovet av massinläsningsuppdateringar elimineras.

Anmärkning

För att förenkla den här handledningen, hoppa över att ställa in ett externt CDC-system. Anta att den kör och sparar CDC-data som JSON-filer i molnobjektlagring (S3, ADLS eller GCS). Självstudien använder Faker-biblioteket för att generera den data som används i självstudien.

Fånga CDC

Det finns en mängd olika CDC-verktyg. En av de ledande lösningarna med öppen källkod är Debezium, men andra implementeringar som förenklar datakällor finns, till exempel Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate och AWS DMS.

I den här självstudien använder du CDC-data från ett externt system som Debezium eller DMS. Debezium fångar varje ändrad rad. Den skickar vanligtvis historiken för dataändringar till Kafka-ämnen eller sparar dem som filer.

Du måste mata in CDC-informationen från customers tabellen (JSON-format), kontrollera att den är korrekt och sedan materialisera kundtabellen i Lakehouse.

CDC-indata från Debezium

För varje ändring får du ett JSON-meddelande som innehåller alla fält på raden som uppdateras (id, , firstnamelastname, email, address). Meddelandet innehåller även ytterligare metadata:

  • operation: En åtgärdskod, vanligtvis (DELETE, APPEND, UPDATE).
  • operation_date: Datum och tidsstämpel för registreringen av varje driftåtgärd.

Verktyg som Debezium kan ge mer avancerade utdata, till exempel radvärdet före ändringen, men den här självstudien utelämnar dem för enkelhetens skull.

Steg 1: Skapa en pipeline

Skapa en ny ETL-pipeline i Lakeflow Deklarativa Pipelines för att göra förfrågningar mot din CDC-datakälla och generera tabeller på din arbetsyta.

  1. På din arbetsyta klickar du på plusikonen. Nytt i det övre vänstra hörnet.

  2. Klicka på ETL-pipeline.

  3. Ändra rubriken på pipelinen till Pipelines with CDC tutorial eller ett namn som du föredrar.

  4. Under rubriken väljer du en katalog och ett schema som du har skrivbehörighet för.

    Den här katalogen och schemat används som standard om du inte anger en katalog eller ett schema i koden. Din kod kan skriva till valfri katalog eller schema genom att ange den fullständiga sökvägen. Denna handledning använder de standardvärden du anger här.

  5. Från Avancerade alternativ väljer du Starta med en tom fil.

  6. Välj en mapp för koden. Du kan välja Bläddra för att bläddra i listan över mappar på arbetsytan. Du kan välja valfri mapp som du har skrivbehörighet för.

    Om du vill använda versionskontroll väljer du en Git-mapp. Om du behöver skapa en ny mapp väljer du plusikonen.

  7. Välj Python eller SQL som språk för filen, baserat på det språk som du vill använda för självstudien.

  8. Klicka på Välj för att skapa pipelinen med de här inställningarna och öppna Lakeflow Pipelines-redigeraren.

Nu har du en tom pipeline med en standardkatalog och ett schema. Nästa steg är att förbereda exempeldata som ska importeras i handledningen.

Steg 2: Skapa exempeldata som ska importeras i den här självstudien

Det här steget behövs inte om du importerar dina egna data från en befintlig källa. I den här självstudien genererar du falska data som exempel för självstudien. Skapa en notebook-fil för att köra Python-datagenereringsskriptet. Den här koden behöver bara köras en gång för att generera exempeldata, så skapa den i pipelinens explorations mapp, som inte körs som en del av en pipelineuppdatering.

Anmärkning

Den här koden använder Faker för att generera CDC-exempeldata. Faker kan installeras automatiskt, så handledningen använder %pip install faker. Du kan också ange ett beroende av faker för notebook-filen. Se Lägga till beroenden i notebook-filen.

  1. Från Lakeflow Pipelines-redigeraren går du till sidofältet i tillgångsläsaren till vänster om redigeraren och klickar på Plus-ikonen.Lägg till och välj sedan Utforskning.

  2. Ge den ett namn, till exempel Setup data, välj Python. Du kan lämna standardmålmappen, som är en ny explorations mapp.

  3. Klicka på Skapa. Då skapas en notebook-fil i den nya mappen.

  4. Ange följande kod i den första cellen. Du måste ändra definitionen av <my_catalog> och <my_schema> för att matcha standardkatalogen och schemat som du valde i föregående procedur:

    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"
    
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
    volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exist, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  5. Om du vill generera den datauppsättning som används i självstudien skriver du Skift + Retur för att köra koden:

  6. Valfritt. Om du vill förhandsgranska data som används i den här självstudien anger du följande kod i nästa cell och kör koden. Uppdatera katalogen och schemat så att de matchar sökvägen från föregående kod.

    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"
    
    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
    

Detta genererar en stor datauppsättning (med falska CDC-data) som du kan använda i resten av självstudien. I nästa steg matar du in data med Auto Loader.

Steg 3: Läs in data stegvis med Auto Loader

Nästa steg är att mata in rådata från den (falska) molnlagringen i ett bronslager.

Detta kan vara svårt av flera orsaker, eftersom du måste:

  • Arbeta i stor skala och eventuellt mata in miljontals små filer.
  • Härled schema och JSON-typ.
  • Hantera felaktiga poster med felaktigt JSON-schema.
  • Ta hand om schemautvecklingen (till exempel en ny kolumn i kundtabellen).

Automatisk inläsning förenklar den här inmatningen, inklusive schemainferens och schemautveckling, samtidigt som den skalas till miljontals inkommande filer. Automatisk inläsning är tillgänglig i Python med hjälp av cloudFiles och i SQL med hjälp av SELECT * FROM STREAM read_files(...) och kan användas med olika format som JSON, CSV, Apache Avro osv.

Om du definierar tabellen som en strömmande tabell kan du bara använda nya inkommande data. Om du inte definierar den som en strömmande tabell genomsöker och matar den in alla tillgängliga data. Mer information finns i Strömmande tabeller .

  1. Om du vill mata in inkommande CDC-data med Auto Loader kopierar och klistrar du in följande kod i den kodfil som är kopplad till din pipeline (kallas my_transformation.py). Du kan använda Python eller SQL baserat på det språk du valde när du skapade pipelinen. Se till att ersätta <catalog> och <schema> med de som du har konfigurerat som standard för pipelinen.

    python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # Replace with the catalog and schema name that
    # you are using:
    path = "/Volumes/<catalog>/<schema>/raw_data/customers"
    
    
    # Create the target bronze table
    dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @dp.append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load(f"{path}")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        -- replace with the catalog/schema you are using:
        "/Volumes/<catalog>/<schema>/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. Klicka på Ikonen Spela upp.Kör filen eller kör pipelinen för att starta en uppdatering för den anslutna pipelinen. Med bara en källfil i pipelinen är dessa funktionellt likvärdiga.

När uppdateringen är klar uppdateras redigeraren med information om din pipeline.

  • Pipelinediagrammet (DAG) i sidofältet till höger om koden visar en enda tabell, customers_cdc_bronze.
  • En sammanfattning av uppdateringen visas längst upp i resurshanteraren för pipeline.
  • Information om tabellen som genererades visas i den nedre rutan och du kan bläddra bland data från tabellen genom att välja den.

Det här är rådata för bronsskikt som importerats från molnlagring. I nästa steg rensar du data för att skapa en silverlagertabell.

Steg 4: Rensning och förväntningar för att spåra datakvalitet

När bronsskiktet har definierats skapar du silverskiktet genom att lägga till förväntningar för att kontrollera datakvaliteten. Kontrollera följande villkor:

  • ID får aldrig vara null.
  • CDC-åtgärdstypen måste vara giltig.
  • JSON måste läsas korrekt av autoinläsaren.

Rader som inte uppfyller dessa villkor tas bort.

Mer information finns i Hantera datakvalitet med pipelineförväntningar .

  1. I sidofältet för pipelinetillgångar i webbläsaren klickar du på plusikonen.Lägg till och sedan Transformering.

  2. Ange ett namn och välj ett språk (Python eller SQL) för källkodsfilen. Du kan kombinera språk inom en pipeline, så du kan välja ett av dem för detta steg.

  3. Om du vill skapa ett silverlager med en rensad tabell och införa begränsningar kopierar du och klistrar in följande kod i den nya filen (välj Python eller SQL baserat på filens språk).

    python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @dp.append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          spark.readStream.table("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  4. Klicka på Ikonen Spela upp.Kör filen eller kör pipelinen för att starta en uppdatering för den anslutna pipelinen.

    Eftersom det nu finns två källfiler gör dessa inte samma sak, men i det här fallet är utdata detsamma.

    • Kör pipelinen kör hela pipelinen, inklusive koden från steg 3. Om dina indata uppdaterades skulle detta hämta eventuella ändringar från källan till bronsskiktet. Det här kör inte koden från datainstallationssteget, eftersom det finns i mappen explorations och inte en del av källan för din pipeline.
    • Kör filen kör endast den aktuella källfilen. I det här fallet, utan att dina indata uppdateras, genererar detta silverdata från den cachelagrade bronstabellen. Det skulle vara användbart att bara köra den här filen för snabbare iteration när du skapar eller redigerar din pipelinekod.

När uppdateringen är klar kan du se att pipelinediagrammet nu visar två tabeller (med silverskiktet beroende på bronsskiktet) och den nedre panelen visar information om båda tabellerna. Överst i webbläsaren för pipelinetillgångar visas nu flera körningar, men bara information om den senaste körningen.

Skapa sedan den slutliga guldlagerversionen av customers tabellen.

Steg 5: Materialisera kundtabellen med ett AUTO CDC-flöde

Hittills har tabellerna bara överfört CDC-data vidare i varje steg. Nu skapar du customers tabellen för att både innehålla den senaste vyn och att även vara en kopia av originaltabellen, inte listan över CDC-operationer som skapade den.

Detta är inte enkelt att implementera manuellt. Du måste överväga saker som deduplicering av data för att säkerställa att den senaste raden bevaras.

Lakeflow Deklarativa Pipelines löser dock dessa utmaningar med åtgärden AUTO CDC.

  1. I webbläsarens sidofält för pipelinetillgångar klickar du på plusikonen.Lägg till och Transformation.

  2. Ange ett namn och välj ett språk (Python eller SQL) för den nya källkodsfilen. Du kan återigen välja något av språken för det här steget, men använd rätt kod nedan.

  3. Om du vill bearbeta CDC-data med hjälp av AUTO CDC deklarativa pipelines i Lakeflow kopierar du och klistrar in följande kod i den nya filen.

    python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dp.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  4. Klicka på Ikonen Spela upp.Kör filen för att starta en uppdatering för den anslutna pipelinen.

När uppdateringen är klar kan du se att pipelinediagrammet visar tre tabeller som går från brons till silver till guld.

Steg 6: Spåra uppdateringshistorik med långsamt föränderlig dimensionstyp 2 (SCD2)

Det är ofta nödvändigt att skapa en tabell som spårar alla ändringar som följer av APPEND, UPDATE och DELETE.

  • Historik: Du vill behålla en historik över alla ändringar i tabellen.
  • Spårningsbarhet: Du vill se vilken åtgärd som inträffade.

SCD2 med de deklarativa pipelines för Lakeflow

Delta stöder ändringsdataflöde (CDF) och table_change kan köra frågor mot tabelländringar i SQL och Python. CDF:s huvudsakliga användningsfall är dock att samla in ändringar i en pipeline, inte att skapa en fullständig vy över tabelländringar från början.

Saker blir särskilt komplexa att implementera om du har out-of-order-händelser. Om du måste sekvensera ändringarna med en tidsstämpel och få en ändring som har gjorts tidigare måste du lägga till en ny post i SCD-tabellen och uppdatera de tidigare posterna.

Lakeflow Deklarativa Pipelines tar bort den här komplexiteten och gör att du kan skapa en separat tabell som innehåller alla ändringar från tidens början. Den här tabellen kan sedan användas i stor skala med specifika partitioner eller ZORDER-kolumner om det behövs. Fält som inte är i ordning hanteras omedelbart baserat på _sequence_by.

Om du vill skapa en SCD2-tabell använder du alternativet STORED AS SCD TYPE 2 i SQL eller stored_as_scd_type="2" Python.

Anmärkning

Du kan också begränsa vilka kolumner funktionen spårar med hjälp av alternativet: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. I webbläsarens sidofält för pipelinetillgångar klickar du på plusikonen.Lägg till och Transformation.

  2. Ange ett namn och välj ett språk (Python eller SQL) för den nya källkodsfilen.

  3. Kopiera och klistra in följande kod i den nya filen.

    python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # create the table
    dp.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dp.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW customers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  4. Klicka på Ikonen Spela upp.Kör filen för att starta en uppdatering för den anslutna pipelinen.

När uppdateringen är klar innehåller pipelinediagrammet den nya customers_history tabellen, som också är beroende av tabellen i silverskiktet, och den nedre panelen visar information om alla 4 tabellerna.

Steg 7: Skapa en materialiserad vy som spårar vem som har ändrat sin information mest

Tabellen customers_history innehåller alla historiska ändringar som en användare har gjort i sin information. Skapa en enkel materialiserad vy i det guldfärgade lagret som håller reda på vem som har ändrat sin information mest. Detta kan användas för analys av bedrägeriidentifiering eller användarrekommendationer i ett verkligt scenario. Dessutom har dubbletter redan tagits bort när du tillämpar ändringar med SCD2, så du kan räkna raderna per användar-ID direkt.

  1. I webbläsarens sidofält för pipelinetillgångar klickar du på plusikonen.Lägg till och Transformation.

  2. Ange ett namn och välj ett språk (Python eller SQL) för den nya källkodsfilen.

  3. Kopiera och klistra in följande kod i den nya källfilen.

    python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    @dp.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        spark.read.table("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  4. Klicka på Ikonen Spela upp.Kör filen för att starta en uppdatering för den anslutna pipelinen.

När uppdateringen är klar finns det en ny tabell i pipelinediagrammet som är beroende av customers_history-tabellen, och du kan se den i den nedre panelen. Din pipeline är nu klar. Du kan testa det genom att utföra en fullständig kör pipeline. De enda steg som återstår är att schemalägga att pipelinen uppdateras regelbundet.

Steg 8: Skapa ett jobb för att köra ETL-pipelinen

Skapa sedan ett arbetsflöde för att automatisera datainmatnings-, bearbetnings- och analysstegen i din pipeline med hjälp av ett Databricks-jobb.

  1. Välj knappen Schema överst i redigeraren.
  2. Om dialogrutan Scheman visas väljer du Lägg till schema.
  3. Då öppnas dialogrutan Nytt schema , där du kan skapa ett jobb för att köra pipelinen enligt ett schema.
  4. Du kan också ge jobbet ett namn.
  5. Som standard är schemat inställt på att köras en gång per dag. Du kan acceptera den här standardinställningen eller ange ett eget schema. Genom att välja Avancerat kan du ange en viss tid som jobbet ska köras. Om du väljer Fler alternativ kan du skapa meddelanden när jobbet körs.
  6. Välj Skapa för att tillämpa ändringarna och skapa jobbet.

Nu körs jobbet dagligen för att hålla pipelinen uppdaterad. Du kan välja Schema igen för att visa listan över scheman. Du kan hantera scheman för din pipeline från den dialogrutan, inklusive att lägga till, redigera eller ta bort scheman.

Om du klickar på namnet på schemat (eller jobbet) kommer du till jobbets sida i listan Jobb och pipelines . Därifrån kan du visa information om jobbkörningar, inklusive historiken för körningar, eller köra jobbet direkt med knappen Kör nu .

För mer information om jobbkörningar, se Övervakning och observerbarhet för Lakeflow-jobb.

Ytterligare resurser