Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Viktigt!
API:et för deklarativa pipelines create_sink i Lakeflow finns i offentlig förhandsversion.
Funktionen create_sink() skriver till en händelseströmningstjänst som Apache Kafka eller Azure Event Hubs eller till en Delta-tabell från en deklarativ pipeline. När du har skapat en mottagare med funktionen create_sink() använder du mottagaren i ett tilläggsflöde för att skriva data till mottagaren. tilläggsflöde är den enda flödestyp som stöds med funktionen create_sink(). Andra flödestyper, till exempel create_auto_cdc_flow, stöds inte.
Delta-sink stöder externa och hanterade tabeller i Unity Catalog samt hanterade tabeller i Hive-metadatalager. Namnen på tabeller måste vara fullständigt kvalificerade. Unity Catalog-tabeller måste till exempel använda en identifierare på tre nivåer: <catalog>.<schema>.<table>. Hive-metaarkivtabeller måste använda <schema>.<table>.
Anmärkning
- Om du kör en fullständig uppdatering rensas inte data från mottagare. Alla ombearbetade data läggs till i datamottagaren och befintliga data ändras inte.
 - Förväntningar på Lakeflows deklarativa pipelines stöds inte med API:et 
sink. 
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parameterar
| Parameter | Typ | Description | 
|---|---|---|
name | 
str | 
Obligatoriskt. En sträng som identifierar sänkan och används för att referera till och hantera sänkan. Sinksnamn måste vara unika för pipelinen, inklusive alla källkodsfiler som är en del av pipelinen. | 
format | 
str | 
Obligatoriskt. En sträng som definierar utdataformatet, antingen kafka eller delta. | 
options | 
dict | 
En lista över alternativ för mottagare, formaterad som {"key": "value"}, där nyckeln och värdet är båda strängarna. Alla Databricks Runtime-alternativ som stöds av Kafka- och Delta-mottagare stöds.
  | 
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)
# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)