Lakeflow 声明性管道事件日志包含与管道相关的所有信息,包括审核日志、数据质量检查、管道进度和数据世系。 可以使用事件日志来跟踪、了解和监视数据管道的状态。
可以在 Lakeflow 声明性管道 用户界面、Lakeflow 声明性管道 API 中或通过直接查询事件日志来查看事件日志条目。 此部分重点介绍如何直接查询事件日志。
还可以定义在记录事件时要运行的自定义操作,例如,使用事件挂钩发送警报。
重要
请勿删除事件日志或发布事件日志的父目录或架构。 删除事件日志可能会导致管道在将来的运行过程中无法更新。
有关事件日志架构的完整详细信息,请参阅 Lakeflow 声明性管道事件日志架构。
查询事件日志
注释
本部分介绍使用 Unity 目录和默认发布模式配置的管道使用事件日志的默认行为和语法。
- 有关使用旧版发布模式的 Unity 目录管道的行为,请参阅 适用于 Unity 目录旧版发布模式管道的事件日志。
- 有关 Hive 元存储管道的行为和语法,请参阅使用 Hive 元存储管道的事件日志。
默认情况下,Lakeflow 声明性管道会将事件日志写入为管道配置的默认目录和架构中的隐藏 Delta 表。 隐藏时,该表仍可由所有足够特权的用户查询。 默认情况下,只有管道的所有者才能查询事件日志表。
若要以所有者身份查询事件日志,请使用管道 ID:
SELECT * FROM event_log(<pipelineId>);
默认情况下,隐藏事件日志的名称的格式为 event_log_{pipeline_id},其中管道 ID 是系统分配的 UUID,用短划线替换下划线。
可以通过编辑管道 的高级设置 来发布事件日志。 有关详细信息,请参阅 事件日志的管道设置。 发布事件日志时,请指定事件日志的名称,并根据需要指定目录和架构,如以下示例所示:
{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}
事件日志位置还充当管道中任何自动加载程序查询的架构位置。 Databricks 建议在修改特权之前对事件日志表创建视图,因为一些计算设置可能允许用户直接共享事件日志表时访问架构元数据。 以下示例语法在事件日志表上创建视图,并在本文包含的示例事件日志查询中使用。 将 `<catalog_name>.<schema_name>.<event_log_table_name>` 替换为管道事件日志的完全限定表名称。 如果已发布事件日志,请使用发布时指定的名称。 否则,请在管道 ID 所在的位置使用 event_log(<pipelineId>),其中 pipelineId 是您要查询的管道。
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
在 Unity Catalog 中,视图支持流式查询。 以下示例使用结构化流式处理查询在事件日志表顶部定义的视图:
df = spark.readStream.table("event_log_raw")
基本查询示例
以下示例演示如何查询事件日志以获取有关管道的常规信息,以及帮助调试常见方案。
通过查询以前的更新来监视管道更新
以下示例查询管道的更新(或 运行),其中显示了更新 ID、状态、开始时间、完成时间和持续时间。 这为您提供了管道运行的概览。
假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
调试具体化视图增量刷新问题
此示例从管道的最新更新中查询所有工作流。 它显示了它们是否进行了增量更新,以及能帮助调试为何未进行增量刷新的其他相关规划信息。
假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
查询管道更新的成本
此示例演示如何查询管道的 DBU 使用情况,以及给定管道运行的用户。
SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;
高级查询
以下示例演示如何查询事件日志以处理不太常见的或更高级的方案。
查询管道中所有流的指标
此示例演示如何查询管道中每个流的详细信息。 它显示流名称、更新时长、数据质量指标以及有关已处理的行的信息(输出行、已删除、更新插入记录和已丢弃的记录)。
假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations
  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,
  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
查询数据质量或预期指标
如果在管道中定义数据集的期望,则符合和不符合期望的记录数指标将存储在 details:flow_progress.data_quality.expectations 对象中。 已删除记录数的指标存储在对象中 details:flow_progress.data_quality 。 包含有关数据质量的信息的事件具有事件类型 flow_progress。
某些数据集可能无法使用数据质量指标。 查看 预期限制。
提供以下数据质量指标:
| 指标 | Description | 
|---|---|
| dropped_records | 由于记录未能达到一个或多个预期而被删除的记录数。 | 
| passed_records | 通过预期条件的记录数。 | 
| failed_records | 不符合预期条件的记录数。 | 
以下示例查询上次管道更新的数据质量指标。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;
查询世系信息
包含有关世系的信息的事件具有事件类型 flow_definition。 
              details:flow_definition 对象包含用于定义图中每种关系的 output_dataset 和 input_datasets。
