Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Gäller för:
Databricks SQL
Databricks Runtime 13.3 LTS och senare
Returnerar en tabell med poster som lästs från en eller flera strömmar i Kinesis.
Syntax
read_kinesis ( { parameter => value } [, ...] )
Argument
read_kinesis kräver namngiven parameteranrop.
Det enda argumentet som krävs är streamName. Alla andra argument är valfria.
Beskrivningarna av argumenten är korta här. Mer information finns i Amazon Kinesis-dokumentationen.
Det finns flera sätt att ansluta och autentisera med AWS.
Den rekommenderade metoden är att skapa en Databricks-tjänstautentiseringsuppgift och ange den med hjälp av serviceCredential alternativet .
Du kan också autentisera med hjälp av awsAccessKey och awsSecretKey.
Dessa alternativ kan antingen anges i funktionsargumenten secret med hjälp av funktionen, anges manuellt i argumenten eller konfigureras som miljövariabler enligt nedan.
roleArn, roleExternalID, roleSessionName kan också användas för att autentisera med AWS med hjälp av instansprofiler.
Om inget av dessa anges använder den standardkedjan för AWS-providern.
| Parameter | Typ | Beskrivning |
|---|---|---|
streamName |
STRING |
Obligatorisk, kommaavgränsad lista över en eller flera kinesis-strömmar. |
serviceCredential |
STRING |
Namnet på databricks-tjänstens autentiseringsuppgifter. |
awsAccessKey |
STRING |
AWS-åtkomstnyckeln, om den finns. Kan också anges via de olika alternativ som stöds via AWS standardleverantörskedja för autentiseringsuppgifter, inklusive miljövariabler (AWS_ACCESS_KEY_ID) och en profilfil för autentiseringsuppgifter. |
awsSecretKey |
STRING |
Den hemliga nyckel som motsvarar åtkomstnyckeln. Kan anges antingen i argumenten eller via de olika alternativ som stöds via AWS standardleverantörskedja för autentiseringsuppgifter, inklusive miljövariabler (AWS_SECRET_KEY eller AWS_SECRET_ACCESS_KEY) och en profilfil för autentiseringsuppgifter. |
roleArn |
STRING |
Amazon-resursnamnet för rollen som ska antas vid åtkomst till Kinesis. |
roleExternalId |
STRING |
Används vid delegering av åtkomst till AWS-kontot. |
roleSessionName |
STRING |
AWS-rollsessionens namn |
stsEndpoint |
STRING |
En slutpunkt för att begära autentiseringsuppgifter för tillfällig åtkomst. |
region |
STRING |
Region för de strömmar som ska anges. Standardvärdet är den lokalt lösta regionen. |
endpoint |
STRING |
regional slutpunkt för Kinesis-dataströmmar. Standardvärdet är den lokalt lösta regionen. |
initialPosition |
STRING |
Startposition för läsning från strömmen. En av: "senaste" (standard), "trim_horizon", "tidigast", "at_timestamp". |
consumerMode |
STRING |
En av: "polling" (standard) eller "EFO" (enhanced-fan-out). |
consumerName |
STRING |
Namnet på konsumenten. Alla konsumenter har prefixet "databricks_". Standardvärdet är en tom sträng. |
registerConsumerTimeoutInterval |
STRING |
den maximala tidsgränsen för att vänta tills Kinesis EFO-konsumenten har registrerats med Kinesis-strömmen innan ett fel uppstår. Standardvärdet är "300s". |
requireConsumerDeregistration |
BOOLEAN |
true för att avregistrera EFO-konsumenten vid frågeavslut. Standard är false. |
deregisterConsumerTimeoutInterval |
STRING |
Maximal tidsgräns för att vänta på att Kinesis EFO-klienten avregistreras från Kinesis-strömmen innan ett fel påträffas. Standardvärdet är "300s". |
consumerRefreshInterval |
STRING |
Det intervall med vilket konsumenten kontrolleras och uppdateras. Standardvärdet är "300s". |
Följande argument används för att kontrollera läsdataflödet och svarstiden för Kinesis:
| Parameter | Typ | Beskrivning |
|---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Valfritt, med standardvärdet 10 000 poster som ska läsas för varje API-begäran till Kinesis. |
maxFetchRate |
STRING |
Hur snabbt man förhämtar data per shard. Ett värde mellan 1,0 och 2,0 som mäts i MB/s. Standardvärdet är "1.0". |
minFetchPeriod |
STRING |
Den maximala väntetiden mellan efterföljande prefetch-försök. Standardvärdet är "400ms". |
maxFetchDuration |
STRING |
Den maximala varaktigheten för buffring av förhämtade nya data. Standardvärdet är "10s". |
fetchBufferSize |
STRING |
Mängden data för nästa utlösare. Standardvärdet är "20gb". |
shardsPerTask |
INTEGER (>0) |
Antalet Kinesis-shards som ska förhämtas parallellt per spark-uppgift. Standardinställningen är 5. |
shardFetchinterval |
STRING |
Hur ofta man kontrollerar för ompartitionering. Standardvärdet är "1s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Det tröskelvärde vid vilket automatisk sammansning sker. Standardvärdet är 10 000 000. |
coalesce |
BOOLEAN |
true för att sammansmälta förhämtade förfrågningar. Standardvärdet är true. |
coalesceBinSize |
INTEGER (>0) |
Den ungefärliga blockstorleken efter sammankoppling. Standardvärdet är 128 000 000. |
reuseKinesisClient |
BOOLEAN |
true för att återanvända Kinesis-klienten som lagras i cacheminnet. Standardvärdet är true förutom i ett PE-kluster. |
clientRetries |
INTEGER (>0) |
Antalet återförsök i återförsöksscenariot. Standardinställningen är 5. |
Returer
En tabell med Kinesis-uppgifter enligt följande schema:
| Namn | Datatyp | Nullbar | Norm | Beskrivning |
|---|---|---|---|---|
partitionKey |
STRING |
Nej | En nyckel som används för att distribuera data mellan fragmenten i en dataström. Alla dataposter med samma partitionsnyckel kommer att läsas från samma fragment. | |
data |
BINARY |
Nej | Kinesis-datanyttolasten, kodad med base-64. | |
stream |
STRING |
Nej | Namnet på strömmen där data lästes från. | |
shardId |
STRING |
Nej | En unik identifierare för fragmentet där data lästes från. | |
sequenceNumber |
BIGINT |
Nej | Den unika identifieraren för posten i dess skärva. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nej | Den ungefärliga tid då posten infogades i dataströmmen. |
Kolumnerna (stream, shardId, sequenceNumber) utgör en primärnyckel.
Exempel
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret('test-databricks', 'awsAccessKey'),
awsSecretKey => secret('test-databricks', 'awsSecretKey'),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');