If under a rock and you work with AI, you’ve probably heard about Agent2Agent (A2A) Protocol, “an open standard designed to enable communication and collaboration between AI Agents”. It’s still pretty new, but it’s already getting a lot of buzz. Since it plays so nicely with MCP (which looks like it’s becoming the industry’s standard), A2A is shaping up to be the go-to standard for multi-agent communication in the industry.
When Google first dropped the Protocol Specification, my first reaction was basically: “Okay, cool… but what am I supposed to do with this?” Thankfully, this week they released the official Python SDK for the protocol, so now it finally speaks a language I understand.
In this article we’re going to dive into how the protocol actually sets up communication between agents and clients. Spoiler: it’s all in a task-oriented manner. To make things less abstract, let’s build a little toy example together.
Communication between the Event Detector Agent and an A2A Client
In our systems we have an Event Detector AI Agent (responsible for detecting events) and an Alert AI Agent (responsible for alerting the user of the events). Since I’m focusing on the A2A protocol here, both agents are mocked as simple Python methods that return strings. But in real life you can build your agents with any framework you like (LangGraph, Google ADK, CrewAI and so on).
We have three characters in our system, the user, the Event Agent and the Alert Agent. They all communicate using Messages
. A Message
represents a single turn of communication in the A2A protocol. We wrap the agents into A2A Servers. The servers expose an HTTP endpoint implementing the protocol. Each A2A Server has Event queues
that act as a buffer between the agent’s asynchronous execution and the server’s response handling.
The A2A Client initiates communication, and if two agents need to communicate an A2A Server can also play the role of a A2A Client. The diagram below shows how a Client and a Server communicate within the protocol.
The EventQueue
stores Messages
, Tasks
, TaskStatusUpdateEvent
, TaskArtifactUpdateEvent
, A2AError
and JSONRPCError
objects. The Task
might be the most important object to understand how to build multi-agent systems with A2A. According to the A2A documentation:
- When a client sends a message to an agent, the agent might determine that fulfilling the request requires a stateful task to be completed (e.g., “generate a report,” “book a flight,” “answer a question”).
- Each task has a unique ID defined by the agent and progresses through a defined lifecycle (e.g.,
submitted
,working
,input-required
,completed
,failed
). - Tasks are stateful and can involve multiple exchanges (messages) between the client and the server.
Think of a Task
as something in your multi-agent system that has a clear and unique goal. We have two tasks in our system:
- Detect an event
- Alert the user
Each Agent does its own thing (task). Let’s build the A2A Server for the Event Agent so things become more tangible.
Building the A2A Server for the Event Agent
First up: the Agent Card. The Agent Card is JSON document used to get to know other agents available. :
- Server’s identity
- Capabilities
- Skills
- Service endpoint
- URL
- How clients should authenticate and interact with the agent
Let’s first define the Agent Card for the Event Detector AI Agent (I’ve defined the skills based on this example from Google):
agent_card = AgentCard(
name='Event Detection Agent',
description='Detects relevant events and alerts the user',
url='http://localhost:10008/',
version='1.0.0',
defaultInputModes=['text'],
defaultOutputModes=['text'],
capabilities=AgentCapabilities(streaming=False),
authentication={ "schemes": ["basic"] },
skills=[
AgentSkill(
id='detect_events',
name='Detect Events',
description='Detects events and alert the user',
tags=['event'],
),
],
)
You can learn more about the Agent Card object structure here: https://google.github.io/A2A/specification/#55-agentcard-object-structure
The agent itself will actually be a Uvicorn server, so let’s build the main()
method to get it up and running. All the request will be handled by the DefaultRequestHandler
of the a2a-python SDK. The handler needs a TaskStore
to store the Tasks and an AgentExecutor
which has the implementation of the core logic of the agent (we’ll build the EventAgentExecutor
in a minute).
The last component of the main()
method is the A2AStarletteApplication
, which is the Starlette application that implements the A2A protocol server endpoints. We need to provide the Agent Card
and the DefaultRequestHandler
to initialize it. Now the last step is to run the app using uvicorn. Here is the full code of the main()
method:
import click
import uvicorn
from a2a.types import (
AgentCard, AgentCapabilities, AgentSkill
)
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.apps import A2AStarletteApplication
@click.command()
@click.option('--host', default='localhost')
@click.option('--port', default=10008)
def main(host: str, port: int):
agent_executor = EventAgentExecutor()
agent_card = AgentCard(
name='Event Detection Agent',
description='Detects relevant events and alerts the user',
url='http://localhost:10008/',
version='1.0.0',
defaultInputModes=['text'],
defaultOutputModes=['text'],
capabilities=AgentCapabilities(streaming=False),
authentication={ "schemes": ["basic"] },
skills=[ AgentSkill( id='detect_events', name='Detect Events', description='Detects events and alert the user', tags=['event'],
),
],
)
request_handler = DefaultRequestHandler(
agent_executor=agent_executor,
task_store=InMemoryTaskStore()
)
a2a_app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=request_handler
)
uvicorn.run(a2a_app.build(), host=host, port=port)
Creating the EventAgentExecutor
Now it’s time to build the core of our agent and finally see how to use the Tasks to make the agents interact with each other. The EventAgentExecutor
class inherits from AgentExecutor
interface and thus we need to implement the execute()
and the cancel()
methods. Both take a RequestContext
and an EventQueue
object as parameters. The RequestContext
holds information about the current request being processed by the server and the EventQueue
acts as a buffer between the agent’s asynchronous execution and the server’s response handling.
Our agent will just check if the string “event
” is in the message the user have sent (KISS ✨). If the “event
” is there then we should call the Alert Agent. We’ll do that by sending a Message
to this other Alert agent. This is the Direct Configuration strategy, meaning we’ll configure the agent with a URL to fetch the Agent Card of the Alert Agent. To do that our Event Agent will act like a A2A Client.
Let’s build the Executor step by step. First let’s create the main Task (the task to detect the events). We need to instantiate a TaskUpdater
object (a helper class for agents to publish updates to a task’s event queue), then submit the task and announce we are working on it with the start_work()
method:
from a2a.server.agent_execution import AgentExecutor
class EventAgentExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
task_updater.submit()
task_updater.start_work()
The message the user will send to the agent will look like this:
send_message_payload = {
'message': {
'role': 'user',
'parts': [{'type': 'text', 'text': f'it has an event!'}],
'messageId': uuid4().hex,
}
}
A Part
represents a distinct piece of content within a Message
, representing exportable content as either TextPart
, FilePart
, or DataPart
. We’ll use a TextPart
so we need to unwrap it in the executor:
from a2a.server.agent_execution import AgentExecutor
class EventAgentExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
task_updater.submit()
task_updater.start_work()
await asyncio.sleep(1) #let's pretend we're actually doing something
user_message = context.message.parts[0].root.text # unwraping the TextPart
Time to create the super advanced logic of our agent. If the message doesn’t have the string “event
” we don’t need to call the Alert Agent and the task is done:
from a2a.server.agent_execution import AgentExecutor
class EventAgentExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
task_updater.submit()
task_updater.start_work()
await asyncio.sleep(1) #let's pretend we're actually doing something
user_message = context.message.parts[0].root.text # unwraping the TextPart
if "event" not in user_message:
task_updater.update_status(
TaskState.completed,
message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
)
Creating an A2A Client for the User
Let’s create an A2A Client so we can test the agent as it is. The client uses the get_client_from_agent_card_url()
method from A2AClient
class to (guess what) get the agent card. Then we wrap the message in a SendMessageRequest
object and send it to the agent using the send_message()
method of the client. Here is the full code:
import httpx
import asyncio
from a2a.client import A2AClient
from a2a.types import SendMessageRequest, MessageSendParams
from uuid import uuid4
from pprint import pprint
async def main():
send_message_payload = {
'message': {
'role': 'user',
'parts': [{'type': 'text', 'text': f'nothing happening here'}],
'messageId': uuid4().hex,
}
}
async with httpx.AsyncClient() as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(
httpx_client, 'http://localhost:10008'
)
request = SendMessageRequest(
params=MessageSendParams(**send_message_payload)
)
response = await client.send_message(request)
pprint(response.model_dump(mode='json', exclude_none=True))
if __name__ == "__main__":
asyncio.run(main())
This is what happens in the terminal that is running the EventAgent server:
And this is the message the client sees:
The task to detect the event was created and no event was detected, nice! But the whole point of A2A is to make Agents communicate with each other, so let’s make the Event Agent talk to the Alert Agent.
Making the Event Agent talk to the Alert Agent
To make the Event Agent talk to the Alert Agent the Event Agent will act as a client as well:
from a2a.server.agent_execution import AgentExecutor
ALERT_AGENT_URL = "http://localhost:10009/"
class EventAgentExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
task_updater.submit()
task_updater.start_work()
await asyncio.sleep(1) #let's pretend we're actually doing something
user_message = context.message.parts[0].root.text # unwraping the TextPart
if "event" not in user_message:
task_updater.update_status(
TaskState.completed,
message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
)
else:
alert_message = task_updater.new_agent_message(parts=[TextPart(text="Event detected!")])
send_alert_payload = SendMessageRequest(
params=MessageSendParams(
message=alert_message
)
)
async with httpx.AsyncClient() as client:
alert_agent = A2AClient(httpx_client=client, url=ALERT_AGENT_URL)
response = await alert_agent.send_message(send_alert_payload)
if hasattr(response.root, "result"):
alert_task = response.root.result
# Polling until the task is done
while alert_task.status.state not in (
TaskState.completed, TaskState.failed, TaskState.canceled, TaskState.rejected
):
await asyncio.sleep(0.5)
get_resp = await alert_agent.get_task(
GetTaskRequest(params=TaskQueryParams(id=alert_task.id))
)
if isinstance(get_resp.root, GetTaskSuccessResponse):
alert_task = get_resp.root.result
else:
break
# Complete the original task
if alert_task.status.state == TaskState.completed:
task_updater.update_status(
TaskState.completed,
message=task_updater.new_agent_message(parts=[TextPart(text="Event detected and alert sent!")]),
)
else:
task_updater.update_status(
TaskState.failed,
message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to send alert: {alert_task.status.state}")]),
)
else:
task_updater.update_status(
TaskState.failed,
message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to create alert task")]),
)
We call the Alert Agent just as we called the Event Agent as the user, and when the Alert Agent task is done, we complete the original Event Agent task. Let’s call the Event Agent again but this time with an event:
The beauty here is that we simply called the Alert Agent and we don’t need to know anything about how it alerts the user. We just send a message to it and wait for it to finish.
The Alert Agent is super similar to the Event Agent. You can check the whole code here: https://github.com/dmesquita/multi-agent-communication-a2a-python
Final Thoughts
Understanding how to build multi-agent systems with A2A might be daunting at first, but in the end you just send messages to let the agents do their thing. All you need to do to integrate you agents with A2A is to create a class with the agent’s logic that inherit from the AgentExecutor
and run the agent as a server.
I hope this article have helped you in your A2A journey, thanks for reading!
References
[1] Padgham, Lin, and Michael Winikoff. Developing intelligent agent systems: A practical guide. John Wiley & Sons, 2005.
[2] https://github.com/google/a2a-python
[3] https://github.com/google/a2a-python/tree/main/examples/google_adk
[4] https://developers.googleblog.com/en/agents-adk-agent-engine-a2a-enhancements-google-io/
Source link
#MultiAgent #Communication #A2A #PythonSDK