Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Det här innehåller notebook-filer och kodexempel för vanliga mönster för att arbeta med strukturerad direktuppspelning i Azure Databricks.
Komma igång med strukturerad direktuppspelning
Om du är helt ny på Structured Streaming, se Kör ditt första strukturerade strömningsjobb.
Skriva till Cassandra som mottagare för strukturerad direktuppspelning i Python
Apache Cassandra är en distribuerad OLTP-databas med låg svarstid, skalbar och hög tillgänglighet.
Strukturerad direktuppspelning fungerar med Cassandra via Spark Cassandra Connector. Den här anslutningsappen stöder både RDD- och DataFrame-API:er och har inbyggt stöd för att skriva strömmande data. Viktigt Du måste använda motsvarande version av spark-cassandra-connector-assembly.
Följande exempel ansluter till en eller flera värdar i ett Cassandra-databaskluster. Den anger även anslutningskonfigurationer, till exempel kontrollpunktsplatsen och de specifika nyckelrymds- och tabellnamnen:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Skriva till Azure Synapse Analytics med hjälp av foreachBatch() i Python
streamingDF.writeStream.foreachBatch() gör att du kan återanvända befintliga batchdataskrivare för att skriva utdata från en strömmande fråga till Azure Synapse Analytics. Mer information finns i foreachBatch-dokumentationen .
Om du vill köra det här exemplet behöver du Azure Synapse Analytics-anslutningsappen. Mer information om Azure Synapse Analytics-anslutningsappen finns i Fråga efter data i Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Sammanfogningar av dataströmmar
Dessa två notebook-filer visar hur du använder stream-stream-kopplingar i Python och Scala.