Dela via


Strömbearbetning med Azure Databricks

Azure Cosmos DB
Azure Databricks
Händelsehubbar för Azure
Azure Log Analytics
Azure Monitor

Den här referensarkitekturen visar en dataströmbearbetningspipeline från slutpunkt till slutpunkt. De fyra stegen i den här pipelinen matas in, bearbetas, lagras och analyseras och rapporteras. För den här referensarkitekturen matar pipelinen in data från två källor, utför en koppling på relaterade poster från varje dataström, berikar resultatet och beräknar ett genomsnitt i realtid. Resultaten lagras sedan för ytterligare analys.

Arkitektur

diagram som visar en referensarkitektur för dataströmbearbetning med Azure Databricks.

Ladda ned en Visio-fil av den här arkitekturen.

Arbetsflöde

Följande dataflöde motsvarar föregående diagram:

  1. I den här arkitekturen finns det två datakällor som genererar dataströmmar i realtid. Den första strömmen innehåller färdinformation och den andra strömmen innehåller prisinformation. Referensarkitekturen innehåller en simulerad datagenerator som läser från en uppsättning statiska filer och skickar data till Azure Event Hubs. Datakällorna i ett verkligt program är enheter som är installerade i taxibilarna.

  2. Event Hubs är en händelseinmatningstjänst. Den här arkitekturen använder två händelsehubbinstanser, en för varje datakälla. Varje datakälla skickar en dataström till den associerade händelsehubben.

  3. Azure Databricks är en Apache Spark-baserad analysplattform som är optimerad för Microsoft Azure-molntjänstplattformen. Azure Databricks används för att korrelera taxiresa och biljettdata och för att utöka korrelerade data med grannskapsdata som lagras i Azure Databricks-filsystemet.

  4. Azure Cosmos DB är en fullständigt hanterad databastjänst med flera modeller. Utdata från ett Azure Databricks-jobb är en serie poster som skrivs till Azure Cosmos DB för Apache Cassandra. Azure Cosmos DB för Apache Cassandra används eftersom det stöder datamodellering för tidsserier.

  5. Log Analytics är ett verktyg i Azure Monitor som gör att du kan köra frågor mot och analysera loggdata från olika källor. Programloggdata som Azure Monitor- samlar in lagras på en Log Analytics-arbetsyta. Du kan använda Log Analytics-frågor för att analysera och visualisera mått och inspektera loggmeddelanden för att identifiera problem i programmet.

Information om scenario

Ett taxiföretag samlar in data om varje taxiresa. I det här scenariot förutsätter vi att två separata enheter skickar data. Taxin har en mätare som skickar information om varje resa, inklusive varaktighet, avstånd och upphämtnings- och avlämningsplatser. En separat enhet accepterar betalningar från kunder och skickar data om priser. För att upptäcka ridership-trender vill taxiföretaget beräkna den genomsnittliga dricksen per mil som körs för varje grannskap, i realtid.

Datainsamling

För att simulera en datakälla använder den här referensarkitekturen datauppsättningen Taxi i New York City1. Den här datamängden innehåller data om taxiresor i New York City från 2010 till 2013. Den innehåller både rese- och biljettdataposter. Ride-data inkluderar resans varaktighet, reseavstånd och upphämtnings- och avlämningsplatserna. Prisdata inkluderar belopp för biljettpriser, skatter och tips. Fält i båda posttyperna inkluderar medaljongnummer, hacklicens och leverantörs-ID. Kombinationen av dessa tre fält identifierar unikt en taxi och en drivrutin. Data lagras i CSV-format.

[1] Donovan, Brian; Arbete, Dan (2016): Data om taxiresor i New York City (2010-2013). University of Illinois vid Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Datageneratorn är ett .NET Core-program som läser posterna och skickar dem till Event Hubs. Generatorn skickar kördata i JSON-format och prisdata i CSV-format.

Event Hubs använder partitioner för att segmentera data. Med partitioner kan en konsument läsa varje partition parallellt. När du skickar data till Event Hubs kan du ange partitionsnyckeln direkt. I annat fall tilldelas poster till partitioner i resursallokering.

