Dela via


Handledning: Skapa en ETL-pipeline med Lakeflow-deklarativa pipelines

I den här handledningen beskrivs hur du skapar och distribuerar en ETL-pipeline (extrahera, transformera och läsa in) med Lakeflow Deklarativa Pipelines och Auto Loader för dataorkestrering. En ETL-pipeline implementerar stegen för att läsa data från källsystem, transformera dessa data baserat på krav, till exempel datakvalitetskontroller och registrera deduplicering, och skriva data till ett målsystem, till exempel ett informationslager eller en datasjö.

I den här självstudien ska du använda Lakeflow deklarativa pipelines och Auto Loader till att:

  • Mata in rådata i en måltabell.
  • Transformera rådata och skriv transformerade data till två materialiserade målvyer.
  • Kör en sökfråga på de transformerade data.
  • Automatisera ETL-pipelinen med ett Databricks-jobb.

Mer information om Lakeflow Declarative Pipelines och Auto Loader finns i Lakeflow Declarative Pipelines och What is Auto Loader?

Krav

För att slutföra den här självstudien måste du uppfylla följande krav:

Om datauppsättningen

Datauppsättningen som används i det här exemplet är en delmängd av Datauppsättningen Million Song, en samling funktioner och metadata för samtida musikspår. Den här datamängden är tillgänglig i exempeldatauppsättningarna som ingår i din Azure Databricks-arbetsyta.

steg 1: Skapa en pipeline

Skapa en ETL-pipeline först i Lakeflow Deklarativa Pipelines. Lakeflow Declarative Pipelines skapar pipelines genom att matcha beroenden som definierats i filer (kallas källkod) med Lakeflow Declarative Pipelines syntax. Varje källkodsfil kan bara innehålla ett språk, men du kan lägga till flera språkspecifika filer i pipelinen. Mer information finns i Deklarativa pipelines för Lakeflow

I den här guiden används serverlös datorkraft och Unity Catalog. Använd standardinställningarna för alla konfigurationsalternativ som inte har angetts. Om serverlös beräkning inte är aktiverad eller stöds på din arbetsyta kan du slutföra självstudien enligt standardinställningarna för beräkning.

För att skapa en ny ETL-pipeline i Lakeflow Deklarativa Pipelines, följ dessa steg:

  1. På din arbetsyta klickar du på plusikonen.Ny i sidofältet och välj sedan ETL-pipeline.
  2. Ge din pipeline ett unikt namn.
  3. Precis under namnet väljer du standardkatalogen och schemat för de data som du genererar. Du kan ange andra mål i dina omvandlingar, men i den här handledningen används dessa standardvärden. Du måste ha behörighet till katalogen och schemat som du skapar. Se kraven.
  4. I den här självstudien väljer du Starta med en tom fil.
  5. I Mappsökväg anger du en plats för källfilerna eller godkänner standardinställningen (användarmappen).
  6. Välj Python eller SQL som språk för din första källfil (en pipeline kan blanda och matcha språk, men varje fil måste vara på ett enda språk).
  7. Klicka på Välj.

Pipelineredigeraren visas för den nya pipelinen. En tom källfil för ditt språk skapas, redo för din första transformering.

Steg 2: Utveckla pipeline-logik

I det här steget använder du Lakeflow Pipelines-redigeraren för att utveckla och validera källkoden för Deklarativa pipelines för Lakeflow interaktivt.

Koden använder Auto Loader för inkrementell datainmatning. "Auto Loader identifierar och bearbetar automatiskt nya filer när de anländer i molnlagringsobjekt." Mer information finns i Vad är automatisk inläsning?

En tom källkodsfil skapas och konfigureras automatiskt för pipelinen. Filen skapas i transformeringsmappen för din pipeline. Som standard är alla *.py- och *.sql-filer i transformeringsmappen en del av källan för din pipeline.

  1. Kopiera och klistra in följande kod i källfilen. Se till att använda det språk som du valde för filen i steg 1.

    python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Den här källan innehåller kod för tre frågor. Du kan också placera frågorna i separata filer för att ordna filerna och koda som du vill.

  2. Klicka på Ikonen Spela upp.Kör filen eller kör pipelinen för att starta en uppdatering för den anslutna pipelinen. Med bara en källfil i pipelinen är dessa funktionellt likvärdiga.

