使用 Spark 处理数据文件

已完成

设置笔记本并将其附加到群集后,可以使用 Spark 读取和处理数据文件。 Spark 支持各种格式(如 CSV、JSON、Parquet、ORC、Avro 和 Delta),Databricks 提供内置连接器来访问存储在工作区、Azure Data Lake 或 Blob 存储或其他外部系统中的文件。

工作流通常遵循三个步骤:

  1. 使用具有正确格式和路径的 spark.read 将文件读入 Spark 数据帧。 读取原始文本格式(如 CSV 或 JSON)时,Spark 可以推断架构(列名和数据类型),但有时速度缓慢或不可靠。 在生产环境中,更好的做法是显式定义架构,以便数据一致高效地加载。

  2. 使用 SQL 或数据帧操作来探索和转换数据帧(例如,筛选行、选择列、聚合值)。

  3. 以所选格式将结果回到存储。

在 Spark 中处理文件旨在跨小型和大型数据集保持一致性。 用于测试小型 CSV 文件的相同代码也适用于更大的数据集,因为 Spark 会在群集中分发工作。 这样,可以轻松地从快速浏览扩展到更复杂的数据处理。

将数据加载到数据帧中

我们来看看一个假设示例,了解如何使用数据帧来处理数据。 假设你在 Databricks 文件系统 (DBFS) 存储的“数据”文件夹中名为 products.csv 的以逗号分隔的文本文件中有以下数据

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

在 Spark 笔记本中,可以使用以下 PySpark 代码将数据加载到数据帧中并显示前 10 行:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

开头的 %pyspark 行称为 magic,它告诉 Spark 此单元格中使用的语言是 PySpark。 下面是产品数据示例的等效 Scala 代码:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

magic %spark 用于指定 Scala。

提示

你还可以为笔记本界面中的每个单元格选择要使用的语言。

上述两个示例都会生成如下输出:

ProductID ProductName 类别 ListPrice
771 Mountain-100 Silver, 38 山地自行车 3399.9900
772 Mountain-100 Silver, 42 山地自行车 3399.9900
773 Mountain-100 Silver, 44 山地自行车 3399.9900
... ... ... ...

指定数据帧架构

在前面的示例中,CSV 文件的第一行包含列名,Spark 能够根据每一列所包含的数据推断出其数据类型。 还可以指定数据的显式架构,这在数据文件中不包含列名时很有用,例如此 CSV 示例:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

以下 PySpark 示例演示了如何指定要从名为 product-data.csv 的文件加载的数据帧的架构,格式如下

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

结果将再次类似于以下内容:

ProductID ProductName 类别 ListPrice
771 Mountain-100 Silver, 38 山地自行车 3399.9900
772 Mountain-100 Silver, 42 山地自行车 3399.9900
773 Mountain-100 Silver, 44 山地自行车 3399.9900
... ... ... ...

对数据帧进行筛选和分组

可以使用 Dataframe 类的方法来对所包含的数据进行筛选、排序、分组和执行其他操作。 例如,下面的代码示例使用select该方法从包含上一示例中产品数据的 df 数据帧检索 ProductNameListPrice 列:

pricelist_df = df.select("ProductID", "ListPrice")

此示例代码的结果可能如下所示:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

与大多数数据作方法一起使用时, select 返回一个新的数据帧对象。

提示

从数据帧中选择部分列是一种常见的操作,也可以通过使用以下较短的语法来实现:

pricelist_df = df["ProductID", "ListPrice"]

可以将多个方法“链接”在一起来执行一系列操作,从而生成转换后的数据帧。 例如,此示例代码通过链接selectwhere方法,创建一个包含ProductNameListPrice列的新数据帧,其中仅包括类别为山地自行车公路自行车的产品。

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

此示例代码的结果可能如下所示:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 黑色,52 539.9900
... ...

若要对数据进行分组和聚合,可以使用 groupby 方法和聚合函数。 例如,以下 PySpark 代码计算每个类别的产品数量:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

此示例代码的结果可能如下所示:

类别 计数
耳机 3
Wheels 14
山地自行车 32
... ...

注意

Spark 数据帧是 声明性的,是不可变的。 每个转换(例如 selectfiltergroupBy)都会创建一个新的数据帧,表示所需的内容,而不是运行方式。 这使得代码可重用、可优化且无副作用。 但是,这些转换不会真正执行,直到您触发一个操作(例如,displaycollectwrite),此时 Spark 将运行全面优化的计划。

在 Spark 中使用 SQL 表达式

Dataframe API 是名为 Spark SQL 的 Spark 库的一部分,它使数据分析师能够使用 SQL 表达式来查询和操作数据。

在 Spark 目录中创建数据库对象

Spark 目录是关系数据对象(例如视图和表)的元存储。 Spark 运行时可以使用目录将用任何 Spark 支持的语言编写的代码与 SQL 表达式无缝集成,对于一些数据分析师或开发人员来说,SQL 表达式可能更合理。

使数据帧中的数据可用于在 Spark 目录中查询的最简单方法之一是创建一个临时视图,如以下代码示例所示:

df.createOrReplaceTempView("products")

视图是临时的,这意味着它会在当前会话结束时被自动删除。 还可以创建持久保存在目录中的表,以定义可以使用 Spark SQL 查询的数据库

注意

在本模块中,我们不会深入探讨 Spark 目录表,但值得花时间强调几个关键点:

  • 可以使用 spark.catalog.createTable 方法创建空表。 表是元数据结构,该结构会将其基础数据存储在与目录关联的存储位置。 删除表也会删除其基础数据。
  • 可以使用数据帧的 saveAsTable 方法将其保存为表。
  • 可以使用 方法创建外部表spark.catalog.createExternalTable。 外部表定义目录中的元数据,但从外部存储位置获取其基础数据;通常是数据湖中的文件夹。 删除外部表不会删除基础数据。

使用 Spark SQL API 查询数据

可以使用采用任何语言编写的代码中的 Spark SQL API 来查询目录中的数据。 例如,以下 PySpark 代码使用 SQL 查询将 products 视图中的数据作为数据帧返回

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

代码示例的结果类似于下表:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 黑色,52 539.9900
... ...

使用 SQL 代码

前面的示例演示了如何使用 Spark SQL API 在 Spark 代码中嵌入 SQL 表达式。 在笔记本中,还可以使用 %sql magic 来运行查询目录中的对象的 SQL 代码,如下所示:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL 代码示例返回了一个结果集,它在笔记本中自动显示为表,如下所示:

类别 ProductCount
背带短裤 3
自行车车架 1
自行车存放架 1
... ...