本页演示如何使用马赛克 AI 代理框架和常用代理创作库(如 LangGraph 和 OpenAI)在 Python 中创作 AI 代理。
要求
小窍门
Databricks 建议在开发代理时安装最新版本的 MLflow Python 客户端。
若要使用此页上的方法创作和部署代理,请安装以下内容:
-
databricks-agents1.2.0 或更高版本 -
mlflow3.1.3 或更高版本 - Python 3.10 或更高版本。
- 使用无服务器计算或 Databricks Runtime 13.3 LTS 或更高版本来满足此要求。
%pip install -U -qqqq databricks-agents mlflow
Databricks 还建议安装 Databricks AI Bridge 集成包以创作代理。 这些集成包提供与 Databricks AI 功能(如 Databricks AI/BI Genie 和 Vector Search)交互的共享 API 层,这些 API 跨代理创作框架和 SDK。
OpenAI
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
完全用 Python 编写的代理
%pip install -U -qqqq databricks-ai-bridge
使用 ResponsesAgent 来创作代理
Databricks 建议使用 MLflow 接口 ResponsesAgent 来创建生产级代理。
ResponsesAgent 允许使用任何第三方框架生成代理,然后将其与 Databricks AI 功能集成,实现可靠的日志记录、跟踪、评估、部署和监视功能。
该 ResponsesAgent 架构与 OpenAI Responses 架构兼容。 若要了解有关 OpenAI Responses的详细信息,请参阅 OpenAI:响应与 ChatCompletion。
注意
Databricks 仍支持较旧的 ChatAgent 接口。 但是,对于新代理,Databricks 建议使用最新版本的 MLflow 和 ResponsesAgent 接口。
请参阅 旧版输入和输出代理架构。
ResponsesAgent 提供以下优势:
高级代理功能
- 多代理支持
- 流式处理输出:以较小的区块流式传输输出。
- 全面的工具呼叫消息历史记录:返回多个消息,包括中间工具呼叫消息,以提高质量和聊天管理。
- 工具调用确认支持
- 长时间运行的工具支持
简化的开发、部署和监视
-
使用任何框架创作代理:使用
ResponsesAgent接口包装任何现有代理,以便与 AI Playground、代理评估和代理监视保持现式兼容性。 - 类型化创作接口:使用类型化的 Python 类编写代理代码,受益于 IDE 和笔记本自动完成。
-
自动签名推理:MLflow 在记录代理时自动推断
ResponsesAgent签名,从而简化注册和部署。 请参阅在日志记录期间推断模型签名。 -
自动跟踪:MLflow 自动跟踪你的
predict和predict_stream函数,聚合流式响应,以便更轻松地评估和显示。 - AI 网关增强的推理表:自动为已部署的代理启用 AI 网关推理表,从而提供对详细请求日志元数据的访问权限。
-
使用任何框架创作代理:使用
若要了解如何创建, ResponsesAgent请参阅以下部分中的示例和 MLflow 文档 - Model Service 的 ResponsesAgent。
ResponsesAgent 例子
以下笔记本演示如何使用常用库创作流式处理和非流式处理 ResponsesAgent 。 若要了解如何扩展这些代理的功能,请参阅 AI 代理工具。
OpenAI
使用 Databricks 托管的模型的 OpenAI 简易聊天系统
使用 Databricks 托管的模型的 OpenAI 工具调用代理
使用 OpenAI 托管的模型的 OpenAI 工具调用代理
LangGraph
LangGraph 工具调用代理
DSPy
DSPy 单次工具调用代理
多代理示例
若要了解如何创建多代理系统,请参阅 在多代理系统中使用 Genie。
有状态代理
若要了解如何创建可将内存范围限定为单个会话线程的有状态代理并提供检查点会话的功能,请参阅 有状态 AI 代理。
如果我已有代理,该怎么办?
如果已有使用 LangChain、LangGraph 或类似框架构建的代理,则无需重写代理以在 Databricks 上使用它。 只需使用 MLflow ResponsesAgent 接口包装现有代理即可:
编写继承自
mlflow.pyfunc.ResponsesAgent. 的 Python 包装类。在包装类中,将现有代理引用为属性
self.agent = your_existing_agent。该
ResponsesAgent类需要实现返回predict用于处理非流式处理请求的方法ResponsesAgentResponse。 下面是架构的示例ResponsesAgentResponses:import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )在函数中
predict,将传入消息转换为ResponsesAgentRequest代理所需的格式。 代理生成响应后,将其输出转换为ResponsesAgentResponse对象。
请参阅以下代码示例,了解如何将现有代理 ResponsesAgent转换为:
基本转换
对于非流式处理代理,转换函数中的 predict 输入和输出。
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
使用代码重新使用流式处理
对于流式处理代理,可以巧妙地重复使用逻辑,以避免复制转换消息的代码:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
从 ChatCompletions 迁移
如果现有代理使用 OpenAI ChatCompletions API,则可以将其 ResponsesAgent 迁移到而无需重写其核心逻辑。 添加一个包装器,该包装器:
- 将传入
ResponsesAgentRequest消息转换为ChatCompletions代理所需的格式。 - 将
ChatCompletions输出转换为ResponsesAgentResponse架构。 - (可选)通过将增量增量从
ChatCompletions对象映射到对象来ResponsesAgentStreamEvent支持流式传输。
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-3-7-sonnet",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
有关完整示例,请参阅 ResponsesAgent 示例。
流式处理响应
流式处理允许代理以实时区块形式发送响应,而不是等待完整的响应。 若要实现流式处理 ResponsesAgent,请发出一系列增量事件,然后发出最终完成事件:
-
发出增量事件:发送多个
output_text.delta具有相同item_id事件以实时流式传输文本区块。 -
完成完成事件:发送与包含完整最终输出文本的增量事件相同的
response.output_item.done最终item_id事件。
每个增量事件将一块文本流式传输到客户端。 最终完成事件包含完整的响应文本,并指示 Databricks 执行以下作:
- 使用 MLflow 跟踪跟踪跟踪代理的输出
- AI 网关推理表中的聚合流式响应
- 在 AI Playground UI 中显示完整的输出
流式处理错误传播
马赛克 AI 传播使用下的最后一个令牌 databricks_output.error进行流式处理时遇到的任何错误。 由调用客户端来正确处理和显示此错误。
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
高级功能
自定义输入和输出
某些方案可能需要其他代理输入,例如 client_type ,或 session_id检索源链接等输出,这些源链接不应包含在聊天历史记录中供将来交互使用。
对于这些方案,MLflow ResponsesAgent 原生支持字段 custom_inputs 和 custom_outputs。 可以通过request.custom_inputs中链接的所有示例中访问自定义输入。
警告
代理评估评审应用不支持为具有其他输入字段的代理呈现跟踪。
请参阅以下笔记本,了解如何设置自定义输入和输出。
在 AI场和评审应用中提供custom_inputs
如果代理使用 custom_inputs 字段接受其他输入,则可以在 AI Playground 和 评审应用中手动提供这些输入。
在 AI 操场或代理评审应用中,选择齿轮图标
。
启用自定义输入。
提供与代理定义的输入架构匹配的 JSON 对象。
指定自定义检索器架构
AI 代理通常使用检索器从矢量搜索索引查找和查询非结构化数据。 有关检索器工具的示例,请参阅 非结构化数据的生成和跟踪检索器工具。
使用 MLflow RETRIEVER 范围在代理中跟踪这些检索器,以启用 Databricks 产品功能,包括:
- 在 AI Playground UI 中自动显示检索到的源文档的链接
- 在代理评估中自动运行检索有据性和相关性判断
注意
Databricks 建议使用 Databricks AI Bridge 包提供的检索器工具,例如 databricks_langchain.VectorSearchRetrieverTool 和 databricks_openai.VectorSearchRetrieverTool,因为它们已符合 MLflow 检索器架构。 请参阅使用 AI Bridge 在本地开发矢量搜索检索工具。
如果代理包含使用自定义架构的检索器范围,请在代码中定义代理时调用 mlflow.models.set_retriever_schema 。 这会将检索器的输出列映射到 MLflow 的预期字段(primary_key、text_column、doc_uri)。
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
注意
在评估检索器的性能时,该 doc_uri 列尤为重要。
doc_uri 是检索器返回的文档的主要标识符,使你可以将它们与地面真相评估集进行比较。 请参阅评估集(MLflow 2)。
部署注意事项
准备 Databricks 模型服务
Databricks 在 ResponsesAgentDatabricks 模型服务上的分布式环境中部署。 这意味着,在多轮对话期间,同一服务副本可能无法处理所有请求。 请注意管理代理状态的以下影响:
避免本地缓存:部署时
ResponsesAgent,不要假定同一副本处理多轮对话中的所有请求。 使用字典ResponsesAgentRequest架构为每个回合重新构造内部状态。线程安全状态:将代理状态设计为线程安全,防止多线程环境中的冲突。
在
predict函数中初始化状态:每次调用 函数时(而不是在predict初始化期间)初始化状态ResponsesAgent。 在ResponsesAgent级别存储状态可能会泄露对话之间的信息并导致冲突,因为单个ResponsesAgent副本可以处理来自多个会话的请求。
参数化跨环境部署的代码
参数化代理代码,以在不同的环境中重复使用相同的代理代码。
参数是在 Python 字典或 .yaml 文件中定义的键值对。
若要配置代码,请使用 Python 字典或文件创建 ModelConfig 一个 .yaml 。
ModelConfig 是一组键值参数,用于灵活配置管理。 例如,可以在开发期间使用字典,然后将其转换为生产部署和 CI/CD .yaml 文件。
示例 ModelConfig 如以下所示:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
在代理代码中,可以从 .yaml 文件或字典引用默认(开发)配置:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
然后,在记录代理时,将 model_config 参数指定为 log_model,以便在加载已记录的代理时使用自定义参数集。 请参阅 MLflow 文档 - ModelConfig。
使用同步代码或回调模式
若要确保稳定性和兼容性,请在代理实现中使用基于同步代码或基于回调的模式。
Azure Databricks 会自动管理异步通信,以在部署代理时提供最佳并发和性能。 引入自定义事件循环或异步框架可能会导致类似 RuntimeError: This event loop is already running and caused unpredictable behavior错误。
Azure Databricks 建议在开发代理时避免异步编程,例如使用 asyncio 或创建自定义事件循环。