I det här scenariot ska kördata och prisdata tilldelas samma partitions-ID för en specifik taxi. Den här tilldelningen gör det möjligt för Databricks att tillämpa en grad av parallellitet när de korrelerar de två strömmarna. En post i partitionen n av kördata matchar till exempel en post i partitionen n av prisdata.

Diagram över dataströmbearbetning med Azure Databricks och Event Hubs.

Ladda ned en Visio-fil med den här arkitekturen.

I datageneratorn har den gemensamma datamodellen för båda posttyperna en PartitionKey egenskap som är sammanlänkningen av Medallion, HackLicenseoch VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Den här egenskapen används för att tillhandahålla en explicit partitionsnyckel när den skickar data till Event Hubs.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Dataflödeskapaciteten för Event Hubs mäts i dataflödesenheter. Du kan autoskala en händelsehubb genom att aktivera automatiskt blåsa upp. Den här funktionen skalar automatiskt dataflödesenheterna baserat på trafik, upp till ett konfigurerat maxvärde.

Dataströmbearbetning

I Azure Databricks utför ett jobb databehandling. Jobbet tilldelas till ett kluster och körs sedan på det. Jobbet kan vara anpassad kod som skrivits i Java eller en Spark notebook-.

I den här referensarkitekturen är jobbet ett Java-arkiv som har klasser skrivna i Java och Scala. När du anger Java-arkivet för ett Databricks-jobb anger Databricks-klustret klassen för åtgärd. main Här innehåller -metoden för com.microsoft.pnp.TaxiCabReader klassen databearbetningslogik.

Läsa strömmen från de två händelsehubbens instanser

Databearbetningslogik använder Spark-strukturerad strömning för att läsa från de två Azure-händelsehubbens instanser:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Utöka data med grannskapsinformationen

Färddata innehåller koordinaterna för latitud och longitud för upphämtnings- och avlämningsplatserna. Dessa koordinater är användbara men är inte lätta att använda för analys. Därför berikas dessa data med grannskapsdata som läss från en formfil.

Formfilformatet är binärt och inte enkelt parsat. Men GeoTools-biblioteket innehåller verktyg för geospatiala data som använder formfilformatet. Det här biblioteket används i klassen com.microsoft.pnp.GeoFinder för att fastställa grannskapets namn baserat på koordinaterna för upphämtnings- och avlämningsplatser.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Gå med i ride- och fare-data

Först transformeras ride- och fare-data:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Sedan kopplas ride-data till prisdata:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Bearbeta data och infoga dem i Azure Cosmos DB

Det genomsnittliga prisbeloppet för varje grannskap beräknas för ett visst tidsintervall:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Det genomsnittliga prisbeloppet infogas sedan i Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Att tänka på

Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som du kan använda för att förbättra kvaliteten på en arbetsbelastning. Mer information finns i Microsoft Azure Well-Architected Framework.

Säkerhet

Säkerhet ger garantier mot avsiktliga attacker och missbruk av dina värdefulla data och system. Mer information finns i checklistan för Designgranskning för Security.

Åtkomst till Azure Databricks-arbetsytan styrs med hjälp av -administratörskonsolen. Administratörskonsolen innehåller funktioner för att lägga till användare, hantera användarbehörigheter och konfigurera enkel inloggning. Åtkomstkontroll för arbetsytor, kluster, jobb och tabeller kan också anges via administratörskonsolen.

Hantera hemligheter

Azure Databricks innehåller ett hemligt arkiv som används för att lagra autentiseringsuppgifter och referera till dem i notebook-filer och jobb. Omfattningar partitionshemligheter i Azure Databricks-hemlighetslagret:

databricks secrets create-scope --scope "azure-databricks-job"

Hemligheter läggs till på omfångsnivå:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Kommentar

Använd ett Azure Key Vault-backat omfång i stället för det interna Azure Databricks-omfånget.

I kod används hemligheter via Azure Databricks-funktionerna för hemligheter.

