Dela via


Läsa in data med deklarativa Lakeflow-pipelines

Du kan läsa in data från alla datakällor som stöds av Apache Spark på Azure Databricks med hjälp av Lakeflow Deklarativa Pipeliner. Du kan definiera datauppsättningar (tabeller och vyer) i Lakeflow Declarative Pipelines för alla frågor som returnerar en Spark DataFrame, inklusive strömmande Spark DataFrames och Pandas för Spark DataFrames. För datainmatningsuppgifter rekommenderar Databricks att du använder strömningstabeller för de flesta användningsfall. Strömmande tabeller är bra för att mata in data från molnobjektlagring med hjälp av Auto Loader eller från meddelandebussar som Kafka.

Anmärkning

  • Alla datakällor har inte SQL-stöd för inmatning. Du kan blanda SQL och Python-källor i Lakeflow deklarativa pipelines för att använda Python där det behövs och SQL för andra operationer i samma pipeline.
  • Mer information om hur du arbetar med bibliotek som inte är paketerade som standard i Lakeflow Deklarativa Pipelines finns i Hantera Python-beroenden för deklarativa Lakeflow-pipelines.
  • Allmän information om datainmatning i Azure Databricks finns i Standardanslutningar i Lakeflow Connect.

Exemplen nedan visar några vanliga mönster.

Läsa in från en befintlig tabell

Läs in data från en befintlig tabell i Azure Databricks. Du kan transformera data med hjälp av en fråga eller läsa in tabellen för vidare bearbetning i din pipeline.

I följande exempel läss data från en befintlig tabell:

python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Läsa in filer från molnobjektlagring

Databricks rekommenderar att du använder Auto Loader med Lakeflow Deklarativa Pipelines för de flesta datainmatningsuppgifterna från molnobjektlagringslösningar eller från filer i en Unity Catalog-volym. Auto Loader och Lakeflow Deklarativa Pipelines är utformade för att stegvis och idempotent läsa in ständigt växande data när det tas emot i molnlagringen.

Se Vad är automatisk inläsning? och Läsa in data från objektlagring.

I följande exempel hämtas data från molnlagring med Auto Loader.

python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

I följande exempel används Auto Loader för att skapa datamängder från CSV-filer i en Unity Catalog-volym:

python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Anmärkning

  • Om du använder Auto Loader med filaviseringar och kör en fullständig uppdatering för din pipeline eller strömningstabell måste du rensa dina resurser manuellt. Du kan använda CloudFilesResourceManager i en anteckningsbok för att utföra rensning.
  • Om du vill läsa in filer med Auto Loader i en Unity Catalog-aktiverad pipeline måste du använda externa platser. Mer information om hur du använder Unity Catalog med deklarativa pipelines för Lakeflow finns under Använd Unity Catalog med dina deklarativa pipelines för Lakeflow.

Läsa in data från en meddelandebuss

Du kan konfigurera deklarativa lakeflow-pipelines för att mata in data från meddelandebussar. Databricks rekommenderar att du använder strömningstabeller med kontinuerlig exekvering och förbättrad autoskalning för att ge den optimala inmatningen för låg latens inläsning från meddelandebussar. Se Optimera klusteranvändningen för deklarativa lakeflow-pipelines med autoskalning.

Följande kod konfigurerar till exempel en strömmande tabell för att mata in data från Kafka med hjälp av funktionen read_kafka :

python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Information om hur du matar in från andra meddelandebusskällor finns i:

Läsa in data från Azure Event Hubs

Azure Event Hubs är en dataströmningstjänst som tillhandahåller ett Apache Kafka-kompatibelt gränssnitt. Du kan använda Structured Streaming Kafka-kopplingen, som ingår i Lakeflow Declarative Pipelines-körningen, för att ladda in meddelanden från Azure Event Hubs. Mer information om hur du läser in och bearbetar meddelanden från Azure Event Hubs finns i Använda Azure Event Hubs som en datakälla för deklarativa pipelines i Lakeflow.

Läsa in data från externa system

Lakeflow Deklarativa Pipelines stöder laddning av data från alla datakällor som stöds av Azure Databricks. Se Ansluta till datakällor och externa tjänster. Du kan också ladda in externa data med Lakehouse Federation för stödda datakällor . Eftersom Lakehouse Federation kräver Databricks Runtime 13.3 LTS eller senare, måste din pipeline konfigureras för att använda förhandsgranskningskanalen för att kunna använda Lakehouse Federation.

