Gegevensstromen verwerken met Azure Databricks
Deze referentiearchitectuur toont een end-to-end stroomverwerkingspijplijn. De vier fasen van deze pijplijn zijn opnemen, verwerken, opslaan en analyseren en rapporteren. Voor deze referentiearchitectuur worden gegevens uit twee bronnen opgenomen in de pijplijn, wordt een join uitgevoerd op gerelateerde records uit elke stream, wordt het resultaat verrijkt en wordt een gemiddelde in realtime berekend. De resultaten worden vervolgens opgeslagen voor verdere analyse.
Architectuur
Download een Visio-bestand van deze architectuur.
Werkproces
De volgende gegevensstroom komt overeen met het vorige diagram:
In deze architectuur zijn er twee gegevensbronnen die in realtime gegevensstromen genereren. De eerste stream bevat ritgegevens en de tweede stream bevat tariefinformatie. De referentiearchitectuur bevat een gesimuleerde gegevensgenerator die wordt gelezen uit een set statische bestanden en de gegevens naar Azure Event Hubs pusht. De gegevensbronnen in een echte toepassing zijn apparaten die zijn geïnstalleerd in de taxi's.
Event Hubs is een service voor gebeurtenisopname. Deze architectuur maakt gebruik van twee Event Hub-exemplaren, één voor elke gegevensbron. Elke gegevensbron verzendt een gegevensstroom naar de bijbehorende Event Hub.
Azure Databricks- is een op Apache Spark gebaseerd analyseplatform dat is geoptimaliseerd voor het Microsoft Azure-cloudservicesplatform. Azure Databricks wordt gebruikt om de taxirit- en ritgegevens te correleren en om de gecorreleerde gegevens te verrijken met buurtgegevens die zijn opgeslagen in het Azure Databricks-bestandssysteem.
Azure Cosmos DB- is een volledig beheerde databaseservice met meerdere modellen. De uitvoer van een Azure Databricks-taak is een reeks records, die worden geschreven naar Azure Cosmos DB voor Apache Cassandra. Azure Cosmos DB voor Apache Cassandra wordt gebruikt omdat het modellering van tijdreeksgegevens ondersteunt.
Azure Synapse Link voor Azure Cosmos DB kunt u in bijna realtime analyses uitvoeren op operationele gegevens in Azure Cosmos DB, zonder enige prestatie- of kosteneffecten op uw transactionele workload. U kunt deze resultaten bereiken met behulp van serverloze SQL-pool en Spark-pools. Deze analyse-engines zijn beschikbaar in uw Azure Synapse Analytics-werkruimte.
Azure Cosmos DB for NoSQL spiegelen in Microsoft Fabric kunt u Azure Cosmos DB-gegevens integreren met de rest van uw gegevens in Microsoft Fabric.
Log Analytics- is een hulpprogramma in Azure Monitor waarmee u logboekgegevens uit verschillende bronnen kunt doorzoeken en analyseren. Toepassingslogboekgegevens die Azure Monitor verzamelt, worden opgeslagen in een Log Analytics-werkruimte. U kunt Log Analytics-query's gebruiken om metrische gegevens te analyseren en te visualiseren en logboekberichten te inspecteren om problemen in de toepassing te identificeren.
Scenariodetails
Een taxibedrijf verzamelt gegevens over elke taxirit. Voor dit scenario gaan we ervan uit dat twee afzonderlijke apparaten gegevens verzenden. De taxi heeft een meter die informatie over elke rit verzendt, inclusief de duur, afstand en ophaal- en afleverlocaties. Een afzonderlijk apparaat accepteert betalingen van klanten en verzendt gegevens over tarieven. Om trends voor ruiters te ontdekken, wil het taxibedrijf in realtime de gemiddelde tip per mijl berekenen die voor elke buurt wordt gereden.
Gegevensopname
Voor het simuleren van een gegevensbron maakt deze referentiearchitectuur gebruik van de gegevensset voor taxigegevens uit New York1. Deze gegevensset bevat gegevens over taxiritten in New York City van 2010 tot 2013. Het bevat zowel rit- als ritgegevensrecords. Ritgegevens omvatten reisduur, reisafstand en de ophaal- en afleverlocaties. Tariefgegevens omvatten tarief-, belasting- en fooibedragen. Velden in beide recordtypen zijn het aantal medailles, de hacklicentie en de leveranciers-id. De combinatie van deze drie velden identificeert een taxi en een chauffeur uniek. De gegevens worden opgeslagen in CSV-indeling.
[1] Donovan, Brian; Werk, Dan (2016): Gegevens taxiritten in New York City (2010-2013). Universiteit van Illinois bij Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
De gegevensgenerator is een .NET Core-toepassing waarmee de records worden gelezen en naar Event Hubs worden verzonden. De generator verzendt ritgegevens in JSON-indeling en ritgegevens in CSV-indeling.
Event Hubs maakt gebruik van partities om de gegevens te segmenteren. Met partities kan een consument elke partitie parallel lezen. Wanneer u gegevens naar Event Hubs verzendt, kunt u de partitiesleutel rechtstreeks opgeven. Anders worden records op round robin-wijze toegewezen aan partities.
In dit scenario moeten ritgegevens en ritgegevens dezelfde partitie-id voor een specifieke taxicabine worden toegewezen. Met deze toewijzing kan Databricks een mate van parallelle uitvoering toepassen wanneer deze de twee streams correleert. Een record in partitie n van de ritgegevens komt bijvoorbeeld overeen met een record in partitie n van de ritgegevens.
Download een Visio-bestand van deze architectuur.
In de gegevensgenerator heeft het algemene gegevensmodel voor beide recordtypen een PartitionKey eigenschap die de samenvoeging is van Medallion, HackLicenseen VendorId.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Deze eigenschap wordt gebruikt om een expliciete partitiesleutel op te geven wanneer deze gegevens naar Event Hubs verzendt.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Evenement Hubs
De doorvoercapaciteit van Event Hubs wordt gemeten in doorvoereenheden. U kunt een Event Hub automatisch schalen door automatisch vergrotenin te schakelen. Met deze functie worden de doorvoereenheden automatisch geschaald op basis van verkeer, tot een geconfigureerd maximum.
Stroomverwerking
In Azure Databricks voert een taak gegevensverwerking uit. De taak wordt toegewezen aan een cluster en wordt vervolgens uitgevoerd. De taak kan aangepaste code zijn die is geschreven in Java of een Spark notebook.
In deze referentiearchitectuur is de taak een Java-archief met klassen die zijn geschreven in Java en Scala. Wanneer u het Java-archief voor een Databricks-taak opgeeft, geeft het Databricks-cluster de klasse voor de bewerking op. Hier bevat de main methode van de com.microsoft.pnp.TaxiCabReader klasse de logica voor gegevensverwerking.
De stream van de twee Event Hub-exemplaren lezen
De logica voor gegevensverwerking maakt gebruik van gestructureerd streamen van Spark om te lezen uit de twee Azure Event Hub-exemplaren:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Verrijk de gegevens met de buurtinformatie
De ritgegevens omvatten de breedte- en lengtegraadcoördinaten van de ophaal- en afleverlocaties. Deze coördinaten zijn nuttig, maar niet gemakkelijk te gebruiken voor analyse. Daarom worden deze gegevens verrijkt met buurtgegevens die worden gelezen uit een shapebestand.
De shapefile-indeling is binair en kan niet eenvoudig worden geparseerd. Maar de GeoTools bibliotheek biedt hulpprogramma's voor georuimtelijke gegevens die de shapefile-indeling gebruiken. Deze bibliotheek wordt gebruikt in de com.microsoft.pnp.GeoFinder klasse om de naam van de buurt te bepalen op basis van de coördinaten voor ophaal- en afleverlocaties.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Deelnemen aan de rit- en ritgegevens
Eerst worden de rit- en ritgegevens getransformeerd:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Vervolgens worden de ritgegevens samengevoegd met de ritgegevens:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
De gegevens verwerken en invoegen in Azure Cosmos DB
Het gemiddelde tarief voor elke buurt wordt berekend voor een specifiek tijdsinterval:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Het gemiddelde tarief wordt vervolgens ingevoegd in Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Overwegingen
Met deze overwegingen worden de pijlers van het Azure Well-Architected Framework geïmplementeerd. Dit is een set richtlijnen die u kunt gebruiken om de kwaliteit van een workload te verbeteren. Zie Microsoft Azure Well-Architected Framework voor meer informatie.
Beveiliging
Beveiliging biedt garanties tegen opzettelijke aanvallen en misbruik van uw waardevolle gegevens en systemen. Zie voor meer informatie controlelijst ontwerpbeoordeling voor Security.
Toegang tot de Azure Databricks-werkruimte wordt beheerd met behulp van de -beheerconsole. De beheerconsole bevat functionaliteit voor het toevoegen van gebruikers, het beheren van gebruikersmachtigingen en het instellen van eenmalige aanmelding. Toegangsbeheer voor werkruimten, clusters, taken en tabellen kan ook worden ingesteld via de beheerconsole.
Geheimen beheren
Azure Databricks bevat een geheime opslag die wordt gebruikt om referenties op te slaan en ernaar te verwijzen in notebooks en taken. Hiermee worden partitiegeheimen binnen het azure Databricks-geheimarchief bereikt:
databricks secrets create-scope --scope "azure-databricks-job"
Geheimen worden toegevoegd op bereikniveau:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Notitie
Gebruik een azure Key Vault-bereik in plaats van het systeemeigen Azure Databricks-bereik.
In code worden geheimen geopend via de azure Databricks-hulpprogramma's voor geheimen.
Kostenoptimalisatie
Kostenoptimalisatie richt zich op manieren om onnodige uitgaven te verminderen en operationele efficiëntie te verbeteren. Zie controlelijst ontwerpbeoordeling voor kostenoptimalisatievoor meer informatie.
Gebruik de Azure-prijscalculator om een schatting van de kosten te maken. Houd rekening met de volgende services die in deze referentiearchitectuur worden gebruikt.
Kostenoverwegingen voor Event Hubs
Met deze referentiearchitectuur worden Event Hubs geïmplementeerd in de Standard-laag. Het prijsmodel is gebaseerd op doorvoereenheden, inkomend verkeer en vastleggen van gebeurtenissen. Een inkomende gebeurtenis is een gegevenseenheid van 64 kB of minder. Grotere berichten worden gefactureerd in meervouden van 64 kB. U geeft doorvoereenheden op via Azure Portal of Event Hubs-beheer-API's.
Als u meer retentiedagen nodig hebt, kunt u de dedicated-laag overwegen. Deze laag biedt implementaties met één tenant die strenge vereisten hebben. Met deze aanbieding wordt een cluster gebouwd dat is gebaseerd op capaciteitseenheden en niet afhankelijk is van doorvoereenheden. De Standard-laag wordt ook gefactureerd op basis van ingangsgebeurtenissen en doorvoereenheden.
Zie Prijzen voor Event Hubsvoor meer informatie.
Kostenoverwegingen voor Azure Databricks
Azure Databricks biedt de Standard-laag en de Premium-laag, die beide ondersteuning bieden voor drie workloads. Met deze referentiearchitectuur wordt een Azure Databricks-werkruimte geïmplementeerd in de Premium-laag.
Data engineering-workloads moeten worden uitgevoerd op een taakcluster. Data engineers gebruiken clusters om taken te bouwen en uit te voeren. Workloads voor gegevensanalyse moeten worden uitgevoerd op een cluster voor alle doeleinden en zijn bedoeld voor gegevenswetenschappers om gegevens en inzichten interactief te verkennen, visualiseren, bewerken en delen.
Azure Databricks biedt meerdere prijsmodellen.
betalen per gebruik-abonnement
U wordt gefactureerd voor virtuele machines (VM's) die zijn ingericht in clusters en Azure Databricks-eenheden (DBU's) op basis van het gekozen VM-exemplaar. Een DBU is een verwerkingseenheid die per seconde wordt gefactureerd per gebruik. Het DBU-verbruik is afhankelijk van de grootte en het type exemplaar dat wordt uitgevoerd in Azure Databricks. Prijzen zijn afhankelijk van de gekozen workload en laag.
abonnement vooraf aanschaffen
U verbindt DBA's als Azure Databricks-doorvoereenheden voor een of drie jaar om de totale eigendomskosten gedurende die periode te verlagen in vergelijking met het model voor betalen per gebruik.
Zie Prijzen van Azure Databricksvoor meer informatie.
Kostenoverwegingen voor Azure Cosmos DB
In deze architectuur schrijft de Azure Databricks-taak een reeks records naar Azure Cosmos DB. Er worden kosten in rekening gebracht voor de capaciteit die u reserveert, wat wordt gemeten in aanvraageenheden per seconde (RU/s). Deze capaciteit wordt gebruikt om invoegbewerkingen uit te voeren. De eenheid voor facturering is 100 RU/s per uur. De kosten voor het schrijven van items van 100 kB zijn bijvoorbeeld 50 RU/s.
Voor schrijfbewerkingen moet u voldoende capaciteit inrichten ter ondersteuning van het aantal schrijfbewerkingen dat per seconde nodig is. U kunt de ingerichte doorvoer verhogen met behulp van de portal of Azure CLI voordat u schrijfbewerkingen uitvoert en vervolgens de doorvoer verminderen nadat deze bewerkingen zijn voltooid. Uw doorvoer voor de schrijfperiode is de som van de minimale doorvoer die nodig is voor de specifieke gegevens en de doorvoer die is vereist voor de invoegbewerking. Bij deze berekening wordt ervan uitgegaan dat er geen andere werkbelasting wordt uitgevoerd.
Voorbeeld van kostenanalyse
Stel dat u een doorvoerwaarde van 1000 RU/s voor een container configureert. Het wordt gedurende 24 uur gedurende 30 dagen geïmplementeerd voor een totaal van 720 uur.
De container wordt gefactureerd op 10 eenheden van 100 RU/s per uur voor elk uur. Tien eenheden voor $ 0,008 (per 100 RU/s per uur) worden in rekening gebracht op $ 0,08 per uur.
Voor 720 uur of 7.200 eenheden (van 100 RU's) wordt u $ 57,60 gefactureerd voor de maand.
Opslag wordt ook gefactureerd voor elke GB die wordt gebruikt voor uw opgeslagen gegevens en index. Zie het prijsmodel van Azure Cosmos DB voor meer informatie.
Gebruik de Azure Cosmos DB-capaciteitscalculator voor een snelle schatting van de workloadkosten.
Operationele uitmuntendheid
Operational Excellence behandelt de operationele processen die een toepassing implementeren en deze in productie houden. Zie controlelijst ontwerpbeoordeling voor Operational Excellencevoor meer informatie.
Controleren
Azure Databricks is gebaseerd op Apache Spark. Zowel Azure Databricks als Apache Spark gebruiken Apache Log4j- als standaardbibliotheek voor logboekregistratie. Naast de standaardlogboekregistratie die Apache Spark biedt, kunt u logboekregistratie implementeren in Log Analytics. Zie Bewaking van Azure Databricks voor meer informatie.
Aangezien de com.microsoft.pnp.TaxiCabReader klasse rit- en ritberichten verwerkt, is een bericht mogelijk ongeldig en dus niet geldig. In een productieomgeving is het belangrijk om deze onjuiste berichten te analyseren om een probleem met de gegevensbronnen te identificeren, zodat deze snel kunnen worden opgelost om gegevensverlies te voorkomen. De com.microsoft.pnp.TaxiCabReader-klasse registreert een Apache Spark Accumulator waarmee het aantal onjuiste tariefrecords en ritrecords wordt bijgehouden:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark maakt gebruik van de dropwizard-bibliotheek voor het verzenden van metrische gegevens. Sommige van de systeemeigen velden voor metrische gegevens van Dropwizard zijn niet compatibel met Log Analytics. Daarom bevat deze referentiearchitectuur een aangepaste Dropwizard-sink en reporter. Hiermee worden de metrische gegevens opgemaakt in de indeling die Log Analytics verwacht. Wanneer Apache Spark metrische gegevens rapporteert, worden de aangepaste metrische gegevens voor de onjuiste rit en ritgegevens ook verzonden.
U kunt de volgende voorbeeldquery's in uw Log Analytics-werkruimte gebruiken om de werking van de streamingtaak te bewaken. Het argument ago(1d) in elke query retourneert alle records die zijn gegenereerd op de laatste dag. U kunt deze parameter aanpassen om een andere periode weer te geven.
Uitzonderingen die zijn vastgelegd tijdens de streamquerybewerking
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Accumulatie van onjuiste ritten en ritgegevens
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Taakbewerking in de loop van de tijd
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Resourceorganisatie en implementaties
Maak afzonderlijke resourcegroepen voor productie-, ontwikkelings- en testomgevingen. Met afzonderlijke resourcegroepen kunt u eenvoudiger implementaties beheren, testimplementaties verwijderen en toegangsrechten verlenen.
Gebruik de Azure Resource Manager-sjabloon om de Azure-resources te implementeren volgens het proces voor infrastructuur als code. Met behulp van sjablonen kunt u implementaties automatiseren met Azure DevOps-services of andere OPLOSSINGEN voor continue integratie en continue levering (CI/CD).
Plaats elke workload in een afzonderlijke implementatiesjabloon en sla de resources op in broncodebeheersystemen. U kunt de sjablonen samen of afzonderlijk implementeren als onderdeel van een CI/CD-proces. Deze aanpak vereenvoudigt het automatiseringsproces.
In deze architectuur worden Event Hubs, Log Analytics en Azure Cosmos DB geïdentificeerd als één workload. Deze resources zijn opgenomen in één Azure Resource Manager-sjabloon.
Overweeg om uw workloads te faseren. Implementeer in verschillende fasen en voer validatiecontroles uit in elke fase voordat u naar de volgende fase gaat. Op die manier kunt u bepalen hoe u updates naar uw productieomgevingen pusht en onverwachte implementatieproblemen minimaliseert.
In deze architectuur zijn er meerdere implementatiefasen. Overweeg om een Azure DevOps-pijplijn te maken en deze fasen toe te voegen. U kunt de volgende fasen automatiseren:
- Een Databricks-cluster starten.
- Configureer Databricks CLI.
- Installeer Scala-hulpprogramma's.
- Voeg de Databricks-geheimen toe.
Overweeg geautomatiseerde integratietests te schrijven om de kwaliteit en betrouwbaarheid van de Databricks-code en de levenscyclus ervan te verbeteren.