Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In deze zelfstudie leert u hoe u gegevens laadt en transformeert met behulp van de DataFrame-API van Apache Spark (PySpark), de Apache Spark Scala DataFrame-API en de SparkR SparkDataFrame-API in Azure Databricks.
Aan het einde van deze zelfstudie begrijpt u wat een DataFrame is en vertrouwd bent met de volgende taken:
Python
- Variabelen definiëren en openbare gegevens kopiëren naar een Unity Catalog-volume
- Een DataFrame maken met Python
- Gegevens laden in een DataFrame vanuit een CSV-bestand
- Een DataFrame weergeven en ermee werken
- Het DataFrame opslaan
- SQL-query's uitvoeren in PySpark
Zie ook naslaginformatie over de Apache Spark PySpark-API.
Scala
- Variabelen definiëren en openbare gegevens kopiëren naar een Unity Catalog-volume
- Een DataFrame maken met Scala
- Gegevens laden in een DataFrame vanuit een CSV-bestand
- Een DataFrame weergeven en ermee werken
- Het DataFrame opslaan
- SQL-query's uitvoeren in Apache Spark
Zie ook de Naslaginformatie over de Scala-API van Apache Spark.
R
- Variabelen definiëren en openbare gegevens kopiëren naar een Unity Catalog-volume
- Een SparkR SparkDataFrames maken
- Gegevens laden in een DataFrame vanuit een CSV-bestand
- Een DataFrame weergeven en ermee werken
- Het DataFrame opslaan
- SQL-query's uitvoeren in SparkR
Zie ook naslaginformatie over de Apache SparkR-API.
Wat is een DataFrame?
Een DataFrame is een tweedimensionale gelabelde gegevensstructuur met kolommen met mogelijk verschillende typen. U kunt een DataFrame beschouwen als een spreadsheet, een SQL-tabel of een woordenlijst met reeksobjecten. Apache Spark DataFrames bieden een uitgebreide set functies (select columns, filter, join, aggregate) waarmee u veelvoorkomende problemen met gegevensanalyse efficiënt kunt oplossen.
Apache Spark DataFrames zijn een abstractie die is gebouwd op RDD's (Resilient Distributed Datasets). Spark DataFrames en Spark SQL maken gebruik van een geïntegreerde plannings- en optimalisatie-engine, zodat u bijna identieke prestaties krijgt in alle ondersteunde talen in Azure Databricks (Python, SQL, Scala en R).
Vereisten
Als u de volgende zelfstudie wilt voltooien, moet u voldoen aan de volgende vereisten:
- Om de voorbeelden in deze zelfstudie te gebruiken, moet uw werkruimte Unity Catalog zijn ingeschakeld. 
- In de voorbeelden in deze zelfstudie wordt een Unity Catalog volume gebruikt om voorbeeldgegevens op te slaan. Als u deze voorbeelden wilt gebruiken, maakt u een volume en gebruikt u de catalogus, het schema en de volumenamen van dat volume om het volumepad in te stellen dat door de voorbeelden wordt gebruikt. 
- U moet over de volgende machtigingen beschikken in Unity Catalog: - 
              READ VOLUMEenWRITE VOLUME, ofALL PRIVILEGESvoor het volume dat voor deze zelfstudie wordt gebruikt.
- 
              USE SCHEMAofALL PRIVILEGESvoor het schema dat voor deze zelfstudie wordt gebruikt.
- 
              USE CATALOGofALL PRIVILEGESvoor de catalogus die voor deze zelfstudie wordt gebruikt.
 - Als u deze machtigingen wilt instellen, kunt u uw Databricks-beheerder raadplegen of de Unity Catalog-bevoegdheden en beveiligbare objectenbekijken. 
- 
              
