Delen via


CREATE STREAMING TABLE

Van toepassing op:aangevinkt als ja Databricks SQL

Hiermee maakt u een streamingtabel, een Delta-tabel met extra ondersteuning voor streaming of incrementele gegevensverwerking.

Streamingtabellen worden alleen ondersteund in declaratieve pijplijnen van Lakeflow en in Databricks SQL met Unity Catalog. Als u deze opdracht uitvoert op ondersteunde Databricks Runtime-berekening, wordt de syntaxis alleen geparseerd. Zie Pijplijncode ontwikkelen met SQL.

Syntaxis

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Parameterwaarden

  • REFRESH

    Indien opgegeven, vernieuwt u de tabel met de meest recente gegevens die beschikbaar zijn vanuit de bronnen die in de query zijn gedefinieerd. Alleen nieuwe gegevens die binnenkomen voordat de query wordt gestart, worden verwerkt. Nieuwe gegevens die worden toegevoegd aan de bronnen tijdens de uitvoering van de opdracht, worden genegeerd tot de volgende vernieuwing. De vernieuwingsbewerking van CREATE OR REFRESH is volledig declaratief. Als met een vernieuwingsopdracht niet alle metagegevens uit de oorspronkelijke instructie voor het maken van tabellen worden opgegeven, worden de niet-opgegeven metagegevens verwijderd.

  • ALS NIET BESTAAT

    Maakt de streamingtabel aan als deze niet bestaat. Als er al een tabel met deze naam bestaat, wordt de instructie CREATE STREAMING TABLE genegeerd.

    U kunt hoogstens één van IF NOT EXISTS of OR REFRESH specificeren.

  • table_name

    De naam van de tabel die moet worden gemaakt. De naam mag geen tijdelijke specificatie of optiesspecificatie bevatten. Als de naam niet is gekwalificeerd, wordt de tabel gemaakt in het huidige schema.

  • tafel_specificatie

    Deze optionele component definieert de lijst met kolommen, hun typen, eigenschappen, beschrijvingen en kolombeperkingen.

    Als u geen kolommen in het tabelschema definieert, moet u AS queryopgeven.

    • column_identifier

      Een unieke naam voor de kolom.

      • column_type

        Hiermee geeft u het gegevenstype van de kolom.

      • NIET NULL

        Als het is gespecificeerd, accepteert de kolom geen NULL-waarden.

      • OPMERKING (kolom opmerking)

        Een letterlijke tekenreeks om de kolom te beschrijven.

      • column_constraint

        Belangrijk

        Deze functie is beschikbaar als openbare preview.

        Voegt een primaire sleutel of vreemde sleutel toe als beperking aan de kolom in een streamingtabel. Beperkingen worden niet ondersteund voor tabellen in de hive_metastore catalogus.

      • MASK-clausule

        Belangrijk

        Deze functie is beschikbaar als openbare preview.

        Voegt een kolommaskerfunctie toe om gevoelige gegevens anoniem te maken. Alle volgende query's van die kolom ontvangen het resultaat van het evalueren van die functie ten opzichte van de kolom in plaats van de oorspronkelijke waarde van de kolom. Dit kan handig zijn voor verfijnde toegangsbeheerdoeleinden, waarbij de functie de identiteit of groepslidmaatschappen van de aanroepende gebruiker kan inspecteren om te bepalen of de waarde moet worden bewerkt.

      • CONSTRAINT expectation_name VERWACHTEN (expectation_expr) [ BIJ SCHENDING { FAIL UPDATE | DROP ROW } ]

        Voegt verwachtingen voor gegevenskwaliteit toe aan de tabel. Deze verwachtingen voor gegevenskwaliteit kunnen in de loop van de tijd worden bijgehouden en worden geopend via het gebeurtenislogboek van de streamingtabel. Een FAIL UPDATE verwachting zorgt ervoor dat de verwerking mislukt bij het maken van de tabel en het vernieuwen van de tabel. Een DROP ROW verwachting zorgt ervoor dat de hele rij wordt verwijderd als niet aan de verwachting wordt voldaan.

        expectation_expr kan bestaan uit letterlijke waarden, kolom-id's in de tabel en deterministische, ingebouwde SQL-functies of -operators, met uitzondering van:

        Mag ook expr geen subquery bevatten.

      • tabelbeperking

        Belangrijk

        Deze functie is beschikbaar als openbare preview.

        Voegt informatieve primaire of externe sleutelbeperkingen toe aan een streamingtabel. Sleutelbeperkingen worden niet ondersteund voor tabellen in de hive_metastore-catalogus.

  • tabel_clausules

    Geef desgewenst partitionering, opmerkingen, door de gebruiker gedefinieerde eigenschappen en een vernieuwingsschema voor de nieuwe tabel op. Elke subclausule mag slechts eenmaal worden opgegeven.

    • GEPARTITIONEERD OP

      Een optionele lijst met kolommen van de tabel om de tabel op te delen.

      Notitie

      Liquid clustering biedt een flexibele, geoptimaliseerde oplossing voor clustering. Overweeg het gebruik van CLUSTER BY in plaats van PARTITIONED BY voor streamingtabellen.

    • CLUSTER BY

      Een optionele clausule om te clusteren op basis van een subset van kolommen. Gebruik automatische liquide clustering met CLUSTER BY AUTOen Databricks kiest op intelligente wijze clusteringsleutels om de queryprestaties te optimaliseren. Zie Liquid Clustering gebruiken voor tabellen.

      Vloeibare clustering kan niet worden gecombineerd met PARTITIONED BY.

    • OPMERKING: table_comment

      Een letterlijke STRING om de tabel te beschrijven.

    • STANDAARD COLLATIE UTF8_BINARY

      Van toepassing op:controleren gemarkeerd ja Databricks SQL-controle gemarkeerd als ja Databricks Runtime 17.1 en hoger

      Dwingt de standaardsortering van de streamingtabel naar UTF8_BINARY. Deze component is verplicht als het schema waarin de tabel wordt gemaakt, een andere standaardsortering heeft dan UTF8_BINARY. De standaardsortering van de streamingtabel wordt gebruikt als de standaardsortering binnen query en voor kolomtypen.

    • TBLPROPERTIES

      U kunt desgewenst een of meer door de gebruiker gedefinieerde eigenschappen instellen.

      Gebruik deze instelling om het Lakeflow Declarative Pipelines Runtime-kanaal op te geven dat wordt gebruikt om deze instructie uit te voeren. Stel de waarde van de eigenschap pipelines.channel in op "PREVIEW" of "CURRENT". De standaardwaarde is "CURRENT". Zie Lakeflow Declarative Pipelines-runtimekanalen voor meer informatie over Lakeflow Declarative Pipelines-kanalen.

    • planning

      De planning kan een SCHEDULE instructie of een TRIGGER instructie zijn.

      • SCHEMA [ REFRESH ] schema_clausule

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          Als u een vernieuwing wilt plannen die periodiek plaatsvindt, gebruikt u EVERY syntaxis. Als EVERY syntaxis is opgegeven, wordt de streamingtabel of gerealiseerde weergave periodiek vernieuwd met het opgegeven interval op basis van de opgegeven waarde, zoals HOUR, HOURS, DAY, DAYS, WEEKof WEEKS. De volgende tabel bevat geaccepteerde gehele getallen voor number.

          Tijdeenheid Integerwaarde
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 <= W <= 8

          Notitie

          De enkelvoudige en meervoudvormen van de opgenomen tijdseenheid zijn semantisch gelijkwaardig.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Een vernieuwing plannen met behulp van een Quartz cron waarde. Geldige time_zone_values worden geaccepteerd. AT TIME ZONE LOCAL wordt niet ondersteund.

          Als AT TIME ZONE afwezig is, wordt de tijdzone van de sessie gebruikt. Als AT TIME ZONE afwezig is en de sessietijdzone niet is ingesteld, wordt er een fout gegenereerd. SCHEDULE is semantisch gelijk aan SCHEDULE REFRESH.

        De planning kan worden opgegeven als onderdeel van de CREATE opdracht. Gebruik ALTER STREAMING TABLE of voer CREATE OR REFRESH opdracht uit met SCHEDULE component om het schema van een streamingtabel te wijzigen nadat deze is gemaakt.

      • TRIGGER ON UPDATE [ MAXIMAAL ELKE trigger_interval ]

        Belangrijk

        De TRIGGER ON UPDATE functie bevindt zich in de bètaversie.

        U kunt de tabel desgewenst zo instellen dat deze wordt vernieuwd wanneer een upstream-gegevensbron wordt bijgewerkt, maximaal één keer per minuut. Stel een waarde in om AT MOST EVERY minimaal een minimale tijd tussen vernieuwingen te vereisen.

        De upstream-gegevensbronnen moeten externe of beheerde Delta-tabellen zijn (inclusief gerealiseerde weergaven of streamingtabellen) of beheerde weergaven waarvan afhankelijkheden zijn beperkt tot ondersteunde tabeltypen.

        Door bestandsevenementen in te schakelen, kunnen triggers beter presteren en worden enkele van de limieten voor triggerupdates verhoogd.

        Dit trigger_interval is een INTERVAL-instructie die ten minste 1 minuut is.

        TRIGGER ON UPDATE heeft de volgende beperkingen

        • Niet meer dan 10 upstream-gegevensbronnen per streamingtabel met behulp van tabeltriggers.
        • Maximaal 50 streamingtabellen of gerealiseerde weergaven met behulp van tabeltriggers (door bestandsgebeurtenissen in upstream-gegevensbronnen in te schakelen, wordt deze limiet verwijderd).
        • Voor brongegevens in een externe Delta-tabel geldt een limiet van 10.000 rijen per wijzigingsset (het inschakelen van bestandsevenementen in upstream-gegevensbronnen verwijdert deze limiet).
        • De AT MOST EVERY component is standaard ingesteld op 1 minuut en mag niet kleiner zijn dan 1 minuut.
  • WITH ROW FILTER clausule

    Belangrijk

    Deze functie is beschikbaar als openbare preview.

    Hiermee voegt u een rijfilterfunctie toe aan de tabel. Alle volgende query's uit die tabel ontvangen een subset van de rijen waarin de functie de booleaanse waarde TRUE oplevert. Dit kan handig zijn voor verfijnde toegangsbeheerdoeleinden, waarbij de functie de identiteit of groepslidmaatschappen van de aanroepende gebruiker kan inspecteren om te bepalen of bepaalde rijen moeten worden gefilterd.

  • AS-query

    Met deze clausule wordt de tabel gevuld met behulp van de gegevens uit query. Deze query moet een streamingquery zijn. Dit kan worden bereikt door het STREAM trefwoord toe te voegen aan elke relatie die u incrementeel wilt verwerken. Wanneer u een query en een table_specification samen opgeeft, moet het tabelschema dat is opgegeven in table_specification alle kolommen bevatten die door de queryworden geretourneerd, anders krijgt u een foutmelding. Kolommen die zijn opgegeven in table_specification maar niet worden geretourneerd door query geven null-waarden terug bij een query.

