你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用范围: NoSQL
本教程使用 Azure Cosmos DB Spark 连接器从 Azure Cosmos DB for NoSQL 帐户读取或写入数据。 本教程使用 Azure Databricks 和 Jupyter 笔记本来说明如何从 Spark 与 API for NoSQL 集成。 本教程重点介绍 Python 和 Scala,不过你可以使用 Spark 支持的任何语言或界面。
本教程介绍如何执行下列操作:
- 使用 Spark 和 Jupyter Notebook 连接到 API for NoSQL 帐户。
- 创建数据库和容器资源。
- 将数据引入容器。
- 查询容器中的数据。
- 对容器中的项执行常见操作。
Prerequisites
- 一个现有的适用于 NoSQL 的 Azure Cosmos DB 帐户。
- 如果你有现有的 Azure 订阅,请创建一个新帐户。
 
- 现有的 Azure Databricks 工作区。
使用 Spark 和 Jupyter 进行连接
使用现有的 Azure Databricks 工作区创建一个可以使用 Apache Spark 3.4.x 连接到 Azure Cosmos DB for NoSQL 帐户的计算群集。
- 打开 Azure Databricks 工作区。 
- 在工作区界面中,创建一个新群集。 至少使用以下设置配置群集: - Version - Value - 运行时版本 - 13.3 LTS(Scala 2.12、Spark 3.4.1) 
- 使用工作区界面从 Maven Central 搜索组 ID 为 的 Maven 包 - com.azure.cosmos.spark。 将特定于 Spark 3.4 且项目 ID 前缀为 的包安装到群集- azure-cosmos-spark_3-4。
- 最后,创建一个新的笔记本。 - Tip - 默认情况下,笔记本会附加到最近创建的群集。 
- 在笔记本中,设置 NoSQL 帐户终结点、数据库名称和容器名称的联机事务处理 (OLTP) 配置设置。 - # Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }- # Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
创建数据库和容器
使用目录 API 管理帐户资源,例如数据库和容器。 然后,可以使用 OLTP 管理容器资源中的数据。
- 使用 Spark 配置 Catalog API 来管理 NoSQL 资源的 API。 - # Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])- // Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
- 使用 - cosmicworks创建名为- CREATE DATABASE IF NOT EXISTS的新数据库。- # Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")- // Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
- 使用 - products创建名为- CREATE TABLE IF NOT EXISTS的新容器。 确保将分区键路径设置为- /category并启用自动缩放吞吐量,最大吞吐量为每秒- 1000个请求单位 (RU)。- # Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))- // Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
- 使用分层分区键配置创建另一个名为 - employees的容器。 使用- /organization、- /department和- /team作为分区键路径集。 按照该特定顺序操作。 此外,将吞吐量设置为手动控制的数量- 400RU/s。- # Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))- // Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
- 运行笔记本单元格,以验证数据库和容器是否是在 API for NoSQL 帐户中创建的。 
引入数据
创建示例数据集。 然后使用 OLTP 将数据引入 API for NoSQL 容器。
- 创建示例数据集。 - # Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )- // Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
- 使用 - spark.createDataFrame和以前保存的 OLTP 配置将示例数据添加到目标容器。- # Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()- // Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
查询数据
将 OLTP 数据加载到数据帧中,以对数据执行常见查询。 可以使用各种语法筛选或查询数据。
- 使用 - spark.read将 OLTP 数据加载到数据帧对象中。 使用本教程前面使用的相同配置。 此外,将- spark.cosmos.read.inferSchema.enabled设置为- true,以允许 Spark 连接器通过采样现有项来推断架构。- # Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()- // Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
- 使用 - printSchema呈现在数据帧中加载的数据的架构。- # Render schema df.printSchema()- // Render schema df.printSchema()
- 呈现 - quantity列小于- 20的数据行。 使用- where和- show函数执行此查询。- # Render filtered data df.where("quantity < 20") \ .show()- // Render filtered data df.where("quantity < 20") .show()
- 显示第一个数据行,其中 - clearance列是- true。 使用- filter函数执行此查询。- # Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)- // Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
- 显示五行数据,不经过筛选或截断。 使用 - show函数自定义所呈现行的外观和行数。- # Render five rows of unfiltered and untruncated data df.show(5, False)- // Render five rows of unfiltered and untruncated data df.show(5, false)
- 使用以下原始 NoSQL 查询字符串查询数据: - SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800- # Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()- // Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
执行常见操作
在 Spark 中使用 API for NoSQL 数据时,可以执行部分更新或将数据作为原始 JSON 处理。
- 若要执行项的部分更新,请执行以下步骤: - 复制现有 - config配置变量并修改新副本中的属性。 具体而言,将写入策略配置为- ItemPatch。 然后禁用批量支持。 设置列和映射操作。 最后,将默认操作类型设置为- Set。- # Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"- // Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
- 为要作为此修补操作目标的项分区键和唯一标识符创建变量。 - # Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"- // Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
- 创建一组修补程序对象以指定目标项并指定应修改的字段。 - # Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]- // Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
- 使用修补程序对象集创建数据帧。 使用 - write执行修补操作。- # Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()- // Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
- 运行查询以查看修补操作的结果。 现在,该项应命名为 - Yamba New Surfboard,无需进行其他更改。- # Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)- // Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
 
- 若要使用原始 JSON 数据,请执行以下操作: - 复制现有 - config配置变量并修改新副本中的属性。 具体而言,将目标容器更改为- employees。 然后将- contacts列/字段配置为使用原始 JSON 数据。- # Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"- // Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
- 创建一组员工以引入容器。 - # Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )- // Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
- 创建数据帧并使用 - write引入员工数据。- # Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()- // Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
- 使用 - show呈现数据帧中的数据。 请注意,- contacts列是输出中的原始 JSON。- # Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()- // Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
 
相关内容
- Apache Spark
- Azure Cosmos DB 目录 API
- 配置参数引用
- Azure Cosmos DB Spark 连接器示例
- 从 Spark 2.4 迁移到 Spark 3.*
- 弃用的版本: - 适用于 Spark 3.1 和 3.2 的 Azure Cosmos DB Spark 连接器已弃用,因为 Azure Databricks、Azure Synapse 或 Azure HDInsight 中不再提供受支持的 Spark 3.1 或 3.2 运行时。
- 从 Spark 3.1 更新的迁移指南
- Spark 3.2 更新迁移指南
 
- 版本兼容性:
- 发行说明:
- 下载链接: