教程:使用 Lakeflow 声明性管道生成 ETL 管道

本教程介绍如何使用 Lakeflow 声明性管道和自动加载程序为数据业务流程创建和部署 ETL(提取、转换和加载)管道。 ETL 管道实现从源系统读取数据、根据要求转换数据(如数据质量检查和记录重复数据)以及将数据写入目标系统(如数据仓库或数据湖)的步骤。

在本教程中,你将使用 Lakeflow 声明性管道和自动加载程序来:

  • 将原始源数据引入目标表。
  • 转换原始源数据,并将转换后的数据写入两个目标具体化视图。
  • 查询转换后的数据。
  • 使用 Databricks 作业自动执行 ETL 管道。

有关 Lakeflow 声明性管道和自动加载程序的详细信息,请参阅 Lakeflow 声明性管道 以及 什么是自动加载程序?

要求

若要完成此教程,必须满足以下要求:

关于数据集

此示例中使用的数据集是 Million Song Dataset 的子集,该数据集是当代音乐曲目的特征和元数据集合。 此数据集在 Azure Databricks 工作区中包含的示例数据集中可用。

步骤 1:创建管道

首先,在 Lakeflow 声明性管道中创建 ETL 管道。 Lakeflow 声明性管道通过使用 Lakeflow 声明性管道语法解析文件中定义的依赖项(称为 源代码)来创建管道。 每个源代码文件只能包含一种语言,但你可以在管道中添加多种语言特定的文件。 若要了解详细信息,请参阅 Lakeflow 声明式管道

本教程使用无服务器计算和 Unity 目录。 对于未指定的所有配置选项,请使用默认设置。 如果工作区中未启用或支持无服务器计算,则可以使用默认计算设置完成本教程。

若要在 Lakeflow 声明性管道中创建新的 ETL 管道,请执行以下步骤:

  1. 在工作区中,单击“加号”图标。在边栏中新建,然后选择“ETL 管道”。
  2. 为管道提供唯一的名称。
  3. 在名称下方,为生成的数据选择默认目录和架构。 可以在转换中指定其他目标,但本教程使用这些默认值。 必须有权访问所创建的目录和架构。 请参阅 要求
  4. 对于本教程,请选择 “从空文件开始”。
  5. “文件夹路径”中,指定源文件的位置,或接受默认值(用户文件夹)。
  6. 选择 PythonSQL 作为第一个源文件的语言(管道可以混合和匹配语言,但每个文件都必须使用一种语言)。
  7. 单击“选择”。

此时会显示新管道的管道编辑器。 创建语言的空源文件,可供第一次转换使用。

步骤 2:开发管道逻辑

在此步骤中,将使用 Lakeflow 管道编辑器 以交互方式开发和验证 Lakeflow 声明性管道的源代码。

该代码使用自动加载程序进行增量数据引入。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。 若要了解详细信息,请参阅 什么是自动加载程序?

将为管道自动创建和配置空白源代码文件。 该文件是在管道的转换文件夹中创建的。 默认情况下,转换文件夹中的所有 *.py 和 *.sql 文件都是管道的源的一部分。

  1. 将以下代码复制并粘贴到源文件中。 请务必使用在步骤 1 中为文件选择的语言。

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    此源代码包含三个查询的代码。 还可以将这些查询放在单独的文件中,以按偏好的方式组织文件和编码。

  2. 单击“ 播放”图标。运行文件运行管道 以启动连接的管道的更新。 在管道中只有一个源文件时,这些在功能上是等效的。

更新完成后,编辑器会更新为包含有关你管道的信息。

  • 代码右侧边栏中的管道图(DAG)显示三个表、songs_rawsongs_preparedtop_artists_by_year
  • 更新摘要显示在管道资产浏览器顶部。
  • 生成的表的详细信息显示在底部窗格中,可以通过选择其中一个来浏览表中的数据。

这包括原始和清理的数据,以及一些简单的分析,以逐年查找顶级艺术家。 在下一步中,你将创建临时查询,以便在管道中的一个单独文件中进行进一步分析。

步骤 3:浏览管道创建的数据集

在此步骤中,对 ETL 管道中处理的数据执行即席查询,以分析 Databricks SQL 编辑器中的歌曲数据。 这些查询使用在上一步中创建的准备好的记录。

首先,运行一个查询,查找自1990年以来每年发布最多的歌曲的艺术家。

  1. 在管道资产浏览器边栏中,单击“加号”图标,然后添加探索

  2. 输入 名称 并选择用于浏览文件的 SQL 。 一个 SQL 笔记本被创建在新的文件夹 explorations 中。 explorations默认情况下,文件夹中的文件不会作为管道更新的一部分运行。 SQL 笔记本包含可以一起或单独运行的单元格。

  3. 若要创建在 1990 年后每年发布最多歌曲的艺术家表,请在新的 SQL 文件中输入以下代码(如果文件中有示例代码,请替换它)。 由于此笔记本不是管道的一部分,因此它不使用默认目录和架构。 将 <catalog>.<schema> 替换为您用作管道默认值的目录和架构:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. 单击 “播放”图标。 或按 Shift + Enter 以运行此查询。

现在,运行另一个查询,找到具备4/4节拍和适合跳舞的节奏的歌曲。

  1. 将以下代码添加到同一文件中的下一个单元格。 同样,请将它 <catalog>.<schema> 替换为用作管道默认值的目录和架构:

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. 单击 “播放”图标。 或按 Shift + Enter 以运行此查询。

步骤 4:创建作业以运行管道

接下来,创建一个工作流,以使用按计划运行的 Databricks 作业自动执行数据引入、处理和分析步骤。

  1. 在编辑器顶部,选择“ 计划 ”按钮。
  2. 如果出现“ 计划 ”对话框,请选择“ 添加计划”。
  3. 这将打开 “新建计划 ”对话框,可在其中创建作业以按计划运行管道。
  4. (可选)为作业命名。
  5. 默认情况下,计划设置为每天运行一次。 可以接受此默认设置,也可以设置自己的时间表。 选择 “高级 ”可让你选择设置作业将运行的特定时间。 选择 “更多”选项 可在作业运行时创建通知。
  6. 选择 “创建 ”以应用更改并创建作业。

现在,该作业将每日运行,以确保管道始终保持最新状态。 可以再次选择 “计划” 以查看计划列表。 可以从该对话框管理管道的计划,包括添加、编辑或删除计划。

单击计划(或作业)的名称会将你转到 “作业和管道 ”列表中的作业页面。 在此处可以查看有关作业运行的详细信息,包括运行历史记录,或使用“ 立即运行” 按钮立即运行作业。

有关作业运行的详细信息,请参阅 Lakeflow 作业的监视和可观测性

了解更多