Verschillen tussen streamingtabellen en andere tabellen

Streamingtabellen zijn statusgevoelige tabellen, ontworpen om elke rij slechts één keer te verwerken bij het verwerken van een groeiende gegevensset. Omdat de meeste gegevenssets in de loop van de tijd continu groeien, zijn streamingtabellen geschikt voor de meeste opnameworkloads. Streamingtabellen zijn optimaal voor pijplijnen waarvoor nieuwe gegevens en lage latentie nodig zijn. Streamingtabellen kunnen ook handig zijn voor grootschalige transformaties, omdat resultaten incrementeel kunnen worden berekend wanneer nieuwe gegevens binnenkomen, zodat de resultaten up-to-date blijven zonder dat alle brongegevens volledig hoeven te worden gecomputeerd met elke update. Streamingtabellen zijn ontworpen voor gegevensbronnen die alleen worden toegevoegd.

Streamingtabellen accepteren aanvullende opdrachten, zoals REFRESH, waarmee de meest recente gegevens worden verwerkt die beschikbaar zijn in de bronnen die in de query zijn opgegeven. Wijzigingen in de opgegeven query worden alleen doorgevoerd in nieuwe gegevens door een REFRESHaan te roepen, en gelden niet voor eerder verwerkte gegevens. Als u ook de wijzigingen op bestaande gegevens wilt toepassen, moet u REFRESH TABLE <table_name> FULL uitvoeren om een FULL REFRESH uit te voeren. Alle beschikbare gegevens in de bron worden opnieuw verwerkt met de meest recente definitie. Het is niet raadzaam om volledige vernieuwingen aan te roepen voor bronnen die de volledige geschiedenis van de gegevens niet behouden of korte bewaarperioden hebben, zoals Kafka, omdat de volledige vernieuwing de bestaande gegevens afkapt. Mogelijk kunt u oude gegevens niet herstellen als de gegevens niet meer beschikbaar zijn in de bron.

Rijfilters en kolommaskers

Belangrijk

Deze functie is beschikbaar als openbare preview.

Met rijfilters kunt u een functie opgeven die als filter wordt toegepast wanneer een tabelscan rijen ophaalt. Deze filters zorgen ervoor dat volgende query's alleen rijen retourneren waarbij het filterpredicaat op 'waar' evalueert.

Met kolommaskers kunt u de waarden van een kolom maskeren wanneer een tabelscan rijen ophaalt. Alle toekomstige query's met betrekking tot die kolom ontvangen het resultaat van de evaluatie van de functie over de kolom, waarbij de oorspronkelijke waarde van de kolom wordt vervangen.

Zie Rijfilters en kolommaskers voor meer informatie over het gebruik van rijfilters en kolommaskers.

Rijfilters en kolommaskers beheren

Rijfilters en kolommaskers in streaming-tabellen moeten worden toegevoegd, bijgewerkt of verwijderd via de CREATE OR REFRESH-instructie.

Gedrag

  • Vernieuwen als Definer: Wanneer de CREATE OR REFRESH of REFRESH instructies een streamingtabel vernieuwen, worden rijfilterfuncties uitgevoerd met de rechten van de Definer (als eigenaar van de tabel). Dit betekent dat de tabelvernieuwing gebruikmaakt van de beveiligingscontext van de gebruiker die de streamingtabel heeft gemaakt.
  • Query: Terwijl de meeste filters worden uitgevoerd met de rechten van de definieerer, zijn functies die gebruikerscontext controleren (zoals CURRENT_USER en IS_MEMBER) uitzonderingen. Deze functies worden uitgevoerd als de aanroeper. Deze aanpak dwingt gebruikersspecifieke gegevensbeveiliging en toegangsbeheer af op basis van de context van de huidige gebruiker.

