本教程介绍如何在 Azure Databricks 中使用 Apache Spark Python (PySpark) 数据帧 API、Apache Spark Scala 数据帧 API 和 SparkR SparkDataFrame API 加载和转换数据。
本教程结束时,你可了解数据帧是什么并熟悉以下任务:
Python语言
- 定义变量并将公共数据复制到 Unity Catalog 卷
- 使用 Python 创建数据帧
- 将数据从 CSV 文件加载到数据帧
- 查看数据帧并与之交互
- 保存数据帧
- 在 PySpark 中运行 SQL 查询
另请参阅 Apache Spark PySpark API 参考。
Scala(编程语言)
- 定义变量并将公共数据复制到 Unity Catalog 卷
- 使用 Scala 创建数据帧
- 将数据从 CSV 文件加载到数据帧
- 查看数据帧并与之交互
- 保存数据帧
- 在 Apache Spark 中运行 SQL 查询
另请参阅 Apache Spark Scala API 参考。
R
- 定义变量并将公共数据复制到 Unity Catalog 卷
- 创建 SparkR SparkDataFrame
- 将数据从 CSV 文件加载到数据帧
- 查看数据帧并与之交互
- 保存数据帧
- 在 SparkR 中运行 SQL 查询
另请参阅 Apache SparkR API 参考。
什么是数据帧?
数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。
Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。
要求
若要完成以下教程,必须满足以下要求:
若要使用本教程中的示例,必须已为工作区启用 Unity 目录。
本教程中的示例使用 Unity Catalog 卷来存储示例数据。 若要使用这些示例,请创建一个卷,并使用该卷的目录、架构和卷名称来设置示例使用的卷路径。
必须在 Unity Catalog 中具有以下权限:
-
READ VOLUME和WRITE VOLUME或ALL PRIVILEGES表示本教程使用的卷。 -
USE SCHEMA或ALL PRIVILEGES表示本教程使用的架构。 -
USE CATALOG或ALL PRIVILEGES表示本教程使用的目录。
若要设置这些权限,请联系 Databricks 管理员或参阅 Unity Catalog 特权和安全对象。
-
提示
有关本文的完整笔记本,请参阅数据帧教程笔记本。
步骤 1:定义变量并加载 CSV 文件
此步骤定义要在本教程中使用的变量,然后将包含婴儿姓名数据的 CSV 文件从 health.data.ny.gov 加载到 Unity Catalog 卷。
单击
图标打开新笔记本。 若要了解如何浏览 Azure Databricks 笔记本,请参阅 自定义笔记本外观。将以下代码复制并粘贴到新的空笔记本单元格中: 将
<catalog-name>、<schema-name>和<volume-name>替换为 Unity Catalog 卷中的目录、架构和卷名称。 请将<table_name>替换为你选择的表名称。 本教程稍后会将婴儿姓名数据加载到此表中。Python语言
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete pathScala(编程语言)
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete pathR
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path按
Shift+Enter以运行单元格并创建新的空白单元格。将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用
rows.csv命令将 文件从 health.data.ny.gov 复制到您的 Unity Catalog 存储卷。Python语言
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")Scala(编程语言)
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
步骤 2:创建数据帧
此步骤使用测试数据创建名为 df1 的数据帧,然后显示其内容。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用测试数据创建数据帧,然后显示数据帧的内容和架构。
Python语言
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.Scala(编程语言)
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.按
Shift+Enter以运行单元格,然后移动到下一个单元格。
步骤 3:将数据从 CSV 文件加载到数据帧
此步骤从之前加载到 Unity Catalog 卷的 CSV 文件中创建名为 df_csv 的数据帧。 请参阅 spark.read.csv。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码将婴儿姓名数据从 CSV 文件加载到数据帧
df_csv,然后显示该数据帧的内容。Python语言
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)Scala(编程语言)
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)按
Shift+Enter以运行单元格,然后移动到下一个单元格。
可以从许多受支持的文件格式加载数据。
步骤 4:查看数据帧并与之交互
使用以下方法查看婴儿姓名数据帧并与之交互。
打印数据帧架构
了解如何显示 Apache Spark 数据帧的架构。 Apache Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。
注意
Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用
.printSchema()方法显示数据帧的架构,以便查看两个数据帧的架构 - 准备合并这两个数据帧。Python语言
df_csv.printSchema() df1.printSchema()Scala(编程语言)
dfCsv.printSchema() df1.printSchema()R
printSchema(df_csv) printSchema(df1)按
Shift+Enter以运行单元格,然后移动到下一个单元格。
重命名数据帧中的列
了解如何重命名数据帧中的列。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于重命名
df1_csv数据帧中的列,以匹配df1数据帧中的相应列。 此代码使用 Apache SparkwithColumnRenamed()方法。Python语言
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchemaScala(编程语言)
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)按
Shift+Enter以运行单元格,然后移动到下一个单元格。
合并数据帧
了解如何创建一个新的数据帧,用于将某个数据帧的行添加到另一个数据帧。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
union()方法将第一个数据帧df的内容与数据帧df_csv合并,后者包含从 CSV 文件加载的婴儿姓名数据。Python语言
df = df1.union(df_csv) display(df)Scala(编程语言)
val df = df1.union(dfCsvRenamed) display(df)R
display(df <- union(df1, df_csv))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
筛选数据帧中的行
使用 Apache Spark .filter() 或 .where() 方法筛选行,发现数据集中最受欢迎的婴儿姓名。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。
使用 .filter() 方法
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
.filter()方法显示数据帧中计数超过 50 的行。Python语言
display(df.filter(df["Count"] > 50))Scala(编程语言)
display(df.filter(df("Count") > 50))R
display(filteredDF <- filter(df, df$Count > 50))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
使用 .where() 方法
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
.where()方法显示数据帧中计数超过 50 的行。Python语言
display(df.where(df["Count"] > 50))Scala(编程语言)
display(df.where(df("Count") > 50))R
display(filtered_df <- where(df, df$Count > 50))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
从数据帧中选择列并按频率排序
使用 select() 方法指定要从数据帧返回的列,了解婴儿名字的使用频率。 使用 Apache Spark orderby 和 desc 函数对结果进行排序。
Apache Spark 的 pyspark.sql 模块为 SQL 函数提供支持。 在这些函数中,本教程中使用的函数包括 Apache Spark orderBy()、desc() 和 expr() 函数。 可以根据需要将它们导入会话来使用这些函数。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入
desc()函数,然后使用 Apache Sparkselect()方法以及 Apache SparkorderBy()和desc()函数按降序显示最常用的姓名及其计数。Python语言
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))Scala(编程语言)
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
创建子集数据帧
了解如何从现有数据帧创建子集数据帧。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
filter方法创建新的数据帧,以按年份、计数和性别限制数据。 它使用 Apache Sparkselect()方法来限制列。 它还使用 Apache SparkorderBy()和desc()函数按计数对新的数据帧进行排序。Python语言
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)Scala(编程语言)
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)按
Shift+Enter以运行单元格,然后移动到下一个单元格。
步骤 5:保存数据帧
了解如何保存数据帧。 可以将数据帧保存到表,或者将数据帧写入一个或多个文件。
将数据帧保存到表
默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE 表权限。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用在本教程开始时定义的变量将数据帧的内容保存到表中。
Python语言
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")Scala(编程语言)
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")按
Shift+Enter以运行单元格,然后移动到下一个单元格。
大多数 Apache Spark 应用程序都以分布式方式处理大型数据集。 Apache Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。
将数据帧保存到 JSON 文件
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于将数据帧保存到 JSON 文件的目录中。
Python语言
df.write.format("json").mode("overwrite").save("/tmp/json_data")Scala(编程语言)
df.write.format("json").mode("overwrite").save("/tmp/json_data")R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")按
Shift+Enter以运行单元格,然后移动到下一个单元格。
从 JSON 文件读取数据帧
了解如何使用 Apache Spark spark.read.format() 方法将 JSON 数据从目录读取到数据帧中。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码显示在上一示例中保存的 JSON 文件。
Python语言
display(spark.read.format("json").json("/tmp/json_data"))Scala(编程语言)
display(spark.read.format("json").json("/tmp/json_data"))R
display(read.json("/tmp/json_data"))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
其他任务:在 PySpark、Scala 和 R 中运行 SQL 查询
Apache Spark 数据帧提供以下选项,用于将 SQL 与 PySpark、Scala 和 R 合并在一起。可以在为本教程创建的同一笔记本中运行以下代码。
将列指定为 SQL 查询
了解如何使用 Apache Spark selectExpr() 方法。 这是 select() 方法的变体,它用于接受 SQL 表达式并返回更新的数据帧。 此方法允许使用 SQL 表达式,例如 upper。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
selectExpr()方法和 SQLupper表达式将字符串列转换为大写(并重命名列)。Python语言
display(df.selectExpr("Count", "upper(County) as big_name"))Scala(编程语言)
display(df.selectExpr("Count", "upper(County) as big_name"))R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
使用 expr() 对列使用 SQL 语法
了解如何导入并使用 Apache Spark expr() 函数,以在指定列的任何位置使用 SQL 语法。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入
expr()函数,然后使用 Apache Sparkexpr()函数和 SQLlower表达式将字符串列转换为小写(并重命名列)。Python语言
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))Scala(编程语言)
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality按
Shift+Enter以运行单元格,然后移动到下一个单元格。
使用 spark.sql() 函数运行任意 SQL 查询
了解如何使用 Apache Spark spark.sql() 函数运行任意 SQL 查询。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
spark.sql()函数通过 SQL 语法来查询 SQL 表。Python语言
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))Scala(编程语言)
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))按
Shift+Enter以运行单元格,然后移动到下一个单元格。
数据帧教程笔记本
以下笔记本包含本教程中的示例查询。