Dela via


Självstudie: Azure Data Lake Storage, Azure Databricks och Spark

Den här handledningen visar hur du ansluter ditt Azure Databricks-kluster till data som lagras i ett lagringskonto för Azure som har Azure Data Lake Storage aktiverat. Med den här anslutningen kan du köra frågor och analyser internt från klustret på dina data.

I den här handledningen kommer du att:

  • Mata in ostrukturerade data till ett lagringskonto
  • Köra analyser på data i Blob Storage

Om du inte har en Azure-prenumeration, skapa ett gratis konto innan du börjar.

Prerequisites

Skapa en Azure Databricks-arbetsyta och notebook-fil

  1. Skapa en Azure Databricks-arbetsyta. Se Skapa en Azure Databricks-arbetsyta.

  2. Skapa en anteckningsbok. Se Skapa en notebook. Välj Python som standardspråk för notebook-filen.

Håll anteckningsboken öppen. Du använder den i följande avsnitt.

Ladda ned flygdata

I den här handledningen används punktlighetsdata för flygningar i januari 2016 från Transportdepartementets statistikbyrå för att visa hur du utför en ETL-åtgärd. Du måste hämta dessa data för att kunna gå självstudien.

  1. Ladda ned filen On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip . Den här filen innehåller flygdata.

  2. Packa upp innehållet i den komprimerade filen och anteckna filnamnet och sökvägen. Du behöver den här informationen i ett senare steg.

Om du vill lära dig mer om den information som samlas in i prestandadata för rapportering i tid kan du se fältbeskrivningarna på webbplatsen för Bureau of Transportation Statistics.

Importera data

I det här avsnittet laddar du upp .csv flygdata till ditt Azure Data Lake Storage-konto och monterar sedan lagringskontot till ditt Databricks-kluster. Slutligen använder du Databricks för att läsa .csv flygdata och skriva tillbaka dem till lagringen i Apache parquet-format.

Ladda upp flygdata till ditt lagringskonto

Använd AzCopy för att kopiera din.csv-fil till ditt Azure Data Lake Storage-konto. Du använder azcopy make kommandot för att skapa en container i ditt lagringskonto. Sedan använder azcopy copy du kommandot för att kopiera csv-data som du nyss laddade ned till en katalog i containern.

I följande steg måste du ange namn för den container som du vill skapa och den katalog och blob som du vill ladda upp flygdata till i containern. Du kan använda de föreslagna namnen i varje steg eller ange egna namngivningskonventioner för containrar, kataloger och blobar.

  1. Öppna ett kommandotolksfönster och ange följande kommando för att logga in på Azure Active Directory för att få åtkomst till ditt lagringskonto.

    azcopy login
    

    Följ anvisningarna som visas i kommandotolken för att autentisera ditt användarkonto.

  2. Om du vill skapa en container i ditt lagringskonto för att lagra flygdata anger du följande kommando:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Ersätt platshållarvärdet <storage-account-name> med namnet på ditt lagringskonto.

    • <container-name> Ersätt platshållaren med ett namn på containern som du vill skapa för att lagra csv-data, till exempel flight-data-container.

  3. Om du vill ladda upp (kopiera) csv-data till ditt lagringskonto anger du följande kommando.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> Ersätt platshållarvärdet med sökvägen till filen.csv.

    • Ersätt platshållarvärdet <storage-account-name> med namnet på ditt lagringskonto.

    • <container-name> Ersätt platshållaren med namnet på containern i ditt lagringskonto.

    • <directory-name> Ersätt platshållaren med namnet på en katalog för att lagra dina data i containern, till exempel jan2016.

Montera ditt lagringskonto i databricks-klustret

I det här avsnittet monterar du azure Data Lake Storage-molnobjektlagringen till Databricks File System (DBFS). Du använder azure AD-tjänstprincipen som du skapade tidigare för autentisering med lagringskontot. Mer information finns i Montera molnobjektlagring på Azure Databricks.

  1. Koppla anteckningsboken till klustret.

    1. I anteckningsboken som du skapade tidigare väljer du knappen Anslut i det övre högra hörnet i notebook-verktygsfältet. Den här knappen öppnar beräkningsväljaren. (Om du redan har anslutit anteckningsboken till ett kluster visas namnet på klustret i knapptexten i stället för Anslut).

    2. I rullgardinsmenyn för klusterval, välj ett kluster som du tidigare har skapat.

    3. Observera att texten i klusterväljaren ändras till startar. Vänta tills klustret har startat och namnet på klustret visas i knappen innan du fortsätter.

  2. Kopiera och klistra in följande kodblock i den första cellen, men kör inte den här koden ännu.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. I det här kodblocket:

    • I configs ersätter du <appId>, <clientSecret> och <tenantId>-platshållarvärdena med program-ID, klienthemlighet och tenant-ID som du kopierade när du skapade serviceprincipalen i förutsättningarna.

    • source I URI:n ersätter <storage-account-name>du platshållarvärdena , <container-name>och <directory-name> med namnet på ditt Azure Data Lake Storage-konto och namnet på containern och katalogen som du angav när du laddade upp flygdata till lagringskontot.

      Note

      Schemaidentifieraren i URI:n, abfss, säger till Databricks att använda drivrutinen för Azure Blob File System med Transport Layer Security (TLS). Mer information om URI finns i Använda Azure Data Lake Storage-URI:n.

  4. Kontrollera att klustret har startat fullt ut innan ni fortsätter.

  5. Tryck på SKIFT + RETUR-tangenterna för att köra koden i det här blocket.

Containern och katalogen där du laddade upp flygdata i ditt lagringskonto är nu tillgänglig i din notebook via monteringspunkt /mnt/flightdata.

