Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Gäller för:
Databricks SQL
Databricks Runtime 14.1 och senare
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Returnerar en tabell med poster som avlästa från Pulsar.
Den här tabellvärdesfunktionen stöder endast direktuppspelning och inte batchfråga.
Syntax
read_pulsar ( { option_key => option_value } [, ...] )
Argument
Den här funktionen kräver namngivna parameteranrop för alternativnycklarna.
Alternativen serviceUrl och topic är obligatoriska.
Beskrivningarna av argumenten är korta här. Mer information finns i dokumentationen om strukturerade pulsarströmmar .
| Alternativ | Typ | Standardvärde | beskrivning |
|---|---|---|---|
| tjänsteURL | STRÄNG | Obligatorisk | URI:n för Pulsar-tjänsten. |
| Ämne | STRÄNG | Obligatorisk | Ämne att läsa från. |
| fördefinierad prenumeration | STRÄNG | Ingen | Det fördefinierade prenumerationsnamnet som används av kopplingen för att spåra spark-applikationens framsteg. |
| subscriptionPrefix | STRÄNG | Ingen | Ett prefix som används av kopplingen för att generera en slumpmässig prenumeration för att spåra spark-applikationens framsteg. |
| pollTimeoutMs | LÅNG | 120000 | Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder. |
| failOnDataLoss | BOOLESK | true | Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip). |
| startingOffsets | sträng | senaste | Startpunkten när en fråga startas, antingen tidigaste, senaste eller en JSON-sträng som anger en specifik förskjutning. Om den är nyast, läser läsaren de senaste posterna efter att den börjar köra. Om det är den tidigaste, läser läsaren från den tidigaste offset. Användaren kan också ange en JSON-sträng som anger en specifik förskjutning. |
| starttid | STRÄNG | Ingen | När det anges läser Pulsar-källan meddelanden från positionen för den angivna starttiden. |
Följande argument används för autentisering av pulsar-klienten:
| Alternativ | Typ | Standardvärde | beskrivning |
|---|---|---|---|
| pulsarClientAuthPluginClassName | STRÄNG | Ingen | Namnet på plugin-programmet för autentisering. |
| pulsarClientAuthParams | STRÄNG | Ingen | Parametrar för plugin-programmet för autentisering. |
| pulsarClientUseKeyStoreTls | STRÄNG | Ingen | Om du vill använda KeyStore för tls-autentisering. |
| pulsarClientTlsTrustStoreType | sträng | Ingen | TrustStore-filtyp för tls-autentisering. |
| pulsarClientTlsTrustStorePath | STRÄNG | Ingen | TrustStore-filsökväg för tls-autentisering. |
| pulsarClientTlsTrustStore-lösenord | STRÄNG | Ingen | TrustStore-lösenord för tls-autentisering. |
Dessa argument används för konfiguration och autentisering av pulsar-antagningskontroll, pulsaradministratörskonfiguration krävs endast när antagningskontroll är aktiverad (när maxBytesPerTrigger har angetts)
| Alternativ | Typ | Standardvärde | beskrivning |
|---|---|---|---|
| maxBytesPerTrigger | BIGINT | Ingen | En mjuk gräns för det maximala antalet byte som vi vill bearbeta per mikrobatch. Om detta anges måste även admin.url anges. |
| administratörs-URL | STRÄNG | Ingen | Pulsar-tjänsten HttpUrl-konfigurationen. Behövs bara när maxBytesPerTrigger har angetts. |
| pulsarAdminAuthPlugin | STRÄNG | Ingen | Namnet på plugin-programmet för autentisering. |
| pulsarAdminAuthParams | STRÄNG | Ingen | Parametrar för plugin-programmet för autentisering. |
| pulsarClientUseKeyStoreTls | STRÄNG | Ingen | Om du vill använda KeyStore för tls-autentisering. |
| pulsarAdminTlsTrustStoreType | STRÄNG | Ingen | TrustStore-filtyp för tls-autentisering. |
| pulsarAdminTlsTrustStorePath | STRÄNG | Ingen | TrustStore-filsökväg för tls-autentisering. |
| pulsarAdminTlsTrustStorePassword | STRÄNG | Ingen | TrustStore-lösenord för tls-autentisering. |
Returer
En tabell med pulsarrekord med följande schema.
__key STRING NOT NULL: Pulsar-meddelandenyckel.value BINARY NOT NULL: Pulsar-meddelandevärde.Obs! För ämnen med Avro- eller JSON-schema expanderas innehållet i stället för att läsa in innehåll i ett binärt värdefält för att bevara fältnamnen och fälttyperna för Pulsar-ämnet.
__topic STRING NOT NULL: Ämnesnamn för Pulsar.__messageId BINARY NOT NULL: Pulsar-meddelande-ID.__publishTime TIMESTAMP NOT NULL: Publiceringstid för Pulsar-meddelande.__eventTime TIMESTAMP NOT NULL: Pulsar meddelandehändelsetid.__messageProperties MAP<STRING, STRING>: Pulsar-meddelandeegenskaper.
Exempel
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.