Kostnadsoptimering

Kostnadsoptimering fokuserar på sätt att minska onödiga utgifter och förbättra drifteffektiviteten. Mer information finns i checklistan Designgranskning för kostnadsoptimering.

Normalt beräknar du kostnader med hjälp av priskalkylatorn för Azure. Överväg följande tjänster som används i den här referensarkitekturen.

Kostnadsöverväganden för Event Hubs

Den här referensarkitekturen distribuerar Event Hubs på standardnivån. Prismodellen baseras på dataflödesenheter, ingresshändelser och avbildningshändelser. En ingresshändelse är en dataenhet som är 64 KB eller mindre. Större meddelanden faktureras i multipler av 64 kB. Du anger dataflödesenheter antingen via Azure Portal- eller Event Hubs-hanterings-API:er.

Om du behöver fler kvarhållningsdagar bör du överväga den dedikerade nivån. Den här nivån tillhandahåller distributioner med en enda klientorganisation som har stränga krav. Det här erbjudandet bygger ett kluster som baseras på kapacitetsenheter och inte är beroende av dataflödesenheter. Standardnivån faktureras också baserat på ingresshändelser och dataflödesenheter.

Mer information finns i Event Hubs-priser.

Kostnadsöverväganden för Azure Databricks

Azure Databricks tillhandahåller standardnivån och Premium-nivån, som båda har stöd för tre arbetsbelastningar. Den här referensarkitekturen distribuerar en Azure Databricks-arbetsyta på Premium-nivån.

Datateknikarbetsbelastningar ska köras på ett jobbkluster. Datatekniker använder kluster för att skapa och utföra jobb. Dataanalysarbetsbelastningar ska köras i ett kluster för alla syften och är avsedda för dataexperter att utforska, visualisera, manipulera och dela data och insikter interaktivt.

Azure Databricks tillhandahåller flera prismodeller.

  • betala per användning-plan

    Du debiteras för virtuella datorer (VM) som etablerats i kluster och Azure Databricks-enheter (DBUs) baserat på den valda VM-instansen. En DBU är en bearbetningsenhet som debiteras av användning per sekund. DBU-förbrukningen beror på storleken och typen av instans som körs i Azure Databricks. Prissättningen beror på den valda arbetsbelastningen och nivån.

  • förköpsplan

    Du checkar in på DBU:er som Azure Databricks-incheckningsenheter i antingen ett eller tre år för att minska den totala ägandekostnaden under den tidsperioden jämfört med modellen betala per användning.

Mer information finns i Prissättning för Azure Databricks.

Kostnadsöverväganden för Azure Cosmos DB

I den här arkitekturen skriver Azure Databricks-jobbet en serie poster till Azure Cosmos DB. Du debiteras för den kapacitet som du reserverar, vilket mäts i enheter för programbegäran per sekund (RU/s). Den här kapaciteten används för att utföra infogningsåtgärder. Faktureringsenheten är 100 RU/s per timme. Till exempel är kostnaden för att skriva 100 KB-objekt 50 RU/s.

För skrivåtgärder etablerar du tillräckligt med kapacitet för att stödja antalet skrivningar som behövs per sekund. Du kan öka det etablerade dataflödet med hjälp av portalen eller Azure CLI innan du utför skrivåtgärder och sedan minska dataflödet när dessa åtgärder har slutförts. Ditt dataflöde för skrivperioden är summan av det minsta dataflöde som krävs för specifika data och det dataflöde som krävs för infogningsåtgärden. Den här beräkningen förutsätter att det inte finns någon annan arbetsbelastning som körs.

Exempel på kostnadsanalys

Anta att du konfigurerar ett dataflödesvärde på 1 000 RU/s i en container. Den distribueras i 24 timmar i 30 dagar, totalt 720 timmar.

Containern debiteras med 10 enheter på 100 RU/s per timme för varje timme. Tio enheter till 0,008 USD (per 100 RU/s per timme) debiteras till 0,08 USD per timme.

