Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In dit artikel leert u hoe u kunt communiceren met Azure Cosmos DB met behulp van Synapse Apache Spark 3. Klanten kunnen Scala, Python, SparkSQL en C# gebruiken voor analyses, data engineering, data science en gegevensverkenningsscenario's in Azure Synapse Link voor Azure Cosmos DB.
De volgende mogelijkheden worden ondersteund bij interactie met Azure Cosmos DB:
- Met Synapse Apache Spark 3 kunt u gegevens analyseren in uw Azure Cosmos DB-containers die zijn ingeschakeld met Azure Synapse Link in bijna realtime, zonder dat dit van invloed is op de prestaties van uw transactionele workloads. De volgende twee opties zijn beschikbaar om een query uit te voeren op de analytische opslag van Azure Cosmos DB vanuit Spark:
- Laden in Spark DataFrame
- Spark-tabel maken
- Met Synapse Apache Spark kunt u ook gegevens opnemen in Azure Cosmos DB. Het is belangrijk te weten dat gegevens altijd worden opgenomen in Azure Cosmos DB-containers via de transactionele opslag. Wanneer Azure Synapse Link is ingeschakeld, worden alle nieuwe invoegingen, updates en verwijderingen automatisch gesynchroniseerd met de analytische opslag.
- Synapse Apache Spark biedt ook ondersteuning voor gestructureerd streamen van Spark met Azure Cosmos DB als bron en een sink.
In de volgende secties doorloopt u de syntaxis. U kunt ook de Learn-module bekijken over het uitvoeren van query's op Azure Cosmos DB met Apache Spark voor Azure Synapse Analytics. Bewegingen in Azure Synapse Analytics-werkruimte zijn ontworpen om een eenvoudige out-of-the-box-ervaring te bieden om aan de slag te gaan. Gebaren worden weergegeven wanneer u met de rechtermuisknop op een Azure Cosmos DB-container klikt op het tabblad Gegevens van de Synapse-werkruimte. Met bewegingen kunt u snel code genereren en deze aanpassen aan uw behoeften. Bewegingen zijn ook ideaal om met één klik gegevens te ontdekken.
Important
U moet rekening houden met enkele beperkingen in het analytische schema die kunnen leiden tot onverwacht gedrag bij het laden van gegevens. Als voorbeeld zijn slechts de eerste 1000 eigenschappen van het transactionele schema beschikbaar in het analytische schema, zijn eigenschappen met spaties niet beschikbaar, enzovoort. Als u onverwachte resultaten ondervindt, controleert u de beperkingen van het schema voor analytische opslag voor meer informatie.
** Opvragen van de analytische opslag van Azure Cosmos DB
Klanten kunnen analytische opslaggegevens laden in Spark DataFrames of Spark-tabellen maken.
Het verschil in ervaring is om te bepalen of onderliggende gegevenswijzigingen in de Azure Cosmos DB-container automatisch moeten worden doorgevoerd in de analyse die in Spark wordt uitgevoerd. Wanneer Spark DataFrames zijn geregistreerd of er een Spark-tabel wordt gemaakt, haalt Spark metagegevens van analytische opslag op voor efficiënte pushdown. Het is belangrijk te weten dat Spark een luie evaluatiebeleid volgt. U moet actie ondernemen om de laatste momentopname van de gegevens op te halen in Spark DataFrames- of SparkSQL-query's.
In het geval van het laden naar Spark DataFrame, worden de opgehaalde metagegevens in de cache opgeslagen tijdens de levensduur van de Spark-sessie. Daarom worden de volgende acties die worden uitgevoerd op de DataFrame geëvalueerd op basis van de momentopname van de analytische opslag op het moment dat DataFrame wordt gemaakt.
In het geval van het maken van een Spark-tabel worden de metagegevens van de status van de analytische opslag echter niet in de cache opgeslagen in Spark en worden ze opnieuw geladen bij elke Uitvoering van SparkSQL-query's in de Spark-tabel.
Tot slot kunt u kiezen tussen het laden van een momentopname naar Spark DataFrame of het uitvoeren van query's op een Spark-tabel voor de meest recente momentopname.
Note
Als u query's wilt uitvoeren op Azure Cosmos DB voor MongoDB-accounts, vindt u meer informatie over de schemaweergave van volledige betrouwbaarheid in de analytische opslag en de uitgebreide eigenschapsnamen die moeten worden gebruikt.
Note
Alles options is hoofdlettergevoelig.
Authentication
Klanten van Spark 3.x kunnen nu verifiëren bij de analytische opslag van Azure Cosmos DB met behulp van toegangstokens voor vertrouwde identiteiten of databaseaccountsleutels. Tokens zijn veiliger omdat ze kort leven en zijn toegewezen aan de vereiste machtiging met cosmos DB RBAC.
De connector ondersteunt nu twee verificatietypen MasterKey en AccessToken voor de spark.cosmos.auth.type eigenschap.
Hoofdsleutelverificatie
Gebruik de sleutel om een dataframe te lezen met spark:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.accountKey" -> "<key>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Verificatie van toegangstokens
De nieuwe sleutelloze verificatie introduceert ondersteuning voor toegangstokens:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.auth.type" -> "AccessToken",
"spark.cosmos.auth.accessToken" -> "<accessToken>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Note
De Azure Synapse Link Spark-connector van Azure Cosmos DB biedt geen ondersteuning voor beheerde identiteit.
Verificatie van toegangstokens vereist roltoewijzing
Als u de toegangstokenbenadering wilt gebruiken, moet u toegangstokens genereren. Omdat toegangstokens zijn gekoppeld aan Azure-identiteiten, moet op rollen gebaseerd toegangsbeheer (RBAC) worden toegewezen aan de Azure-identiteit. De roltoewijzing bevindt zich op gegevensvlakniveau en u moet minimale machtigingen voor het besturingsvlak hebben om de roltoewijzing uit te voeren.
De IAM-roltoewijzingen (Identity Access Management) van Azure Portal bevinden zich op het niveau van het besturingsvlak en hebben geen invloed op de roltoewijzingen op het gegevensvlak. Roltoewijzingen van gegevensvlak zijn alleen beschikbaar via Azure CLI. De readAnalytics actie is vereist voor het lezen van gegevens uit de analytische opslag in Cosmos DB en maakt geen deel uit van vooraf gedefinieerde rollen. Daarom moeten we een aangepaste roldefinitie maken. Voeg naast de readAnalytics actie ook de acties toe die nodig zijn voor Gegevenslezer. Maak een JSON-bestand met de volgende inhoud en geef het role_definition.json
{
"RoleName": "CosmosAnalyticsRole",
"Type": "CustomRole",
"AssignableScopes": ["/"],
"Permissions": [{
"DataActions": [
"Microsoft.DocumentDB/databaseAccounts/readAnalytics",
"Microsoft.DocumentDB/databaseAccounts/readMetadata",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
]
}]
}
Verificatie van toegangstoken vereist Azure CLI
- Meld u aan bij Azure CLI:
az login - Stel het standaardabonnement in dat uw Cosmos DB-account heeft:
az account set --subscription <name or id> - Maak de roldefinitie in het gewenste Cosmos DB-account:
az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json - Kopieer deze over de geretourneerde rol
definition id:/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid> - Haal de hoofd-ID op van de identiteit waaraan u de rol wilt toewijzen. De identiteit kan een Azure-app-registratie, een virtuele machine of een andere ondersteunde Azure-resource zijn. Wijs de rol toe aan de principal met behulp van:
az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"
Note
Wanneer u een Azure-app-registratie gebruikt, maakt u gebruik van de Object Id als de serviceprincipal-id. Bovendien moeten de principal-id en het Cosmos DB-account zich in dezelfde tenant bevinden.
Het toegangstoken genereren - Synapse Notebooks
De aanbevolen methode voor Synapse Notebooks is het gebruik van een service-principal met een certificaat voor het genereren van toegangstokens. Klik hier voor meer informatie.
The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
"https://<cosmos-account-name>.documents.azure.com/.default",
"https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)
U kunt nu het toegangstoken gebruiken dat in deze stap is gegenereerd om gegevens uit de analytische opslag te lezen wanneer het verificatietype is ingesteld op het toegangstoken.
Note
Wanneer u een Azure-app-registratie gebruikt, gebruikt u de toepassing (client-id).
Note
Op dit moment biedt Synapse geen ondersteuning voor het genereren van toegangstokens met behulp van het azure-identity-pakket in notebooks. Bovendien bevatten Synapse VHDs geen azure-identity package en de bijbehorende afhankelijkheden. Klik hier voor meer informatie.
Laden in Spark DataFrame
In dit voorbeeld maakt u een Spark DataFrame dat verwijst naar de analytische opslag van Azure Cosmos DB. Vervolgens kunt u meer analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op de transactionele opslag.
De syntaxis van Python is als volgt:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Spark-tabel maken
In dit voorbeeld maakt u een Spark-tabel die verwijst naar de analytische opslag van Azure Cosmos DB. Vervolgens kunt u meer analyses uitvoeren door SparkSQL-query's aan te roepen voor de tabel. Deze bewerking heeft geen invloed op transactionele opslag of gegevensverplaatsing. Als u besluit deze Spark-tabel te verwijderen, wordt de onderliggende Azure Cosmos DB-container en de bijbehorende analytische opslag niet beïnvloed.
Dit scenario is handig voor hergebruik van Spark-tabellen via hulpprogramma's van derden en om de gegevens voor de uitvoering toegankelijk te maken.
De syntaxis voor het maken van een Spark-tabel is als volgt:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Note
Als u scenario's hebt waarin het schema van de onderliggende Azure Cosmos DB-container na verloop van tijd verandert; en als u wilt dat het bijgewerkte schema automatisch wordt weergegeven in de query's voor de Spark-tabel, kunt u dit doen door de optie spark.cosmos.autoSchemaMerge in te true stellen in de Spark-tabelopties.
Spark DataFrame naar Azure Cosmos DB-container schrijven
In dit voorbeeld schrijft u een Spark DataFrame naar een Azure Cosmos DB-container. Deze bewerking is van invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht in de Azure Cosmos DB-container of de gedeelde database.
De syntaxis van Python is als volgt:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.mode('append')\
.save()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
mode(SaveMode.Append).
save()
Streaming-dataframe laden vanuit een container
In deze beweging gebruikt u spark-streamingfunctie om gegevens uit een container in een dataframe te laden. De gegevens worden opgeslagen in het primaire Data Lake-account (en bestandssysteem) dat u hebt verbonden met de werkruimte.
Note
Als u wilt verwijzen naar externe bibliotheken in Synapse Apache Spark, vindt u hier meer informatie. Als u bijvoorbeeld een Spark DataFrame wilt opnemen in een container van Azure Cosmos DB voor MongoDB, kunt u hier de MongoDB-connector voor Spark gebruiken.
Streaming-DataFrame laden vanuit een Azure Cosmos DB-container
In dit voorbeeld gebruikt u de gestructureerde streaming van Spark om gegevens uit een Azure Cosmos DB-container te laden in een Spark-streaming DataFrame, met behulp van de functie voor wijzigingenfeeds in Azure Cosmos DB. De controlepuntgegevens die worden gebruikt door Spark, worden opgeslagen in het primaire datalake-account (en bestandssysteem) dat u hebt verbonden met de werkruimte.
De syntaxis van Python is als volgt:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp.changeFeed")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.startFrom", "Beginning")\
.option("spark.cosmos.changeFeed.mode", "Incremental")\
.load()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp.changeFeed").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.startFrom", "Beginning").
option("spark.cosmos.changeFeed.mode", "Incremental").
load()
Streaming-DataFrame schrijven naar een Azure Cosmos DB-container
In dit voorbeeld schrijft u een streaming DataFrame naar een Azure Cosmos DB-container. Deze bewerking is van invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht in de Azure Cosmos DB-container of gedeelde database. Als de map /localWriteCheckpointFolder niet is gemaakt (in het onderstaande voorbeeld), wordt deze automatisch gemaakt.
De syntaxis van Python is als volgt:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
streamQuery = dfStream\
.writeStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("checkpointLocation", "/tmp/myRunId/")\
.outputMode("append")\
.start()
streamQuery.awaitTermination()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val query = dfStream.
writeStream.
format("cosmos.oltp").
outputMode("append").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("checkpointLocation", "/tmp/myRunId/").
start()
query.awaitTermination()
Volgende stappen
- Voorbeelden om aan de slag te gaan met Azure Synapse Link op GitHub
- Wat wordt ondersteund in Azure Synapse Link voor Azure Cosmos DB
- Verbinding maken met Azure Synapse Link voor Azure Cosmos DB
- Bekijk de Learn-module over het uitvoeren van query's op Azure Cosmos DB met Apache Spark voor Azure Synapse Analytics.