Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
GÄLLER FÖR:  Kassandra
Den här artikeln beskriver nyckelrymds- och tabell-DDL-åtgärder mot Azure Cosmos DB för Apache Cassandra från Spark.
Spark-kontext
Anslutningsappen för API för Cassandra kräver att Cassandra-anslutningsinformationen initieras som en del av Spark-kontexten. När du startar en notebook-fil initieras spark-kontexten redan och det är inte lämpligt att stoppa och initiera den igen. En lösning är att lägga till API:et för Cassandra-instanskonfigurationen på klusternivå i kluster spark-konfigurationen. Det är engångsaktivitet per kluster. Lägg till följande kod i Spark-konfigurationen som ett blankstegsavgränsat nyckelvärdepar:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
API för Cassandra-relaterad konfiguration
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Kommentar
Om du använder Spark 3.x behöver du inte installera Azure Cosmos DB-hjälpen och anslutningsfabriken. Du bör också använda remoteConnectionsPerExecutor i stället connections_per_executor_max för för Spark 3-anslutningsappen (se ovan).
Varning
Spark 3-exemplen som visas i den här artikeln har testats med Spark version 3.2.1 och motsvarande Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Senare versioner av Spark och/eller Cassandra-anslutningsappen kanske inte fungerar som förväntat.
DDL-åtgärder för nyckelområde
Skapa ett nyckelområde
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Verifiera i cqlsh
Kör följande kommando i cqlsh så bör du se nyckelområdet som du skapade tidigare.
DESCRIBE keyspaces;
Ta bort ett nyckelområde
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Verifiera i cqlsh
DESCRIBE keyspaces;
Tabell-DDL-åtgärder
Överväganden:
- Dataflödet kan tilldelas på tabellnivå med hjälp av instruktionen skapa tabell.
 - En partitionsnyckel kan lagra 20 GB data.
 - En post kan lagra högst 2 MB data.
 - Ett partitionsnyckelintervall kan lagra flera partitionsnycklar.
 
Skapa en tabell
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Verifiera i cqlsh
Kör följande kommando i cqlsh och du bör se tabellen med namnet "books:
USE books_ks;
DESCRIBE books;
Etablerat dataflöde och standardvärden för TTL visas inte i utdata från föregående kommando. Du kan hämta dessa värden från portalen.
Ändra tabell
Du kan ändra följande värden med hjälp av kommandot alter table:
- etablerat dataflöde
 - time-to-live-värde 
Kolumnändringar stöds för närvarande inte. 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Ta bort tabell
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Verifiera i cqlsh
Kör följande kommando i cqlsh och du bör se att tabellen "böcker" inte längre är tillgänglig:
USE books_ks;
DESCRIBE tables;
Nästa steg
När du har skapat nyckelområdet och tabellen fortsätter du till följande artiklar för CRUD-åtgärder med mera: