Dela via


Användardefinierade skalärfunktioner – Scala

Den här artikeln innehåller exempel på användardefinierade funktioner i Scala (UDF). Den visar hur du registrerar UDF:er, hur du anropar UDF:er och varningar om utvärderingsordning för underuttryck i Spark SQL. Mer information finns i Externa användardefinierade skalärfunktioner (UDF:er).

Kommentar

Scala UDF:er på Unity Catalog-aktiverade beräkningsresurser med standardåtkomstläge (tidigare delat åtkomstläge) kräver Databricks Runtime 14.2 och senare.

Registrera en funktion som en UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

Anropa UDF i Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

Använda UDF med DataFrames

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

Utvärderingsordning och nullkontroll

Spark SQL (inklusive SQL och API:er för DataFrame och datauppsättning) garanterar inte utvärderingsordningen för underuttryck. I synnerhet utvärderas inte indata från en operator eller funktion nödvändigtvis från vänster till höger eller i någon annan fast ordning. Till exempel har logiska AND uttryck OR inte vänster-till-höger-semantik för "kortslutning".

Därför är det farligt att förlita sig på bieffekter eller utvärderingsordning av booleska uttryck och ordningen på WHERE och HAVING-satser, eftersom sådana uttryck och satser kan ordnas om under frågeoptimering och planering. Mer specifikt, om en UDF förlitar sig på kortslutningssemantik i SQL för null-kontroll, finns det ingen garanti för att null-kontrollen sker innan UDF anropas. Ett exempel:

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

Den här WHERE satsen garanterar strlen inte att UDF anropas efter filtrering av null-värden.

För att utföra rätt null-kontroll rekommenderar vi att du gör något av följande:

  • Gör UDF medveten om null och gör null-kontroll inne i själva UDF:n
  • Använd IF eller CASE WHEN uttryck för att göra null-kontrollen och anropa UDF i en villkorsstyrd gren
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Api:er för typade datauppsättningar

Kommentar

Den här funktionen stöds i Unity Catalog-aktiverade kluster med standardåtkomstläge i Databricks Runtime 15.4 och senare.

Med inskrivna API:er för datauppsättningar kan du köra transformeringar som mappning, filter och aggregeringar på resulterande datauppsättningar med en användardefinierad funktion.

Följande Scala-program använder till exempel API:et map() för att ändra ett tal i en resultatkolumn till en prefixsträng.

spark.range(3).map(f => s"row-$f").show()

Även om det här exemplet använder API:et map() gäller detta även för andra typerade API:er för datauppsättningar, till exempel filter(), mapPartitions(), foreach(), foreachPartition(), reduce()och flatMap().

Scala UDF-funktioner och Databricks Runtime-kompatibilitet

Följande Scala-funktioner kräver minst Databricks Runtime-versioner när de används i Unity Catalog-aktiverade kluster i standardåtkomstläge (delad).

Egenskap Minimimum Databricks Runtime-version
Skalära UDF:er Databricks Runtime 14.2
Dataset.map, Dataset.mapPartitions, Dataset.filter, , , Dataset.reduceDataset.flatMap Databricks Runtime 15.4
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups Databricks Runtime 15.4
(Direktuppspelning) foreachWriter Sink Databricks Runtime 15.4
(Direktuppspelning) foreachBatch Databricks Runtime 16.1
(Direktuppspelning) KeyValueGroupedDataset.flatMapGroupsWithState Databricks Runtime 16.2