Vissa datakällor har inte motsvarande stöd i SQL. Om du inte kan använda Lakehouse Federation med någon av dessa datakällor kan du använda Python för att mata in data från källan. Du kan lägga till Python- och SQL-källfiler i samma pipeline. I följande exempel deklareras en materialiserad vy för att få åtkomst till det aktuella tillståndet för data i en fjärransluten PostgreSQL-tabell:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Läs in små eller statiska datamängder från molnobjektlagring

Du kan läsa in små eller statiska datauppsättningar med apache Spark-inläsningssyntax. Lakeflows deklarativa pipeliner stödjer alla filformat som stöds av Apache Spark på Azure Databricks. En fullständig lista finns i Alternativ för dataformat.

Följande exempel visar hur JSON läses in för att skapa Lakeflow deklarativa Pipelines-tabeller.

python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Anmärkning

Funktionen read_files SQL är gemensam för alla SQL-miljöer i Azure Databricks. Det är det rekommenderade sättet för direkt filåtkomst med SQL med Lakeflow Deklarativa Pipelines. Mer information finns i Alternativ.

Läsa in data från en anpassad Python-datakälla

Med anpassade Python-datakällor kan du läsa in data i anpassade format. Du kan skriva kod för att läsa från och skriva till en specifik extern datakälla, eller använda befintlig Python-kod i dina befintliga system för att läsa data från dina egna interna system. Mer information om hur du utvecklar Python-datakällor finns i PySpark-anpassade datakällor.

Om du vill använda en anpassad Python-datakälla för att läsa in data i Lakeflow Deklarativa pipelines registrerar du den med ett formatnamn, till exempel my_custom_datasource, och läser sedan från den:

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Konfigurera en strömmande tabell för att ignorera ändringar i en källströmningstabell

Anmärkning

  • Flaggan skipChangeCommits fungerar bara med spark.readStream med hjälp av funktionen option(). Du kan inte använda den här flaggan i en dp.read_stream() funktion.
  • Du kan inte använda skipChangeCommits flaggan när källuppspelningstabellen definieras som mål för en create_auto_cdc_flow() -funktion.

Som standard kräver strömmande tabeller källor som endast tillåter tillägg. När en strömmande tabell använder en annan strömmande tabell som källa, och källströmningstabellen kräver uppdateringar eller borttagningar, till exempel GDPR-bearbetning av "rätt att bli bortglömd", kan skipChangeCommits-flaggan anges när du läser källströmningstabellen för att ignorera dessa ändringar. Mer information om den här flaggan finns i Ignorera uppdateringar och borttagningar.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Kom åt lagringsuppgifter på ett säkert sätt med säkerhetsnycklar i en pipeline

Du kan använda Azure Databricks-hemligheter för att lagra autentiseringsuppgifter som åtkomstnycklar eller lösenord. Om du vill konfigurera hemligheten i din pipeline använder du en Spark-egenskap i klusterkonfigurationen för pipelineinställningar. Se Konfigurera beräkning i klassiskt läge för deklarativa Lakeflow-pipelines.

I följande exempel används en hemlighet för att lagra en åtkomstnyckel som krävs för att läsa indata från ett Azure Data Lake Storage-lagringskonto (ADLS) med hjälp av Auto Loader. Du kan använda samma metod för att konfigurera alla hemligheter som krävs av din pipeline, till exempel AWS-nycklar för att komma åt S3 eller lösenordet till ett Apache Hive-metaarkiv.

Mer information om hur du arbetar med Azure Data Lake Storage finns i Ansluta till Azure Data Lake Storage och Blob Storage.

Anmärkning

Du måste lägga till prefixet spark.hadoop. till spark_conf konfigurationsnyckeln som anger det hemliga värdet.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Ersätt

  • <storage-account-name> med namnet på ADLS-lagringskontot.
  • <scope-name> med namnet på Azure Databricks-hemlighetsområdet.
  • <secret-name> med namnet på nyckeln som innehåller åtkomstnyckeln för Azure Storage-kontot.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Ersätt

  • <container-name> med namnet på containern för Azure-lagringskontot som lagrar indata.
  • <storage-account-name> med namnet på ADLS-lagringskontot.
  • <path-to-input-dataset> med sökvägen till inmatningsdatamängden.