Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
van toepassing op:Azure SQL Database
Azure SQL Managed Instance-
Notitie
Vanaf september 2020 wordt deze connector niet actief onderhouden. Dit artikel wordt alleen bewaard voor archiveringsdoeleinden.
Met de Spark-connector kunnen databases in Azure SQL Database, Azure SQL Managed Instance en SQL Server fungeren als de invoergegevensbron of uitvoergegevenssink voor Spark-taken. Hiermee kunt u realtime transactionele gegevens gebruiken in big data-analyses en resultaten opslaan voor ad-hocvragen of rapportages. Vergeleken met de ingebouwde JDBC-connector biedt deze connector de mogelijkheid om gegevens bulksgewijs in uw database in te voegen. Het kan beter presteren dan rij-op-rij-invoeging met 10x tot 20x snellere prestaties. De Spark-connector ondersteunt verificatie met Microsoft Entra ID (voorheen Azure Active Directory) om verbinding te maken met Azure SQL Database en Azure SQL Managed Instance, zodat u uw database vanuit Azure Databricks kunt verbinden met uw Microsoft Entra-account. Het biedt vergelijkbare interfaces met de ingebouwde JDBC-connector. Het is eenvoudig om uw bestaande Spark-taken te migreren om deze nieuwe connector te gebruiken.
Notitie
Microsoft Entra ID voorheen Azure Active Directory (Azure AD) werd genoemd.
Een Spark-connector downloaden en bouwen
De GitHub-opslagplaats voor de oude connector waarnaar eerder vanaf deze pagina is gekoppeld, wordt niet actief onderhouden. In plaats daarvan raden we u sterk aan om de nieuwe connector te evalueren en te gebruiken.
Officiƫle ondersteunde versies
| Bestanddeel | Versie |
|---|---|
| Apache Spark | 2.0.2 of hoger |
| Scala | 2.10 of hoger |
| Microsoft JDBC-stuurprogramma voor SQL Server | 6.2 of hoger |
| Microsoft SQL Server | SQL Server 2008 of hoger |
| Azure SQL Database | Ondersteund |
| Azure SQL Managed Instance (een beheerde database-instantie van Azure) | Ondersteund |
De Spark-connector maakt gebruik van het Microsoft JDBC-stuurprogramma voor SQL Server om gegevens te verplaatsen tussen Spark-werkknooppunten en -databases:
De gegevensstroom is als volgt:
- Het Spark-hoofdknooppunt maakt verbinding met databases in SQL Database of SQL Server en laadt gegevens uit een specifieke tabel of met behulp van een specifieke SQL-query.
- Het Spark-hoofdknooppunt distribueert gegevens naar werkknooppunten voor transformatie.
- Het werkknooppunt maakt verbinding met databases die verbinding maken met SQL Database en SQL Server en schrijft gegevens naar de database. De gebruiker kan ervoor kiezen om rij-op-rij in te voegen of bulksgewijs in te voegen.
In het volgende diagram ziet u de gegevensstroom.
De Spark-connector bouwen
Op dit moment maakt het connectorproject gebruik van Maven. Als u de connector wilt bouwen zonder afhankelijkheden, kunt u het volgende uitvoeren:
- mvn clean package
- Download de nieuwste versies van het JAR-bestand uit de releasefolder.
- De SQL Database Spark JAR opnemen
Verbinding maken en gegevens lezen met behulp van de Spark-connector
U kunt vanuit een Spark-taak verbinding maken met databases in SQL Database en SQL Server om gegevens te lezen of te schrijven. U kunt ook een DML- of DDL-query uitvoeren in databases in SQL Database en SQL Server.
Gegevens lezen uit Azure SQL en SQL Server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"user" -> "username",
"password" -> "*********",
"connectTimeout" -> "5", //seconds
"queryTimeout" -> "5" //seconds
))
val collection = sqlContext.read.sqlDB(config)
collection.show()
Gegevens lezen uit Azure SQL en SQL Server met een opgegeven SQL-query
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"queryCustom" -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
"user" -> "username",
"password" -> "*********",
))
//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()
Gegevens schrijven naar Azure SQL en SQL Server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
// Acquire a DataFrame collection (val collection)
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"user" -> "username",
"password" -> "*********"
))
import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)
DML- of DDL-soorten query's uitvoeren in Azure SQL en SQL Server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
|UPDATE Customers
|SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
|WHERE CustomerID = 1;
""".stripMargin
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"queryCustom" -> query
))
sqlContext.sqlDBQuery(config)
Verbinding maken vanuit Spark met behulp van Microsoft Entra-verificatie
U kunt verbinding maken met SQL Database en SQL Managed Instance met behulp van Microsoft Entra-verificatie. Gebruik Microsoft Entra-verificatie om identiteiten van databasegebruikers centraal te beheren en als alternatief voor SQL-verificatie.
Verbinding maken met behulp van de verificatiemodus ActiveDirectoryPassword
Installatievereiste
Als u de ActiveDirectoryPassword-verificatiemodus gebruikt, downloadt u microsoft-authentication-library-for-java en de bijbehorende afhankelijkheden en neemt u deze op in het Java-buildpad.
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"authentication" -> "ActiveDirectoryPassword",
"encrypt" -> "true"
))
val collection = sqlContext.read.sqlDB(config)
collection.show()
Verbinding maken met behulp van een toegangstoken
Installatievereiste
Als u de verificatiemodus op basis van een toegangstoken gebruikt, moet u microsoft-authentication-library-for-java- en de bijbehorende afhankelijkheden downloaden en opnemen in het Java-buildpad.
Zie Microsoft Entra-verificatie voor Azure SQL voor meer informatie over het ophalen van een toegangstoken voor uw database in Azure SQL Database of Azure SQL Managed Instance.
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val config = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"accessToken" -> "access_token",
"hostNameInCertificate" -> "*.database.windows.net",
"encrypt" -> "true"
))
val collection = sqlContext.read.sqlDB(config)
collection.show()
Gegevens schrijven met behulp van bulk-insertie
De traditionele jdbc-connector voert gegevens in uw database in door per rij in te voegen. U kunt de Spark-connector gebruiken om gegevens naar Azure SQL en SQL Server te schrijven met behulp van bulksgewijs invoegen. Het verbetert de schrijfprestaties aanzienlijk bij het laden van grote gegevenssets of het laden van gegevens in tabellen waarin een columnstore-index wordt gebruikt.
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
/**
Add column Metadata.
If not specified, metadata is automatically added
from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.