Since our server is operating regionally, the WhatsApp Webhook can not name the endpoint for verification. What we want is a public URL that can be utilized by the webhook. There are two choices: deploy the applying to a cloud server or create a proxy server tunnel. Since we’re nonetheless within the improvement course of, we’ll use the second possibility.
- Go to ngrok Signup and create a free account.
- Set up ngrok regionally. Relying in your system, you should use Brew, Chocolatey, or just obtain and set up it. See: Setup & Installation.
- After set up, add your authentication code utilizing the next command in your terminal. Change
$YOUR-AUTHENTICATION_TOKEN
along with your ngrok authentication token, which might be discovered beneath “Your Authtoken” within the ngrok dashboard. - Start forwarding site visitors out of your localhost on port 8000 by operating the next command in your terminal:
> ngrok config add-authtoken $YOUR-AUTHENTICATION_TOKEN
> ngrok http http://localhost:8000Forwarding https://<random-string>.ngrok.io -> http://localhost:8000
Your native server is now accessible through public URLs supplied by ngrok. It’s best to see one thing like this:
Forwarding https://<random-string>.ngrok.io -> http://localhost:8000
Use the HTTPS URL supplied by ngrok for the webhook configuration.
Now allow us to return to Meta’s Cloud API to implement the specified webhook.
- Navigate to Meta for Developers and choose the app created earlier than.
- Within the left-hand menu go to WhatsApp > Configuration.
- Within the Webhook part paste your ngrok HTTPS forwarding URL into the Callback URL subject and enter the
VERIFICATION_TOKEN
outlined infundamental.py
into the Verification Token subject. - Click on the affirm and save button and await the webhook to confirm your backend.
- Within the part Webhook Fields allow the
messages
toggle beneath Subscribed Fields.
That’s it! It’s best to now have the ability to obtain WhatsApp messages in your Python backend server.
Webhooks are HTTP callbacks that allow applications to obtain real-time updates when sure occasions happen comparable to a brand new message or a standing change. Webhooks make system integrations and automation attainable by delivering an HTTP request containing occasion knowledge to a pre-configured URL (in our case the ngrok proxy server url).
To grasp the logic and pricing behind webhooks within the Meta cosmos it’s useful to grasp some primary rules about conversations.
A ‘dialog’ on WhatsApp API begins when:
1. The Consumer sends a message: This opens a 24-hour window, throughout which you’ll reply with messages together with textual content, photos, or different media with out extra prices.
2. The Enterprise Initiates Contact: If no consumer message has been acquired lately (no open 24-hour window), your AI assistant should use a pre-approved template message to start out the dialog. You’ll be able to add customized templates however they have to be accredited by Meta.
So long as the consumer retains replying, the 24-hour window resets with every new message. This makes it attainable to have steady interplay with out extra prices. A Dialog prices about 0.00–0.08 USD. The concrete pricing relies on you dialog sort Advertising, Utility, Service and your location. FYI: Service Conversations appear to be these days at no cost. You’ll find the concrete pricing right here: Whatsapp Pricing
Now we’re capable of obtain messages in our backend. Since we’ve subscribed to message objects, every time a message is shipped to your take a look at quantity, the webhook will create a POST request to the callback URL that you simply outlined within the earlier step. What we have to do subsequent is to construct an endpoint for POST requests in our FastAPI software.
Allow us to first outline the necessities:
- Return a 200 HTTP Standing Code: That is important to tell CloudAPI that the message has been acquired efficiently. Failing to take action will trigger CloudAPI to retry sending the message for as much as 7 days.
- Extract Telephone Quantity and Message: The payload of the incoming request incorporates knowledge that features the telephone quantity and the message. Which we have to course of within the backend.
- Filter Incoming Objects: Since CloudAPI would possibly ship a number of occasions for a similar message (comparable to despatched, acquired, and browse), the backend must ensures that just one occasion of the message is processed.
- Deal with A number of Message Varieties: The backend can deal with various kinds of messages, comparable to textual content, voice messages, and pictures. With a view to not unfold the scope of the artice we’ll solely lay the muse for photos however not implement it to the top.
- Course of with LLM-Agent Workflow: The extracted info is processed utilizing the LLM-Agent workflow, which we’ve developed with earlier components of this sequence. You can even use one other agentic implementation, e.g. Langchain or Langgraph
We’ll obtain a payload from a webhook. You’ll find instance payloads in Meta’s documentation: Example Payload
I desire to write down my code with Pydantic so as to add sort security to my Python code. Furthermore, sort annotations and Pydantic are an optimum match for FastAPI purposes. So, let’s first outline the fashions utilized in our endpoint:
# app/schema.py
from typing import Record, Optionally available
from pydantic import BaseModel, Subject class Profile(BaseModel):
title: str
class Contact(BaseModel):
profile: Profile
wa_id: str
class Textual content(BaseModel):
physique: str
class Picture(BaseModel):
mime_type: str
sha256: str
id: str
class Audio(BaseModel):
mime_type: str
sha256: str
id: str
voice: bool
class Message(BaseModel):
from_: str = Subject(..., alias="from")
id: str
timestamp: str
textual content: Textual content | None = None
picture: Picture | None = None
audio: Audio | None = None
sort: str
class Metadata(BaseModel):
display_phone_number: str
phone_number_id: str
class Worth(BaseModel):
messaging_product: str
metadata: Metadata
contacts: Record[Contact] | None = None
messages: Record[Message] | None = None
class Change(BaseModel):
worth: Worth
subject: str
statuses: Record[dict] | None = None
class Entry(BaseModel):
id: str
adjustments: Record[Change]
class Payload(BaseModel):
object: str
entry: Record[Entry]
class Consumer(BaseModel):
id: int
first_name: str
last_name: str
telephone: str
position: str
class UserMessage(BaseModel):
consumer: Consumer
message: str | None = None
picture: Picture | None = None
audio: Audio | None = None
Subsequent, we’re going to create some helper features for utilizing dependency injection in FastAPI:
# app/fundamental.pyfrom app.area import message_service
def parse_message(payload: Payload) -> Message | None:
if not payload.entry[0].adjustments[0].worth.messages:
return None
return payload.entry[0].adjustments[0].worth.messages[0]
def get_current_user(message: Annotated[Message, Depends(parse_message)]) -> Consumer | None:
if not message:
return None
return message_service.authenticate_user_by_phone_number(message.from_)
def parse_audio_file(message: Annotated[Message, Depends(parse_message)]) -> Audio | None:
if message and message.sort == "audio":
return message.audio
return None
def parse_image_file(message: Annotated[Message, Depends(parse_message)]) -> Picture | None:
if message and message.sort == "picture":
return message.picture
return None
def message_extractor(
message: Annotated[Message, Depends(parse_message)],
audio: Annotated[Audio, Depends(parse_audio_file)],
):
if audio:
return message_service.transcribe_audio(audio)
if message and message.textual content:
return message.textual content.physique
return None
- Parsing the Payload: The
parse_message
perform extracts the primary message from the incoming payload if it exists. This perform returnsNone
if no messages are discovered, in order that solely legitimate messages are processed. - Consumer Authentication: The
get_current_user
perform makes use of theparse_message
dependency injection to extract the message after which authenticates the consumer based mostly on the telephone quantity related to the message. Right here we make sure that solely authenticated customers are allowed to ship messages. - Audio and Picture Parsing: These features extract audio or picture information from the message if the message sort is “audio” or “picture,” respectively. This enables the applying to deal with various kinds of media.
- Message Extraction: The
message_extractor
perform makes an attempt to extract textual content from the message or transcribe audio into textual content. This ensures that whatever the message sort, the content material might be processed.
Right here we’ve one import from our area layer. The entire script message_service
is the place we place all domain-specific code for this implementation, comparable to authenticate_user_by_phone_number
and transcribe_audio
.
# app/fundamental.py
import threading
from typing_extensions import Annotated
from fastapi import APIRouter, Question, HTTPException, Relies upon
from app.area import message_service
from app.schema import Payload, Message, Audio, Picture, Consumer # ... remainder of the code ...
@app.put up("/", status_code=200)
def receive_whatsapp(
consumer: Annotated[User, Depends(get_current_user)],
user_message: Annotated[str, Depends(message_extractor)],
picture: Annotated[Image, Depends(parse_image_file)],
):
if not consumer and never user_message and never picture:
return {"standing": "okay"}
if not consumer:
increase HTTPException(status_code=401, element="Unauthorized")
if picture:
return print("Picture acquired")
if user_message:
thread = threading.Thread(
goal=message_service.respond_and_send_message,
args=(user_message, consumer)
)
thread.daemon = True
thread.begin()
return {"standing": "okay"}
- POST Endpoint Implementation: This endpoint handles the incoming POST request. It checks if the consumer, message, or picture is legitimate. If none are legitimate, it merely returns a standing message to CloudAPI. If the consumer isn’t authenticated, it raises an
HTTPException
with a 401 standing code. - Processing Photos and Messages: If a picture is acquired, we make a easy stdout print as a placeholder for future picture dealing with. If a textual content message is acquired, it’s processed asynchronously utilizing a separate thread to keep away from blocking the primary software thread. The
message_service.respond_and_send_message
perform is invoked to deal with the message in accordance with the LLM-Agent workflow.
Rationalization for Utilizing Thread Pooling for the Webhook: WhatsApp will resend the webhook till it will get a 200 response, so thread pooling is used to make sure that message dealing with doesn’t block the webhook response.
In our presentation layer the place we beforehand outlined our endpoint, we use some message_service
features that have to be outlined subsequent. Particularly, we want an implementation for processing and transcribing audio payloads, authenticating customers, and at last invoking our agent and sending a response again. We’ll place all this performance inside area/message_service.py
. In manufacturing settings, as your software grows, I’d advocate splitting them additional down into, e.g., transcription_service.py
, message_service.py
, and authentication_service.py
.
In a number of features on this part, we’ll make requests to the Meta API "https://graph.fb.com/..."
. In all of those requests, we have to embrace authorization headers with WHATSAPP_API_KEY
, which we created in step 1.3, because the bearer token. I often retailer API keys and tokens in an .env
file and entry them with the Python dotenv
library. We additionally use the OpenAI shopper along with your OPENAI_API_KEY
, which may be saved within the .env
file.
However for simplicity, let’s simply place and initialize them on the prime of message_service.py
scripts as follows:
import os
import json
import requests
from typing import BinaryIOWHATSAPP_API_KEY = "YOUR_ACCESS_TOKEN"
llm = OpenAI(api_key="YOUR_OPENAI_API_KEY")
Change “YOUR_ACCESS_TOKEN” along with your precise entry token that you simply created in step 1.3.
Dealing with voice data from a WhatsApp webhook isn’t as easy as it might appear. To start with, you will need to know that the incoming webhook solely tells us the info sort and an object ID. So it doesn’t comprise the binary audio file. We first need to obtain the audio file utilizing Meta’s Graph API. To obtain our acquired audio, we have to make two sequential requests. The primary one is a GET request with the object_id
to acquire the obtain URL. This obtain URL is the goal of our second GET request.
def download_file_from_facebook(file_id: str, file_type: str, mime_type: str) -> str | None:
# First GET request to retrieve the obtain URL
url = f"https://graph.fb.com/v19.0/{file_id}"
headers = {"Authorization": f"Bearer {WHATSAPP_API_KEY}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
download_url = response.json().get('url')
# Second GET request to obtain the file
response = requests.get(download_url, headers=headers)
if response.status_code == 200:
# Extract file extension from mime_type
file_extension = mime_type.cut up('/')[-1].cut up(';')[0]
# Create file_path with extension
file_path = f"{file_id}.{file_extension}"
with open(file_path, 'wb') as file:
file.write(response.content material)
if file_type == "picture" or file_type == "audio":
return file_path
increase ValueError(f"Didn't obtain file. Standing code: {response.status_code}")
increase ValueError(f"Didn't retrieve obtain URL. Standing code: {response.status_code}")
Right here, we principally get the obtain URL and obtain the file to the file system utilizing the article ID and the file extension as its file_path
. If one thing fails, we increase a ValueError
that signifies the place the error occurred.
Subsequent, we merely outline a perform that takes the audio binary and transcribes it utilizing Whisper:
def transcribe_audio_file(audio_file: BinaryIO) -> str:
if not audio_file:
return "No audio file supplied"
attempt:
transcription = llm.audio.transcriptions.create(
file=audio_file,
mannequin="whisper-1",
response_format="textual content"
)
return transcription
besides Exception as e:
increase ValueError("Error transcribing audio") from e
And at last, let’s carry the obtain and transcription features collectively:
def transcribe_audio(audio: Audio) -> str:
file_path = download_file_from_facebook(audio.id, "audio", audio.mime_type)
with open(file_path, 'rb') as audio_binary:
transcription = transcribe_audio_file(audio_binary)
attempt:
os.take away(file_path)
besides Exception as e:
print(f"Didn't delete file: {e}")
return transcription
Whereas utilizing the take a look at quantity supplied by Meta, we’ve to predefine which numbers our chatbot can ship messages to. I’m not fairly positive and haven’t examined if any quantity can ship a message to our chatbot. However anyway, as quickly as we swap to a customized quantity, we don’t need anybody to have the ability to execute our agent chatbot. So we want a technique to authenticate the consumer. We’ve got a number of choices to do that. To start with, we’ve to consider the place to retailer consumer info. We might use, for instance, a database like PostgreSQL or a non-relational database like Firestore. We will predefine our customers within the file system in a JSON file or in an .env
file. For this tutorial, I’ll go along with the best manner and hardcode the consumer inside an inventory in our authentication perform.
An inventory entry has the construction of the Consumer
mannequin as outlined in step 5.1. So a consumer consists of an ID, first title, final title, and telephone quantity. We’ve got not applied a task system in our agent workflow but. However in most use circumstances with completely different customers, comparable to within the instance case of a small enterprise assistant, completely different customers can have completely different rights and entry scopes. For now, we simply move "default"
as a placeholder position.
def authenticate_user_by_phone_number(phone_number: str) -> Consumer | None:
allowed_users = [
{"id": 1, "phone": "+1234567890", "first_name": "John", "last_name": "Doe", "role": "default"},
{"id": 2, "phone": "+0987654321", "first_name": "Jane", "last_name": "Smith", "role": "default"}
]
for consumer in allowed_users:
if consumer["phone"] == phone_number:
return Consumer(**consumer)
return None
So simply confirm if the telephone quantity is in our record of allowed_users
and return the consumer whether it is. In any other case, we return None
. For those who have a look at our endpoint in step 5.3, you will note we increase an error if the consumer is None
to stop additional processing of unauthorized consumer messages.
Now, our final helper perform earlier than we are able to truly invoke our agent is send_whatsapp_message
. I’ve included two modes into this perform due to some Meta-specific WhatsApp API logic.
Mainly, you aren’t allowed to ship a customized message to a consumer as a dialog starter. This implies you possibly can reply with a person textual content message if the consumer begins the dialog and writes a message to the chatbot first. In any other case, if you’d like the chatbot to provoke a dialog, you might be restricted to accredited templates, just like the “Hey World” template.
Additionally necessary to say, once we discuss Meta logic, a dialog after being began opens a dialog window of 24 hours in which you’ll ship messages to that consumer. This dialog window can be what will get charged, not the person message. It will get a bit extra advanced based mostly on the kind of dialog, comparable to advertising and marketing, help, and so forth.
You can even outline a template by yourself and let or not it’s accredited by Meta. I’ve not performed that at this level, so to check if we are able to ship a message from our backend to a consumer, I take advantage of the “Hey World” template. For those who add some customized accredited templates, you too can use this perform to ship them to the consumer.
So again to the code. To ship a message, we make a POST request and outline a payload that both contains the textual content physique or the template:
def send_whatsapp_message(to, message, template=True):
url = f"https://graph.fb.com/v18.0/289534840903017/messages"
headers = {
"Authorization": f"Bearer " + WHATSAPP_API_KEY,
"Content material-Sort": "software/json"
}
if not template:
knowledge = {
"messaging_product": "whatsapp",
"preview_url": False,
"recipient_type": "particular person",
"to": to,
"sort": "textual content",
"textual content": {
"physique": message
}
}
else:
knowledge = {
"messaging_product": "whatsapp",
"to": to,
"sort": "template",
"template": {
"title": "hello_world",
"language": {
"code": "en_US"
}
}
} response = requests.put up(url, headers=headers, knowledge=json.dumps(knowledge))
return response.json()
Lastly, we are able to combine our agent from our earlier examples. At this stage, you too can combine your customized agent, a Langchain AgentExecutor
, Langgraph AgentWorkflow
, and so forth.
So our fundamental perform that will likely be referred to as on every incoming message is respond_and_send_message
, which takes the user_message
string and passes it to our agent workflow because the enter object.
# app/area/message_service.py
import json
import requests
from app.area.brokers.routing_agent import RoutingAgent
from app.schema import Consumer def respond_and_send_message(user_message: str, consumer: Consumer):
agent = RoutingAgent()
response = agent.run(user_message, consumer.id)
send_whatsapp_message(consumer.telephone, response, template=False)
After invoking our agent, we get a response message that we need to ship again to the consumer utilizing the send_whatsapp_message perform.