Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Viktig
Den här funktionen finns i offentlig förhandsversion i Databricks Runtime 14.3 LTS och senare.
Med en användardefinierad tabellfunktion (UDTF) kan du registrera funktioner som returnerar tabeller i stället för skalära värden. Till skillnad från skalärfunktioner som returnerar ett enda resultatvärde från varje anrop anropas varje UDTF i en SQL-instruktionssats FROM och returnerar en hel tabell som utdata.
Varje UDTF-anrop kan acceptera noll eller fler argument. Dessa argument kan vara skalära uttryck eller tabellargument som representerar hela indatatabeller.
UDF:er kan registreras på två sätt:
- Unity Catalog: Registrera UDTF som ett styrt objekt i Unity Catalog. Se Python-användardefinierade tabellfunktioner (UDF: er) i Unity Catalog.
- Sessionsomfång: Registrera dig för den lokala
SparkSession, isolerade till den aktuella notebook-filen eller jobbet.
Tips/Råd
Databricks rekommenderar att du registrerar UDF:er i Unity Catalog för att dra nytta av centraliserad styrning som gör det enklare att dela och återanvända funktioner på ett säkert sätt mellan användare och team.
Grundläggande UDTF-syntax
Apache Spark implementerar Python UDTFs som Python-klasser med en obligatorisk eval-metod som använder yield för att generera utdatarader.
Om du vill använda klassen som UDTF måste du importera funktionen PySpark udtf. Databricks rekommenderar att du använder den här funktionen som dekoratör och uttryckligen anger fältnamn och typer med hjälp av alternativet returnType (såvida inte klassen definierar en analyze-metod enligt beskrivningen i ett senare avsnitt).
Följande UDTF skapar en tabell med en fast lista med två heltalsargument:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrera en UDTF
Om du vill registrera en UDTF med sessionsomfattning för användning i SQL-frågor använder du spark.udtf.register(). Ange ett namn för SQL-funktionen och Python UDTF-klassen.
spark.udtf.register("get_sum_diff", GetSumDiff)
Anropa en registrerad UDTF
När du har registrerat dig kan du använda UDTF i SQL med antingen det %sql magiska kommandot eller spark.sql() funktionen:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
Uppgradera en UDTF med sessionsomfattning till Unity Catalog
Viktig
Registrering av Python-UDF:er i Unity Catalog finns i offentlig förhandsversion. UDTF:er för Unity-katalogen kräver Databricks Runtime version 17.1 och senare. Se kraven.
Du kan uppgradera en UDTF med sessionsomfattning till Unity Catalog för att dra nytta av centraliserad styrning och göra det enklare att dela och återanvända funktioner på ett säkert sätt mellan användare och team.
Om du vill uppgradera en UDTF med sessionsomfattning till Unity Catalog använder du SQL DDL med -instruktionen CREATE OR REPLACE FUNCTION . I följande exempel visas hur du konverterar GetSumDiff UDTF från en funktion med sessionsomfattning till en Unity Catalog-funktion:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
Mer information om UDF:er för Unity-katalogen finns i Python-användardefinierade tabellfunktioner (UDF:er) i Unity Catalog.
Använda Apache Arrow
Om UDTF tar emot en liten mängd data som indata men matar ut en stor tabell rekommenderar Databricks att du använder Apache Arrow. Du kan aktivera den genom att ange parametern useArrow när du deklarerar UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Variabelargumentlistor – *args och **kwargs
Du kan använda Python-*args eller **kwargs syntax och implementera logik för att hantera ett ospecificerat antal indatavärden.
I följande exempel returneras samma resultat när du uttryckligen kontrollerar indatalängden och typerna för argumenten:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Här är samma exempel, men med nyckelordsargument:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definiera ett statiskt schema vid registreringen
UDTF returnerar rader med ett utdataschema som består av en ordnad sekvens med kolumnnamn och typer. Om UDTF-schemat alltid ska vara detsamma för alla frågor kan du ange ett statiskt, fast schema efter @udtf dekoratör. Det måste antingen vara en StructType:
StructType().add("c1", StringType())
Eller en DDL-sträng som representerar en structtyp:
c1: string
Beräkna ett dynamiskt schema vid funktionsanropstid
UDF:er kan också beräkna utdataschemat programmatiskt för varje anrop beroende på värdena för indataargumenten. För att göra detta definierar du en statisk metod som kallas analyze som accepterar noll eller fler parametrar som motsvarar argumenten som tillhandahålls till det specifika UDTF-anropet.
Varje argument i metoden analyze är en instans av klassen AnalyzeArgument som innehåller följande fält:
AnalyzeArgument klassfält |
Beskrivning |
|---|---|
dataType |
Typen av indataargument som en DataType. För argument för indatatabeller är detta en StructType som representerar tabellens kolumner. |
value |
Värdet för indataargumentet som en Optional[Any]. Det här är None för tabellargument eller literalskalära argument som inte är konstanta. |
isTable |
Om indataargumentet är en tabell som BooleanType. |
isConstantExpression |
Om indataargumentet är ett konstant vikbart uttryck som en BooleanType. |
Metoden analyze returnerar en instans av AnalyzeResult klassen, som innehåller resultattabellens schema som ett StructType plus några valfria fält. Om UDTF accepterar ett argument i indatatabellen kan AnalyzeResult också inkludera ett begärt sätt att partitionera och beställa raderna i indatatabellen över flera UDTF-anrop, enligt beskrivningen senare.
AnalyzeResult klassfält |
Beskrivning |
|---|---|
schema |
Schemat för resultattabellen som en StructType. |
withSinglePartition |
Om alla indatarader ska skickas till samma UDTF-klassinstans som en BooleanType. |
partitionBy |
Om värdet inte är tomt används alla rader med varje unik kombination av värden för partitioneringsuttrycken av en separat instans av UDTF-klassen. |
orderBy |
Om värdet inte är tomt anger detta en ordning på rader inom varje partition. |
select |
Om värdet inte är tomt är detta en sekvens med uttryck som UDTF anger för Catalyst att utvärdera mot kolumnerna i indata TABLE argumentet. UDTF tar emot ett indataattribut för varje namn i listan i den ordning de visas. |
Det här analyze exemplet returnerar en utdatakolumn för varje ord i argumentet för indatasträngen.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
Vidarebefordra status till framtida eval-anrop
Metoden analyze kan fungera som en praktisk plats för att utföra initiering och sedan vidarebefordra resultatet till framtida eval metodanrop för samma UDTF-anrop.
För att göra det skapar du en underklass av AnalyzeResult och returnerar en instans av underklassen från metoden analyze.
Lägg sedan till ytterligare ett argument i metoden __init__ för att acceptera den instansen.
Det här analyze exemplet returnerar ett konstant utdataschema, men lägger till anpassad information i resultatmetadata som ska användas av framtida __init__-metodanrop:
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Ge utdatarader
Metoden eval körs en gång för varje rad i argumentet för indatatabellen (eller bara en gång om inget tabellargument anges), följt av ett anrop av metoden terminate i slutet. Antingen matar metoden ut noll eller fler rader som överensstämmer med resultatschemat genom att ge tupplar, listor eller pyspark.sql.Row objekt.
Det här exemplet returnerar en rad genom att ange en tupl med tre element.
def eval(self, x, y, z):
yield (x, y, z)
Du kan också utelämna parenteserna:
def eval(self, x, y, z):
yield x, y, z
Lägg till ett avslutande kommatecken för att returnera en rad med endast en kolumn:
def eval(self, x, y, z):
yield x,
Du kan också generera ett pyspark.sql.Row-objekt.
def eval(self, x, y, z):
from pyspark.sql.types import Row
yield Row(x, y, z)
Det här exemplet ger utdatarader från metoden terminate med hjälp av en Python-lista. Du kan lagra tillståndet i klassen från tidigare steg i UDTF-utvärderingen för det här ändamålet.
def terminate(self):
yield [self.x, self.y, self.z]
Skicka skalära argument till en UDTF
Du kan skicka skalära argument till en UDTF som konstanta uttryck som består av literalvärden eller funktioner baserat på dem. Till exempel:
SELECT * FROM get_sum_diff(1, y => 2)
Skicka tabellargument till en UDTF
Python-UDF:er kan acceptera en indatatabell som ett argument utöver skalära indataargument. En enskild UDTF kan också acceptera ett tabellargument och flera skalära argument.
Sedan kan alla SQL-frågor tillhandahålla en indatatabell med hjälp av nyckelordet TABLE följt av parenteser som omger en lämplig tabellidentifierare, till exempel TABLE(t). Du kan också skicka en tabellunderfråga, till exempel TABLE(SELECT a, b, c FROM t) eller TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).
Argumentet för indatatabellen representeras sedan som ett pyspark.sql.Row argument för metoden eval, med ett anrop till metoden eval för varje rad i indatatabellen. Du kan använda standardkommentarer för PySpark-kolumnfält för att interagera med kolumner i varje rad. I följande exempel visas explicit import av PySpark-Row typ och sedan filtrering av den skickade tabellen i fältet id:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Om du vill köra frågor mot funktionen använder du nyckelordet TABLE SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Ange en partitionering av indatarader för funktionsanrop
När du anropar en UDTF med ett tabellargument kan alla SQL-frågor partitionera indatatabellen över flera UDTF-anrop baserat på värdena för en eller flera indatatabellkolumner.
Om du vill ange en partition använder du PARTITION BY-satsen i funktionsanropet efter argumentet TABLE.
Detta garanterar att alla indatarader med varje unik kombination av värden i partitioneringskolumnerna hanteras av exakt en instans av UDTF-klassen.
Observera att förutom enkla kolumnreferenser accepterar PARTITION BY-satsen även godtyckliga uttryck baserat på indatatabellkolumner. Du kan till exempel ange LENGTH för en sträng, extrahera en månad från ett datum eller sammanfoga två värden.
Det är också möjligt att ange WITH SINGLE PARTITION istället för PARTITION BY för att begära endast en partition där alla indatarader måste konsumeras av exakt en instans av UDTF-klassen.
Inom varje partition kan du ange en obligatorisk ordning på indataraderna eftersom UDTF-metoden eval använder dem. Det gör du genom att ange en ORDER BY-sats efter satsen PARTITION BY eller WITH SINGLE PARTITION som beskrivs ovan.
Tänk till exempel på följande UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Du kan ange partitioneringsalternativ när du anropar UDTF över indatatabellen på flera sätt:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Ange en partitionering av indataraderna från metoden analyze
Observera att för vart och ett av ovanstående sätt att partitionera indatatabellen när du anropar UDF:er i SQL-frågor finns det ett motsvarande sätt för UDTF-metoden analyze att ange samma partitioneringsmetod automatiskt i stället.
- I stället för att anropa en UDTF som
SELECT * FROM udtf(TABLE(t) PARTITION BY a)kan du uppdatera metodenanalyzeför att ange fältetpartitionBy=[PartitioningColumn("a")]och helt enkelt anropa funktionen med hjälp avSELECT * FROM udtf(TABLE(t)). - Med samma token kan du i stället för att ange
TABLE(t) WITH SINGLE PARTITION ORDER BY bi SQL-frågan göraanalyzeange fältenwithSinglePartition=trueochorderBy=[OrderingColumn("b")]och sedan bara skickaTABLE(t). - I stället för att skicka
TABLE(SELECT a FROM t)i SQL-frågan kan du göraanalyzeangeselect=[SelectedColumn("a")]och sedan bara skickaTABLE(t).
I följande exempel returnerar analyze ett konstant utdataschema, väljer en delmängd av kolumner från indatatabellen och anger att indatatabellen är partitionerad över flera UDTF-anrop baserat på värdena i kolumnen date:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add("longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word)",
alias="length_word")])