Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
VAN TOEPASSING OP:  NoSQL
In deze zelfstudie gebruikt u de Azure Cosmos DB Spark-connector voor het lezen of schrijven van gegevens uit een Azure Cosmos DB for NoSQL-account. In deze zelfstudie wordt gebruikgemaakt van Azure Databricks en een Jupyter-notebook om te laten zien hoe u integreert met de API voor NoSQL vanuit Spark. Deze zelfstudie is gericht op Python en Scala, hoewel u elke taal of interface kunt gebruiken die wordt ondersteund door Spark.
In deze zelfstudie leert u het volgende:
- Maak verbinding met een API voor NoSQL-account met behulp van Spark en een Jupyter-notebook.
 - Database- en containerbronnen maken.
 - Gegevens opnemen in de container.
 - Query's uitvoeren op gegevens in de container.
 - Voer algemene bewerkingen uit op items in de container.
 
Prerequisites
- Een bestaand Azure Cosmos DB for NoSQL-account.
- Als u een bestaand Azure-abonnement hebt, maakt u een nieuw account.
 
 - Een bestaande Azure Databricks-werkruimte.
 
Verbinding maken met behulp van Spark en Jupyter
Gebruik uw bestaande Azure Databricks-werkruimte om een rekencluster te maken dat gereed is voor het gebruik van Apache Spark 3.4.x om verbinding te maken met uw Azure Cosmos DB for NoSQL-account.
Open uw Azure Databricks-werkruimte.
Maak een nieuw cluster in de werkruimte-interface. Configureer het cluster met deze instellingen minimaal:
Version Value Runtime-versie 13.3 LTS (Scala 2.12, Spark 3.4.1) Gebruik de werkruimte-interface om te zoeken naar Maven-pakketten van Maven Central met een groeps-id van
com.azure.cosmos.spark. Installeer het pakket specifiek voor Spark 3.4 met een artefact-id die is voorafgegaan doorazure-cosmos-spark_3-4het cluster.Maak ten slotte een nieuw notitieblok.
Tip
Standaard wordt het notebook gekoppeld aan het onlangs gemaakte cluster.
Stel in het notebook configuratie-instellingen voor online transaction processing (OLTP) in voor het NoSQL-accounteindpunt, de databasenaam en de containernaam.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Een database en een container maken
Gebruik de Catalogus-API om accountbronnen, zoals databases en containers, te beheren. Vervolgens kunt u OLTP gebruiken om gegevens binnen de containerbronnen te beheren.
Configureer de Catalogus-API voor het beheren van API voor NoSQL-resources met behulp van Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Maak een nieuwe database met de naam
cosmicworksmet behulp vanCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Maak een nieuwe container met de naam
productsmet behulp vanCREATE TABLE IF NOT EXISTS. Zorg ervoor dat u het pad/categorynaar de partitiesleutel instelt en doorvoer voor automatische schaalaanpassing inschakelt met een maximale doorvoer van1000aanvraageenheden (RU's) per seconde.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Maak een andere container met de naam
employeesmet behulp van een hiƫrarchische partitiesleutelconfiguratie. Gebruik/organization,/departmenten/teamals de verzameling partitiesleutelpaden. Volg die specifieke volgorde. Stel de doorvoer ook in op een handmatig aantal400RU's.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Voer de notebookcellen uit om te controleren of uw database en containers zijn gemaakt in uw API voor NoSQL-account.
Gegevens opnemen
Maak een voorbeeldgegevensset. Gebruik vervolgens OLTP om die gegevens op te nemen in de API voor NoSQL-container.
Maak een voorbeeldgegevensset.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Gebruik
spark.createDataFrameen de eerder opgeslagen OLTP-configuratie om voorbeeldgegevens toe te voegen aan de doelcontainer.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Gegevens opvragen
LAAD OLTP-gegevens in een gegevensframe om algemene query's uit te voeren op de gegevens. U kunt verschillende syntaxis gebruiken om gegevens te filteren of op te vragen.
Gebruik
spark.readdit om de OLTP-gegevens in een gegevensframeobject te laden. Gebruik dezelfde configuratie die u eerder in deze zelfstudie hebt gebruikt. Stel ookspark.cosmos.read.inferSchema.enabledin optrueom de Spark-connector het schema te laten afleiden door bestaande items te bemonsteren.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Geef het schema weer van de gegevens die in het gegevensframe zijn geladen met behulp van
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Gegevensrijen weergeven waarin de
quantitykolom kleiner is dan20. Gebruik dewhereenshowfuncties om deze query uit te voeren.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Geef de eerste gegevensrij weer waarin de
clearancekolom zich bevindttrue. Gebruik defilterfunctie om deze query uit te voeren.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Geef vijf rijen met gegevens weer zonder filter of afkapping. Gebruik de
showfunctie om het uiterlijk en het aantal rijen aan te passen dat wordt weergegeven.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Voer een query uit op uw gegevens met behulp van deze onbewerkte NoSQL-queryreeks:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Algemene bewerkingen uitvoeren
Wanneer u werkt met API voor NoSQL-gegevens in Spark, kunt u gedeeltelijke updates uitvoeren of met gegevens werken als onbewerkte JSON.
Een gedeeltelijke update van een item uitvoeren:
Kopieer de bestaande configuratievariabele
configen wijzig de eigenschappen in de nieuwe kopie. Configureer met name de schrijfstrategie naarItemPatch. Schakel vervolgens bulkondersteuning uit. Stel de kolommen en gekoppelde bewerkingen in. Stel ten slotte het standaardbewerkingstypeSetin op .# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Maak variabelen voor de partitiesleutel van het item en de unieke id die u wilt targeten als onderdeel van deze patchbewerking.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Maak een set patchobjecten om het doelitem op te geven en geef velden op die moeten worden gewijzigd.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Maak een gegevensframe met behulp van de set patchobjecten. Gebruik
writeom de patchbewerking uit te voeren.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Voer een query uit om de resultaten van de patchbewerking te bekijken. Het item moet nu worden benoemd
Yamba New Surfboardzonder andere wijzigingen.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Werken met onbewerkte JSON-gegevens:
Kopieer de bestaande configuratievariabele
configen wijzig de eigenschappen in de nieuwe kopie. Wijzig met name de doelcontainer inemployees. Configureer vervolgens de kolom/hetcontactsveld voor het gebruik van onbewerkte JSON-gegevens.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Maak een set werknemers die u wilt opnemen in de container.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Maak een gegevensframe en gebruik
writedeze om de werknemersgegevens op te nemen.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Geef de gegevens uit het gegevensframe weer met behulp van
show. U ziet dat decontactskolom onbewerkte JSON in de uitvoer is.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Verwante inhoud
- Apache Spark
 - Catalogus-API voor Azure Cosmos DB
 - Naslaginformatie over configuratieparameters
 - Voorbeelden van Azure Cosmos DB Spark-connector
 - Migreren van Spark 2.4 naar Spark 3.*
 - Afgeschafte versies: 
- De Azure Cosmos DB Spark-connector voor Spark 3.1 en 3.2 is afgeschaft, omdat er geen ondersteunde Spark 3.1- of 3.2-runtimes beschikbaar zijn in Azure Databricks, Azure Synapse of Azure HDInsight.
 - Migratiehandleiding voor het bijwerken van Spark 3.1
 - Migratiehandleiding voor het bijwerken van Spark 3.2
 
 - Versiecompatibiliteit:
 - Opmerkingen bij de release:
 - Downloadkoppelingen: