Delen via


Zelfstudie: Een ETL-pijplijn bouwen met Lakeflow-declaratieve pijplijnen

In deze zelfstudie wordt uitgelegd hoe u een ETL-pijplijn (extraheren, transformeren en laden) maakt en implementeert voor gegevensindeling met behulp van Lakeflow Declarative Pipelines en Auto Loader. Een ETL-pijplijn implementeert de stappen voor het lezen van gegevens uit bronsystemen, het transformeren van die gegevens op basis van vereisten, zoals gegevenskwaliteitscontroles en recordontdubbeling en het schrijven van de gegevens naar een doelsysteem, zoals een datawarehouse of een data lake.

In deze zelfstudie gebruikt u Lakeflow Declarative Pipelines and Auto Loader voor het volgende:

  • Onbewerkte brongegevens opnemen in een doeltabel.
  • Transformeer de onbewerkte brongegevens en schrijf de getransformeerde gegevens naar twee gerealiseerde doelweergaven.
  • Voer een query uit op de getransformeerde gegevens.
  • Automatiseer de ETL-pijplijn met een Databricks-taak.

Zie Lakeflow Declarative Pipelines en Wat is Auto Loader? voor meer informatie over Lakeflow Declarative Pipelines en Auto Loader.

Vereisten

Als u deze zelfstudie wilt voltooien, moet u aan de volgende vereisten voldoen:

Over de gegevensset

De gegevensset die in dit voorbeeld wordt gebruikt, is een subset van de Million Song Dataset, een verzameling functies en metagegevens voor hedendaagse muzieknummers. Deze gegevensset is beschikbaar in de voorbeeldgegevenssets die zijn opgenomen in uw Azure Databricks-werkruimte.

stap 1: een pijplijn maken

Maak eerst een ETL-pijplijn in Lakeflow-declaratieve pijplijnen. Declaratieve pijplijnen van Lakeflow maken pijplijnen door afhankelijkheden op te lossen die zijn gedefinieerd in bestanden ( broncode genoemd) met behulp van de syntaxis van Lakeflow Declarative Pipelines. Elk broncodebestand kan slechts één taal bevatten, maar u kunt meerdere taalspecifieke bestanden toevoegen in de pijplijn. Zie Lakeflow Declarative Pipelines voor meer informatie

In deze zelfstudie wordt gebruikgemaakt van serverloze compute en Unity Catalog. Gebruik de standaardinstellingen voor alle configuratieopties die niet zijn opgegeven. Als serverloze berekeningen niet zijn ingeschakeld of ondersteund in uw werkruimte, kunt u de zelfstudie voltooien zoals geschreven met behulp van de standaard-rekeninstellingen.

Voer de volgende stappen uit om een nieuwe ETL-pijplijn te maken in Lakeflow Declarative Pipelines:

  1. Klik in uw werkruimte op pluspictogram.Nieuw in de zijbalk en selecteer vervolgens ETL-pijplijn.
  2. Geef uw pijplijn een unieke naam.
  3. Selecteer net onder de naam de standaardcatalogus en het standaardschema voor de gegevens die u genereert. U kunt andere bestemmingen opgeven in uw transformaties, maar in deze zelfstudie worden deze standaardinstellingen gebruikt. U moet machtigingen hebben voor de catalogus en het schema dat u maakt. Raadpleeg Vereisten.
  4. Voor deze zelfstudie selecteert u Beginnen met een leeg bestand.
  5. Geef in mappad een locatie op voor uw bronbestanden of accepteer de standaardlocatie (uw gebruikersmap).
  6. Kies Python of SQL als taal voor uw eerste bronbestand (een pijplijn kan talen combineren en vergelijken, maar elk bestand moet zich in één taal bevinden).
  7. Klik op Selecteren.

De pijplijneditor wordt weergegeven voor de nieuwe pijplijn. Er wordt een leeg bronbestand voor uw taal gemaakt, klaar voor uw eerste transformatie.

Stap 2: Uw pijplijnlogica ontwikkelen

In deze stap gebruikt u de Lakeflow Pipelines Editor om interactief broncode voor Lakeflow-declaratieve pijplijnen te ontwikkelen en te valideren.

De code maakt gebruik van automatisch laadprogramma voor incrementele gegevensopname. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten. Voor meer informatie, zie Wat is Auto Loader?

Er wordt automatisch een leeg broncodebestand gemaakt en geconfigureerd voor de pijplijn. Het bestand wordt gemaakt in de map transformaties van uw pijplijn. Standaard maken alle bestanden van *.py en *.sql in de map transformaties deel uit van de bron voor uw pijplijn.

  1. Kopieer en plak de volgende code in het bronbestand. Zorg ervoor dat u de taal gebruikt die u hebt geselecteerd voor het bestand in stap 1.

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Deze broncode bevat code voor drie query's. U kunt deze query's ook in afzonderlijke bestanden plaatsen om de bestanden te ordenen en de gewenste code te coden.

  2. Klik op pictogram Afspelen.Voer een bestand of pijplijn uit om een update voor de verbonden pijplijn te starten. Met slechts één bronbestand in uw pijplijn zijn deze functioneel gelijkwaardig.