När uppdateringen är klar uppdateras redigeraren med information om din pipeline.

  • Pipelinediagrammet (DAG) i sidofältet till höger om koden visar tre tabeller, songs_raw, songs_preparedoch top_artists_by_year.
  • En sammanfattning av uppdateringen visas längst upp i resurshanteraren för pipeline.
  • Information om de tabeller som genererades visas i den nedre rutan och du kan bläddra bland data från tabellerna genom att välja en.

Detta inkluderar rådata och rensade data, samt en enkel analys för att hitta de främsta konstnärerna efter år. I nästa steg skapar du ad hoc-frågor för ytterligare analys i en separat fil i pipelinen.

Steg 3: Utforska de datauppsättningar som skapats av din pipeline

I det här steget utför du ad hoc-frågor på de data som bearbetas i ETL-pipelinen för att analysera låtdata i Databricks SQL-redigeraren. Dessa frågor använder de förberedda poster som skapades i föregående steg.

Kör först en fråga som hittar de artister som har släppt flest låtar varje år sedan 1990.

  1. I sidofältet för pipelinetillgångar i webbläsaren klickar du på plusikonen.Lägg till sedan Utforskning.

  2. Ange ett namn och välj SQL för utforskningsfilen. En SQL-notebook-fil skapas i en ny explorations mapp. Filer i explorations mappen körs inte som en del av en pipelineuppdatering som standard. SQL-notebook-filen har celler som du kan köra tillsammans eller separat.

  3. Om du vill skapa en tabell med artister som släpper flest låtar varje år efter 1990 anger du följande kod i den nya SQL-filen (om det finns exempelkod i filen ersätter du den). Eftersom den här notebook-filen inte ingår i pipelinen använder den inte standardkatalogen och schemat. <catalog>.<schema> Ersätt med den katalog och det schema som du använde som standard för pipelinen:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Klicka på Ikonen Spela upp. Eller tryck på Shift + Enter för att köra den här frågan.

Kör nu en annan fråga som hittar låtar med ett 4/4 beat och dansbart tempo.

  1. Lägg till följande kod i nästa cell i samma fil. <catalog>.<schema> Ersätt återigen med den katalog och det schema som du använde som standard för pipelinen:

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Klicka på Ikonen Spela upp. Eller tryck på Shift + Enter för att köra den här frågan.

Steg 4: Skapa ett jobb för att köra pipelinen

Skapa sedan ett arbetsflöde för att automatisera datainmatnings-, bearbetnings- och analyssteg med hjälp av ett Databricks-jobb som körs enligt ett schema.

  1. Välj knappen Schema överst i redigeraren.
  2. Om dialogrutan Scheman visas väljer du Lägg till schema.
  3. Då öppnas dialogrutan Nytt schema , där du kan skapa ett jobb för att köra pipelinen enligt ett schema.
  4. Du kan också ge jobbet ett namn.
  5. Som standard är schemat inställt på att köras en gång per dag. Du kan acceptera den här defauten eller ange ett eget schema. Genom att välja Avancerat kan du ange en viss tid som jobbet ska köras. Om du väljer Fler alternativ kan du skapa meddelanden när jobbet körs.
  6. Välj Skapa för att tillämpa ändringarna och skapa jobbet.

Nu körs jobbet dagligen för att hålla pipelinen uppdaterad. Du kan välja Schema igen för att visa listan över scheman. Du kan hantera scheman för din pipeline från den dialogrutan, inklusive att lägga till, redigera eller ta bort scheman.

Om du klickar på namnet på schemat (eller jobbet) kommer du till jobbets sida i listan Jobb och pipelines . Därifrån kan du visa information om jobbkörningar, inklusive historiken för körningar, eller köra jobbet direkt med knappen Kör nu .

För mer information om jobbkörningar, se Övervakning och observerbarhet för Lakeflow-jobb.

Läs mer