Dela via


Interagera med Azure Cosmos DB med Apache Spark 3 i Azure Synapse Link

I den här artikeln får du lära dig hur du interagerar med Azure Cosmos DB med Hjälp av Synapse Apache Spark 3. Kunder kan använda Scala, Python, SparkSQL och C#, för analys, datateknik, datavetenskap och datautforskningsscenarier i Azure Synapse Link för Azure Cosmos DB.

Följande funktioner stöds när du interagerar med Azure Cosmos DB:

  • Med Synapse Apache Spark 3 kan du analysera data i dina Azure Cosmos DB-containrar som är aktiverade med Azure Synapse Link nästan i realtid utan att påverka prestanda för dina transaktionsarbetsbelastningar. Följande två alternativ är tillgängliga för att köra frågor mot Azure Cosmos DB-analysarkivet från Spark:
    • Läs in till Spark DataFrame
    • Skapa Spark-tabell
  • Med Synapse Apache Spark kan du också mata in data i Azure Cosmos DB. Observera att data alltid matas in i Azure Cosmos DB-containrar via transaktionslagret. När Azure Synapse Link är aktiverat synkroniseras alla nya infogningar, uppdateringar och borttagningar automatiskt till analysarkivet.
  • Synapse Apache Spark har också stöd för Spark-strukturerad strömning med Azure Cosmos DB som källa och mottagare.

Följande avsnitt beskriver syntaxen. Du kan också utforska Learn-modulen om hur du utför frågeoperationer på Azure Cosmos DB med Apache Spark för Azure Synapse Analytics. Gester i Azure Synapse Analytics-arbetsytan är utformade för att ge en enkel upplevelse för att komma igång. Gester visas när du högerklickar på en Azure Cosmos DB-container på fliken Data på Synapse-arbetsytan. Med gester kan du snabbt generera kod och skräddarsy den efter dina behov. Gester passar också perfekt för att upptäcka data med ett enda klick.

Important

Du bör vara medveten om vissa begränsningar i analysschemat som kan leda till oväntat beteende vid datainläsningsåtgärder. Till exempel är endast de första 1 000 egenskaperna från transaktionsschemat tillgängliga i det analytiska schemat, egenskaper med blanksteg är inte tillgängliga osv. Om du förväntar dig oväntade resultat kan du kolla schemabegränsningarna för det analytiska lagret för mer information.

Fråga det analytiska lagret i Azure Cosmos DB

Kunder kan läsa in analyslagringsdata till Spark DataFrames eller skapa Spark-tabeller.

Skillnaden i upplevelse handlar om huruvida underliggande dataändringar i Azure Cosmos DB-containern ska återspeglas automatiskt i analysen som utförs i Spark. När Spark DataFrames registreras eller en Spark-tabell skapas hämtar Spark metadata för analysarkivet för effektiv pushdown. Det är viktigt att notera att eftersom Spark följer en lat utvärderingsprincip. Du måste vidta åtgärder för att hämta den sista ögonblicksbilden av data i Spark DataFrames- eller SparkSQL-frågor.

Vid inläsning till Spark DataFrame cachelagras hämtade metadata under Spark-sessionens livstid och därmed utvärderas efterföljande åtgärder som utförs för DataFrame mot ögonblicksbilden av analysarkivet vid tidpunkten då DataFrame skapades.

Å andra sidan cachelagras inte metadata för analysarkivets tillstånd i Spark om du skapar en Spark-tabell och laddas om för varje SparkSQL-frågekörning mot Spark-tabellen.

Avslutningsvis kan du välja mellan att läsa in en ögonblicksbild till Spark DataFrame eller att fråga en Spark-tabell om den senaste ögonblicksbilden.

Note

Om du vill fråga Azure Cosmos DB om MongoDB-konton kan du läsa mer om den fullständiga schemaåtergivningen i den analytiska lagringen och de utökade egenskapsnamnen som används.

Note

Alla options är skiftlägeskänsliga.

Authentication

Nu kan Spark 3.x-kunder autentisera till Azure Cosmos DB-analysarkivet med hjälp av betrodda identiteter med åtkomsttoken eller databaskontonycklar. Token är säkrare eftersom de är kortlivade och tilldelas den behörighet som krävs med Cosmos DB RBAC.

Anslutningsappen stöder nu två autentiseringstyper MasterKey och AccessToken för egenskapen spark.cosmos.auth.type .

Huvudnyckelautentisering

Använd nyckeln för att läsa en dataram med spark:

val config = Map(
    "spark.cosmos.accountEndpoint" -> "<endpoint>",
    "spark.cosmos.accountKey" -> "<key>",
    "spark.cosmos.database" -> "<db>",
    "spark.cosmos.container" -> "<container>"
)

val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)

Åtkomsttokenautentisering

Den nya nyckellösa autentiseringen ger stöd för åtkomsttoken:

val config = Map(
    "spark.cosmos.accountEndpoint" -> "<endpoint>",
    "spark.cosmos.auth.type" -> "AccessToken",
    "spark.cosmos.auth.accessToken" -> "<accessToken>",
    "spark.cosmos.database" -> "<db>",
    "spark.cosmos.container" -> "<container>"
)

val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)

Note

Azure Cosmos DB:s Azure Synapse Link Spark-anslutning stöder inte hanterad identitet.

Åtkomsttokenautentisering kräver rolltilldelning

Om du vill använda metoden för åtkomsttoken måste du generera åtkomsttoken. Eftersom åtkomsttoken är associerade med Azure-identiteter måste rätt rollbaserad åtkomstkontroll (RBAC) tilldelas till identiteten. Rolltilldelningen är på dataplansnivå och du måste ha minsta behörighet för kontrollplanet för att utföra rolltilldelningen.