För 720 timmar eller 7 200 enheter (av 100 RU:er) debiteras du 57,60 USD för månaden.

Lagring faktureras också för varje GB som används för dina lagrade data och index. Mer information finns i Prismodellen för Azure Cosmos DB.

Använd Azure Cosmos DB-kapacitetskalkylatorn för en snabb uppskattning av arbetsbelastningskostnaden.

Operativ skicklighet

Operational Excellence omfattar de driftsprocesser som distribuerar ett program och håller det igång i produktion. Mer information finns i checklistan för Designgranskning för Operational Excellence.

Övervakning

Azure Databricks baseras på Apache Spark. Både Azure Databricks och Apache Spark använder Apache Log4j som standardbibliotek för loggning. Förutom standardloggningen som Apache Spark tillhandahåller kan du implementera loggning i Log Analytics. Mer information finns i Övervaka Azure Databricks.

Eftersom com.microsoft.pnp.TaxiCabReader-klassen bearbetar ride- och fare-meddelanden kan ett meddelande vara felaktigt och därför inte giltigt. I en produktionsmiljö är det viktigt att analysera dessa felaktiga meddelanden för att identifiera ett problem med datakällorna så att de snabbt kan åtgärdas för att förhindra dataförlust. Klassen com.microsoft.pnp.TaxiCabReader registrerar en Apache Spark Accumulator som spårar antalet felformade biljettposter och ride-poster:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark använder Dropwizard-biblioteket för att skicka mått. Vissa av de interna Dropwizard-måttfälten är inte kompatibla med Log Analytics, vilket är anledningen till att den här referensarkitekturen innehåller en anpassad Dropwizard-mottagare och reporter. Den formaterar måtten i det format som Log Analytics förväntar sig. När Apache Spark rapporterar mått skickas också anpassade mått för den felaktiga resan och prisdata.

Du kan använda följande exempelfrågor på Log Analytics-arbetsytan för att övervaka driften av strömningsjobbet. Argumentet ago(1d) i varje fråga returnerar alla poster som genererades under den senaste dagen. Du kan justera den här parametern för att visa en annan tidsperiod.

Undantag som loggas under dataströmfrågeåtgärden

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Ackumulering av felaktiga biljett- och kördata

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Jobbåtgärd över tid

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Resursorganisation och distributioner

  • Skapa separata resursgrupper för produktions-, utvecklings- och testmiljöer. Med separata resursgrupper blir det enklare att hantera distributioner, ta bort testdistributioner och tilldela åtkomsträttigheter.

  • Använd Azure Resource Manager-mallen för att distribuera Azure-resurserna enligt processen infrastruktur som kod. Med hjälp av mallar kan du automatisera distributioner med Azure DevOps-tjänster eller andra ci/CD-lösningar (kontinuerlig integrering och kontinuerlig leverans).

  • Placera varje arbetsbelastning i en separat distributionsmall och lagra resurserna i källkontrollsystemen. Du kan distribuera mallarna tillsammans eller individuellt som en del av en CI/CD-process. Den här metoden förenklar automatiseringsprocessen.

    I den här arkitekturen identifieras Event Hubs, Log Analytics och Azure Cosmos DB som en enda arbetsbelastning. Dessa resurser ingår i en enda Azure Resource Manager-mall.

  • Överväg att mellanlagring av dina arbetsbelastningar. Distribuera till olika faser och kör valideringskontroller i varje steg innan du går vidare till nästa steg. På så sätt kan du styra hur du push-överför uppdateringar till dina produktionsmiljöer och minimera oväntade distributionsproblem.

    I den här arkitekturen finns det flera distributionssteg. Överväg att skapa en Azure DevOps-pipeline och lägga till dessa steg. Du kan automatisera följande steg:

    • Starta ett Databricks-kluster.
    • Konfigurera Databricks CLI.
    • Installera Scala-verktyg.
    • Lägg till Databricks-hemligheterna.

    Överväg att skriva automatiserade integreringstester för att förbättra kvaliteten och tillförlitligheten i Databricks-koden och dess livscykel.

Gå vidare