Dela via


create_sink

Viktigt!

API:et för deklarativa pipelines create_sink i Lakeflow finns i offentlig förhandsversion.

Funktionen create_sink() skriver till en händelseströmningstjänst som Apache Kafka eller Azure Event Hubs eller till en Delta-tabell från en deklarativ pipeline. När du har skapat en mottagare med funktionen create_sink() använder du mottagaren i ett tilläggsflöde för att skriva data till mottagaren. tilläggsflöde är den enda flödestyp som stöds med funktionen create_sink(). Andra flödestyper, till exempel create_auto_cdc_flow, stöds inte.

Delta-sink stöder externa och hanterade tabeller i Unity Catalog samt hanterade tabeller i Hive-metadatalager. Namnen på tabeller måste vara fullständigt kvalificerade. Unity Catalog-tabeller måste till exempel använda en identifierare på tre nivåer: <catalog>.<schema>.<table>. Hive-metaarkivtabeller måste använda <schema>.<table>.

Anmärkning

  • Om du kör en fullständig uppdatering rensas inte data från mottagare. Alla ombearbetade data läggs till i datamottagaren och befintliga data ändras inte.
  • Förväntningar på Lakeflows deklarativa pipelines stöds inte med API:et sink.

Syntax

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parameterar

Parameter Typ Description
name str Obligatoriskt. En sträng som identifierar sänkan och används för att referera till och hantera sänkan. Sinksnamn måste vara unika för pipelinen, inklusive alla källkodsfiler som är en del av pipelinen.
format str Obligatoriskt. En sträng som definierar utdataformatet, antingen kafka eller delta.
options dict En lista över alternativ för mottagare, formaterad som {"key": "value"}, där nyckeln och värdet är båda strängarna. Alla Databricks Runtime-alternativ som stöds av Kafka- och Delta-mottagare stöds.

Examples

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)