Aanbeveling
Zie DataFrame-zelfstudienotebooks voor een voltooid notebook voor dit artikel.
Stap 1: Variabelen definiëren en CSV-bestand laden
Deze stap definieert variabelen voor gebruik in deze zelfstudie en laadt vervolgens een CSV-bestand met babynaamgegevens van health.data.ny.gov in uw Unity Catalog-volume.
- Open een nieuw notitieblok door op het  pictogram te klikken. Zie Het uiterlijk van het notitieblok aanpassenvoor meer informatie over het navigeren in Azure Databricks-notebooks. pictogram te klikken. Zie Het uiterlijk van het notitieblok aanpassenvoor meer informatie over het navigeren in Azure Databricks-notebooks.
- Kopieer en plak de volgende code in de nieuwe lege notebookcel. Vervang - <catalog-name>,- <schema-name>en- <volume-name>door de catalogus-, schema- en volumenamen voor een Unity Catalog-volume. Vervang- <table_name>door een tabelnaam van uw keuze. Later in deze zelfstudie laadt u babynaamgegevens in deze tabel.- Python- catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path- Scala- val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path- R- catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
- Druk - Shift+Enterom de cel uit te voeren en een nieuwe lege cel te maken.
- Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code kopieert u het - rows.csv-bestand van health.data.ny.gov naar uw Unity Catalog-volume met behulp van de opdracht Databricks dbutuils.- Python- dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")- Scala- dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")- R- dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Stap 2: Een DataFrame maken
Met deze stap maakt u een DataFrame met de naam df1 testgegevens en wordt vervolgens de inhoud ervan weergegeven.
- Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code maakt u het DataFrame met testgegevens en geeft u vervolgens de inhoud en het schema van het DataFrame weer. - Python- data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.- Scala- val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.- R- # Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Stap 3: Gegevens in een DataFrame laden vanuit een CSV-bestand
Met deze stap maakt u een DataFrame met de naam df_csv van het CSV-bestand dat u eerder in uw Unity Catalog-volume hebt geladen. Zie spark.read.csv.
- Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code worden babynaamgegevens vanuit het CSV-bestand in DataFrame - df_csvgeladen en wordt vervolgens de inhoud van het DataFrame weergegeven.- Python- df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)- Scala- val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)- R- df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
U kunt gegevens laden uit een groot aantal ondersteunde bestandsindelingen.
Stap 4: Uw DataFrame weergeven en ermee werken
Bekijk en communiceer met uw babynamen DataFrames met behulp van de volgende methoden.
Het DataFrame-schema afdrukken
Meer informatie over het weergeven van het schema van een Apache Spark DataFrame. Apache Spark gebruikt de term schema om te verwijzen naar de namen en gegevenstypen van de kolommen in het DataFrame.
Notitie
Azure Databricks gebruikt ook het termenschema om een verzameling tabellen te beschrijven die zijn geregistreerd bij een catalogus.
- Kopieer en plak de volgende code in een lege notebookcel. Deze code toont het schema van uw DataFrames met de - .printSchema()-methode om de schema's van de twee DataFrames weer te geven, zodat u zich kunt voorbereiden op het samenvoegen van de twee DataFrames.- Python- df_csv.printSchema() df1.printSchema()- Scala- dfCsv.printSchema() df1.printSchema()- R- printSchema(df_csv) printSchema(df1)
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
De naam van de kolom wijzigen in het DataFrame
Meer informatie over het wijzigen van de naam van een kolom in een DataFrame.
- Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de naam van een kolom in het - df1_csvDataFrame aangepast aan de desbetreffende kolom in het- df1DataFrame. Deze code maakt gebruik van de Apache Spark-methode- withColumnRenamed().- Python- df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema- Scala- val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()- R- df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
DataFrames combineren
Leer hoe u een nieuw DataFrame maakt waarmee de rijen van het ene DataFrame aan het andere worden toegevoegd.
- Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode - union()om de inhoud van uw eerste DataFrame- dfte combineren met DataFrame- df_csvmet de babynamen die zijn geladen vanuit het CSV-bestand.- Python- df = df1.union(df_csv) display(df)- Scala- val df = df1.union(dfCsvRenamed) display(df)- R- display(df <- union(df1, df_csv))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Rijen filteren in een DataFrame
Ontdek de populairste babynamen in uw gegevensset door rijen te filteren met behulp van de Apache Spark-.filter() of .where() methoden. Gebruik filters om een subset van rijen te selecteren die moeten worden geretourneerd of gewijzigd in een DataFrame. Er is geen verschil in prestaties of syntaxis, zoals te zien is in de volgende voorbeelden.
Methode .filter() gebruiken
- Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode - .filter()om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.- Python- display(df.filter(df["Count"] > 50))- Scala- display(df.filter(df("Count") > 50))- R- display(filteredDF <- filter(df, df$Count > 50))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
De methode .where() gebruiken
- Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode - .where()om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.- Python- display(df.where(df["Count"] > 50))- Scala- display(df.where(df("Count") > 50))- R- display(filtered_df <- where(df, df$Count > 50))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Kolommen selecteren in een DataFrame en sorteren op frequentie
Leer welke babynaamfrequenties je kunt gebruiken met de select() methode om de kolommen van het DataFrame te specificeren die moeten worden geretourneerd. Gebruik Apache Spark orderby en desc functies om de resultaten te ordenen.
De pyspark.sql-module voor Apache Spark biedt ondersteuning voor SQL-functies. Een van deze functies die we in deze zelfstudie gebruiken, zijn de Apache Spark orderBy()en desc()expr() functies. U schakelt het gebruik van deze functies in door ze indien nodig in uw sessie te importeren.
- Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de - desc()functie geïmporteerd en vervolgens de Apache Spark-methode- select()en Apache Spark- orderBy()en- desc()functies gebruikt om de meest voorkomende namen en hun aantallen in aflopende volgorde weer te geven.- Python- from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))- Scala- import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))- R- display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Een subset DataFrame maken
Meer informatie over het maken van een subset DataFrame op basis van een bestaand DataFrame.
- Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode - filterom een nieuw DataFrame te maken dat de gegevens per jaar, aantal en geslacht beperkt. Er wordt gebruikgemaakt van de Apache Spark-- select()methode om de kolommen te beperken. Het maakt ook gebruik van Apache Spark- orderBy()en- desc()functies om het nieuwe DataFrame te sorteren op aantal.- Python- subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)- Scala- val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)- R- subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Stap 5: Het DataFrame opslaan
Meer informatie over het opslaan van een DataFrame. U kunt uw DataFrame opslaan in een tabel of het DataFrame naar een bestand of meerdere bestanden schrijven.
Het DataFrame opslaan in een tabel
Azure Databricks maakt standaard gebruik van de Delta Lake-indeling voor alle tabellen. Als u uw DataFrame wilt opslaan, moet u CREATE tabelbevoegdheden voor de catalogus en het schema hebben.
- Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de inhoud van het DataFrame opgeslagen in een tabel met behulp van de variabele die u aan het begin van deze zelfstudie hebt gedefinieerd. - Python- df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")- Scala- df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")- R- saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
De meeste Apache Spark-toepassingen werken op grote gegevenssets en op gedistribueerde wijze. Apache Spark schrijft een map met bestanden uit in plaats van één bestand. Delta Lake splitst de Parquet-mappen en -bestanden. Veel gegevenssystemen kunnen deze mappen met bestanden lezen. Azure Databricks raadt het gebruik van tabellen aan via bestandspaden voor de meeste toepassingen.
Het DataFrame opslaan in JSON-bestanden
- Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt het DataFrame opgeslagen in een map met JSON-bestanden. - Python- df.write.format("json").mode("overwrite").save("/tmp/json_data")- Scala- df.write.format("json").mode("overwrite").save("/tmp/json_data")- R- write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Het DataFrame lezen uit een JSON-bestand
Meer informatie over het gebruik van de Apache Spark-methode spark.read.format() om JSON-gegevens uit een map te lezen in een DataFrame.
- Kopieer en plak de volgende code in een lege notebookcel. Met deze code worden de JSON-bestanden weergegeven die u in het vorige voorbeeld hebt opgeslagen. - Python- display(spark.read.format("json").json("/tmp/json_data"))- Scala- display(spark.read.format("json").json("/tmp/json_data"))- R- display(read.json("/tmp/json_data"))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Aanvullende taken: SQL-query's uitvoeren in PySpark, Scala en R
Apache Spark DataFrames bieden de volgende opties om SQL te combineren met PySpark, Scala en R. U kunt de volgende code uitvoeren in hetzelfde notebook dat u voor deze zelfstudie hebt gemaakt.
Een kolom opgeven als een SQL-query
Meer informatie over het gebruik van de Apache Spark-methode selectExpr() . Dit is een variant van de select() methode die SQL-expressies accepteert en een bijgewerkt DataFrame retourneert. Met deze methode kunt u een SQL-expressie gebruiken, zoals upper.
- Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode - selectExpr()en de SQL- upper-expressie om een tekenreekskolom te converteren naar hoofdletters (en de naam van de kolom te wijzigen).- Python- display(df.selectExpr("Count", "upper(County) as big_name"))- Scala- display(df.selectExpr("Count", "upper(County) as big_name"))- R- display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Gebruik expr() om de SQL-syntaxis voor een kolom te gebruiken
Meer informatie over het importeren en gebruiken van de Apache Spark expr()-functie om SQL-syntaxis te gebruiken op elke plek waar een kolom wordt gespecificeerd.
- Kopieer en plak de volgende code in een lege notebookcel. Met deze code importeert u de - expr()-functie en gebruikt u vervolgens de Apache Spark-- expr()-functie en de SQL- lower-expressie om een tekenreekskolom te converteren naar kleine letters (en de naam van de kolom te wijzigen).- Python- from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))- Scala- import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))- R- display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Een willekeurige SQL-query uitvoeren met behulp van de functie spark.sql()
Meer informatie over het gebruik van de Apache Spark-functie spark.sql() om willekeurige SQL-query's uit te voeren.
- Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark- - spark.sql()-functie om een query uit te voeren op een SQL-tabel met behulp van sql-syntaxis.- Python- display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))- Scala- display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))- R- display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
- Druk - Shift+Enterom de cel uit te voeren en naar de volgende cel te gaan.
Zelfstudienotebooks voor DataFrame
De volgende notebooks bevatten de voorbeeldenquery's uit deze zelfstudie.