Wanneer de update is voltooid, wordt de editor bijgewerkt met informatie over uw pijplijn.

  • De pijplijngrafiek (DAG), in de zijbalk rechts van uw code, toont drie tabellen, songs_rawen songs_prepared.top_artists_by_year
  • Een samenvatting van de update wordt boven aan de browser voor pijplijnassets weergegeven.
  • Details van de gegenereerde tabellen worden weergegeven in het onderste deelvenster en u kunt door gegevens uit de tabellen bladeren door er een te selecteren.

Dit omvat de onbewerkte en opgeschoonde gegevens, evenals een eenvoudige analyse om de topkunstenaars per jaar te vinden. In de volgende stap maakt u ad-hocquery's voor verdere analyse in een afzonderlijk bestand in uw pijplijn.

Stap 3: De gegevenssets verkennen die zijn gemaakt door uw pijplijn

In deze stap voert u ad-hocqueries uit op de gegevens die zijn verwerkt in de ETL-pijplijn om de songdata in de Databricks SQL Editor te analyseren. Deze query's maken gebruik van de voorbereide records die in de vorige stap zijn gemaakt.

Voer eerst een query uit die de artiesten vindt die elk jaar de meeste nummers hebben uitgebracht sinds 1990.

  1. Klik in de zijbalk van de pijplijnassets-browser op pluspictogram.Voeg vervolgens Verkenning toe.

  2. Voer een naam in en selecteer SQL voor het verkenningsbestand. Er wordt een SQL-notebook gemaakt in een nieuwe explorations map. Bestanden in de explorations map worden niet standaard uitgevoerd als onderdeel van een pijplijnupdate. Het SQL-notebook bevat cellen die u samen of afzonderlijk kunt uitvoeren.

  3. Als u een tabel wilt maken met artiesten die de meeste nummers elk jaar na 1990 vrijgeven, voert u de volgende code in het nieuwe SQL-bestand in (als er voorbeeldcode in het bestand staat, vervangt u deze). Omdat dit notebook geen deel uitmaakt van de pijplijn, wordt de standaardcatalogus en het standaardschema niet gebruikt. Vervang de <catalog>.<schema> door de catalogus en het schema dat u hebt gebruikt als standaardinstellingen voor de pijplijn:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Klik op het pictogram Afspelen of druk om Shift + Enter deze query uit te voeren.

Voer nu een andere query uit die nummers vindt met een 4/4 maat en een dansbaar tempo.

  1. Voeg de volgende code toe aan de volgende cel in hetzelfde bestand. Vervang opnieuw de <catalog>.<schema> catalogus en het schema die u hebt gebruikt als standaardinstellingen voor de pijplijn:

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Klik op het pictogram Afspelen of druk om Shift + Enter deze query uit te voeren.

Stap 4: Een taak maken om de pijplijn uit te voeren

Maak vervolgens een werkstroom om stappen voor gegevensopname, verwerking en analyse te automatiseren met behulp van een Databricks-taak die volgens een schema wordt uitgevoerd.

  1. Kies boven aan de editor de knop Planning .
  2. Als het dialoogvenster Planningen wordt weergegeven, kiest u Planning toevoegen.
  3. Hiermee opent u het dialoogvenster Nieuwe planning , waarin u een taak kunt maken om uw pijplijn volgens een planning uit te voeren.
  4. Geef desgewenst de taak een naam.
  5. Standaard is de planning ingesteld op één keer per dag. U kunt deze fout accepteren of uw eigen planning instellen. Als u Geavanceerd kiest, kunt u een specifieke tijd instellen waarop de taak wordt uitgevoerd. Als u meer opties selecteert, kunt u meldingen maken wanneer de taak wordt uitgevoerd.
  6. Selecteer Maken om de wijzigingen toe te passen en de taak te maken.

De taak wordt nu dagelijks uitgevoerd om uw pijplijn up-to-date te houden. U kunt Planning opnieuw kiezen om de lijst met planningen weer te geven. U kunt planningen voor uw pijplijn beheren vanuit dat dialoogvenster, inclusief het toevoegen, bewerken of verwijderen van planningen.

Als u op de naam van de planning (of taak) klikt, gaat u naar de pagina van de taak in de lijst taken en pijplijnen . Hier kunt u details bekijken over taakuitvoeringen, inclusief de geschiedenis van uitvoeringen, of de taak direct uitvoeren met de knop Nu uitvoeren .

Zie Bewaking en waarneembaarheid voor Lakeflow-taken voor meer informatie over taakuitvoeringen.

Meer informatie