Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan visar hur du skapar en AI-agent i Python med hjälp av Mosaic AI Agent Framework och populära agentredigeringsbibliotek som LangGraph och OpenAI.
Krav
Tips/Råd
Databricks rekommenderar att du installerar den senaste versionen av MLflow Python-klienten när du utvecklar agenter.
Om du vill skapa och distribuera agenter med hjälp av metoden på den här sidan installerar du följande:
-
databricks-agents1.2.0 eller senare -
mlflow3.1.3 eller senare - Python 3.10 eller senare.
- Använd serverlös beräkning eller Databricks Runtime 13.3 LTS eller senare för att uppfylla detta krav.
%pip install -U -qqqq databricks-agents mlflow
Databricks rekommenderar också att du installerar Databricks AI Bridge-integreringspaket för redigeringsagenter. Dessa integreringspaket tillhandahåller ett delat lager med API:er som interagerar med Databricks AI-funktioner, till exempel Databricks AI/BI Genie och Vector Search, mellan agentredigeringsramverk och SDK:er.
OpenAI (på engelska)
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
Python-agenter utan andra språk
%pip install -U -qqqq databricks-ai-bridge
Använd ResponsesAgent för att skapa agenter
Databricks rekommenderar MLflow-gränssnittet ResponsesAgent för att skapa agenter i produktionsklass.
ResponsesAgent låter dig skapa agenter med alla ramverk från tredje part och sedan integrera dem med Databricks AI-funktioner för robust loggning, spårning, utvärdering, distribution och övervakning.
Schemat ResponsesAgent är kompatibelt med OpenAI-schemat Responses . Mer information om OpenAI Responsesfinns i OpenAI: Responses vs. ChatCompletion.
Anteckning
Det äldre ChatAgent gränssnittet stöds fortfarande på Databricks. För nya agenter rekommenderar Databricks dock att du använder den senaste versionen av MLflow och ResponsesAgent gränssnittet.
ResponsesAgent ger följande fördelar:
Avancerade agentfunktioner
- Stöd för flera agenter
- Strömmande utdata: Strömma utdata i mindre segment.
- Omfattande meddelandehistorik för verktygssamtal: Returnera flera meddelanden, inklusive mellanliggande verktygssamtalsmeddelanden, för bättre kvalitet och konversationshantering.
- Stöd för bekräftelse av verktygsanrop
- Stöd för långvariga verktyg
Effektiviserad utveckling, distribution och övervakning
-
Skapa agenter med valfritt ramverk: Omslut alla befintliga agenter med hjälp
ResponsesAgentav gränssnittet för att få färdiga kompatibiliteter med AI Playground, agentutvärdering och agentövervakning. - Typed authoring interfaces: Skriv agentkod med typade Python-klasser och dra nytta av IDE och notebook-autocomplete.
-
Automatisk signaturinferens: MLflow härleder automatiskt signaturer
ResponsesAgentnär du loggar en agent, vilket förenklar registrering och distribution. Se Kontrollera modellsignatur under loggning. -
Automatisk spårning: MLflow spårar automatiskt dina
predictoch funktioner ochpredict_streamaggregerar strömmade svar för enklare utvärdering och visning. - AI Gateway-förbättrade slutsatsdragningstabeller: AI Gateway-slutsatsdragningstabeller aktiveras automatiskt för distribuerade agenter, vilket ger åtkomst till detaljerade metadata för begärandeloggar.
-
Skapa agenter med valfritt ramverk: Omslut alla befintliga agenter med hjälp
Information om hur du skapar en ResponsesAgentfinns i exemplen i följande avsnitt och MLflow-dokumentationen – ResponsesAgent för modellservering.
ResponsesAgent Exempel
Följande notebook-filer visar hur du skapar direktuppspelning och icke-direktuppspelning ResponsesAgent med hjälp av populära bibliotek. Information om hur du utökar funktionerna för dessa agenter finns i AI-agentverktyg.
OpenAI (på engelska)
OpenAI enkel chattagent med modeller som hostas av Databricks
OpenAI-verktygssamtalsagent med hjälp av Databricks-värdbaserade modeller
OpenAI-verktygssamtalsagent med OpenAI-värdbaserade modeller
LangGraph
LangGraph verktygsanropsagent
DSPy
DSPy-verktygsanropsagent för engångsanvändning
Exempel på flera agenter
Information om hur du skapar ett system med flera agenter finns i Använda Genie i system med flera agenter.
Tillståndskänsliga agenter
Information om hur du skapar tillståndskänsliga agenter som kan omfångsbegränsa minne till enskilda konversationstrådar och ge möjlighet till kontrollpunktskonversationer finns i Tillståndskänsliga AI-agenter.
Vad händer om jag redan har en agent?
Om du redan har skapat en agent med LangChain, LangGraph eller ett liknande ramverk behöver du inte skriva om agenten för att använda den på Databricks. I stället kan du bara omsluta din befintliga agent med MLflow-gränssnittet ResponsesAgent :
Skriv en Python-omslutningsklass som ärver från
mlflow.pyfunc.ResponsesAgent.I omslutningsklassen refererar du till den befintliga agenten som ett attribut
self.agent = your_existing_agent.Klassen
ResponsesAgentkräver implementering av enpredictmetod som returnerar enResponsesAgentResponseför att hantera icke-strömmande begäranden. Följande är ett exempel påResponsesAgentResponsesschemat:import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )predictI funktionen konverterar du inkommande meddelanden frånResponsesAgentRequesttill det format som agenten förväntar sig. När agenten har genererat ett svar konverterar du dess utdata till ettResponsesAgentResponseobjekt.
Se följande kodexempel för att se hur du konverterar befintliga agenter till ResponsesAgent:
Grundläggande konvertering
Konvertera indata och utdata i predict funktionen för agenter som inte strömmas.
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
Direktuppspelning med återanvändning av kod
För strömningsagenter kan du vara smart och återanvända logik för att undvika att duplicera koden som konverterar meddelanden:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
Migrera från ChatCompletions
Om din befintliga agent använder API:et OpenAI ChatCompletions kan du migrera den till ResponsesAgent utan att skriva om dess kärnlogik. Lägg till en omslutning som:
- Konverterar inkommande
ResponsesAgentRequestmeddelanden till det format som agentenChatCompletionsförväntar sig. -
ChatCompletionsÖversätter utdata tillResponsesAgentResponseschemat. - Du kan också ha stöd för strömning genom att mappa inkrementella delta från
ChatCompletionstillResponsesAgentStreamEventobjekt.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-3-7-sonnet",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
Fullständiga exempel finns i ResponsesAgent exempel.
Direktuppspelningssvar
Strömning gör att agenter kan skicka svar i realtidssegment i stället för att vänta på det fullständiga svaret. Om du vill implementera strömning med ResponsesAgentgenererar du en serie deltahändelser följt av en slutlig slutförandehändelse:
-
Generera deltahändelser: Skicka flera
output_text.deltahändelser med sammaitem_idför att strömma textsegment i realtid. -
Avsluta med klar händelse: Skicka en slutlig
response.output_item.donehändelse med sammaitem_idsom deltahändelserna som innehåller den fullständiga slutliga utdatatexten.
Varje deltahändelse strömmar ett textsegment till klienten. Den slutgiltiga händelsen innehåller den fullständiga svarstexten och signalerar Databricks att göra följande:
- Spåra agentens utdata med MLflow-spårning
- Aggregera strömmade svar i AI Gateway-slutsatsdragningstabeller
- Visa fullständiga utdata i AI Playground-användargränssnittet
Spridning av streamingfel
Mosaic AI sprider eventuella fel som påträffas vid strömning med den sista token under databricks_output.error. Det är upp till den anropande klienten att hantera och rapportera det här felet korrekt.
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
Avancerade funktioner
Anpassade indata och utdata
Vissa scenarier kan kräva ytterligare agentindata, till exempel client_type och session_id, eller utdata som hämtning av källlänkar som inte ska ingå i chatthistoriken för framtida interaktioner.
I dessa scenarier stöder MLflow ResponsesAgent inbyggt fälten custom_inputs och custom_outputs. Du kan komma åt anpassade indata via request.custom_inputs i alla exempel som länkas ovan i ResponsesAgent-exempel.
Varning
Granskningsappen för agentutvärdering stöder inte återgivning av spårningar för agenter med ytterligare indatafält.
Se följande notebook-filer för att lära dig hur du anger anpassade indata och utdata.
Ange custom_inputs i AI Playground och granska appen
Om din agent accepterar ytterligare indata med hjälp av custom_inputs fältet kan du manuellt ange dessa indata i både AI Playground och granskningsappen.
I antingen AI Playground eller Agent Review-appen, välj kugghjulsikonen
.
Aktivera custom_inputs.
Ange ett JSON-objekt som matchar agentens definierade indataschema.
Ange anpassade retriever-scheman
AI-agenter använder ofta hämtare för att hitta och ställa frågor mot ostrukturerade data från vektorindex. Exempel på verktyg för hämtning finns i Build and trace retriever tools for unstructured data (Skapa och spåra hämtningsverktyg för ostrukturerade data).
Spåra dessa sökmotorer i din agent med MLflow RETRIEVER-span för att aktivera Databricks produktfunktioner, inklusive:
- Visa länkar automatiskt till hämtade källdokument i AI Playground-användargränssnittet
- Automatiskt köra hämtningens grundlighet och relevansbedömare i agentutvärdering
Anteckning
Databricks rekommenderar att du använder hämtningsverktyg som tillhandahålls av Databricks AI Bridge-paket som databricks_langchain.VectorSearchRetrieverTool och databricks_openai.VectorSearchRetrieverTool eftersom de redan överensstämmer med MLflow retriever-schemat. Se utveckla verktyg för vektorsökning lokalt med AI Bridge.
När din agent innehåller hämtningsintervall med ett anpassat schema, anropar du mlflow.models.set_retriever_schema när du definierar din agent i kod. Detta mappar utdatakolumnerna för din hämtare till MLflows förväntade fält (primary_key, text_column, doc_uri).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
Anteckning
Kolumnen doc_uri är särskilt viktig när du utvärderar sökverktygets prestanda.
doc_uri är huvudidentifieraren för dokument som returneras av hämtaren, så att du kan jämföra dem med utvärderingsuppsättningar för grundsanning. Se Utvärderingsuppsättningar (MLflow 2).
Distributionsöverväganden
Förbereda för Databricks-modellservering
Databricks distribuerar ResponsesAgents i en distribuerad miljö på Databricks Model Serving. Det innebär att samma serveringsreplik kanske inte hanterar alla begäranden under en konversation med flera turer. Var uppmärksam på följande konsekvenser för att hantera agenttillstånd:
Undvik lokal cachelagring: Anta inte att samma replik hanterar alla begäranden i en konversation med flera turer när du distribuerar en
ResponsesAgent. Rekonstruera det interna tillståndet med hjälp av ett ordboksschemaResponsesAgentRequestför varje omgång.trådsäkert tillstånd: Utforma agenttillståndet så att det är trådsäkert, vilket förhindrar konflikter i miljöer med flera trådar.
Initiera tillstånd i funktionen
predict: Initiera tillstånd varje gång funktionenpredictanropas, inte underResponsesAgentinitiering. Lagra tillstånd påResponsesAgent-nivå kan läcka information mellan samtal och orsaka konflikter eftersom en endaResponsesAgent-kopia kan hantera begäranden från flera samtal.
Parametrisera kod för distribution mellan miljöer
Parametrize agentkod för att återanvända samma agentkod i olika miljöer.
Parametrar är nyckel/värde-par som du definierar i en Python-ordlista eller en .yaml fil.
Om du vill konfigurera koden skapar du en ModelConfig med hjälp av antingen en Python-ordlista eller en .yaml fil.
ModelConfig är en uppsättning nyckel/värde-parametrar som möjliggör flexibel konfigurationshantering. Du kan till exempel använda en ordlista under utvecklingen och sedan konvertera den till en .yaml fil för produktionsdistribution och CI/CD.
Ett exempel ModelConfig visas nedan:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
I din agentkod kan du referera till en standardkonfiguration (utveckling) från .yaml fil eller ordlista:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
När du loggar agenten, ange sedan parametern model_config till log_model för att definiera en anpassad uppsättning parametrar som ska användas vid inläsning av den loggade agenten. Se MLflow-dokumentation – ModelConfig.
Använda synkron kod eller återanropsmönster
För att säkerställa stabilitet och kompatibilitet använder du synkron kod eller motringningsbaserade mönster i agentimplementeringen.
Azure Databricks hanterar automatiskt asynkron kommunikation för att ge optimal samtidighet och prestanda när du distribuerar en agent. Introduktion till anpassade händelseslingor eller asynkrona ramverk kan leda till fel som RuntimeError: This event loop is already running and caused unpredictable behavior.
Azure Databricks rekommenderar att du undviker asynkron programmering, till exempel att använda asyncio eller skapa anpassade händelseloopar när du utvecklar agenter.