Since our server is running locally, the WhatsApp Webhook cannot call the endpoint for verification. What we need is a public URL that can be used by the webhook. There are two options: deploy the application to a cloud server or create a proxy server tunnel. Since we are still in the development process, we will use the second option.
- Go to ngrok Signup and create a free account.
- Install ngrok locally. Depending on your system, you can use Brew, Chocolatey, or simply download and install it. See: Setup & Installation.
- After installation, add your authentication code using the following command in your terminal. Replace
$YOUR-AUTHENTICATION_TOKEN
with your ngrok authentication token, which can be found under “Your Authtoken” in the ngrok dashboard. - Begin forwarding traffic from your localhost on port 8000 by running the following command in your terminal:
> ngrok config add-authtoken $YOUR-AUTHENTICATION_TOKEN
> ngrok http http://localhost:8000Forwarding https://.ngrok.io -> http://localhost:8000
Your local server is now accessible via public URLs provided by ngrok. You should see something like this:
Forwarding https://.ngrok.io -> http://localhost:8000
Use the HTTPS URL provided by ngrok for the webhook configuration.
Now let us return to Meta’s Cloud API to implement the desired webhook.
- Navigate to Meta for Developers and select the app created before.
- In the left-hand menu go to WhatsApp > Configuration.
- In the Webhook section paste your ngrok HTTPS forwarding URL into the Callback URL field and enter the
VERIFICATION_TOKEN
defined inmain.py
into the Verification Token field. - Click the confirm and save button and wait for the webhook to verify your backend.
- In the section Webhook Fields enable the
messages
toggle under Subscribed Fields.
That’s it! You should now be able to receive WhatsApp messages in your Python backend server.
Webhooks are HTTP callbacks that enable programs to receive real-time updates when certain events occur such as a new message or a status change. Webhooks make system integrations and automation possible by delivering an HTTP request containing event data to a pre-configured URL (in our case the ngrok proxy server url).
To understand the logic and pricing behind webhooks in the Meta cosmos it is helpful to understand some basic principles about conversations.
A ‘conversation’ on WhatsApp API starts when:
1. The User sends a message: This opens a 24-hour window, during which you can reply with messages including text, images, or other media without additional costs.
2. The Business Initiates Contact: If no user message has been received recently (no open 24-hour window), your AI assistant must use a pre-approved template message to start the conversation. You can add custom templates but they need to be approved by Meta.
As long as the user keeps replying, the 24-hour window resets with each new message. This makes it possible to have continuous interaction without additional costs. A Conversation costs about 0.00–0.08 USD. The concrete pricing is based on you conversation type Marketing, Utility, Service and your location. FYI: Service Conversations seem to be nowadays for free. You can find the concrete pricing here: Whatsapp Pricing
Now we are able to receive messages in our backend. Since we have subscribed to message objects, each time a message is sent to your test number, the webhook will create a POST request to the callback URL that you defined in the previous step. What we need to do next is to build an endpoint for POST requests in our FastAPI application.
Let us first define the requirements:
- Return a 200 HTTP Status Code: This is essential to inform CloudAPI that the message has been received successfully. Failing to do so will cause CloudAPI to retry sending the message for up to 7 days.
- Extract Phone Number and Message: The payload of the incoming request contains data that includes the phone number and the message. Which we need to process in the backend.
- Filter Incoming Objects: Since CloudAPI might send multiple events for the same message (such as sent, received, and read), the backend needs to ensures that only one instance of the message is processed.
- Handle Multiple Message Types: The backend can handle different types of messages, such as text, voice messages, and images. In order to not spread the scope of the artice we will only lay the foundation for images but not implement it to the end.
- Process with LLM-Agent Workflow: The extracted information is processed using the LLM-Agent workflow, which we have developed with previous parts of this series. You can also use another agentic implementation, e.g. Langchain or Langgraph
We will receive a payload from a webhook. You can find example payloads in Meta’s documentation: Example Payload
I prefer to write my code with Pydantic to add type safety to my Python code. Moreover, type annotations and Pydantic are an optimal match for FastAPI applications. So, let’s first define the models used in our endpoint:
# app/schema.py
from typing import List, Optional
from pydantic import BaseModel, Field class Profile(BaseModel):
name: str
class Contact(BaseModel):
profile: Profile
wa_id: str
class Text(BaseModel):
body: str
class Image(BaseModel):
mime_type: str
sha256: str
id: str
class Audio(BaseModel):
mime_type: str
sha256: str
id: str
voice: bool
class Message(BaseModel):
from_: str = Field(..., alias="from")
id: str
timestamp: str
text: Text | None = None
image: Image | None = None
audio: Audio | None = None
type: str
class Metadata(BaseModel):
display_phone_number: str
phone_number_id: str
class Value(BaseModel):
messaging_product: str
metadata: Metadata
contacts: List[Contact] | None = None
messages: List[Message] | None = None
class Change(BaseModel):
value: Value
field: str
statuses: List[dict] | None = None
class Entry(BaseModel):
id: str
changes: List[Change]
class Payload(BaseModel):
object: str
entry: List[Entry]
class User(BaseModel):
id: int
first_name: str
last_name: str
phone: str
role: str
class UserMessage(BaseModel):
user: User
message: str | None = None
image: Image | None = None
audio: Audio | None = None
Next, we are going to create some helper functions for using dependency injection in FastAPI:
# app/main.pyfrom app.domain import message_service
def parse_message(payload: Payload) -> Message | None:
if not payload.entry[0].changes[0].value.messages:
return None
return payload.entry[0].changes[0].value.messages[0]
def get_current_user(message: Annotated[Message, Depends(parse_message)]) -> User | None:
if not message:
return None
return message_service.authenticate_user_by_phone_number(message.from_)
def parse_audio_file(message: Annotated[Message, Depends(parse_message)]) -> Audio | None:
if message and message.type == "audio":
return message.audio
return None
def parse_image_file(message: Annotated[Message, Depends(parse_message)]) -> Image | None:
if message and message.type == "image":
return message.image
return None
def message_extractor(
message: Annotated[Message, Depends(parse_message)],
audio: Annotated[Audio, Depends(parse_audio_file)],
):
if audio:
return message_service.transcribe_audio(audio)
if message and message.text:
return message.text.body
return None
- Parsing the Payload: The
parse_message
function extracts the first message from the incoming payload if it exists. This function returnsNone
if no messages are found, so that only valid messages are processed. - User Authentication: The
get_current_user
function uses theparse_message
dependency injection to extract the message and then authenticates the user based on the phone number associated with the message. Here we ensure that only authenticated users are allowed to send messages. - Audio and Image Parsing: These functions extract audio or image files from the message if the message type is “audio” or “image,” respectively. This allows the application to handle different types of media.
- Message Extraction: The
message_extractor
function attempts to extract text from the message or transcribe audio into text. This ensures that regardless of the message type, the content can be processed.
Here we have one import from our domain layer. The whole script message_service
is where we place all domain-specific code for this implementation, such as authenticate_user_by_phone_number
and transcribe_audio
.
# app/main.py
import threading
from typing_extensions import Annotated
from fastapi import APIRouter, Query, HTTPException, Depends
from app.domain import message_service
from app.schema import Payload, Message, Audio, Image, User # ... rest of the code ...
@app.post("/", status_code=200)
def receive_whatsapp(
user: Annotated[User, Depends(get_current_user)],
user_message: Annotated[str, Depends(message_extractor)],
image: Annotated[Image, Depends(parse_image_file)],
):
if not user and not user_message and not image:
return {"status": "ok"}
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
if image:
return print("Image received")
if user_message:
thread = threading.Thread(
target=message_service.respond_and_send_message,
args=(user_message, user)
)
thread.daemon = True
thread.start()
return {"status": "ok"}
- POST Endpoint Implementation: This endpoint handles the incoming POST request. It checks if the user, message, or image is valid. If none are valid, it simply returns a status message to CloudAPI. If the user is not authenticated, it raises an
HTTPException
with a 401 status code. - Processing Images and Messages: If an image is received, we make a simple stdout print as a placeholder for future image handling. If a text message is received, it is processed asynchronously using a separate thread to avoid blocking the main application thread. The
message_service.respond_and_send_message
function is invoked to handle the message according to the LLM-Agent workflow.
Explanation for Using Thread Pooling for the Webhook: WhatsApp will resend the webhook until it gets a 200 response, so thread pooling is used to ensure that message handling doesn’t block the webhook response.
In our presentation layer where we previously defined our endpoint, we use some message_service
functions that need to be defined next. Specifically, we need an implementation for processing and transcribing audio payloads, authenticating users, and finally invoking our agent and sending a response back. We will place all this functionality inside domain/message_service.py
. In production settings, as your application grows, I would recommend splitting them further down into, e.g., transcription_service.py
, message_service.py
, and authentication_service.py
.
In multiple functions in this section, we will make requests to the Meta API "https://graph.facebook.com/..."
. In all of these requests, we need to include authorization headers with WHATSAPP_API_KEY
, which we created in step 1.3, as the bearer token. I usually store API keys and tokens in an .env
file and access them with the Python dotenv
library. We also use the OpenAI client with your OPENAI_API_KEY
, which could also be stored in the .env
file.
But for simplicity, let’s just place and initialize them at the top of message_service.py
scripts as follows:
import os
import json
import requests
from typing import BinaryIOWHATSAPP_API_KEY = "YOUR_ACCESS_TOKEN"
llm = OpenAI(api_key="YOUR_OPENAI_API_KEY")
Replace “YOUR_ACCESS_TOKEN” with your actual access token that you created in step 1.3.
Handling voice records from a WhatsApp webhook is not as straightforward as it may seem. First of all, it is important to know that the incoming webhook only tells us the data type and an object ID. So it does not contain the binary audio file. We first have to download the audio file using Meta’s Graph API. To download our received audio, we need to make two sequential requests. The first one is a GET request with the object_id
to obtain the download URL. This download URL is the target of our second GET request.
def download_file_from_facebook(file_id: str, file_type: str, mime_type: str) -> str | None:
# First GET request to retrieve the download URL
url = f"https://graph.facebook.com/v19.0/{file_id}"
headers = {"Authorization": f"Bearer {WHATSAPP_API_KEY}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
download_url = response.json().get('url')
# Second GET request to download the file
response = requests.get(download_url, headers=headers)
if response.status_code == 200:
# Extract file extension from mime_type
file_extension = mime_type.split('/')[-1].split(';')[0]
# Create file_path with extension
file_path = f"{file_id}.{file_extension}"
with open(file_path, 'wb') as file:
file.write(response.content)
if file_type == "image" or file_type == "audio":
return file_path
raise ValueError(f"Failed to download file. Status code: {response.status_code}")
raise ValueError(f"Failed to retrieve download URL. Status code: {response.status_code}")
Here, we basically get the download URL and download the file to the file system using the object ID and the file extension as its file_path
. If something fails, we raise a ValueError
that indicates where the error occurred.
Next, we simply define a function that takes the audio binary and transcribes it using Whisper:
def transcribe_audio_file(audio_file: BinaryIO) -> str:
if not audio_file:
return "No audio file provided"
try:
transcription = llm.audio.transcriptions.create(
file=audio_file,
model="whisper-1",
response_format="text"
)
return transcription
except Exception as e:
raise ValueError("Error transcribing audio") from e
And finally, let’s bring the download and transcription functions together:
def transcribe_audio(audio: Audio) -> str:
file_path = download_file_from_facebook(audio.id, "audio", audio.mime_type)
with open(file_path, 'rb') as audio_binary:
transcription = transcribe_audio_file(audio_binary)
try:
os.remove(file_path)
except Exception as e:
print(f"Failed to delete file: {e}")
return transcription
While using the test number provided by Meta, we have to predefine which numbers our chatbot can send messages to. I am not quite sure and have not tested if any number can send a message to our chatbot. But anyway, as soon as we switch to a custom number, we don’t want anyone to be able to execute our agent chatbot. So we need a method to authenticate the user. We have several options to do this. First of all, we have to think of where to store user information. We could use, for example, a database like PostgreSQL or a non-relational database like Firestore. We can predefine our users in the file system in a JSON file or in an .env
file. For this tutorial, I will go with the simplest way and hardcode the user within a list in our authentication function.
A list entry has the structure of the User
model as defined in step 5.1. So a user consists of an ID, first name, last name, and phone number. We have not implemented a role system in our agent workflow yet. But in most use cases with different users, such as in the example case of a small business assistant, different users will have different rights and access scopes. For now, we just pass "default"
as a placeholder role.
def authenticate_user_by_phone_number(phone_number: str) -> User | None:
allowed_users = [
{"id": 1, "phone": "+1234567890", "first_name": "John", "last_name": "Doe", "role": "default"},
{"id": 2, "phone": "+0987654321", "first_name": "Jane", "last_name": "Smith", "role": "default"}
]
for user in allowed_users:
if user["phone"] == phone_number:
return User(**user)
return None
So just verify if the phone number is in our list of allowed_users
and return the user if it is. Otherwise, we return None
. If you look at our endpoint in step 5.3, you will see we raise an error if the user is None
to prevent further processing of unauthorized user messages.
Now, our last helper function before we can actually invoke our agent is send_whatsapp_message
. I have included two modes into this function because of some Meta-specific WhatsApp API logic.
Basically, you are not allowed to send a custom message to a user as a conversation starter. This means you can respond with an individual text message if the user starts the conversation and writes a message to the chatbot first. Otherwise, if you want the chatbot to initiate a conversation, you are limited to approved templates, like the “Hello World” template.
Also important to mention, when we talk about Meta logic, a conversation after being started opens a conversation window of 24 hours in which you can send messages to that user. This conversation window is also what gets charged, not the individual message. It gets a bit more complex based on the type of conversation, such as marketing, support, etc.
You can also define a template on your own and let it be approved by Meta. I have not done that at this point, so to test if we can send a message from our backend to a user, I use the “Hello World” template. If you add some custom approved templates, you can also use this function to send them to the user.
So back to the code. To send a message, we make a POST request and define a payload that either includes the text body or the template:
def send_whatsapp_message(to, message, template=True):
url = f"https://graph.facebook.com/v18.0/289534840903017/messages"
headers = {
"Authorization": f"Bearer " + WHATSAPP_API_KEY,
"Content-Type": "application/json"
}
if not template:
data = {
"messaging_product": "whatsapp",
"preview_url": False,
"recipient_type": "individual",
"to": to,
"type": "text",
"text": {
"body": message
}
}
else:
data = {
"messaging_product": "whatsapp",
"to": to,
"type": "template",
"template": {
"name": "hello_world",
"language": {
"code": "en_US"
}
}
} response = requests.post(url, headers=headers, data=json.dumps(data))
return response.json()
Finally, we can integrate our agent from our previous examples. At this stage, you can also integrate your custom agent, a Langchain AgentExecutor
, Langgraph AgentWorkflow
, etc.
So our main function that will be called on each incoming message is respond_and_send_message
, which takes the user_message
string and passes it to our agent workflow as the input object.
# app/domain/message_service.py
import json
import requests
from app.domain.agents.routing_agent import RoutingAgent
from app.schema import User def respond_and_send_message(user_message: str, user: User):
agent = RoutingAgent()
response = agent.run(user_message, user.id)
send_whatsapp_message(user.phone, response, template=False)
After invoking our agent, we get a response message that we want to send back to the user using the send_whatsapp_message function.
Source link
#Creating #WhatsApp #Agent #GPT4o #Lukasz #Kowejsza #Dec