Important
此功能在 Beta 版中。
模型上下文协议 (MCP) 服务器充当网桥,让 AI 代理访问外部数据和工具。 使用 Databricks 托管 MCP 服务器将代理连接到存储在 Unity 目录、矢量搜索索引和自定义函数中的数据。
可用的托管服务器
Databricks 提供以下现装的 MCP 服务器:
| MCP 服务器 | Description | URL 模式 |
|---|---|---|
| 矢量搜索 | 查询 矢量搜索索引 以查找相关文档和数据。 仅支持由 Databricks 托管嵌入的索引。 | https://<workspace-hostname>/api/2.0/mcp/vector-search/{catalog}/{schema} |
| Unity Catalog 函数 | 运行 Unity 目录函数 ,例如自定义 Python 或 SQL 工具 | https://<workspace-hostname>/api/2.0/mcp/functions/{catalog}/{schema} |
| Genie 空间 | 使用自然语言问题查询 Genie 空间 ,从结构化数据表获取见解 | https://<workspace-hostname>/api/2.0/mcp/genie/{genie_space_id} |
| DBSQL | 使用 SQL 仓库直接对 Unity 目录表执行 SQL 查询 | https://<workspace-hostname>/api/2.0/mcp/sql |
Note
Databricks 建议对大多数用例使用 Genie MCP over SQL MCP。 Genie 提供更高质量的结果,允许用户提出自然语言问题,而不是编写 SQL 查询。 如果需要直接 SQL 查询执行控制,或者需要写入访问权限来创建、修改或删除资源,请使用 SQL MCP。
Note
Genie 的托管 MCP 服务器调用 Genie 作为 MCP 工具,这意味着在调用 Genie API 时不会传递历史记录。 或者,可以在 多代理系统中使用 Genie。
示例方案
请考虑有助于客户支持的代理。 可以将它连接到多个托管 MCP 服务器:
-
矢量搜索:
https://<workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support- 搜索支持票证和文档
-
Genie 空间:
https://<workspace-hostname>/api/2.0/mcp/genie/{billing_space_id}- 查询计费数据和客户信息
-
UC 函数:
https://<workspace-hostname>/api/2.0/mcp/functions/prod/billing- 为帐户查找和更新运行自定义函数
-
DBSQL:
https://<workspace-hostname>/api/2.0/mcp/sql- 对客户数据执行直接 SQL 查询
这使代理能够访问非结构化数据(支持票证)、结构化数据(计费表)和自定义业务逻辑。
示例笔记本:使用 Databricks MCP 服务器生成代理
以下笔记本演示如何创作使用 Databricks 托管 MCP 服务器调用 MCP 工具的 LangGraph 和 OpenAI 代理。
LangGraph MCP 工具调用代理
OpenAI MCP 工具调用代理
从本地环境生成代理
连接到 Databricks 上的 MCP 服务器类似于连接到任何其他远程 MCP 服务器。 可以使用标准 SDK(如 MCP Python SDK)连接到服务器。 主要区别是,默认情况下 Databricks MCP 服务器是安全的,并要求客户端指定身份验证。
databricks-mcp Python 库有助于简化自定义代理代码中的身份验证。
开发代理代码的最简单方法是在本地运行它,并向工作区进行身份验证。 按照以下步骤生成连接到 Databricks MCP 服务器的 AI 代理。
配置你的环境
使用 OAuth 向工作区进行身份验证。 在本地终端中运行以下命令:
databricks auth login --host https://<your-workspace-hostname>出现提示时,请重新创建配置文件名称,并记下该名称供以后使用。 默认配置文件名称为 DEFAULT。
确保具有 Python 3.12 或更高版本的本地环境,然后安装依赖项:
pip install -U "mcp>=1.9" "databricks-sdk[openai]" "mlflow>=3.1.0" "databricks-agents>=1.0.0" "databricks-mcp"
测试本地环境连接
必须在工作区中启用无服务器计算才能运行此代码片段。
通过列出 Unity 目录工具并运行 内置的 Python 代码解释器工具来验证与 MCP 服务器的连接。
- 运行以下代码来验证与 MCP 服务器的连接:
from databricks_mcp import DatabricksMCPClient
from databricks.sdk import WorkspaceClient
# TODO: Update to the Databricks CLI profile name you specified when
# configuring authentication to the workspace.
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)
workspace_hostname = workspace_client.config.host
mcp_server_url = f"{workspace_hostname}/api/2.0/mcp/functions/system/ai"
# This code uses the Unity Catalog functions MCP server to expose built-in
# AI tools under `system.ai`, like the `system.ai.python_exec` code interpreter tool
def test_connect_to_server():
mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
tools = mcp_client.list_tools()
print(
f"Discovered tools {[t.name for t in tools]} "
f"from MCP server {mcp_server_url}"
)
result = mcp_client.call_tool(
"system__ai__python_exec", {"code": "print('Hello, world!')"}
)
print(
f"Called system__ai__python_exec tool and got result "
f"{result.content}"
)
if __name__ == "__main__":
test_connect_to_server()
创建代理
在上述代码的基础上,定义一个使用工具的基本单轮代理。 将本地保存的代理代码命名为文件
mcp_agent.py:import json import uuid import asyncio from typing import Any, Callable, List from pydantic import BaseModel import mlflow from mlflow.pyfunc import ResponsesAgent from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse from databricks_mcp import DatabricksMCPClient from databricks.sdk import WorkspaceClient # 1) CONFIGURE YOUR ENDPOINTS/PROFILE LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet" SYSTEM_PROMPT = "You are a helpful assistant." DATABRICKS_CLI_PROFILE = "YOUR_DATABRICKS_CLI_PROFILE" assert ( DATABRICKS_CLI_PROFILE != "YOUR_DATABRICKS_CLI_PROFILE" ), "Set DATABRICKS_CLI_PROFILE to the Databricks CLI profile name you specified when configuring authentication to the workspace" workspace_client = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE) host = workspace_client.config.host # Add more MCP server URLs here if desired, for example: # f"{host}/api/2.0/mcp/vector-search/prod/billing" # to include vector search indexes under the prod.billing schema, or # f"{host}/api/2.0/mcp/genie/<genie_space_id>" # to include a Genie space MANAGED_MCP_SERVER_URLS = [ f"{host}/api/2.0/mcp/functions/system/ai", ] # Add Custom MCP Servers hosted on Databricks Apps CUSTOM_MCP_SERVER_URLS = [] # 2) HELPER: convert between ResponsesAgent “message dict” and ChatCompletions format def _to_chat_messages(msg: dict[str, Any]) -> List[dict]: """ Take a single ResponsesAgent‐style dict and turn it into one or more ChatCompletions‐compatible dict entries. """ msg_type = msg.get("type") if msg_type == "function_call": return [ { "role": "assistant", "content": None, "tool_calls": [ { "id": msg["call_id"], "type": "function", "function": { "name": msg["name"], "arguments": msg["arguments"], }, } ], } ] elif msg_type == "message" and isinstance(msg["content"], list): return [ { "role": "assistant" if msg["role"] == "assistant" else msg["role"], "content": content["text"], } for content in msg["content"] ] elif msg_type == "function_call_output": return [ { "role": "tool", "content": msg["output"], "tool_call_id": msg["tool_call_id"], } ] else: # fallback for plain {"role": ..., "content": "..."} or similar return [ { k: v for k, v in msg.items() if k in ("role", "content", "name", "tool_calls", "tool_call_id") } ] # 3) “MCP SESSION” + TOOL‐INVOCATION LOGIC def _make_exec_fn( server_url: str, tool_name: str, ws: WorkspaceClient ) -> Callable[..., str]: def exec_fn(**kwargs): mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws) response = mcp_client.call_tool(tool_name, kwargs) return "".join([c.text for c in response.content]) return exec_fn class ToolInfo(BaseModel): name: str spec: dict exec_fn: Callable def _fetch_tool_infos(ws: WorkspaceClient, server_url: str) -> List[ToolInfo]: print(f"Listing tools from MCP server {server_url}") infos: List[ToolInfo] = [] mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws) mcp_tools = mcp_client.list_tools() for t in mcp_tools: schema = t.inputSchema.copy() if "properties" not in schema: schema["properties"] = {} spec = { "type": "function", "function": { "name": t.name, "description": t.description, "parameters": schema, }, } infos.append( ToolInfo( name=t.name, spec=spec, exec_fn=_make_exec_fn(server_url, t.name, ws) ) ) return infos # 4) “SINGLE‐TURN” AGENT CLASS class SingleTurnMCPAgent(ResponsesAgent): def _call_llm(self, history: List[dict], ws: WorkspaceClient, tool_infos): """ Send current history → LLM, returning the raw response dict. """ client = ws.serving_endpoints.get_open_ai_client() flat_msgs = [] for msg in history: flat_msgs.extend(_to_chat_messages(msg)) return client.chat.completions.create( model=LLM_ENDPOINT_NAME, messages=flat_msgs, tools=[ti.spec for ti in tool_infos], ) def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse: ws = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE) # 1) build initial history: system + user history: List[dict] = [{"role": "system", "content": SYSTEM_PROMPT}] for inp in request.input: history.append(inp.model_dump()) # 2) call LLM once tool_infos = [ tool_info for mcp_server_url in (MANAGED_MCP_SERVER_URLS + CUSTOM_MCP_SERVER_URLS) for tool_info in _fetch_tool_infos(ws, mcp_server_url) ] tools_dict = {tool_info.name: tool_info for tool_info in tool_infos} llm_resp = self._call_llm(history, ws, tool_infos) raw_choice = llm_resp.choices[0].message.to_dict() raw_choice["id"] = uuid.uuid4().hex history.append(raw_choice) tool_calls = raw_choice.get("tool_calls") or [] if tool_calls: # (we only support a single tool in this “single‐turn” example) fc = tool_calls[0] name = fc["function"]["name"] args = json.loads(fc["function"]["arguments"]) try: tool_info = tools_dict[name] result = tool_info.exec_fn(**args) except Exception as e: result = f"Error invoking {name}: {e}" # 4) append the “tool” output history.append( { "type": "function_call_output", "role": "tool", "id": uuid.uuid4().hex, "tool_call_id": fc["id"], "output": result, } ) # 5) call LLM a second time and treat that reply as final followup = ( self._call_llm(history, ws, tool_infos=[]).choices[0].message.to_dict() ) followup["id"] = uuid.uuid4().hex assistant_text = followup.get("content", "") return ResponsesAgentResponse( output=[ { "id": uuid.uuid4().hex, "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": assistant_text}], } ], custom_outputs=request.custom_inputs, ) # 6) if no tool_calls at all, return the assistant’s original reply assistant_text = raw_choice.get("content", "") return ResponsesAgentResponse( output=[ { "id": uuid.uuid4().hex, "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": assistant_text}], } ], custom_outputs=request.custom_inputs, ) mlflow.models.set_model(SingleTurnMCPAgent()) if __name__ == "__main__": req = ResponsesAgentRequest( input=[{"role": "user", "content": "What's the 100th Fibonacci number?"}] ) resp = SingleTurnMCPAgent().predict(req) for item in resp.output: print(item)
部署您的代理
准备好部署连接到托管 MCP 服务器的代理时,请参阅 为生成式 AI 应用程序部署代理。
指定代理在日志记录时所需的所有资源。 请参阅 Databricks 资源的身份验证
例如,如果代理使用下面列出的 MCP 服务器 URL,则必须在prod.customer_support和prod.billing两个架构中指定所有矢量搜索索引。 您还必须在 prod.billing 中指定所有 Unity Catalog 函数。
https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_supporthttps://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/billinghttps://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing
如果您的代理连接到 Databricks 上的 MCP 服务器以发现和运行工具,请使用代理记录这些 MCP 服务器所需的资源。 Databricks 建议安装 databricks-mcp PyPI 包以简化此过程。
特别是,如果使用 托管 MCP 服务器,则可以用于 databricks_mcp.DatabricksMCPClient().get_databricks_resources(<server_url>) 检索托管 MCP 服务器所需的资源。 如果代理查询 Databricks 应用上托管的自定义 MCP 服务器,则可以在记录模型时将服务器显式包括为资源来配置授权。
例如,若要部署上面定义的代理,请运行以下代码,假设你保存了代理代码定义:mcp_agent.py
import os
from databricks.sdk import WorkspaceClient
from databricks import agents
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksVectorSearchIndex
from mcp_agent import LLM_ENDPOINT_NAME
from databricks_mcp import DatabricksMCPClient
# TODO: Update this to your Databricks CLI profile name
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)
# Configure MLflow and the Databricks SDK to use your Databricks CLI profile
current_user = workspace_client.current_user.me().user_name
mlflow.set_tracking_uri(f"databricks://{databricks_cli_profile}")
mlflow.set_registry_uri(f"databricks-uc://{databricks_cli_profile}")
mlflow.set_experiment(f"/Users/{current_user}/databricks_docs_example_mcp_agent")
os.environ["DATABRICKS_CONFIG_PROFILE"] = databricks_cli_profile
MANAGED_MCP_SERVER_URLS = [
f"{host}/api/2.0/mcp/functions/system/ai",
]
# Log the agent defined in mcp_agent.py
here = os.path.dirname(os.path.abspath(__file__))
agent_script = os.path.join(here, "mcp_agent.py")
resources = [
DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
DatabricksFunction("system.ai.python_exec"),
# --- Uncomment and edit the following lines to include custom mcp servers hosted on Databricks Apps ---
# DatabricksApp(app_name="app-name")
]
for mcp_server_url in MANAGED_MCP_SERVER_URLS:
mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
resources.extend(mcp_client.get_databricks_resources())
with mlflow.start_run():
logged_model_info = mlflow.pyfunc.log_model(
artifact_path="mcp_agent",
python_model=agent_script,
resources=resources,
)
# TODO Specify your UC model name here
UC_MODEL_NAME = "main.default.databricks_docs_mcp_agent"
registered_model = mlflow.register_model(logged_model_info.model_uri, UC_MODEL_NAME)
agents.deploy(
model_name=UC_MODEL_NAME,
model_version=registered_model.version,
)
后续步骤
- 将客户端连接到 Cursor 和 Claude Desktop 等 MCP 服务器。
- Databricks 矢量搜索 MCP 服务器的元参数