Waarneembaarheid

Gebruik DESCRIBE EXTENDED, INFORMATION_SCHEMAof Catalog Explorer om de bestaande rijfilters en kolommaskers te onderzoeken die van toepassing zijn op een bepaalde streamingtabel. Met deze functionaliteit kunnen gebruikers de toegang tot gegevens en de beveiligingsmaatregelen voor streamingtabellen auditen en evalueren.

Beperkingen

  • Alleen eigenaren van tabellen kunnen streamingtabellen vernieuwen om de meest recente gegevens op te halen.
  • ALTER TABLE opdrachten zijn niet toegestaan voor streamingtabellen. De definitie en eigenschappen van de tabel moeten worden gewijzigd via de instructie CREATE OR REFRESH of ALTER STREAMING TABLE.
  • Het tabelschema ontwikkelen via DML-opdrachten zoals INSERT INTOen MERGE wordt niet ondersteund.
  • De volgende opdrachten worden niet ondersteund voor streamingtabellen:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing wordt niet ondersteund.
  • Het wijzigen van de naam van de tabel of het wijzigen van de eigenaar wordt niet ondersteund.
  • Tabelbeperkingen zoals PRIMARY KEY en FOREIGN KEY worden niet ondersteund voor streamingtabellen in de hive_metastore catalogus.
  • Gegenereerde kolommen, identiteitskolommen en standaardkolommen worden niet ondersteund.

Voorbeelden

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')