Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
I den här handledningen beskrivs hur du skapar och distribuerar en ETL-pipeline (extrahera, transformera och läsa in) med Lakeflow Deklarativa Pipelines och Auto Loader för dataorkestrering. En ETL-pipeline implementerar stegen för att läsa data från källsystem, transformera dessa data baserat på krav, till exempel datakvalitetskontroller och registrera deduplicering, och skriva data till ett målsystem, till exempel ett informationslager eller en datasjö.
I den här självstudien ska du använda Lakeflow deklarativa pipelines och Auto Loader till att:
- Mata in rådata i en måltabell.
- Transformera rådata och skriv transformerade data till två materialiserade målvyer.
- Kör en sökfråga på de transformerade data.
- Automatisera ETL-pipelinen med ett Databricks-jobb.
Mer information om Lakeflow Declarative Pipelines och Auto Loader finns i Lakeflow Declarative Pipelines och What is Auto Loader?
Krav
För att slutföra den här självstudien måste du uppfylla följande krav:
- Loggas in på en Azure Databricks-arbetsyta.
- Låt Unity Catalog vara aktiverat för din arbetsyta.
- Ha serverlös beräkning aktiverad för ditt konto. Serverlösa deklarativa pipelines för Lakeflow är inte tillgängliga i alla arbetsyteregioner. Se Funktioner med begränsad regional tillgänglighet för tillgängliga regioner.
- Ha behörighet att skapa en beräkningsresurs eller åtkomst till en beräkningsresurs.
- Ha behörighet att skapa ett nytt schema i en katalog. De behörigheter som krävs är
ALL PRIVILEGESellerUSE CATALOGochCREATE SCHEMA. - Ha behörighet att skapa en ny volym i ett befintligt schema. De behörigheter som krävs är
ALL PRIVILEGESellerUSE SCHEMAochCREATE VOLUME.
Om datauppsättningen
Datauppsättningen som används i det här exemplet är en delmängd av Datauppsättningen Million Song, en samling funktioner och metadata för samtida musikspår. Den här datamängden är tillgänglig i exempeldatauppsättningarna som ingår i din Azure Databricks-arbetsyta.
steg 1: Skapa en pipeline
Skapa en ETL-pipeline först i Lakeflow Deklarativa Pipelines. Lakeflow Declarative Pipelines skapar pipelines genom att matcha beroenden som definierats i filer (kallas källkod) med Lakeflow Declarative Pipelines syntax. Varje källkodsfil kan bara innehålla ett språk, men du kan lägga till flera språkspecifika filer i pipelinen. Mer information finns i Deklarativa pipelines för Lakeflow
I den här guiden används serverlös datorkraft och Unity Catalog. Använd standardinställningarna för alla konfigurationsalternativ som inte har angetts. Om serverlös beräkning inte är aktiverad eller stöds på din arbetsyta kan du slutföra självstudien enligt standardinställningarna för beräkning.
För att skapa en ny ETL-pipeline i Lakeflow Deklarativa Pipelines, följ dessa steg:
- På din arbetsyta klickar du på
Ny i sidofältet och välj sedan ETL-pipeline.
- Ge din pipeline ett unikt namn.
- Precis under namnet väljer du standardkatalogen och schemat för de data som du genererar. Du kan ange andra mål i dina omvandlingar, men i den här handledningen används dessa standardvärden. Du måste ha behörighet till katalogen och schemat som du skapar. Se kraven.
- I den här självstudien väljer du Starta med en tom fil.
- I Mappsökväg anger du en plats för källfilerna eller godkänner standardinställningen (användarmappen).
- Välj Python eller SQL som språk för din första källfil (en pipeline kan blanda och matcha språk, men varje fil måste vara på ett enda språk).
- Klicka på Välj.
Pipelineredigeraren visas för den nya pipelinen. En tom källfil för ditt språk skapas, redo för din första transformering.
Steg 2: Utveckla pipeline-logik
I det här steget använder du Lakeflow Pipelines-redigeraren för att utveckla och validera källkoden för Deklarativa pipelines för Lakeflow interaktivt.
Koden använder Auto Loader för inkrementell datainmatning. "Auto Loader identifierar och bearbetar automatiskt nya filer när de anländer i molnlagringsobjekt." Mer information finns i Vad är automatisk inläsning?
En tom källkodsfil skapas och konfigureras automatiskt för pipelinen. Filen skapas i transformeringsmappen för din pipeline. Som standard är alla *.py- och *.sql-filer i transformeringsmappen en del av källan för din pipeline.
Kopiera och klistra in följande kod i källfilen. Se till att använda det språk som du valde för filen i steg 1.
python
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;Den här källan innehåller kod för tre frågor. Du kan också placera frågorna i separata filer för att ordna filerna och koda som du vill.
Klicka på
Kör filen eller kör pipelinen för att starta en uppdatering för den anslutna pipelinen. Med bara en källfil i pipelinen är dessa funktionellt likvärdiga.
När uppdateringen är klar uppdateras redigeraren med information om din pipeline.
- Pipelinediagrammet (DAG) i sidofältet till höger om koden visar tre tabeller,
songs_raw,songs_preparedochtop_artists_by_year. - En sammanfattning av uppdateringen visas längst upp i resurshanteraren för pipeline.
- Information om de tabeller som genererades visas i den nedre rutan och du kan bläddra bland data från tabellerna genom att välja en.
Detta inkluderar rådata och rensade data, samt en enkel analys för att hitta de främsta konstnärerna efter år. I nästa steg skapar du ad hoc-frågor för ytterligare analys i en separat fil i pipelinen.
Steg 3: Utforska de datauppsättningar som skapats av din pipeline
I det här steget utför du ad hoc-frågor på de data som bearbetas i ETL-pipelinen för att analysera låtdata i Databricks SQL-redigeraren. Dessa frågor använder de förberedda poster som skapades i föregående steg.
Kör först en fråga som hittar de artister som har släppt flest låtar varje år sedan 1990.
I sidofältet för pipelinetillgångar i webbläsaren klickar du på
Lägg till sedan Utforskning.
Ange ett namn och välj SQL för utforskningsfilen. En SQL-notebook-fil skapas i en ny
explorationsmapp. Filer iexplorationsmappen körs inte som en del av en pipelineuppdatering som standard. SQL-notebook-filen har celler som du kan köra tillsammans eller separat.Om du vill skapa en tabell med artister som släpper flest låtar varje år efter 1990 anger du följande kod i den nya SQL-filen (om det finns exempelkod i filen ersätter du den). Eftersom den här notebook-filen inte ingår i pipelinen använder den inte standardkatalogen och schemat.
<catalog>.<schema>Ersätt med den katalog och det schema som du använde som standard för pipelinen:-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;Klicka på
Eller tryck på
Shift + Enterför att köra den här frågan.
Kör nu en annan fråga som hittar låtar med ett 4/4 beat och dansbart tempo.
Lägg till följande kod i nästa cell i samma fil.
<catalog>.<schema>Ersätt återigen med den katalog och det schema som du använde som standard för pipelinen:-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;Klicka på
Eller tryck på
Shift + Enterför att köra den här frågan.
Steg 4: Skapa ett jobb för att köra pipelinen
Skapa sedan ett arbetsflöde för att automatisera datainmatnings-, bearbetnings- och analyssteg med hjälp av ett Databricks-jobb som körs enligt ett schema.
- Välj knappen Schema överst i redigeraren.
- Om dialogrutan Scheman visas väljer du Lägg till schema.
- Då öppnas dialogrutan Nytt schema , där du kan skapa ett jobb för att köra pipelinen enligt ett schema.
- Du kan också ge jobbet ett namn.
- Som standard är schemat inställt på att köras en gång per dag. Du kan acceptera den här defauten eller ange ett eget schema. Genom att välja Avancerat kan du ange en viss tid som jobbet ska köras. Om du väljer Fler alternativ kan du skapa meddelanden när jobbet körs.
- Välj Skapa för att tillämpa ändringarna och skapa jobbet.
Nu körs jobbet dagligen för att hålla pipelinen uppdaterad. Du kan välja Schema igen för att visa listan över scheman. Du kan hantera scheman för din pipeline från den dialogrutan, inklusive att lägga till, redigera eller ta bort scheman.
Om du klickar på namnet på schemat (eller jobbet) kommer du till jobbets sida i listan Jobb och pipelines . Därifrån kan du visa information om jobbkörningar, inklusive historiken för körningar, eller köra jobbet direkt med knappen Kör nu .
För mer information om jobbkörningar, se Övervakning och observerbarhet för Lakeflow-jobb.
Läs mer
- För att lära dig mer om pipelines för databearbetning med Lakeflow Deklarativa Pipelines, se Lakeflow Deklarativa Pipelines.
- Mer information om Databricks Notebooks finns i Databricks Notebooks.
- Mer information om Lakeflow-jobb finns i Vad är jobb?
- Mer information om Delta Lake finns i Vad är Delta Lake i Azure Databricks?