Använd Databricks Notebook för att konvertera CSV till Parquet

Nu när csv-flygdata är tillgängliga via en DBFS-monteringspunkt kan du använda en Apache Spark DataFrame för att läsa in den på din arbetsyta och skriva tillbaka den i Apache parquet-format till din Azure Data Lake Storage-objektlagring.

  • En Spark DataFrame är en tvådimensionell etiketterad datastruktur med kolumner av potentiellt olika typer. Du kan använda en DataFrame för att enkelt läsa och skriva data i olika format som stöds. Med en DataFrame kan du läsa in data från molnobjektlagring och utföra analyser och transformeringar på dem i beräkningsklustret utan att påverka underliggande data i molnobjektlagringen. Mer information finns i Arbeta med PySpark DataFrames på Azure Databricks.

  • Apache Parquet är ett kolumnbaserat filformat med optimeringar som påskyndar sökfrågor. Det är ett effektivare filformat än CSV eller JSON. Mer information finns i Parquet Files.

Lägg till en ny cell i anteckningsboken och klistra in följande kod i den.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Tryck på SKIFT + RETUR-tangenterna för att köra koden i det här blocket.

Innan du fortsätter till nästa avsnitt kontrollerar du att alla parquet-data har skrivits och att "Klar" visas i utdata.

Utforska data

I det här avsnittet använder du databricks-filsystemverktyget för att utforska din Azure Data Lake Storage-objektlagring med hjälp av DBFS-monteringspunkten som du skapade i föregående avsnitt.

I en ny cell klistrar du in följande kod för att hämta en lista över filerna vid monteringspunkten. Det första kommandot matar ut en lista över filer och kataloger. Det andra kommandot visar utdata i tabellformat för enklare läsning.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Tryck på SKIFT + RETUR-tangenterna för att köra koden i det här blocket.

Observera att parquet-katalogen visas i listan. Du sparade .csv flygdata i parquet-format till katalogen parquet/flights i föregående avsnitt. Om du vill visa filer i katalogen parquet/flights klistrar du in följande kod i en ny cell och kör den:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Om du vill skapa en ny fil och lista den klistrar du in följande kod i en ny cell och kör den:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Eftersom du inte behöver 1.txt-filen i den här självstudien kan du klistra in följande kod i en cell och köra den för att rekursivt ta bort mydirectory. Parametern True anger en rekursiv borttagning.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Som en bekvämlighet kan du använda hjälpkommandot för att lära dig mer om andra kommandon.

dbutils.fs.help("rm")

Med dessa kodexempel har du utforskat hdfs hierarkiska karaktär med hjälp av data som lagras i ett lagringskonto med Azure Data Lake Storage aktiverat.

Fråga efter data

Därefter kan du börja fråga efter data som du laddade upp till ditt lagringskonto. Ange vart och ett av följande kodblock i en ny cell och tryck på SKIFT + RETUR för att köra Python-skriptet.

DataFrames tillhandahåller en omfattande uppsättning funktioner (välj kolumner, filtrera, koppla, aggregera) som gör att du kan lösa vanliga dataanalysproblem effektivt.

Om du vill läsa in en DataFrame från dina tidigare sparade parquet-flygdata och utforska några av de funktioner som stöds anger du det här skriptet i en ny cell och kör det.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected columns for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Ange det här skriptet i en ny cell för att köra några grundläggande analysfrågor mot data. Du kan välja att köra hela skriptet (SKIFT + RETUR), markera varje fråga och köra den separat med CTRL + SKIFT + RETUR, eller ange varje fråga i en separat cell och köra den där.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Summary

I den här handledningen kommer du att:

  • Skapade Azure-resurser, inklusive ett Azure Data Lake Storage-lagringskonto och Azure AD-tjänstens huvudnamn, och tilldelade behörigheter för åtkomst till lagringskontot.

  • Skapade en Azure Databricks-arbetsyta och anteckningsbok.

  • Använde AzCopy för att ladda upp ostrukturerade .csv flygdata till Azure Data Lake Storage-lagringskontot.

  • Databricks-filsystemets verktygsfunktioner användes för att montera ditt Azure Data Lake Storage-lagringskonto och utforska dess hierarkiska filsystem.

  • Använde Apache Spark DataFrames för att transformera dina .csv flygdata till Apache parquet-format och lagra dem tillbaka till ditt Azure Data Lake Storage-lagringskonto.

  • Använde DataFrames för att utforska flygdata och utföra en enkel fråga.

  • Använde Apache Spark SQL för att fråga flygdata för det totala antalet flygningar för varje flygbolag i januari 2016, flygplatserna i Texas, flygbolagen som flyger från Texas, den genomsnittliga ankomstfördröjningen i minuter för varje flygbolag nationellt och procentandelen av varje flygbolags flygningar som har försenat avgångar eller ankomster.

Rensa resurser

Om du vill bevara anteckningsboken och återkomma till den senare är det en bra idé att stänga av (avsluta) klustret för att undvika avgifter. Om du vill avsluta klustret väljer du det i beräkningsväljaren längst upp till höger i notebook-verktygsfältet, väljer Avsluta från menyn och bekräftar ditt val. (Som standard avslutas klustret automatiskt efter 120 minuters inaktivitet.)

Om du vill ta bort enskilda arbetsyteresurser som notebook-filer och kluster kan du göra det från arbetsytans vänstra sidopanel. Detaljerade anvisningar finns i Ta bort ett kluster eller Ta bort en notebook-fil.

Ta bort resursgruppen och alla relaterade resurser när de inte längre behövs. Om du vill göra det i Azure-portalen väljer du resursgruppen för lagringskontot och arbetsytan och väljer Ta bort.

Nästa steg