使用以下查询提取输入和输出数据集以查看世系信息。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
使用自动加载程序监视云文件引入
当自动加载程序处理文件时,Lakeflow 声明性管道将生成事件。 对于自动加载程序事件,event_type 是 operation_progress 且 details:operation_progress:type 既可以是 AUTO_LOADER_LISTING 也可以是 AUTO_LOADER_BACKFILL。 对象 details:operation_progress 还包括 status、 duration_ms字段 auto_loader_details:source_path和 auto_loader_details:num_files_listed 字段。
以下示例查询自动加载程序事件以获取最新更新。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
监视数据积压以优化流媒体持续时间
Lakeflow 声明性管道在 details:flow_progress.metrics.backlog_bytes 对象中跟踪积压中的数据量。 包含积压指标的事件具有事件类型 flow_progress。 以下示例查询最近一次流水线更新的积压量指标。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;
注释
积压指标可能因管道的数据源类型和 Databricks Runtime 版本而无法获取。
监视自动缩放事件以优化经典计算
对于使用经典计算(换句话说,不使用无服务器计算)的 Lakeflow 声明性管道,事件日志会在管道中启用增强的自动缩放时捕获群集大小。 包含有关增强型自动缩放的信息的事件具有事件类型 autoscale。 群集重设大小请求信息存储在 details:autoscale 对象中。
以下示例查询了上次管道更新中增强的自动伸缩群集调整大小请求。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id
监视经典计算的计算资源利用率
              cluster_resources 事件提供有关集群中任务槽的数量、这些任务槽的使用率以及有多少任务在等待调度的指标。
启用增强型自动缩放后,cluster_resources 事件还包含自动缩放算法的指标,包括 latest_requested_num_executors 和 optimal_num_executors。 这些事件还会将算法的状态显示为不同的状态,例如 CLUSTER_AT_DESIRED_SIZE、SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS 和 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION。
可以结合自动缩放事件一起查看此信息,以全面了解增强型自动缩放的情况。
以下示例查询任务队列大小历史记录、利用率历史记录、执行程序计数历史记录,以及上次管道更新中自动缩放的其他指标和状态。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;
监视管道数据流指标
重要
此功能目前以公共预览版提供。
可以查看与 Lakeflow 声明性管道流进度相关的指标。 查询 stream_progress 事件以获取与结构化流式处理创建的 StreamingQueryListener 指标 非常相似的事件,但有以下例外:
- 以下指标存在于StreamingQueryListener中,但不存在于stream_progress中:numInputRows、inputRowsPerSecond和processedRowsPerSecond。
- 对于 Kafka 和 Kinesis 流,startOffset、endOffset和latestOffset这些字段可能太大,因此会被截断。 对于每个字段,将添加一个附加...Truncated字段、一个startOffsetTruncated字段、一个endOffsetTruncated字段和一个latestOffsetTruncated字段,并以布尔值指示数据是否被截断。
若要查询 stream_progress 事件,可以使用如下所示的查询:
SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
下面是 JSON 中的事件示例:
{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}
此示例显示 Kafka 源中的未截断的记录,字段 ...Truncated 设置为 false:
{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}
审核 Lakeflow 声明性管道
可以使用 Lakeflow 声明性管道事件日志和其他 Azure Databricks 审核日志来全面了解如何在 Lakeflow 声明性管道中更新数据。
Lakeflow 声明式流水线使用管道所有者的认证信息来执行更新。 你可以更改通过更新管道所有者使用的凭据。 Lakeflow 声明性管道记录用户对管道操作的行为,包括管道创建、配置编辑和更新触发。
有关 Unity Catalog 审核事件的参考,请参阅 Unity Catalog 事件。
在事件日志中查询用户操作
可以使用事件日志来审核事件,例如用户操作。 包含有关用户操作的信息的事件具有事件类型 user_action。
有关操作的信息存储在 user_action 字段的 details 对象中。 请使用以下查询来构造用户事件的审核日志。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
| timestamp | action | user_name | 
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START | user@company.com | 
| 2021-05-20T19:35:59.913+0000 | CREATE | user@company.com | 
| 2021-05-27T00:35:51.971+0000 | START | user@company.com | 
运行时信息
可以查看管道更新的运行时信息,例如更新的 Databricks Runtime 版本,假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
| dbr_version | 
|---|
| 11.0 |