Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Med användardefinierade funktioner (UDF:er) kan du återanvända och dela kod som utökar inbyggda funktioner i Azure Databricks. Använd UDF:er för att utföra specifika uppgifter som komplexa beräkningar, transformeringar eller anpassade datamanipuleringar.
När ska du använda en UDF- eller Apache Spark-funktion?
Använd UDF:er för logik som är svår att uttrycka med inbyggda Apache Spark-funktioner. Inbyggda Apache Spark-funktioner är optimerade för distribuerad bearbetning och ger bättre prestanda i stor skala. Mer information finns i Funktioner.
Databricks rekommenderar UDF:er för ad hoc-frågor, manuell datarensning, undersökande dataanalys och åtgärder på små till medelstora datamängder. Vanliga användningsfall för UDF:er är datakryptering, dekryptering, hashning, JSON-parsning och validering.
Använd Apache Spark-metoder för åtgärder på mycket stora datauppsättningar och alla arbetsbelastningar körs regelbundet eller kontinuerligt, inklusive ETL-jobb och strömningsåtgärder.
Förstå UDF-typer
Välj en UDF-typ från följande flikar för att se en beskrivning, ett exempel och en länk för mer information.
Skalär UDF
Skalära UDF:er körs på en enskild rad och returnerar ett enda resultatvärde för varje rad. De kan antingen styras av Unity Catalog eller vara kopplade till en session.
I följande exempel används en skalär UDF för att beräkna längden på varje namn i en name kolumn och lägga till värdet i en ny kolumn name_length.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Så här implementerar du detta i en Databricks-notebook-fil med PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Se Användardefinierade funktioner (UDF: er) i Unity Catalog och Användardefinierade skalärfunktioner – Python.
Batch-skalära UDF:er
Bearbeta data i batchar samtidigt som 1:1-indata/utdataradsparitet bibehålls. Detta minskar resursförbrukningen vid radvis bearbetning för storskalig databearbetning. Batch-UDF:er upprätthåller också tillstånd mellan batchar för att köras mer effektivt, återanvända resurser och hantera komplexa beräkningar som behöver kontext över datasegment.
De kan antingen styras av Unity Catalog eller vara kopplade till en session.
Följande Batch Unity Catalog Python UDF beräknar BMI vid bearbetning av batchar med rader:
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Se Användardefinierade funktioner (UDF: er) i Unity Catalog och Batch Python Användardefinierade funktioner (UDF: er) i Unity Catalog.
Icke-skalära UDF:er
Icke-skalära UDF:er fungerar på hela datauppsättningar/kolumner med flexibla in- och utdataförhållanden (1:N eller många:många).
Sessionsavgränsade batch- UDF:er i pandas kan vara av följande typer:
- Serie till serie
- Iteratör av serier till iteratör av serier
- Iterator för flera serier till iterator av serier
- Serie till skalär
Följande är ett exempel på en Series till Series pandas UDF.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
Se användardefinierade funktioner i Pandas.
UDAF
UDAFs fungerar på flera rader och returnerar ett enda aggregerat resultat. UDAF:er är endast avsedda för sessionomfattning.
I följande UDAF-exempel aggregeras poäng efter namnlängd.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Se användardefinierade funktioner för Pandas i Python och användardefinierade aggregeringsfunktioner i Scala.
UDTF:er
En UDTF tar ett eller flera indataargument och returnerar flera rader (och eventuellt flera kolumner) för varje indatarad. De kan antingen styras av Unity Catalog eller vara kopplade till en session.
Följande UDTF skapar en tabell med en fast lista med två heltalsargument:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
Så här implementerar du detta i en Databricks-notebook-fil med PySpark:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Se UDF:er för Unity-katalogen och sessionsomfattande UDF:er.
Unity Catalog-styrda kontra sessionsbaserade UDF:er
Unity Catalog Python UDF:er, Batch Unity Catalog Python UDF:er och Unity Catalog Python UDF:er sparas i Unity Catalog för bättre styrning, återanvändning och identifiering. Alla andra UDF:er är sessionsbaserade, vilket innebär att de definieras i en notebook-fil eller ett jobb och är begränsade till den aktuella SparkSession. Du kan definiera och komma åt sessionsomfattande UDF:er med Scala eller Python.
Unity Catalog-styrt UDF:er fuskark
Med Unity Catalog-styrda UDF:er kan anpassade funktioner definieras, användas, delas på ett säkert sätt och styras i olika datormiljöer. Se Användardefinierade funktioner (UDF: er) i Unity Catalog.
| UDF-typ | Beräkning som stöds | Beskrivning |
|---|---|---|
| Unity Catalog Python UDF |
|
Definiera en UDF i Python och registrera den i Unity Catalog för styrning. Skalära UDF:er körs på en enskild rad och returnerar ett enda resultatvärde för varje rad. |
| Batch Unity Katalog Python UDF |
|
Definiera en UDF i Python och registrera den i Unity Catalog för styrning. Batchoperationer på flera värden och returnerar flera värden. Minskar överbelastning av radvisa operationer för storskalig databearbetning. |
| Unity Catalog Python UDTF |
|
Definiera en UDTF i Python och registrera den i Unity Catalog för styrning. En UDTF tar ett eller flera indataargument och returnerar flera rader (och eventuellt flera kolumner) för varje indatarad. |
Sessionsomfattande UDF:er-fuskblad för användarisolerad beräkning
Sessionsomfattnings-UDF:er definieras i en notebook-fil eller ett jobb och är begränsade till den aktuella SparkSession. Du kan definiera och komma åt sessionsomfattande UDF:er med Scala eller Python.
| UDF-typ | Beräkning som stöds | Beskrivning |
|---|---|---|
| Python-skalär |
|
Skalära UDF:er körs på en enskild rad och returnerar ett enda resultatvärde för varje rad. |
| Python icke-skalär |
|
Icke-skalära UDF:er inkluderar pandas_udf, mapInPandas, mapInArrow, applyInPandas. Pandas UDFs använder Apache Arrow för att överföra data och pandas för att hantera data. Pandas UDF:er stöder vektoriserade åtgärder som avsevärt kan öka prestanda jämfört med skalär-UDF:er rad för rad. |
| Python-UDTF:er |
|
En UDTF tar ett eller flera indataargument och returnerar flera rader (och eventuellt flera kolumner) för varje indatarad. |
| Scala-skalära UDF:er |
|
Skalära UDF:er körs på en enskild rad och returnerar ett enda resultatvärde för varje rad. |
| Scala-användardefinierade aggregeringsfunktioner (UDAFs) |
|
UDAFs fungerar på flera rader och returnerar ett enda aggregerat resultat. |
Prestandaöverväganden
Inbyggda funktioner och SQL UDF:er är de mest effektiva alternativen.
Scala-UDF:er är vanligtvis snabbare än Python-UDF:er.
- Unisolated Scala UDF:er körs i den virtuella Java-datorn (JVM), så de undviker att flytta data in och ut ur JVM.
- Isolerade Scala-UDF:er måste flytta data in och ut ur JVM, men de kan fortfarande vara snabbare än Python-UDF:er eftersom de hanterar minnet mer effektivt.
Python-UDF:er och pandas-UDF:er tenderar att vara långsammare än Scala-UDF:er eftersom de behöver serialisera data och flytta dem från JVM till Python-tolken.
- Pandas UDF:er är upp till 100 gånger snabbare än Python-UDF:er eftersom de använder Apache Arrow för att minska serialiseringskostnaderna.