Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
GÄLLER FÖR:  NoSQL
I den här självstudien använder du Azure Cosmos DB Spark-anslutningsappen för att läsa eller skriva data från ett Azure Cosmos DB för NoSQL-konto. I den här självstudien används Azure Databricks och en Jupyter Notebook för att illustrera hur du integrerar med API:et för NoSQL från Spark. Den här självstudien fokuserar på Python och Scala, även om du kan använda valfritt språk eller gränssnitt som stöds av Spark.
I den här guiden lär du dig:
- Anslut till ett API för NoSQL-konto med hjälp av Spark och en Jupyter Notebook.
- Skapa databas- och containerresurser.
- Mata in data till containern.
- Fråga efter data i containern.
- Utför vanliga åtgärder på objekt i containern.
Prerequisites
- Ett befintligt Azure Cosmos DB för NoSQL-konto.
- Om du har en befintlig Azure-prenumeration skapar du ett nytt konto.
 
- En befintlig Azure Databricks-arbetsyta.
Anslut med Hjälp av Spark och Jupyter
Använd din befintliga Azure Databricks-arbetsyta för att skapa ett beräkningskluster som är redo att använda Apache Spark 3.4.x för att ansluta till ditt Azure Cosmos DB för NoSQL-konto.
- Öppna din Azure Databricks-arbetsyta. 
- Skapa ett nytt kluster i arbetsytans gränssnitt. Konfigurera klustret med de här inställningarna, minst: - Version - Value - Körningsversion - 13.3 LTS (Scala 2.12, Spark 3.4.1) 
- Använd arbetsytans gränssnitt för att söka efter Maven-paket från Maven Central med ett grupp-ID för - com.azure.cosmos.spark. Installera paketet specifikt för Spark 3.4 med ett artefakt-ID som är prefix för- azure-cosmos-spark_3-4klustret.
- Skapa slutligen en ny notebook-fil. - Tip - Som standard är notebook-filen kopplad till det nyligen skapade klustret. 
- I notebook-filen anger du konfigurationsinställningar för onlinetransaktionsbearbetning (OLTP) för NoSQL-kontoslutpunkten, databasnamnet och containernamnet. - # Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }- # Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Skapa en databas och en container
Använd katalog-API:et för att hantera kontoresurser som databaser och containrar. Sedan kan du använda OLTP för att hantera data i containerresurserna.
- Konfigurera katalog-API:et för att hantera API för NoSQL-resurser med hjälp av Spark. - # Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])- // Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
- Skapa en ny databas med namnet - cosmicworksmed hjälp- CREATE DATABASE IF NOT EXISTSav .- # Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")- // Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
- Skapa en ny container med namnet - productsmed hjälp- CREATE TABLE IF NOT EXISTSav . Säkerställ att du anger partitionsnyckelsökvägen till- /categoryoch aktiverar automatisk skalning av genomströmningen med ett maximalt genomflöde på- 1000begärandenheter (RU:er) per sekund.- # Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))- // Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
- Skapa en annan container med namnet - employeesmed hjälp av en hierarkisk partitionsnyckelkonfiguration. Använd- /organization,- /departmentoch- /teamsom uppsättning partitionsnyckelsökvägar. Följ den specifika ordningen. Ange också genomströmningen till ett manuellt antal- 400ru:er.- # Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))- // Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
- Kör notebook-cellerna för att verifiera att databasen och containrarna har skapats i ditt API för NoSQL-kontot. 
Importera data
Skapa en exempeldatauppsättning. Använd sedan OLTP för att mata in dessa data till API:et för NoSQL-containern.
- Skapa en exempeldatauppsättning. - # Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )- // Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
- Använd - spark.createDataFrameoch den tidigare sparade OLTP-konfigurationen för att lägga till exempeldata i målcontainern.- # Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()- // Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Fråga efter data
Läs in OLTP-data i en dataram för att utföra vanliga frågor om data. Du kan använda olika syntaxer för att filtrera eller fråga efter data.
- Använd - spark.readför att läsa in OLTP-data i ett dataramobjekt. Använd samma konfiguration som du använde tidigare i den här självstudien.- spark.cosmos.read.inferSchema.enabledAnge också så att- trueSpark-anslutningsappen kan härleda schemat genom att sampling av befintliga objekt.- # Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()- // Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
- Rendera schemat för data som läses in i en dataframe med hjälp av - printSchema.- # Render schema df.printSchema()- // Render schema df.printSchema()
- Rendera datarader där - quantitykolumnen är mindre än- 20. Använd funktionerna- whereoch- showför att utföra den här frågan.- # Render filtered data df.where("quantity < 20") \ .show()- // Render filtered data df.where("quantity < 20") .show()
- Rendera den första dataraden - clearancedär kolumnen är- true.- filterAnvänd funktionen för att utföra den här frågan.- # Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)- // Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
- Rendera fem rader med data utan filter eller trunkering. - showAnvänd funktionen för att anpassa utseendet och antalet rader som återges.- # Render five rows of unfiltered and untruncated data df.show(5, False)- // Render five rows of unfiltered and untruncated data df.show(5, false)
- Fråga dina data med hjälp av den här råa NoSQL-frågesträngen: - SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800- # Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()- // Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Utföra vanliga åtgärder
När du arbetar med API för NoSQL-data i Spark kan du utföra partiella uppdateringar eller arbeta med data som rå JSON.
- Så här utför du en partiell uppdatering av ett objekt: - Kopiera den befintliga - configkonfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt konfigurerar du skrivstrategin till- ItemPatch. Inaktivera sedan massstöd. Ställ in kolumner och kopplade operationer. Slutligen anger du standardåtgärdstypen till- Set.- # Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"- // Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
- Skapa variabler för objektpartitionsnyckeln och den unika identifierare som du tänker rikta in dig på som en del av den här korrigeringsåtgärden. - # Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"- // Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
- Skapa en uppsättning korrigeringsobjekt för att ange målobjektet och ange fält som ska ändras. - # Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]- // Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
- Skapa en dataram med hjälp av uppsättningen med korrigeringsobjekt. Använd - writeför att utföra korrigeringsåtgärden.- # Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()- // Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
- Kör en fråga för att granska resultatet av korrigeringsåtgärden. Objektet bör nu namnges - Yamba New Surfboardutan några andra ändringar.- # Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)- // Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
 
- Så här arbetar du med råa JSON-data: - Kopiera den befintliga - configkonfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt ändrar du målcontainern till- employees. Konfigurera- contactssedan kolumnen/fältet för att använda råa JSON-data.- # Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"- // Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
- Skapa en uppsättning anställda som ska matas in i containern. - # Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )- // Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
- Skapa en dataram och använd - writeför att mata in de anställdas data.- # Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()- // Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
- Rendera data från dataramen med hjälp av - show. Observera att- contactskolumnen är rå JSON i utdata.- # Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()- // Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
 
Relaterat innehåll
- Apache Spark
- Katalog-API för Azure Cosmos DB
- Referens för konfigurationsparameter
- Azure Cosmos DB Spark-anslutningsexempel
- Migrera från Spark 2.4 till Spark 3.*
- Inaktuella versioner: - Azure Cosmos DB Spark Connector för Spark 3.1 och 3.2 är inaktuell, eftersom det inte längre finns några Spark 3.1- eller 3.2-körningar som stöds i Azure Databricks, Azure Synapse eller Azure HDInsight.
- Migreringsguide för uppdatering från Spark 3.1
- Migreringsguide för uppdatering från Spark 3.2
 
- Versionskompatibilitet:
- Utgåvan:
- Ladda ned länkar: