使用 Spark 处理数据文件
设置笔记本并将其附加到群集后,可以使用 Spark 读取和处理数据文件。 Spark 支持各种格式(如 CSV、JSON、Parquet、ORC、Avro 和 Delta),Databricks 提供内置连接器来访问存储在工作区、Azure Data Lake 或 Blob 存储或其他外部系统中的文件。
工作流通常遵循三个步骤:
使用具有正确格式和路径的 spark.read 将文件读入 Spark 数据帧。 读取原始文本格式(如 CSV 或 JSON)时,Spark 可以推断架构(列名和数据类型),但有时速度缓慢或不可靠。 在生产环境中,更好的做法是显式定义架构,以便数据一致高效地加载。
使用 SQL 或数据帧操作来探索和转换数据帧(例如,筛选行、选择列、聚合值)。
以所选格式将结果写回到存储。
在 Spark 中处理文件旨在跨小型和大型数据集保持一致性。 用于测试小型 CSV 文件的相同代码也适用于更大的数据集,因为 Spark 会在群集中分发工作。 这样,可以轻松地从快速浏览扩展到更复杂的数据处理。
将数据加载到数据帧中
我们来看看一个假设示例,了解如何使用数据帧来处理数据。 假设你在 Databricks 文件系统 (DBFS) 存储的“数据”文件夹中名为 products.csv 的以逗号分隔的文本文件中有以下数据:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
在 Spark 笔记本中,可以使用以下 PySpark 代码将数据加载到数据帧中并显示前 10 行:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
开头的 %pyspark 行称为 magic,它告诉 Spark 此单元格中使用的语言是 PySpark。 下面是产品数据示例的等效 Scala 代码:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
magic %spark 用于指定 Scala。
提示
你还可以为笔记本界面中的每个单元格选择要使用的语言。
上述两个示例都会生成如下输出:
| ProductID | ProductName | 类别 | ListPrice |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | 山地自行车 | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | 山地自行车 | 3399.9900 |
| ... | ... | ... | ... |
指定数据帧架构
在前面的示例中,CSV 文件的第一行包含列名,Spark 能够根据每一列所包含的数据推断出其数据类型。 还可以指定数据的显式架构,这在数据文件中不包含列名时很有用,例如此 CSV 示例:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
以下 PySpark 示例演示了如何指定要从名为 product-data.csv 的文件加载的数据帧的架构,格式如下:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
结果将再次类似于以下内容:
| ProductID | ProductName | 类别 | ListPrice |
|---|---|---|---|
| 771 | Mountain-100 Silver, 38 | 山地自行车 | 3399.9900 |
| 772 | Mountain-100 Silver, 42 | 山地自行车 | 3399.9900 |
| 773 | Mountain-100 Silver, 44 | 山地自行车 | 3399.9900 |
| ... | ... | ... | ... |
对数据帧进行筛选和分组
可以使用 Dataframe 类的方法来对所包含的数据进行筛选、排序、分组和执行其他操作。 例如,下面的代码示例使用select该方法从包含上一示例中产品数据的 df 数据帧检索 ProductName 和 ListPrice 列:
pricelist_df = df.select("ProductID", "ListPrice")
此示例代码的结果可能如下所示:
| ProductID | ListPrice |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
与大多数数据作方法一起使用时, select 返回一个新的数据帧对象。
提示
从数据帧中选择部分列是一种常见的操作,也可以通过使用以下较短的语法来实现:
pricelist_df = df["ProductID", "ListPrice"]
可以将多个方法“链接”在一起来执行一系列操作,从而生成转换后的数据帧。 例如,此示例代码通过链接select和where方法,创建一个包含ProductName和ListPrice列的新数据帧,其中仅包括类别为山地自行车或公路自行车的产品。
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
此示例代码的结果可能如下所示:
| ProductName | ListPrice |
|---|---|
| Mountain-100 Silver, 38 | 3399.9900 |
| Road-750 黑色,52 | 539.9900 |
| ... | ... |
若要对数据进行分组和聚合,可以使用 groupby 方法和聚合函数。 例如,以下 PySpark 代码计算每个类别的产品数量:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
此示例代码的结果可能如下所示:
| 类别 | 计数 |
|---|---|
| 耳机 | 3 |
| Wheels | 14 |
| 山地自行车 | 32 |
| ... | ... |
注意
Spark 数据帧是 声明性的,是不可变的。 每个转换(例如 select, filter或 groupBy)都会创建一个新的数据帧,表示所需的内容,而不是运行方式。 这使得代码可重用、可优化且无副作用。 但是,这些转换不会真正执行,直到您触发一个操作(例如,display、collect、write),此时 Spark 将运行全面优化的计划。
在 Spark 中使用 SQL 表达式
Dataframe API 是名为 Spark SQL 的 Spark 库的一部分,它使数据分析师能够使用 SQL 表达式来查询和操作数据。
在 Spark 目录中创建数据库对象
Spark 目录是关系数据对象(例如视图和表)的元存储。 Spark 运行时可以使用目录将用任何 Spark 支持的语言编写的代码与 SQL 表达式无缝集成,对于一些数据分析师或开发人员来说,SQL 表达式可能更合理。
使数据帧中的数据可用于在 Spark 目录中查询的最简单方法之一是创建一个临时视图,如以下代码示例所示:
df.createOrReplaceTempView("products")
视图是临时的,这意味着它会在当前会话结束时被自动删除。 还可以创建持久保存在目录中的表,以定义可以使用 Spark SQL 查询的数据库。
注意
在本模块中,我们不会深入探讨 Spark 目录表,但值得花时间强调几个关键点:
- 可以使用
spark.catalog.createTable方法创建空表。 表是元数据结构,该结构会将其基础数据存储在与目录关联的存储位置。 删除表也会删除其基础数据。 - 可以使用数据帧的
saveAsTable方法将其保存为表。 - 可以使用 方法创建外部表
spark.catalog.createExternalTable。 外部表定义目录中的元数据,但从外部存储位置获取其基础数据;通常是数据湖中的文件夹。 删除外部表不会删除基础数据。
使用 Spark SQL API 查询数据
可以使用采用任何语言编写的代码中的 Spark SQL API 来查询目录中的数据。 例如,以下 PySpark 代码使用 SQL 查询将 products 视图中的数据作为数据帧返回。
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
代码示例的结果类似于下表:
| ProductName | ListPrice |
|---|---|
| Mountain-100 Silver, 38 | 3399.9900 |
| Road-750 黑色,52 | 539.9900 |
| ... | ... |
使用 SQL 代码
前面的示例演示了如何使用 Spark SQL API 在 Spark 代码中嵌入 SQL 表达式。 在笔记本中,还可以使用 %sql magic 来运行查询目录中的对象的 SQL 代码,如下所示:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
SQL 代码示例返回了一个结果集,它在笔记本中自动显示为表,如下所示:
| 类别 | ProductCount |
|---|---|
| 背带短裤 | 3 |
| 自行车车架 | 1 |
| 自行车存放架 | 1 |
| ... | ... |