Rolltilldelningarna IAM (Identity Access Management) från Azure-portalen är på kontrollplansnivå och påverkar inte rolltilldelningarna på dataplanet. Rolltilldelningar för dataplan är endast tillgängliga via Azure CLI. Åtgärden readAnalytics krävs för att läsa data från analysarkivet i Cosmos DB och ingår inte i några fördefinierade roller. Därför måste vi skapa en anpassad rolldefinition. Utöver åtgärden readAnalytics lägger du även till de åtgärder som krävs för dataläsaren. Skapa en JSON-fil med följande innehåll och ge den namnet role_definition.json

{
  "RoleName": "CosmosAnalyticsRole",
  "Type": "CustomRole",
  "AssignableScopes": ["/"],
  "Permissions": [{
    "DataActions": [
      "Microsoft.DocumentDB/databaseAccounts/readAnalytics",
      "Microsoft.DocumentDB/databaseAccounts/readMetadata",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
    ]
  }]
}

Åtkomsttokenautentisering kräver Azure CLI

  • Logga in på Azure CLI: az login
  • Ange standardprenumerationen, som har ditt Cosmos DB-konto: az account set --subscription <name or id>
  • Skapa rolldefinitionen i önskat Cosmos DB-konto: az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json
  • Kopiera över den returnerade rollen definition id : /subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid>
  • Hämta huvud-ID:t för den identitet som du vill tilldela rollen till. Identiteten kan vara en Azure-appregistrering, en virtuell dator eller någon annan Azure-resurs som stöds. Tilldela rollen till huvudpersonen med hjälp av: az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"

Note

När du använder en Azure-appregistrering använder du Object Id som tjänstens huvudnamns-ID. Huvud-ID:t och Cosmos DB-kontot måste också finnas i samma klientorganisation.

Generera åtkomsttoken – Synapse Notebooks

Den rekommenderade metoden för Synapse Notebooks är att använda tjänstens huvudnamn med ett certifikat för att generera åtkomsttoken. Klicka här om du vill ha mer information.

The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
  "https://<cosmos-account-name>.documents.azure.com/.default",
  "https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)

Nu kan du använda åtkomsttoken som genererades i det här steget för att läsa data från analysarkivet när autentiseringstypen är inställd på åtkomsttoken.

Note

När du använder en Azure-appregistrering använder du programmet (klient-ID).

Note

För närvarande stöder Synapse inte generering av åtkomsttoken med azure-identity-paketet i anteckningsböcker. Dessutom inkluderar synapse VHD:er inte paketet "Azure-Identity" och dess beroenden. Klicka här om du vill ha mer information.

Läs in till Spark DataFrame

I det här exemplet skapar du en Spark DataFrame som pekar på Azure Cosmos DB-analysarkivet. Du kan sedan utföra mer analys genom att anropa Spark-åtgärder mot DataFrame. Den här åtgärden påverkar inte transaktionslagret.

Syntaxen i Python skulle vara följande:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Skapa Spark-tabell

I det här exemplet skapar du en Spark-tabell som pekar på Azure Cosmos DB-analysarkivet. Du kan sedan utföra mer analys genom att anropa SparkSQL-frågor mot tabellen. Den här åtgärden påverkar inte transaktionslager eller medför dataförflyttning. Om du bestämmer dig för att ta bort den här Spark-tabellen påverkas inte den underliggande Azure Cosmos DB-containern och motsvarande analysarkiv.

Det här scenariot är praktiskt för att återanvända Spark-tabeller via verktyg från tredje part och ge åtkomst till underliggande data under körningstid.

Syntaxen för att skapa en Spark-tabell är följande:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Note

Om du har scenarier där schemat för den underliggande Azure Cosmos DB-containern ändras över tid; och om du vill att det uppdaterade schemat automatiskt ska återspeglas i frågorna mot Spark-tabellen kan du uppnå detta genom att ange spark.cosmos.autoSchemaMerge alternativet i true Spark-tabellalternativen.

Skriva Spark DataFrame till Azure Cosmos DB-container

I det här exemplet skriver du en Spark DataFrame till en Azure Cosmos DB-container. Den här åtgärden påverkar prestandan för transaktionsarbetsbelastningar och förbrukar enheter för begäranden som etablerats i Azure Cosmos DB-containern eller den delade databasen.

Syntaxen i Python skulle vara följande:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Läsa in strömmande DataFrame från container

I den här gesten använder du Spark Streaming-funktionen för att läsa in data från en container till en dataram. Data lagras i det primära datasjökontot (och filsystemet) som du har anslutit till arbetsytan.

Note

Om du vill referera till externa bibliotek i Synapse Apache Spark kan du läsa mer här. Om du till exempel vill mata in en Spark DataFrame till en container i Azure Cosmos DB för MongoDB kan du använda MongoDB-anslutningsappen för Spark här.

Läsa in strömmande DataFrame från Azure Cosmos DB-container

I det här exemplet använder du Sparks strukturerade strömning för att läsa in data från en Azure Cosmos DB-container till en Spark-strömmande dataram med hjälp av ändringsflödesfunktionen i Azure Cosmos DB. Kontrollpunktsdata som används av Spark lagras i det primära datasjökontot (och filsystemet) som du har anslutit till arbetsytan.

Syntaxen i Python skulle vara följande:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Skriv strömmande DataFrame till en Azure Cosmos DB-behållare

I det här exemplet skriver du en strömmande DataFrame till en Azure Cosmos DB-container. Den här åtgärden påverkar prestandan för transaktionsarbetsbelastningar och använder enheter för begäranden som etablerats i Azure Cosmos DB-containern eller den delade databasen. Om mappen /localWriteCheckpointFolder inte skapas (i exemplet nedan) skapas den automatiskt.

Syntaxen i Python skulle vara följande:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Nästa steg