本文介绍如何将 Confluent Cloud for Apache Kafka 源添加到事件流。
Confluent Cloud for Apache Kafka 是一个流式处理平台,它使用 Apache Kafka 提供强大的数据流式处理和处理功能。 通过将 Confluent Cloud for Apache Kafka 集成到您的事件流中作为源,可以无缝处理实时数据流,然后将其路由到 Fabric 的多个目标。
注意
工作区容量的以下区域不支持此源:美国西部 3、瑞士西部。
先决条件
- 使用参与者或更高权限在 Fabric 容量许可证模式(或)试用许可证模式下访问工作区。
- 一个用于 Apache Kafka 群集的 Confluent Cloud 和一个 API 密钥。
- Confluent Cloud for Apache Kafka 群集必须可公开访问,并且不能位于防火墙后面或在虚拟网络中受到保护。
- 如果没有事件流,请创建一个事件流。
启动“选择数据源”向导
如果尚未向事件流添加任何源,请选择“使用外部源”磁贴。
如果要向已发布的事件流添加源,请切换到“编辑”模式,在功能区中选择“添加源”,然后选择“外部源”。
配置并连接到 Confluent Cloud 以使用 Apache Kafka
在 “选择数据源 ”页上,选择 适用于 Apache Kafka 的 Confluent Cloud。
若要创建与 Confluent Cloud for Apache Kafka 源的连接,请选择“ 新建连接”。
在“连接设置”部分中,输入“Confluent 启动服务器”。 导航到 Confluent Cloud 主页,选择“群集设置”,并将地址复制到启动服务器。
在 “连接凭据 ”部分中,如果有与 Confluent 群集的现有连接,请从 “连接”下拉列表中选择它。 否则,请执行以下步骤:
- 对于“连接名称”,为连接输入名称。
- 对于“身份验证类型”,确认已选择“Confluent Cloud 密钥”。
- 对于“API 密钥”和“API 密钥机密”:
导航到 Confluent Cloud。
在侧菜单中选择“API 密钥”。
选择“添加密钥”按钮以创建新的 API 密钥。
复制“API 密钥”和“机密”。
将这些值粘贴到“API 密钥”和“API 密钥机密”字段中。
选择连接
滚动以查看页面上的 “配置 Confluent Cloud for Apache Kafka 数据源 ”部分。 输入信息以完成 Confluent 数据源的配置。
对于 主题名称,请从 Confluent Cloud 输入主题名称。 可以在 Confluent Cloud 控制台中创建或管理主题。
对于 使用者组,请输入 Confluent Cloud 的使用者组。 它为你提供了专用的使用者组,用于从 Confluent Cloud 群集获取事件。
对于 重置自动偏移 设置,请选择以下值之一:
- 最早 – Confluent 群集中可用的最早数据。
- 最新 - 最新可用数据。
- 无 – 不自动设置偏移量。
注意
“ 无” 选项在此创建步骤中不可用。 如果存在已提交的偏移量并且想要使用 None,可以先完成配置,然后在 Eventstream 编辑模式下更新设置。
取决于数据是否使用 Confluent 架构注册表进行编码:
- 如果未编码,请选择“ 下一步”。 在 “审阅并创建 ”屏幕上,查看摘要,然后选择“ 添加” 以完成设置。
- 如果编码,请转到下一步: 连接到 Confluent 架构注册表以解码数据(预览版)
连接到 Confluent 架构注册表以解码数据(预览版)
Eventstream 的 Confluent Cloud for Apache Kafka 流式处理连接器能够解码使用 Confluent 序列化程序和 Confluent Cloud 的架构注册表生成的数据。 使用 Confluent 架构注册表的序列化程序编码的数据需要从 Confluent 架构注册表中检索架构,以便进行解码。 无法访问架构,Eventstream 无法预览、处理或路由传入数据。
可以展开 高级设置 以配置 Confluent 架构注册表连接:
定义和序列化数据:选择 “是 ”可让你将数据序列化为标准化格式。 选择 “否” 以原始格式保存数据,并在不修改的情况下传递数据。
如果使用架构注册表对数据进行编码,请在选择数据是否使用架构注册表进行编码时选择 “是 ”。 然后选择 “新建连接 ”以配置对 Confluent 架构注册表的访问:
- 架构注册表 URL:架构注册表的公共终结点。
- API 密钥 和 API 密钥机密:导航到 Confluent Cloud 环境的架构注册表以复制 API 密钥 和 API 机密。 确保用于创建此 API 密钥的帐户对架构具有 DeveloperRead 或更高版本的权限。
- 隐私级别:从 “无”、“ 专用”、“ 组织”或 “公共”中进行选择。
JSON 输出十进制格式:指定源数据中十进制逻辑类型值的 JSON 序列化格式。
- NUMERIC:序列化为数字。
- BASE64:序列化为 base64 编码的数据。
选择下一步。 在 “审阅并创建 ”屏幕上,查看摘要,然后选择“ 添加” 以完成设置。
你会看到用于 Apache Kafka 的 Confluent Cloud 源已添加到在 编辑模式 下的画布上的事件流中。 若要实现新添加的 Confluent Cloud for Apache Kafka 源,请选择功能区上的 《发布》。
完成这些步骤后,Apache Kafka 的 Confluent Cloud 数据源可用于 实时视图中的可视化。
注意
若要预览此 Confluent Cloud for Apache Kafka 源的事件,请确保用于创建云连接的 API 密钥对前缀为“preview-”的使用者组具有读取权限。 如果 API 密钥是使用 用户帐户创建的,则无需执行其他步骤,因为这种类型的密钥已完全访问 Confluent Cloud for Apache Kafka 资源,包括前缀 为“preview-”的使用者组的读取权限。 但是,如果密钥是使用服务帐户创建的,则需要手动向前缀为“preview-”的使用者组授予读取权限,以便预览事件。
对于 Confluent Cloud for Apache Kafka 数据源,当使用 Confluent 架构注册表对数据进行编码时,支持对 Confluent AVRO 格式的消息进行预览。 如果未使用 Confluent 架构注册表对数据进行编码,则只能预览 JSON 格式的消息。
相关内容
其他连接器: