...

Multi-Agent Communication with the A2A Python SDK


If under a rock and you work with AI, you’ve probably heard about Agent2Agent (A2A) Protocol, “an open standard designed to enable communication and collaboration between AI Agents”. It’s still pretty new, but it’s already getting a lot of buzz. Since it plays so nicely with MCP (which looks like it’s becoming the industry’s standard), A2A is shaping up to be the go-to standard for multi-agent communication in the industry.

When Google first dropped the Protocol Specification, my first reaction was basically: “Okay, cool… but what am I supposed to do with this?” Thankfully, this week they released the official Python SDK for the protocol, so now it finally speaks a language I understand.

In this article we’re going to dive into how the protocol actually sets up communication between agents and clients. Spoiler: it’s all in a task-oriented manner. To make things less abstract, let’s build a little toy example together.

Communication between the Event Detector Agent and an A2A Client

In our systems we have an Event Detector AI Agent (responsible for detecting events) and an Alert AI Agent (responsible for alerting the user of the events). Since I’m focusing on the A2A protocol here, both agents are mocked as simple Python methods that return strings. But in real life you can build your agents with any framework you like (LangGraph, Google ADK, CrewAI and so on).

We have three characters in our system, the user, the Event Agent and the Alert Agent. They all communicate using Messages. A Message represents a single turn of communication in the A2A protocol. We wrap the agents into A2A Servers. The servers expose an HTTP endpoint implementing the protocol. Each A2A Server has Event queues that act as a buffer between the agent’s asynchronous execution and the server’s response handling.

The A2A Client initiates communication, and if two agents need to communicate an A2A Server can also play the role of a A2A Client. The diagram below shows how a Client and a Server communicate within the protocol.

Image by author

The EventQueue stores Messages, Tasks, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, A2AError and JSONRPCError objects. The Task might be the most important object to understand how to build multi-agent systems with A2A. According to the A2A documentation:

  • When a client sends a message to an agent, the agent might determine that fulfilling the request requires a stateful task to be completed (e.g., “generate a report,” “book a flight,” “answer a question”).
  • Each task has a unique ID defined by the agent and progresses through a defined lifecycle (e.g., submitted, working, input-required, completed, failed).
  • Tasks are stateful and can involve multiple exchanges (messages) between the client and the server.

Think of a Task as something in your multi-agent system that has a clear and unique goal. We have two tasks in our system:

  1. Detect an event
  2. Alert the user

Each Agent does its own thing (task). Let’s build the A2A Server for the Event Agent so things become more tangible.

Building the A2A Server for the Event Agent

First up: the Agent Card. The Agent Card is JSON document used to get to know other agents available. :

  • Server’s identity
  • Capabilities
  • Skills
  • Service endpoint
  • URL
  • How clients should authenticate and interact with the agent

Let’s first define the Agent Card for the Event Detector AI Agent (I’ve defined the skills based on this example from Google):

agent_card = AgentCard(  
    name='Event Detection Agent',  
    description='Detects relevant events and alerts the user',  
    url='http://localhost:10008/',  
    version='1.0.0',  
    defaultInputModes=['text'],  
    defaultOutputModes=['text'],  
    capabilities=AgentCapabilities(streaming=False),  
    authentication={ "schemes": ["basic"] },  
    skills=[  
        AgentSkill(  
            id='detect_events',  
            name='Detect Events',  
            description='Detects events and alert the user',  
            tags=['event'],  
        ),  
    ],  
)

You can learn more about the Agent Card object structure here: https://google.github.io/A2A/specification/#55-agentcard-object-structure

The agent itself will actually be a Uvicorn server, so let’s build the main() method to get it up and running. All the request will be handled by the DefaultRequestHandler of the a2a-python SDK. The handler needs a TaskStore to store the Tasks and an AgentExecutor which has the implementation of the core logic of the agent (we’ll build the EventAgentExecutor in a minute).

The last component of the main() method is the A2AStarletteApplication, which is the Starlette application that implements the A2A protocol server endpoints. We need to provide the Agent Card and the DefaultRequestHandler to initialize it. Now the last step is to run the app using uvicorn. Here is the full code of the main() method:

import click  
import uvicorn  
from a2a.types import (  
    AgentCard, AgentCapabilities, AgentSkill
) 
from a2a.server.request_handlers import DefaultRequestHandler  
from a2a.server.tasks import InMemoryTaskStore  
from a2a.server.apps import A2AStarletteApplication 
 
@click.command()  
@click.option('--host', default='localhost')  
@click.option('--port', default=10008)  
def main(host: str, port: int):  
    agent_executor = EventAgentExecutor()

    agent_card = AgentCard(  
        name='Event Detection Agent',  
        description='Detects relevant events and alerts the user',  
        url='http://localhost:10008/',  
        version='1.0.0',  
        defaultInputModes=['text'],  
        defaultOutputModes=['text'],  
        capabilities=AgentCapabilities(streaming=False),  
        authentication={ "schemes": ["basic"] },  
        skills=[              AgentSkill(                  id='detect_events',                  name='Detect Events',                  description='Detects events and alert the user',                  tags=['event'],  
            ),  
        ],  
    )
      
    request_handler = DefaultRequestHandler(  
        agent_executor=agent_executor,  
        task_store=InMemoryTaskStore()  
    ) 
 
    a2a_app = A2AStarletteApplication(  
        agent_card=agent_card,  
        http_handler=request_handler  
    )  

    uvicorn.run(a2a_app.build(), host=host, port=port)

Creating the EventAgentExecutor

Now it’s time to build the core of our agent and finally see how to use the Tasks to make the agents interact with each other. The EventAgentExecutor class inherits from AgentExecutor interface and thus we need to implement the execute() and the cancel() methods. Both take a RequestContext and an EventQueue object as parameters. The RequestContext holds information about the current request being processed by the server and the EventQueue acts as a buffer between the agent’s asynchronous execution and the server’s response handling.

Our agent will just check if the string “event” is in the message the user have sent (KISS ✨). If the “event” is there then we should call the Alert Agent. We’ll do that by sending a Message to this other Alert agent. This is the Direct Configuration strategy, meaning we’ll configure the agent with a URL to fetch the Agent Card of the Alert Agent. To do that our Event Agent will act like a A2A Client. 

Let’s build the Executor step by step. First let’s create the main Task (the task to detect the events). We need to instantiate a TaskUpdater object (a helper class for agents to publish updates to a task’s event queue), then submit the task and announce we are working on it with the start_work() method:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

The message the user will send to the agent will look like this:

send_message_payload = {  
        'message': {  
            'role': 'user',  
            'parts': [{'type': 'text', 'text': f'it has an event!'}],  
            'messageId': uuid4().hex,  
        }  
    }

A Part represents a distinct piece of content within a Message, representing exportable content as either TextPart, FilePart, or DataPart. We’ll use a TextPart so we need to unwrap it in the executor:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

Time to create the super advanced logic of our agent. If the message doesn’t have the string “event” we don’t need to call the Alert Agent and the task is done:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

        if "event" not in user_message:  
            task_updater.update_status(  
                TaskState.completed,  
                message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
            )

Creating an A2A Client for the User

Let’s create an A2A Client so we can test the agent as it is. The client uses the get_client_from_agent_card_url() method from A2AClient class to (guess what) get the agent card. Then we wrap the message in a SendMessageRequest object and send it to the agent using the send_message() method of the client. Here is the full code:

import httpx  
import asyncio  
from a2a.client import A2AClient  
from a2a.types import SendMessageRequest, MessageSendParams  
from uuid import uuid4  
from pprint import pprint
  
async def main():    
    send_message_payload = {  
        'message': {  
            'role': 'user',  
            'parts': [{'type': 'text', 'text': f'nothing happening here'}],  
            'messageId': uuid4().hex,  
        }  
    }  

    async with httpx.AsyncClient() as httpx_client:  
        client = await A2AClient.get_client_from_agent_card_url(  
            httpx_client, 'http://localhost:10008'  
        )  
        request = SendMessageRequest(  
            params=MessageSendParams(**send_message_payload)  
        )  
        response = await client.send_message(request)  
        pprint(response.model_dump(mode='json', exclude_none=True))  
  
if __name__ == "__main__":  
    asyncio.run(main())

This is what happens in the terminal that is running the EventAgent server:

Image by author

And this is the message the client sees:

Image by author

The task to detect the event was created and no event was detected, nice! But the whole point of A2A is to make Agents communicate with each other, so let’s make the Event Agent talk to the Alert Agent. 

Making the Event Agent talk to the Alert Agent

To make the Event Agent talk to the Alert Agent the Event Agent will act as a client as well:

from a2a.server.agent_execution import AgentExecutor

ALERT_AGENT_URL = "http://localhost:10009/" 

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

        if "event" not in user_message:  
            task_updater.update_status(  
                TaskState.completed,  
                message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
            )
        else:
            alert_message = task_updater.new_agent_message(parts=[TextPart(text="Event detected!")])

            send_alert_payload = SendMessageRequest(  
                params=MessageSendParams(  
                    message=alert_message  
                )  
            )  

            async with httpx.AsyncClient() as client:  
                alert_agent = A2AClient(httpx_client=client, url=ALERT_AGENT_URL)  
                response = await alert_agent.send_message(send_alert_payload)  

                if hasattr(response.root, "result"):  
                    alert_task = response.root.result  
                    # Polling until the task is done
                    while alert_task.status.state not in (  
                        TaskState.completed, TaskState.failed, TaskState.canceled, TaskState.rejected  
                    ):  
                        await asyncio.sleep(0.5)  
                        get_resp = await alert_agent.get_task(  
                            GetTaskRequest(params=TaskQueryParams(id=alert_task.id))  
                        )  
                        if isinstance(get_resp.root, GetTaskSuccessResponse):  
                            alert_task = get_resp.root.result  
                        else:  
                            break  
  
                    # Complete the original task  
                    if alert_task.status.state == TaskState.completed:  
                        task_updater.update_status(  
                            TaskState.completed,  
                            message=task_updater.new_agent_message(parts=[TextPart(text="Event detected and alert sent!")]),  
                        )  
                    else:  
                        task_updater.update_status(  
                            TaskState.failed,  
                            message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to send alert: {alert_task.status.state}")]),  
                        )  
                else:  
                    task_updater.update_status(  
                        TaskState.failed,  
                        message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to create alert task")]),  
                    )

We call the Alert Agent just as we called the Event Agent as the user, and when the Alert Agent task is done, we complete the original Event Agent task. Let’s call the Event Agent again but this time with an event:

Image by author

The beauty here is that we simply called the Alert Agent and we don’t need to know anything about how it alerts the user. We just send a message to it and wait for it to finish.

The Alert Agent is super similar to the Event Agent. You can check the whole code here: https://github.com/dmesquita/multi-agent-communication-a2a-python

Final Thoughts

Understanding how to build multi-agent systems with A2A might be daunting at first, but in the end you just send messages to let the agents do their thing. All you need to do to integrate you agents with A2A is to create a class with the agent’s logic that inherit from the AgentExecutor and run the agent as a server.

I hope this article have helped you in your A2A journey, thanks for reading!

References

[1] Padgham, Lin, and Michael Winikoff. Developing intelligent agent systems: A practical guide. John Wiley & Sons, 2005.

[2] https://github.com/google/a2a-python

[3] https://github.com/google/a2a-python/tree/main/examples/google_adk

[4] https://developers.googleblog.com/en/agents-adk-agent-engine-a2a-enhancements-google-io/

Source link

#MultiAgent #Communication #A2A #PythonSDK