Dela via


Skapa AI-agenter i kod

Den här sidan visar hur du skapar en AI-agent i Python med hjälp av Mosaic AI Agent Framework och populära agentredigeringsbibliotek som LangGraph och OpenAI.

Krav

Tips/Råd

Databricks rekommenderar att du installerar den senaste versionen av MLflow Python-klienten när du utvecklar agenter.

Om du vill skapa och distribuera agenter med hjälp av metoden på den här sidan installerar du följande:

  • databricks-agents 1.2.0 eller senare
  • mlflow 3.1.3 eller senare
  • Python 3.10 eller senare.
    • Använd serverlös beräkning eller Databricks Runtime 13.3 LTS eller senare för att uppfylla detta krav.
%pip install -U -qqqq databricks-agents mlflow

Databricks rekommenderar också att du installerar Databricks AI Bridge-integreringspaket för redigeringsagenter. Dessa integreringspaket tillhandahåller ett delat lager med API:er som interagerar med Databricks AI-funktioner, till exempel Databricks AI/BI Genie och Vector Search, mellan agentredigeringsramverk och SDK:er.

OpenAI (på engelska)

%pip install -U -qqqq databricks-openai

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

DSPy

%pip install -U -qqqq databricks-dspy

Python-agenter utan andra språk

%pip install -U -qqqq databricks-ai-bridge

Använd ResponsesAgent för att skapa agenter

Databricks rekommenderar MLflow-gränssnittet ResponsesAgent för att skapa agenter i produktionsklass. ResponsesAgent låter dig skapa agenter med alla ramverk från tredje part och sedan integrera dem med Databricks AI-funktioner för robust loggning, spårning, utvärdering, distribution och övervakning.

Schemat ResponsesAgent är kompatibelt med OpenAI-schemat Responses . Mer information om OpenAI Responsesfinns i OpenAI: Responses vs. ChatCompletion.

Anteckning

Det äldre ChatAgent gränssnittet stöds fortfarande på Databricks. För nya agenter rekommenderar Databricks dock att du använder den senaste versionen av MLflow och ResponsesAgent gränssnittet.

Se Schema för äldre indata och utdataagenter.

ResponsesAgent omsluter enkelt befintliga agenter för Databricks-kompatibilitet.

ResponsesAgent ger följande fördelar:

  • Avancerade agentfunktioner

    • Stöd för flera agenter
    • Strömmande utdata: Strömma utdata i mindre segment.
    • Omfattande meddelandehistorik för verktygssamtal: Returnera flera meddelanden, inklusive mellanliggande verktygssamtalsmeddelanden, för bättre kvalitet och konversationshantering.
    • Stöd för bekräftelse av verktygsanrop
    • Stöd för långvariga verktyg
  • Effektiviserad utveckling, distribution och övervakning

    • Skapa agenter med valfritt ramverk: Omslut alla befintliga agenter med hjälp ResponsesAgent av gränssnittet för att få färdiga kompatibiliteter med AI Playground, agentutvärdering och agentövervakning.
    • Typed authoring interfaces: Skriv agentkod med typade Python-klasser och dra nytta av IDE och notebook-autocomplete.
    • Automatisk signaturinferens: MLflow härleder automatiskt signaturer ResponsesAgent när du loggar en agent, vilket förenklar registrering och distribution. Se Kontrollera modellsignatur under loggning.
    • Automatisk spårning: MLflow spårar automatiskt dina predict och funktioner och predict_stream aggregerar strömmade svar för enklare utvärdering och visning.
    • AI Gateway-förbättrade slutsatsdragningstabeller: AI Gateway-slutsatsdragningstabeller aktiveras automatiskt för distribuerade agenter, vilket ger åtkomst till detaljerade metadata för begärandeloggar.

Information om hur du skapar en ResponsesAgentfinns i exemplen i följande avsnitt och MLflow-dokumentationen – ResponsesAgent för modellservering.

ResponsesAgent Exempel

