Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
I den här självstudien konfigurerar du en omvänd ETL-pipeline för att flytta berikade data från Delta-tabeller i Azure Databricks till Azure Cosmos DB för NoSQL. Sedan använder du Spark-anslutningsappen för onlinetransaktionsbearbetning (OLTP) för Azure Cosmos DB for NoSQL för att synkronisera data.
Krav för konfiguration av omvänd ETL-pipeline
- Ett befintligt Azure Cosmos DB-konto.
- Om du har en Azure-prenumeration skapar du ett nytt konto.
- En befintlig Azure Databricks-arbetsyta.
- Om du har en Azure-prenumeration skapar du en ny arbetsyta.
- Senaste versionen av Azure CLI.
- Om du vill kan du också använda Azure Cloud Shell.
Konfigurera rollbaserad åtkomstkontroll med Microsoft Entra
Azure-hanterade identiteter säkerställer säker, lösenordsfri autentisering till Azure Cosmos DB för NoSQL utan att manuellt hantera autentiseringsuppgifter. I det här nödvändiga steget konfigurerar du den användartilldelade hanterade identiteten som Azure Databricks automatiskt skapar med läsåtkomst till metadata och skrivåtkomst till data för ditt Azure Cosmos DB för NoSQL-konto. Det här steget konfigurerar rollbaserade åtkomstkontrollroller för både kontroll och dataplan för den hanterade identiteten.
Logga in på Azure Portal (https://portal.azure.com).
Gå till den befintliga Azure Databricks-resursen.
I fönstret Essentials letar du upp och navigerar till den hanterade resursgruppen som är associerad med arbetsytan.
I den hanterade resursgruppen väljer du den användartilldelade hanterade identiteten som skapades automatiskt med arbetsytan.
Registrera värdet för fälten Klient-ID och Objekt -ID (huvudnamn) i fönstret Essentials . Du använder det här värdet senare för att tilldela kontroll- och dataplansroller.
Tips/Råd
Du kan också hämta huvud-ID:t för den hanterade identiteten med hjälp av Azure CLI. Om du antar att den hanterade identitetens namn är
dbmanagedidentityanvänder duaz resource showkommandot för att hämta huvud-ID:t.az resource show \ --resource-group "<name-of-managed-resource-group>" \ --name "dbmanagedidentity" \ --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \ --query "{clientId: properties.clientId, principalId: properties.principalId}"Gå till det aktuella Azure Cosmos DB för NoSQL-kontot.
På kontots sida väljer du Åtkomstkontroll (IAM).
I fönstret Åtkomstkontroll väljer du lägg till och sedan alternativen Lägg till rolltilldelning för att påbörja processen med att tilldela en kontrollplansroll till den användartilldelade hanterade identiteten.
Välj rollen Cosmos DB-kontoläsare i listan över roller för tilldelning.
I avsnittet för att tilldela åtkomst till en användare, grupp eller tjänstens huvudnamn interagerar du med alternativet Välj medlemmar .
I dialogrutan Medlemmar anger du huvud-ID:t som ska filtreras efter den användartilldelade hanterade identiteten som är associerad med Azure Databricks. Välj identiteten.
Välj slutligen Granska + Tilldela för att skapa rolltilldelningen för styrplanet.
Använd
az cosmosdb sql role assignment create-kommandot för att tilldela dataplansrollenCosmos DB Built-in Data Contributoroch omfånget/till den användartilldelade hanterade identiteten som är associerad med Azure Databricks.az cosmosdb sql role assignment create \ --resource-group "<name-of-resource-group>" \ --account-name "<name-of-cosmos-nosql-account>" \ --principal-id "<managed-identity-principal-id>" \ --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"Använd
az account showför att hämta prenumerations- och klientidentifierare. Dessa värden krävs i ett senare steg med Spark-anslutningsappen med hjälp av Microsoft Entra-autentisering.az account show --query '{subscriptionId: id, tenantId: tenantId}'
Skapa en Databricks-notebook-fil
Gå till den befintliga Azure Databricks-resursen och öppna sedan arbetsytans användargränssnitt.
Om du inte redan har ett kluster skapar du ett nytt kluster.
Viktigt!
Se till att klustret har Runtime version 15.4 av högre som har långsiktigt stöd för Spark 3.5.0 och Scala 2.12. De återstående stegen i den här guiden förutsätter dessa versioner av verktygen.
Gå till Bibliotek>Installera ny> och Maven för att installera ett Maven-paket.
Sök efter Spark-anslutningsappen för Azure Cosmos DB för NoSQL med hjälp av grupp-ID-filtret
com.azure.cosmos.sparkoch välj paketet med artefakt-ID:tazure-cosmos-spark_3-5_2-12.Skapa en ny notebook-fil genom att gå till Arbetsyta>[Mapp]>Ny>anteckningsbok.
Koppla anteckningsboken till klustret.
Konfigurera Spark-kontakt i Azure Databricks
Konfigurera Spark-anslutningsappen så att den ansluter till ditt kontos container med Hjälp av Microsoft Entra-autentisering. Konfigurera dessutom anslutningsappen så att den endast använder ett begränsat tröskelvärde för dataflöde för Spark-åtgärder. Om du vill konfigurera Spark-anslutningsappen definierar du en konfigurationsordlista med autentiseringsuppgifter för att ansluta till ditt konto. Dessa autentiseringsuppgifter omfattar:
| Värde | |
|---|---|
spark.cosmos.accountEndpoint |
NoSQL-kontots slutpunkt |
spark.cosmos.database |
Namnet på måldatabasen |
spark.cosmos.container |
Namnet på målcontainern |
spark.cosmos.auth.type |
ManagedIdentity |
spark.cosmos.auth.aad.clientId |
Klient-ID för den användartilldelade hanterade identiteten |
spark.cosmos.account.subscriptionId |
ID för prenumerationen |
spark.cosmos.account.tenantId |
ID för den kopplade Microsoft Entra-klienten |
spark.cosmos.account.resourceGroupName |
Namnet på resursgruppen |
spark.cosmos.throughputControl.enabled |
true |
spark.cosmos.throughputControl.name |
TargetContainerThroughputControl |
spark.cosmos.throughputControl.targetThroughputThreshold |
0.30 |
spark.cosmos.throughputControl.globalControl.useDedicatedContainer |
'false |
cosmos_config = {
# General settings
"spark.cosmos.accountEndpoint": "<endpoint>",
"spark.cosmos.database": "products",
"spark.cosmos.container": "recommendations",
# Entra authentication settings
"spark.cosmos.auth.type": "ManagedIdentity",
"spark.cosmos.account.subscriptionId": "<subscriptionId>",
"spark.cosmos.account.tenantId": "<tenantId>",
"spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
# Throughput control settings
"spark.cosmos.throughputControl.enabled": "true",
"spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
// General settings
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.database" -> "products",
"spark.cosmos.container" -> "recommendations",
// Entra authentication settings
"spark.cosmos.auth.type" -> "ManagedIdentity",
"spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
"spark.cosmos.account.tenantId" -> "<tenantId>",
"spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
// Throughput control settings
"spark.cosmos.throughputControl.enabled" -> "true",
"spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)
Anmärkning
I det här exemplet namnges products måldatabasen och målcontainern heter recommendations.
Dataflödeskonfigurationen, som anges i det här steget, säkerställer att endast 30% av de enheter för programbegäran (RU: er) som allokerats till målcontainern är tillgängliga för Spark-åtgärder.
Mata in exempeldata för produktrekommendationer till en Delta-tabell
Skapa en dataramexempel med produktrekommendationer för användare och skriv den till en Delta-tabell med namnet recommendations_delta. Det här steget simulerar kuraterade, transformerade data i din datasjö som du tänker synkronisera med Azure Cosmos DB för NoSQL. Om du skriver till Delta-formatet kan du senare aktivera ändringsdatainsamling (CDC) för inkrementell synkronisering.
from pyspark.sql import SparkSession
# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])
# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore")
// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
Batch-inläsning av initiala data till Azure Cosmos DB för NoSQL
recommendations_delta Läs sedan Delta-tabellen i en Spark DataFrame och utför en första batchskrivning till Azure Cosmos DB för NoSQL med formatet cosmos.oltp . Använd tilläggsläget för att lägga till data utan att skriva över befintligt innehåll i måldatabasen och containern. Det här steget säkerställer att alla historiska data är tillgängliga i kontot innan CDC börjar.
# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")
# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")
// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()
Aktivera direktuppspelningssynkronisering med ändringsdataflöde
Aktivera Funktionen För ändringsdataflöde i Delta Lake (CDF) i recommendations_delta tabellen genom att ändra tabellens egenskaper. MED CDF kan Delta Lake spåra alla framtida infogningar, uppdateringar och borttagningar på radnivå. Det är viktigt att aktivera den här egenskapen för att utföra inkrementella synkroniseringar till Azure Cosmos DB för NoSQL, eftersom den exponerar ändringar utan att behöva jämföra ögonblicksbilder.
Efter den historiska datainläsningen kan ändringar i Delta-tabellen samlas in med hjälp av Delta Change Data Feed (CDF). Du kan implementera antingen batchbaserad eller direktuppspelningsbaserad CDC.
# Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()
Verifiera data med hjälp av NoSQL-frågor
När du har skrivit till Azure Cosmos DB för NoSQL kontrollerar du data genom att fråga tillbaka dem till Spark med samma kontokonfiguration. Då; inspektera inmatade data, köra valideringar eller ansluta till andra datauppsättningar i Delta Lake för analys eller rapportering. Azure Cosmos DB for NoSQL stöder snabba, indexerade läsningar för frågeprestanda i realtid.
# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()
# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()
// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()