Följande notebook-filer visar hur du skapar direktuppspelning och icke-direktuppspelning ResponsesAgent med hjälp av populära bibliotek. Information om hur du utökar funktionerna för dessa agenter finns i AI-agentverktyg.

OpenAI (på engelska)

OpenAI enkel chattagent med modeller som hostas av Databricks

Hämta anteckningsbok

OpenAI-verktygssamtalsagent med hjälp av Databricks-värdbaserade modeller

Hämta anteckningsbok

OpenAI-verktygssamtalsagent med OpenAI-värdbaserade modeller

Hämta anteckningsbok

LangGraph

LangGraph verktygsanropsagent

Hämta anteckningsbok

DSPy

DSPy-verktygsanropsagent för engångsanvändning

Hämta anteckningsbok

Exempel på flera agenter

Information om hur du skapar ett system med flera agenter finns i Använda Genie i system med flera agenter.

Tillståndskänsliga agenter

Information om hur du skapar tillståndskänsliga agenter som kan omfångsbegränsa minne till enskilda konversationstrådar och ge möjlighet till kontrollpunktskonversationer finns i Tillståndskänsliga AI-agenter.

Vad händer om jag redan har en agent?

Om du redan har skapat en agent med LangChain, LangGraph eller ett liknande ramverk behöver du inte skriva om agenten för att använda den på Databricks. I stället kan du bara omsluta din befintliga agent med MLflow-gränssnittet ResponsesAgent :

  1. Skriv en Python-omslutningsklass som ärver från mlflow.pyfunc.ResponsesAgent.

    I omslutningsklassen refererar du till den befintliga agenten som ett attribut self.agent = your_existing_agent.

  2. Klassen ResponsesAgent kräver implementering av en predict metod som returnerar en ResponsesAgentResponse för att hantera icke-strömmande begäranden. Följande är ett exempel på ResponsesAgentResponses schemat:

    import uuid
    # input as a dict
    {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
    
    # output example
    ResponsesAgentResponse(
        output=[
            {
                "type": "message",
                "id": str(uuid.uuid4()),
                "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
                "role": "assistant",
            },
        ]
    )
    
  3. predict I funktionen konverterar du inkommande meddelanden från ResponsesAgentRequest till det format som agenten förväntar sig. När agenten har genererat ett svar konverterar du dess utdata till ett ResponsesAgentResponse objekt.

Se följande kodexempel för att se hur du konverterar befintliga agenter till ResponsesAgent:

Grundläggande konvertering

Konvertera indata och utdata i predict funktionen för agenter som inte strömmas.

from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)


class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)

        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)

        # Return the response
        return ResponsesAgentResponse(output=[output_item])

Direktuppspelning med återanvändning av kod

För strömningsagenter kan du vara smart och återanvända logik för att undvika att duplicera koden som konverterar meddelanden:

from typing import Generator
from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk

        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )

Migrera från ChatCompletions

Om din befintliga agent använder API:et OpenAI ChatCompletions kan du migrera den till ResponsesAgent utan att skriva om dess kärnlogik. Lägg till en omslutning som:

  1. Konverterar inkommande ResponsesAgentRequest meddelanden till det format som agenten ChatCompletions förväntar sig.
  2. ChatCompletions Översätter utdata till ResponsesAgentResponse schemat.
  3. Du kan också ha stöd för strömning genom att mappa inkrementella delta från ChatCompletions till ResponsesAgentStreamEvent objekt.
from typing import Generator
from uuid import uuid4

from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
    def __init__(self):
        self.w = WorkspaceClient()
        self.OpenAI = self.w.serving_endpoints.get_open_ai_client()

    def stream(self, messages):
        for chunk in self.OpenAI.chat.completions.create(
            model="databricks-claude-3-7-sonnet",
            messages=messages,
            stream=True,
        ):
            yield chunk.to_dict()


# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # `agent` is your existing ChatCompletions agent
        self.agent = agent

    def prep_msgs_for_llm(self, messages):
        # dummy example of prep_msgs_for_llm
        # real example of prep_msgs_for_llm included in full examples linked below
        return [{"role": "user", "content": "Hello, how are you?"}]

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # process the ChatCompletion output stream
        agent_content = ""
        tool_calls = []
        msg_id = None
        for chunk in self.agent.stream(messages):  # call the underlying agent's stream method
            delta = chunk["choices"][0]["delta"]
            msg_id = chunk.get("id", None)
            content = delta.get("content", None)
            if tc := delta.get("tool_calls"):
                if not tool_calls:  # only accommodate for single tool call right now
                    tool_calls = tc
                else:
                    tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
            elif content is not None:
                agent_content += content
                yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))

        # aggregate the streamed text content
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(agent_content, msg_id),
        )

        for tool_call in tool_calls:
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_function_call_item(
                    str(uuid4()),
                    tool_call["id"],
                    tool_call["function"]["name"],
                    tool_call["function"]["arguments"],
                ),
            )


agent = MyWrappedStreamingAgent(LegacyAgent())

for chunk in agent.predict_stream(
    ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
    print(chunk)

Fullständiga exempel finns i ResponsesAgent exempel.

Direktuppspelningssvar

Strömning gör att agenter kan skicka svar i realtidssegment i stället för att vänta på det fullständiga svaret. Om du vill implementera strömning med ResponsesAgentgenererar du en serie deltahändelser följt av en slutlig slutförandehändelse:

  1. Generera deltahändelser: Skicka flera output_text.delta händelser med samma item_id för att strömma textsegment i realtid.
  2. Avsluta med klar händelse: Skicka en slutlig response.output_item.done händelse med samma item_id som deltahändelserna som innehåller den fullständiga slutliga utdatatexten.

Varje deltahändelse strömmar ett textsegment till klienten. Den slutgiltiga händelsen innehåller den fullständiga svarstexten och signalerar Databricks att göra följande:

  • Spåra agentens utdata med MLflow-spårning
  • Aggregera strömmade svar i AI Gateway-slutsatsdragningstabeller
  • Visa fullständiga utdata i AI Playground-användargränssnittet

Spridning av streamingfel

Mosaic AI sprider eventuella fel som påträffas vid strömning med den sista token under databricks_output.error. Det är upp till den anropande klienten att hantera och rapportera det här felet korrekt.

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

Avancerade funktioner

Anpassade indata och utdata

Vissa scenarier kan kräva ytterligare agentindata, till exempel client_type och session_id, eller utdata som hämtning av källlänkar som inte ska ingå i chatthistoriken för framtida interaktioner.

I dessa scenarier stöder MLflow ResponsesAgent inbyggt fälten custom_inputs och custom_outputs. Du kan komma åt anpassade indata via request.custom_inputs i alla exempel som länkas ovan i ResponsesAgent-exempel.

Varning

Granskningsappen för agentutvärdering stöder inte återgivning av spårningar för agenter med ytterligare indatafält.

Se följande notebook-filer för att lära dig hur du anger anpassade indata och utdata.

Ange custom_inputs i AI Playground och granska appen

Om din agent accepterar ytterligare indata med hjälp av custom_inputs fältet kan du manuellt ange dessa indata i både AI Playground och granskningsappen.

  1. I antingen AI Playground eller Agent Review-appen, välj kugghjulsikonen kugghjulsikonen.

  2. Aktivera custom_inputs.

  3. Ange ett JSON-objekt som matchar agentens definierade indataschema.

    Tillhandahåll custom_inputs i AI-lekplatsen.

Ange anpassade retriever-scheman

AI-agenter använder ofta hämtare för att hitta och ställa frågor mot ostrukturerade data från vektorindex. Exempel på verktyg för hämtning finns i Build and trace retriever tools for unstructured data (Skapa och spåra hämtningsverktyg för ostrukturerade data).

Spåra dessa sökmotorer i din agent med MLflow RETRIEVER-span för att aktivera Databricks produktfunktioner, inklusive:

  • Visa länkar automatiskt till hämtade källdokument i AI Playground-användargränssnittet
  • Automatiskt köra hämtningens grundlighet och relevansbedömare i agentutvärdering

Anteckning

Databricks rekommenderar att du använder hämtningsverktyg som tillhandahålls av Databricks AI Bridge-paket som databricks_langchain.VectorSearchRetrieverTool och databricks_openai.VectorSearchRetrieverTool eftersom de redan överensstämmer med MLflow retriever-schemat. Se utveckla verktyg för vektorsökning lokalt med AI Bridge.

När din agent innehåller hämtningsintervall med ett anpassat schema, anropar du mlflow.models.set_retriever_schema när du definierar din agent i kod. Detta mappar utdatakolumnerna för din hämtare till MLflows förväntade fält (primary_key, text_column, doc_uri).

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

Anteckning

Kolumnen doc_uri är särskilt viktig när du utvärderar sökverktygets prestanda. doc_uri är huvudidentifieraren för dokument som returneras av hämtaren, så att du kan jämföra dem med utvärderingsuppsättningar för grundsanning. Se Utvärderingsuppsättningar (MLflow 2).

Distributionsöverväganden

Förbereda för Databricks-modellservering

Databricks distribuerar ResponsesAgents i en distribuerad miljö på Databricks Model Serving. Det innebär att samma serveringsreplik kanske inte hanterar alla begäranden under en konversation med flera turer. Var uppmärksam på följande konsekvenser för att hantera agenttillstånd:

  • Undvik lokal cachelagring: Anta inte att samma replik hanterar alla begäranden i en konversation med flera turer när du distribuerar en ResponsesAgent. Rekonstruera det interna tillståndet med hjälp av ett ordboksschema ResponsesAgentRequest för varje omgång.

  • trådsäkert tillstånd: Utforma agenttillståndet så att det är trådsäkert, vilket förhindrar konflikter i miljöer med flera trådar.

  • Initiera tillstånd i funktionen predict: Initiera tillstånd varje gång funktionen predict anropas, inte under ResponsesAgent initiering. Lagra tillstånd på ResponsesAgent-nivå kan läcka information mellan samtal och orsaka konflikter eftersom en enda ResponsesAgent-kopia kan hantera begäranden från flera samtal.

Parametrisera kod för distribution mellan miljöer

Parametrize agentkod för att återanvända samma agentkod i olika miljöer.

Parametrar är nyckel/värde-par som du definierar i en Python-ordlista eller en .yaml fil.

Om du vill konfigurera koden skapar du en ModelConfig med hjälp av antingen en Python-ordlista eller en .yaml fil. ModelConfig är en uppsättning nyckel/värde-parametrar som möjliggör flexibel konfigurationshantering. Du kan till exempel använda en ordlista under utvecklingen och sedan konvertera den till en .yaml fil för produktionsdistribution och CI/CD.

Ett exempel ModelConfig visas nedan:

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

I din agentkod kan du referera till en standardkonfiguration (utveckling) från .yaml fil eller ordlista:

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

När du loggar agenten, ange sedan parametern model_config till log_model för att definiera en anpassad uppsättning parametrar som ska användas vid inläsning av den loggade agenten. Se MLflow-dokumentation – ModelConfig.

Använda synkron kod eller återanropsmönster

För att säkerställa stabilitet och kompatibilitet använder du synkron kod eller motringningsbaserade mönster i agentimplementeringen.

Azure Databricks hanterar automatiskt asynkron kommunikation för att ge optimal samtidighet och prestanda när du distribuerar en agent. Introduktion till anpassade händelseslingor eller asynkrona ramverk kan leda till fel som RuntimeError: This event loop is already running and caused unpredictable behavior.

Azure Databricks rekommenderar att du undviker asynkron programmering, till exempel att använda asyncio eller skapa anpassade händelseloopar när du utvecklar agenter.

Nästa steg