diff --git a/.github/workflows/deploy_brach.yml b/.github/workflows/deploy_brach.yml deleted file mode 100644 index 632387f..0000000 --- a/.github/workflows/deploy_brach.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: Copy files to deploy branch - -on: - push: - branches: - - main # Rama principal - -jobs: - sync: - runs-on: ubuntu-latest - - steps: - - name: Checkout main branch - uses: actions/checkout@v2 - with: - fetch-depth: 0 - ref: main - - - name: Copy files - run: | - mkdir -p deployment_directory/pynani - cp -r pynani deployment_directory/pynani - cp pyproject.toml deployment_directory/ - cp README.md deployment_directory/ - cp LICENSE deployment_directory/ - cp .gitignore deployment_directory/ - - - name: Checkout deployment branch - run: | - git checkout deployment-branch - - - name: Configure Git - run: | - git config --global user.name "jorge-jrzz" - git config --global user.email "jorgeang33@gmail.com" - - - name: Copy to Deploy Branch - run: | - cp -r deployment_directory/* . - rm -rf deployment_directory - git add . - - - name: Commit and push changes - env: - ACTIONS_DEPLOY_TOKEN: ${{ secrets.ACTIONS_DEPLOY_TOKEN }} - run: | - git commit -m "馃殌 Deploy files" || echo "No changes to commit" - git push https://x-access-token:${{ secrets.ACTIONS_DEPLOY_TOKEN }}@github.com/jorge-jrzz/Pynani.git deployment-branch diff --git a/Pynani/Messenger.py b/Pynani/Messenger.py deleted file mode 100644 index 4436480..0000000 --- a/Pynani/Messenger.py +++ /dev/null @@ -1,302 +0,0 @@ -from typing import Union, Optional -from pathlib import Path -import mimetypes -import requests - - -class Messenger(): - def __init__(self, access_token: str, page_id: str = 'me'): - self.access_token = access_token - self.page_id = page_id - self._url = f"https://graph.facebook.com/v20.0/{page_id}/messages" - - def verify_token(self, params, token): - mode = params.get("hub.mode") - hub_token = params.get("hub.verify_token") - challenge = params.get("hub.challenge") - - if mode == "subscribe" and challenge: - if hub_token != token: - return "Verification token mismatch", 403 - return challenge, 200 - return "Hello world", 200 - - def get_sender_id(self, data: dict): - try: - return data['entry'][0]['messaging'][0]['sender']['id'] - except (IndexError, KeyError) as e: - print(f"Error accessing sender ID: {e}") - return None - - def get_message_type(self, data: dict): - messaging = data['entry'][0]['messaging'][0] - try: - if 'postback' in messaging: - return 'postback' - message_type = messaging['message'] - if 'text' in message_type: - if 'attachments' in message_type: - if message_type['attachments'][0]['type'] == 'fallback': - return 'link' - return 'text' - if 'attachments' in message_type: - attachment_type = message_type['attachments'][0]['type'] - if 'image' in attachment_type: - if 'sticker_id' in message_type['attachments'][0]['payload']: - return 'sticker' - return 'image' - else: - return attachment_type - except (IndexError, KeyError) as e: - print(f"Error accessing message type: {e}") - return None - - def get_message_text(self, data: dict): - try: - message = data['entry'][0]['messaging'][0] - if 'message' in message: - # print("Hola?") - return message['message']['text'] - elif 'postback' in message: - return message['postback']['title'] - except (IndexError, KeyError) as e: - print(f"Error accessing message text: {e}") - return None - - def send_text_message(self, sender_id, message: Union[str, int]): - header = {"Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}"} - payload = { - "recipient": { - "id": sender_id - }, - "messaging_type": "RESPONSE", - "message": { - "text": message - } - } - - try: - r = requests.post(self._url, headers=header, json=payload, timeout=10) - r.raise_for_status() - return r.json() - except requests.exceptions.RequestException as e: - print(f"Request failed: {e} \n{r.json()}") - return None - - def upload_attachment(self, attachment_type: str, attachment_path: str) -> str: - attachments_url = f"https://graph.facebook.com/v20.0/{self.page_id}/message_attachments" - attachment = Path(attachment_path) - mimetype, _ = mimetypes.guess_type(attachment) - - header = { - "Authorization": f"Bearer {self.access_token}" - } - message = { - "attachment": { - "type": attachment_type, - "payload": { - "is_reusable": "true" - } - } - } - file = { - "filedata": (attachment.name, attachment.open('rb'), mimetype) - } - body = {"message": str(message)} - - r = requests.post(attachments_url, headers=header, files=file, data=body, timeout=20) - - try : - attachment_id = r.json()["attachment_id"] - return attachment_id - except KeyError as e: - print(f"Error uploading attachment: {e}") - return None - - def get_url_attachment(self, data: dict): - try: - return data['entry'][0]['messaging'][0]['message']['attachments'][0]["payload"]["url"] - except (IndexError, KeyError) as e: - print(f"Error accessing attachment url: {e}") - return None - - def get_attachment_type(self, data: dict): - try: - return data['entry'][0]['messaging'][0]['message']['attachments'][0]["type"] - except (IndexError, KeyError) as e: - print(f"Error accessing attachment type: {e}") - return None - - def send_attachment(self, sender_id: str, attachment_type: str, attachment_url: str): - header = {"Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}"} - payload = { - "recipient":{ - "id": sender_id - }, - "messaging_type": "RESPONSE", - "message": { - "attachment": { - "type": attachment_type, - "payload": { - "url": attachment_url, - "is_reusable": True - } - } - } - } - - r = requests.post(self._url, headers=header, json=payload, timeout=10) - return r.json() - - def send_local_attachment(self, sender_id: str, attachment_type: str, attachment_path: str): - attachment = Path(attachment_path) - mimetype, _ = mimetypes.guess_type(attachment) - recipient = {"id": sender_id} - message = { - "attachment": { - "type": attachment_type, - "payload": { - "is_reusable": "true" - } - } - } - - header = {"Authorization": f"Bearer {self.access_token}"} - body = { - "recipient": str(recipient), - "message": str(message) - } - file = { - "filedata": (attachment.name, attachment.open('rb'), mimetype) - } - - r = requests.post(self._url, headers=header, data=body, files=file, timeout=10) - return r.json() - - def download_attachment(self, attachment_url: str, path_dest: str): - response = requests.get(attachment_url, stream=True, timeout=10) - if response.status_code == 200: - with open(path_dest, 'wb') as file: - for chunk in response.iter_content(1024): - file.write(chunk) - - print('Downloaded attachment successfully!') - else: - print('Error downloading attachment') - - def send_quick_reply(self, sender_id, message: str, quick_replies: list): - if len(quick_replies) > 13: - print("Quick replies should be less than 13") - quick_replies = quick_replies[:13] - header = {"Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}"} - payload = { - "recipient": { - "id": sender_id - }, - "messaging_type": "RESPONSE", - "message": { - "text": message, - "quick_replies": quick_replies - } - } - - r = requests.post(self._url, headers=header, json=payload, timeout=10) - return r.json() - - def send_button_template(self, sender_id: str, message: str, buttons: list): - header = {"Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}"} - payload = { - "recipient": { - "id": sender_id - }, - "messaging_type": "RESPONSE", - "message": { - "attachment": { - "type": "template", - "payload": { - "template_type": "button", - "text": message, - "buttons": buttons - } - } - } - } - - r = requests.post(self._url, headers=header, json=payload, timeout=10) - return r.json() - - def send_media_template(self, sender_id: str, media_type: str, attachment_id: str, buttons: list): - header = {"Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}"} - body = { - "recipient": { - "id": sender_id - }, - "messaging_type": "RESPONSE", - "message": { - "attachment": { - "type": "template", - "payload": { - "template_type": "media", - "elements": [ - { - "media_type": media_type, - "attachment_id": attachment_id, - "buttons": buttons - } - ] - } - } - } - } - - r = requests.post(self._url, headers=header, json=body, timeout=10) - return r.json() - - def send_generic_template(self, sender_id: str, title: str, image_url: Optional[str] = None, default_url: Optional[str] = None, subtitle: Optional[str] = None, buttons: Optional[list] = None): - if default_url: - default_action = { - "type": "web_url", - "url": default_url, - "messenger_extensions": "false", - "webview_height_ratio": "tall" - } - else: - default_action = None - - header = {"Content-Type": "application/json", - "Authorization": f"Bearer {self.access_token}"} - body = { - "recipient": { - "id": sender_id - }, - "message": { - "attachment": { - "type": "template", - "payload": { - "template_type": "generic", - "elements": [ - { - "title": title, - "image_url": image_url if image_url else "", - "subtitle": subtitle if subtitle else "", - "default_action": default_action, - "buttons": buttons if buttons else [] - } - ] - } - } - } - } - - try: - r = requests.post(self._url, headers=header, json=body, timeout=10) - r.raise_for_status() - return r.json() - except requests.exceptions.RequestException as e: - print(f"Request failed: {e} \n{r.json()}") - return None diff --git a/Pynani/__init__.py b/Pynani/__init__.py deleted file mode 100644 index 9ba6f29..0000000 --- a/Pynani/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .Messenger import Messenger -from .utils.QuickReply import QuickReply -from .utils.Buttons import Buttons \ No newline at end of file diff --git a/Pynani/utils/Buttons.py b/Pynani/utils/Buttons.py deleted file mode 100644 index a209955..0000000 --- a/Pynani/utils/Buttons.py +++ /dev/null @@ -1,52 +0,0 @@ -from typing import Optional, Union - - -class Buttons: - def __make_button(self, title: str, url: Optional[str] = None, call_number: Optional[str] = None) -> dict: - if url is not None and call_number is not None: - raise ValueError("You can't have both url and call_number at the same time") - - if url: - type_button = "web_url" - url_button = url - payload_button = "" - elif call_number: - type_button = "phone_number" - url_button = "" - payload_button = call_number - elif not url and not call_number: - type_button = "postback" - url_button = "" - payload_button = "DEVELOPER_DEFINED_PAYLOAD" - - return { - "type": type_button, - "title": title, - "payload": payload_button, - "url": url_button, - } - - def basic_buttons(self, buttons: Union[str, list]) -> list: - if isinstance(buttons, str): - return [self.__make_button(buttons)] - else: - if len(buttons) > 3: - print("Buttons template should be less than 3") - buttons = buttons[:3] - return [self.__make_button(button) for button in buttons] - - def leave_buttons(self, buttons: Union[dict, list]): - if isinstance(buttons, dict): - return [self.__make_button(**buttons)] - else: - if len(buttons) > 3: - print("Buttons template should be less than 3") - buttons = buttons[:3] - return [self.__make_button(**button) for button in buttons] - - -# bb = Button() -# # print(bb.basic_buttons(["Hola", "Mundo", "馃敟"])) -# print(bb.leave_buttons([{"title": "Hola", "url": "https://www.google.com"}, {"title": "Mundo", "call_number": "+525555555555"}])) - - diff --git a/Pynani/utils/QuickReply.py b/Pynani/utils/QuickReply.py deleted file mode 100644 index d07c5e5..0000000 --- a/Pynani/utils/QuickReply.py +++ /dev/null @@ -1,42 +0,0 @@ -class QuickReply(): - def __make_quick_button(self, text: str, payload: str = "", image_url: str = None) -> dict: - return { - "content_type": "text", - "title": text, - "payload": payload, - "image_url": image_url if image_url else "", - } - - def quick_buttons(self, buttons: list) -> list: - r_buttons = [] - if len(buttons) > 13: - print("Quick replies should be less than 13") - buttons = buttons[:13] - - for b in buttons: - if not isinstance(b, str): - raise ValueError("Each button should be a string") - else: - r_buttons.append(self.__make_quick_button(b)) - - return r_buttons - - def quick_buttons_image(self, buttons: list) -> list: - r_buttons = [] - if len(buttons) > 13: - print("Quick replies should be less than 13") - buttons = buttons[:13] - for b in buttons: - if not isinstance(b, dict): - raise ValueError("Each button should be a dictionary") - else: - r_buttons.append(self.__make_quick_button(**b)) - - return r_buttons - - -# botones = ['pedro', 'juan', 'popo'] -# b_imagen = [{"text": "1", "image_url": "https://upload.wikimedia.org/wikipedia/commons/b/b9/Solid_red.png"}, -# {"text": "2", "image_url": "https://upload.wikimedia.org/wikipedia/commons/b/b9/Solid_red.png"}] -# sb = qui.quick_buttons(botones) -# cb = qui.quick_buttons_image(b_imagen) diff --git a/README.md b/README.md index 5059123..eb0a65c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,141 @@ -# Pynani -Opensource python wrapper to Messenger and Instagram API +banner Pynani + +--- + +Opensource python wrapper to Messenger API + +## Features + +### API + +- Verify the webhook +- Send text messages +- Send attachments from a remote file (image, audio, video, file) +- Send attachments from a local file (image, audio, video, file) +- Send templates (generic, buttons, media, receipent) +- Send quick replies + +### Other functionalities + +- Get sender id +- Get type of message received +- Get text of the message received +- Get the url of the attachment received +- Get type of the attachment received +- Download attachments received + +## Installation + +Install Pynani with pip + +```bash + pip install pynani +``` + +Or install with pipenv (requires pipenv installed) + +```bash + pipenv install pynani +``` + +## Getting started + +### Prerequisites + +- **Python 3.8**+ installed +- To get started using this module, you will need **page access token** which you can get from the Facebook Developer Portal + +### A simple echo bot + +The Messenger class (defined in Messenger.py) encapsulates all API calls in a single class. It provides functions such as send_xyz (send_message, send_attachment etc.) and several ways to listen for incoming messages. + +Create a file called echo_bot.py. Then, open the file and create an instance of the Messenger class. + +```python +from pynani import Messenger + +PAGE_ACCESS_TOKEN = 'EAAxxxxxxx...' +mess = Messenger(PAGE_ACCESS_TOKEN) +``` + +> [!IMPORTANT] +> Make sure to actually replace PAGE_ACCESS_TOKEN with your own page access token. + +After that declaration, we need to register some message handlers. First, we need to create and verify a webhook with the help of _Flask_ or _FastAPI_. + +```python +from flask import Flask, request, jsonify + +app = Flask(__name__) + +TOKEN = "abc123" + +@app.get("/") +def meta_verify(): + return mess.verify_token(request.args, TOKEN) +``` + +Now let's define a webhook that handles certain messages + +```python +@app.post("/") +def meta_webhook(): + data = request.get_json() + sender_id = mess.get_sender_id(data) + message = mess.get_message_text(data) + if message == "Hello": + mess.send_text_message(sender_id, "Hello, World!") + if message == "Bye": + mess.send_text_message(sender_id, "Nice to meet you! 馃憤馃徑") + + return jsonify({"status": "success"}), 200 +``` + +We now have a basic bot which replies a static message to "hello" and "bye" messages. To start the bot, add the following to our source file: + +```python +if __name__ =='__main__': + app.run(port=8080, debug=True) +``` + +Alright, that's it! Our source file now looks like this: + +```python +from flask import Flask, request, jsonify +from pynani import Messenger + +PAGE_ACCESS_TOKEN = 'EAAxxxxxxx...' +TOKEN = "abc123" + +mess = Messenger(PAGE_ACCESS_TOKEN) +app = Flask(__name__) + +@app.get("/") +def meta_verify(): + return mess.verify_token(request.args, TOKEN) + +@app.post("/") +def meta_webhook(): + data = request.get_json() + sender_id = mess.get_sender_id(data) + message = mess.get_message_text(data) + if message == "Hello": + mess.send_text_message(sender_id, "Hello, World!") + if message == "Bye": + mess.send_text_message(sender_id, "Nice to meet you! 馃憤馃徑") + + return jsonify({"status": "success"}), 200 + +if __name__ =='__main__': + app.run(port=8080, debug=True) +``` + +To start the bot, simply open up a terminal and enter `python echo_bot.py` to run the bot! Test it by sending messages ("hello" and "bye"). + +## Related + +Here are some related projects that I was inspired by them. + +- [pyTelegramBotAPI](https://github.com/eternnoir/pyTelegramBotAPI?tab=readme-ov-file) +- [WhatsApp Cloud API Wrapper](https://github.com/Neurotech-HQ/heyoo) +- [Messenger API Python](https://github.com/krishna2206/messenger-api-python) diff --git a/hook.py b/hook.py deleted file mode 100644 index f5632c4..0000000 --- a/hook.py +++ /dev/null @@ -1,39 +0,0 @@ -from flask import Flask, request, jsonify -from pynani import Messenger, Buttons - -PAGE_ACCESS_TOKEN = 'EAAL4nnkuqAcBO7WJZBbXWineRCJaQv3yvPL3DfzQyrmV6PiUNYrTxxHuZC7S3bWC2fnzGwaxNR5uPJ4OhuEOKpvBensrlHPId6pXHdOZBw4J70RC1jgRhhp9137TgoGRSlpZAdZCHAeBwmloZBqGWNmLUv9CPkHYUY6iIqx23SuOQu1sdghVOnmjSDr0qBc9ol' -app = Flask(__name__) -mess = Messenger(PAGE_ACCESS_TOKEN) -b = Buttons() - -@app.route("/", methods=['GET']) -def meta_verify(): - return mess.verify_token(request.args, '12345') - -@app.route("/", methods=['POST']) -def meta_webhook(): - # print("Algo paso!!!") - data = request.get_json() - mensaje = mess.get_message_text(data) - # print(f'\nData: {data}\n') - # print(mensaje) - # print(mess.get_message_type(data)) - sender_id = mess.get_sender_id(data) - if mensaje == "hola": - mess.send_text_message(sender_id, ["Hola", "驴C贸mo est谩s?"]) - if mensaje == "botones simples": - botones = b.basic_buttons(["Opci贸n 1", "Opci贸n 2", "馃敟"]) - mess.send_button_template(sender_id, "Hola, 驴qu茅 deseas hacer?", botones) - elif mensaje == "botones complicados": - cb = b.leave_buttons([{'title': 'Opci贸n 1', 'url': 'https://www.google.com'}, {'title': 'Opci贸n 2', 'call_number': '+525555555555'}]) - mess.send_button_template(sender_id, "Hola, 驴qu茅 deseas hacer?", cb) - elif mensaje == "imagen": - r = mess.send_local_attachment(sender_id, 'image', './test.png') - print(f'\n\n{r}') - elif mensaje == "ata": - r = mess.upload_attachment('image', './test.png') - print(f'\n\n{r}') - return jsonify({"status": "success"}), 200 - -if __name__ =='__main__': - app.run(port=8080, debug=True) \ No newline at end of file diff --git a/pynani/Messenger.py b/pynani/Messenger.py new file mode 100644 index 0000000..3516b1c --- /dev/null +++ b/pynani/Messenger.py @@ -0,0 +1,797 @@ +# 鈻堚枅鈻堚枅鈻堚枅鈺椻枒鈻堚枅鈺椻枒鈻戔枒鈻堚枅鈺椻枅鈻堚枅鈺椻枒鈻戔枅鈻堚晽鈻戔枅鈻堚枅鈻堚枅鈺椻枒鈻堚枅鈻堚晽鈻戔枒鈻堚枅鈺椻枅鈻堚晽 +# 鈻堚枅鈺斺晲鈺愨枅鈻堚晽鈺氣枅鈻堚晽鈻戔枅鈻堚晹鈺濃枅鈻堚枅鈻堚晽鈻戔枅鈻堚晳鈻堚枅鈺斺晲鈺愨枅鈻堚晽鈻堚枅鈻堚枅鈺椻枒鈻堚枅鈺戔枅鈻堚晳 +# 鈻堚枅鈻堚枅鈻堚枅鈺斺暆鈻戔暁鈻堚枅鈻堚枅鈺斺暆鈻戔枅鈻堚晹鈻堚枅鈺椻枅鈻堚晳鈻堚枅鈻堚枅鈻堚枅鈻堚晳鈻堚枅鈺斺枅鈻堚晽鈻堚枅鈺戔枅鈻堚晳 +# 鈻堚枅鈺斺晲鈺愨晲鈺濃枒鈻戔枒鈺氣枅鈻堚晹鈺濃枒鈻戔枅鈻堚晳鈺氣枅鈻堚枅鈻堚晳鈻堚枅鈺斺晲鈺愨枅鈻堚晳鈻堚枅鈺戔暁鈻堚枅鈻堚枅鈺戔枅鈻堚晳 +# 鈻堚枅鈺戔枒鈻戔枒鈻戔枒鈻戔枒鈻戔枅鈻堚晳鈻戔枒鈻戔枅鈻堚晳鈻戔暁鈻堚枅鈻堚晳鈻堚枅鈺戔枒鈻戔枅鈻堚晳鈻堚枅鈺戔枒鈺氣枅鈻堚枅鈺戔枅鈻堚晳 +# 鈺氣晲鈺濃枒鈻戔枒鈻戔枒鈻戔枒鈻戔暁鈺愨暆鈻戔枒鈻戔暁鈺愨暆鈻戔枒鈺氣晲鈺愨暆鈺氣晲鈺濃枒鈻戔暁鈺愨暆鈺氣晲鈺濃枒鈻戔暁鈺愨晲鈺濃暁鈺愨暆 + + +import mimetypes +import json +from pathlib import Path +from typing import Union, Optional, Tuple, Dict, List +import requests +from requests.exceptions import RequestException +from .utils import get_logger + + +logger = get_logger(__name__) + +HOOK_PAGE = """ + + + + + Verificaci贸n de Token + + + +
+

Hello, World!

+

This is the endpoint to verify the token 馃攼馃敆

+
+ +""" + + +def jsonify(data: Union[Dict, str], status_code: int) -> Tuple: + """ + Converts the given data to a JSON response with the specified status code. + + Args: + data (Union[Dict, str]): The data to be converted to JSON. It can be a dictionary or a string. + status_code (int): The HTTP status code to be returned with the response. + + Returns: + Tuple: A tuple containing the JSON response, the status code, and the headers. + """ + + if isinstance(data, dict): + return json.dumps(data), status_code, {'Content-Type': 'application/json'} + elif isinstance(data, str): + return data.encode('utf-8'), status_code, {'Content-Type': 'text/html'} + + +def get_long_lived_token(app_id: str, app_secret: str, short_lived_token: str, save_env: Optional[bool] = False) -> Optional[str]: + """ + Obtains a long-lived access token using the provided short-lived token. + + Args: + app_id (str): The application ID. + app_secret (str): The application secret. + short_lived_token (str): The short-lived page access token. + save_env (Optional[bool], optional): Whether to save the long-lived token to the .env file. Defaults to False. + + Returns: + Optional[str]: The long-lived access token if successful, otherwise None. + + Exception: + RequestException: If an error occurs during the request. + + Example: + >>> get_long_lived_token('765xxx', 'e0fcxxx', 'EAAK4AXXXX', True) + "EAAK4XXXX" + """ + + url = f"https://graph.facebook.com/v20.0/oauth/access_token?grant_type=fb_exchange_token&client_id={app_id}&client_secret={app_secret}&fb_exchange_token={short_lived_token}" + try: + r = requests.get(url, timeout=10) + r.raise_for_status() + logger.info("Long-lived token obtained successfully - %d", 200) + if save_env: + with open('.env', 'a', encoding='utf-8') as file: + file.write(f'\nACCESS_TOKEN={r.json()["access_token"]}') + return r.json()['access_token'] + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + +class Messenger(): + """ + Initializes the Messenger class with the provided access token and page ID. + + Args: + access_token (str): The access token for authenticating API requests. + page_id (str, optional): The page ID for the Facebook page. Defaults to 'me'. + """ + + def __init__(self, access_token: str, page_id: str = 'me') -> None: + self.access_token = access_token + self.page_id = page_id + self.__url = f"https://graph.facebook.com/v20.0/{page_id}/messages" + + + def verify_token(self, params: Dict, token: str) -> Tuple: + """ + Verifies the provided token against the expected token. + + Args: + params (Dict): The parameters received in the verification request. + token (str): The expected verification token. + + Returns: + Tuple: A tuple containing the JSON response, the status code, and the headers. + + Example: + >>> verify_token({"hub.mode": "subscribe", "hub.challenge": "1950414725", "hub.verify_token": "1234", }, "1234") + Whit a Webhook made with Flask, the function verify_token() will be used as follows: + >>> @app.get("/") + >>> def meta_verify(): + >>> return mess.verify_token(request.args, TOKEN) + """ + + mode = params.get("hub.mode") + hub_token = params.get("hub.verify_token") + challenge = params.get("hub.challenge") + + if mode == "subscribe" and challenge: + if hub_token != token: + logger.error('Verification token mismatch - %d', 403) + return jsonify({"Error": "Verification token mismatch"}, 403) + logger.info('Verification successful - %d', 200) + return jsonify(challenge, 200) + logger.warning('This endpoint is to verify token - %d', 200,) + return jsonify(HOOK_PAGE, 200) + + + def get_sender_id(self, data: dict) -> Optional[str]: + """ + Extracts the sender ID from the provided data. + + Args: + data (dict): The data received from the webhook event. + + Returns: + Optional[str]: The sender ID if found, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> get_sender_id(request.get_json()) + "1234567897654321" + """ + + try: + return data['entry'][0]['messaging'][0]['sender']['id'] + except (IndexError, KeyError) as e: + logger.error("Error accessing sender ID: %s", e) + return None + + + def get_message_type(self, data: Dict) -> Optional[str]: + """ + Determines the type of message received from the webhook event. + + Args: + data (Dict): The data received from the webhook event. + + Returns: + Optional[str]: The type of message if found, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> get_message_type(request.get_json()) + "text" + """ + + messaging = data['entry'][0]['messaging'][0] + try: + if 'postback' in messaging: + return 'postback' + message_type = messaging['message'] + if 'text' in message_type: + if 'attachments' in message_type: + if message_type['attachments'][0]['type'] == 'fallback': + return 'link' + return 'text' + if 'attachments' in message_type: + attachment_type = message_type['attachments'][0]['type'] + if 'image' in attachment_type: + if 'sticker_id' in message_type['attachments'][0]['payload']: + return 'sticker' + return 'image' + else: + return attachment_type + except (IndexError, KeyError) as e: + logger.error("Error accessing message type: %s", e) + return None + + + def get_message_text(self, data: Dict) -> Optional[str]: + """ + Extracts the text message from the provided data. + + Args: + data (Dict): The data received from the webhook event. + + Returns: + Optional[str]: The text message if found, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> get_message_text(request.get_json()) + "Hello 馃憢馃徑" + """ + + try: + message = data['entry'][0]['messaging'][0] + if 'message' in message: + return message['message']['text'] + elif 'postback' in message: + return message['postback']['title'] + except (IndexError, KeyError) as e: + logger.error("Error accessing message text: %s", e) + return None + + + def send_text_message(self, sender_id: str, message: Union[str, int]) -> Optional[Dict]: + """ + Sends a text message to the specified sender. + + Args: + sender_id (str): The ID of the recipient. + message (Union[str, int]): The message to be sent. + + Returns: + Optional[Dict]: The response from the server if the request was successful, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> send_text_message(sender_id, "Hello, how can I help you?") + """ + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "messaging_type": "RESPONSE", + "message": { + "text": message + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=10) + r.raise_for_status() + logger.info("Message sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def upload_attachment(self, attachment_type: str, attachment_path: str) -> str: + """ + Uploads an attachment to the server and returns the attachment ID. + + Args: + attachment_type (str): The type of the attachment (e.g., 'image', 'video', 'audio', 'file'). + attachment_path (str): The local file path to the attachment. + + Returns: + str: The ID of the uploaded attachment if successful, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> upload_attachment("image", "path/to/image.png") + "1234567897654321" + """ + + attachments_url = f"https://graph.facebook.com/v20.0/{self.page_id}/message_attachments" + attachment = Path(attachment_path) + mimetype, _ = mimetypes.guess_type(attachment) + + header = { + "Authorization": f"Bearer {self.access_token}" + } + message = { + "attachment": { + "type": attachment_type, + "payload": { + "is_reusable": "true" + } + } + } + file = { + "filedata": (attachment.name, attachment.open('rb'), mimetype) + } + body = {"message": str(message)} + + try: + r = requests.post(attachments_url, headers=header, files=file, data=body, timeout=20) + r.raise_for_status() + logger.info("Attachment uploaded successfully - %d", 200) + attachment_id = r.json()["attachment_id"] + return attachment_id + except (RequestException, IndexError, KeyError) as e: + logger.error("%s - %d", e, 403) + return None + + + def get_url_attachment(self, data: Dict) -> Optional[str]: + """ + Extracts the URL of an attachment from the provided data. + + Args: + data (Dict): The data containing the attachment information. + + Returns: + Optional[str]: The URL of the attachment if found, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> get_url_attachment(request.get_json()) + "https://example.com/image.png" + """ + + try: + return data['entry'][0]['messaging'][0]['message']['attachments'][0]["payload"]["url"] + except (IndexError, KeyError) as e: + logger.error("Error accessing attachment url: %s", e) + return None + + + def get_attachment_type(self, data: Dict) -> Optional[str]: + """ + Extracts the type of an attachment from the provided data. + + Args: + data (Dict): The data containing the attachment information. + + Returns: + Optional[str]: The type of the attachment if found, otherwise None. + + Example with a Webhook made with Flask, will be used as follows: + >>> get_attachment_type(request.get_json()) + "image" + """ + + try: + return data['entry'][0]['messaging'][0]['message']['attachments'][0]["type"] + except (IndexError, KeyError) as e: + logger.error("Error accessing attachment type: %s", e) + return None + + + def send_attachment(self, sender_id: str, attachment_type: str, attachment_url: str) -> Optional[Dict]: + """ + Sends an attachment to a user. + + Args: + sender_id (str): The ID of the recipient. + attachment_type (str): The type of the attachment (e.g., 'image', 'video', 'audio', 'file'). + attachment_url (str): The URL of the attachment to be sent. + + Returns: + Optional[Dict]: The response from the server if the request is successful, otherwise None. + + Example: + >>> send_attachment(sender_id, "image", "https://example.com/image.png") + """ + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "messaging_type": "RESPONSE", + "message": { + "attachment": { + "type": attachment_type, + "payload": { + "url": attachment_url, + "is_reusable": True + } + } + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=15) + r.raise_for_status() + logger.info("Attachment sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def send_local_attachment(self, sender_id: str, attachment_type: str, attachment_path: str) -> Optional[Dict]: + """ + Sends a local attachment to a user. + + Args: + sender_id (str): The ID of the recipient. + attachment_type (str): The type of the attachment (e.g., 'image', 'video', 'audio', 'file'). + attachment_path (str): The local path to the attachment to be sent. + + Returns: + Optional[Dict]: The response from the server if the request is successful, otherwise None. + + Example: + >>> send_local_attachment(sender_id, "image", "path/to/image.png") + """ + + attachment = Path(attachment_path) + mimetype, _ = mimetypes.guess_type(attachment) + + recipient = {"id": sender_id} + message = { + "attachment": { + "type": attachment_type, + "payload": { + "is_reusable": "true" + } + } + } + + header = {"Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": str(recipient), + "message": str(message) + } + file = { + "filedata": (attachment.name, attachment.open('rb'), mimetype) + } + + try: + r = requests.post(self.__url, headers=header, data=body, files=file, timeout=15) + r.raise_for_status() + logger.info("Attachment sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def download_attachment(self, attachment_url: str, path_dest: str) -> None: + """ + Downloads an attachment from the given URL and saves it to the specified destination path. + + Args: + attachment_url (str): The URL of the attachment to be downloaded. + path_dest (str): The local file path where the attachment will be saved. + + Returns: + None + + Example: + >>> download_attachment("https://example.com/image.png", "path/to/image.png") + """ + + try: + r = requests.get(attachment_url, stream=True, timeout=10) + r.raise_for_status() + with open(path_dest, 'wb') as file: + for chunk in r.iter_content(1024): + file.write(chunk) + logger.info("Downloaded attachment successfully to \"%s\" - %d", path_dest, 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def send_quick_reply(self, sender_id: str, message: Union[str, int], quick_replies: List[Dict]) -> Optional[Dict]: + """ + Sends a quick reply message to the specified sender. + + Args: + sender_id (str): The ID of the recipient. + message (Union[str, int]): The message to be sent. + quick_replies (list): A list of quick reply options. The list should contain less than 13 items. + + Returns: + Optional[Dict]: The response from the server if the request was successful, otherwise None. + + Example: + >>> send_quick_reply(sender_id, "Select an option", [{'content_type': 'text', 'title': 'Hello', 'payload': '', 'image_url': None}]) + Using the quick_buttons() function from pynani: + >>> send_quick_reply(sender_id, "Select an option", quick_buttons(["Hello", "World", "馃挬"])) + Usiing the quick_buttons_image() function from pynani: + >>> send_quick_reply(sender_id, "Select an option", quick_buttons_image(["Hello", "World", "馃挬"], ["https://example.com/hello.png", "https://example.com/world.png", "https://example.com/poop.png"])) + """ + + if len(quick_replies) > 13: + logger.warning("Quick replies should be less than 13") + quick_replies = quick_replies[:13] + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "messaging_type": "RESPONSE", + "message": { + "text": message, + "quick_replies": quick_replies + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=10) + r.raise_for_status() + logger.info("Quick reply sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def send_button_template(self, sender_id: str, message: str, buttons: List[Dict]) -> Optional[Dict]: + """ + Sends a button template message to the specified sender. + + Args: + sender_id (str): The ID of the recipient. + message (str): The message to be sent. + buttons (list): A list of button options. The list should contain less than 3 items. + + Returns: + Optional[Dict]: The response from the server if the request was successful, otherwise None. + + Example: + >>> send_button_template(sender_id, "Select an option", [{'type': 'postback', 'title': 'Hello', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', 'url': ''}]) + Using the basic_buttons() function from pynani: + >>> send_button_template(sender_id, "Select an option", basic_buttons(["Hello", "World", "馃"])) + Using the exit_buttons() function from pynani: + >>> send_button_template(sender_id, "Select an option", exit_buttons([{"title": "Exit", "url": "https://google.com"}, {"title": "Call me", "call_number": "+525555555555"}])) + """ + + if len(buttons) > 3: + logger.warning("Buttons template should be less than 3") + buttons = buttons[:3] + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "messaging_type": "RESPONSE", + "message": { + "attachment": { + "type": "template", + "payload": { + "template_type": "button", + "text": message, + "buttons": buttons + } + } + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=10) + r.raise_for_status() + logger.info("Button template sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def send_media_template(self, sender_id: str, media_type: str, attachment_id: str, buttons: List[Dict]) -> Optional[Dict]: + """ + Sends a media template message to the specified sender. + + Args: + sender_id (str): The ID of the recipient. + media_type (str): The type of media to be sent (e.g., 'image', 'video', 'audio', 'file'). + attachment_id (str): The ID of the attachment to be sent. + buttons (list): A list of button options. The list should contain less than 3 items. + + Returns: + Optional[Dict]: The response from the server if the request was successful, otherwise None. + + Example: + >>> send_media_template(sender_id, "image", "1234567897654321", [{'type': 'web_url', 'title': 'Hello', 'payload': '', 'url': 'https://example.com/hello.png'}]) + Using the basic_buttons() or exit_buttons() functions from pynani: + >>> send_media_template(sender_id, "image", "1234567897654321", basic_buttons(["Hello", "World", "馃懟"])) + """ + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "messaging_type": "RESPONSE", + "message": { + "attachment": { + "type": "template", + "payload": { + "template_type": "media", + "elements": [ + { + "media_type": media_type, + "attachment_id": attachment_id, + "buttons": buttons + } + ] + } + } + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=10) + r.raise_for_status() + logger.info("Media template sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def send_generic_template(self, sender_id: str, title: str, image_url: Optional[str] = None, default_url: Optional[str] = None, + subtitle: Optional[str] = None, buttons: Optional[List] = None) -> Optional[Dict]: + """ + Sends a generic template message to the specified sender. + + Args: + sender_id (str): The ID of the recipient. + title (str): The title of the template. + image_url (Optional[str], optional): The URL of the image to be displayed. Defaults to None. + default_url (Optional[str], optional): The URL for the default action. Defaults to None. + subtitle (Optional[str], optional): The subtitle of the template. Defaults to None. + buttons (Optional[List], optional): A list of button options. Defaults to None. + + Returns: + Optional[Dict]: The response from the server if the request was successful, otherwise None. + + Example: + >>> send_generic_template(sender_id, "Hello", "https://example.com/hello.png", "https://example.com", "World", [{'type': 'web_url', 'title': 'Hello', 'payload': '', 'url': 'https://example.com/hello.png'}]) + Using the basic_buttons() or exit_buttons() functions from pynani: + >>> send_generic_template(sender_id, "Hello", "https://example.com/hello.png", "https://example.com", "World", basic_buttons(["Hello", "World", "馃懡"])) + """ + + if default_url: + default_action = { + "type": "web_url", + "url": default_url, + "messenger_extensions": "false", + "webview_height_ratio": "tall" + } + else: + default_action = None + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "message": { + "attachment": { + "type": "template", + "payload": { + "template_type": "generic", + "elements": [ + { + "title": title, + "image_url": image_url if image_url else "", + "subtitle": subtitle if subtitle else "", + "default_action": default_action, + "buttons": buttons if buttons else [] + } + ] + } + } + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=10) + r.raise_for_status() + logger.info("Generic template sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None + + + def send_receipt_template(self, sender_id: str, order_number: str, payment_method: str, summary: Dict, currency: str = 'USD', + order_url: Optional[str] = None, timestamp: Optional[str] = None, address: Optional[Dict] = None, + adjustments: Optional[List] = None, elements: Optional[List] = None) -> Optional[Dict]: + """ + Sends a receipt template message to the specified sender. + + Args: + sender_id (str): The ID of the recipient. + order_number (str): The order number of the transaction. + payment_method (str): The payment method used. + summary (Dict): A dictionary containing the summary of the transaction. + currency (str, optional): The currency used in the transaction. Defaults to 'USD'. + order_url (Optional[str], optional): The URL of the order. Defaults to None. + timestamp (Optional[str], optional): The timestamp of the transaction. Defaults to None. + address (Optional[Dict], optional): The address of the recipient. Defaults to None. + adjustments (Optional[List], optional): A list of adjustments made to the order. Defaults to None. + elements (Optional[List], optional): A list of elements in the order. Defaults to None. + + Returns: + Optional[Dict]: The response from the server if the request was successful, otherwise None. + + Example: + >>> send_receipt_template(sender_id, "123456789", "Credit Card", + {"subtotal": 75.00, + "shipping_cost": 4.95, + "total_tax": 6.19, + "total_cost": 56.14}, + "USD", "https://example.com/order/123456789", "123456789", + {"street_1": "1 Hacker Way", + "city": "Menlo Park", + "postal_code": "94025", + "state": "CA", + "country": "US"}, + [{"name": "New Customer Discount", "amount": 20}], + [{"title": "Classic White T-Shirt", + "subtitle": "100% Soft and Luxurious Cotton", + "quantity": 2, "price": 50, + "currency": "USD", + "image_url": "https://example.com/classic-white-t-shirt"}]) + Using the get_address(), get_summary(), get_adjustments(), and get_elements() functions from pynani: + >>> address = get_address("123 Main St", "Springfield", "12345", "IL", "US") + >>> adjustments = get_adjustments("New Customer Discount", 20, "Black Friday", 34) + >>> summary = get_summary(56.14) + >>> elements = get_elements("T-Shirt", 20.0) + >>> send_receipt_template(sender_id, "123456789", "Credit Card", + summary=summary, + currency="USD", + order_url="https://example.com/order/123456789", + timestamp="123456789", + address=address, + adjustments=adjustments, + elements=elements) + """ + + header = {"Content-Type": "application/json", + "Authorization": f"Bearer {self.access_token}"} + body = { + "recipient": { + "id": sender_id + }, + "message": { + "attachment": { + "type": "template", + "payload": { + "template_type": "receipt", + "recipient_name": "Stephane Crozatier", + "order_number": order_number, + "currency": currency, + "payment_method": payment_method, + "order_url": order_url, + "timestamp": timestamp, + "address": address, + "summary": summary, + "adjustments": adjustments, + "elements": elements + } + } + } + } + + try: + r = requests.post(self.__url, headers=header, json=body, timeout=10) + r.raise_for_status() + logger.info("Receipt template sent successfully - %d", 200) + return jsonify(r.json(), 200) + except RequestException as e: + logger.error("%s - %d", e, 403) + return None diff --git a/pynani/__init__.py b/pynani/__init__.py new file mode 100644 index 0000000..f6bd973 --- /dev/null +++ b/pynani/__init__.py @@ -0,0 +1,4 @@ +from .Messenger import Messenger, get_long_lived_token +from .utils.quick_reply import quick_buttons, quick_image_buttons +from .utils.buttons import basic_buttons, exit_buttons +from .utils.receipt import get_address, get_elements, get_adjustments, get_summary diff --git a/pynani/utils/__init__.py b/pynani/utils/__init__.py new file mode 100644 index 0000000..b252b9f --- /dev/null +++ b/pynani/utils/__init__.py @@ -0,0 +1 @@ +from .logs import get_logger \ No newline at end of file diff --git a/pynani/utils/buttons.py b/pynani/utils/buttons.py new file mode 100644 index 0000000..76a50aa --- /dev/null +++ b/pynani/utils/buttons.py @@ -0,0 +1,106 @@ +from typing import Optional, Union, List, Dict +from .logs import get_logger + + +logger = get_logger(__name__) + +def __make_button( title: str, url: Optional[str] = None, call_number: Optional[str] = None) -> Dict: + """ + Creates a button with the specified title and optional URL or call number. + + Args: + title (str): The title of the button. + url (Optional[str], optional): The URL associated with the button. Defaults to None. + call_number (Optional[str], optional): The call number associated with the button. Defaults to None. + + Returns: + Dict: A dictionary representing the button with its properties. + + Raises: + ValueError: If both URL and call number are provided. + + Example: + >>> __make_button("Hola", "https://www.google.com") + {'type': 'web_url', 'title': 'Hola', 'payload': '', 'url': 'https://www.google.com'} + >>> __make_button("Mundo", call_number="+525555555555") + {'type': 'phone_number', 'title': 'Mundo', 'payload': '+525555555555', 'url': ''} + >>> __make_button("Hello") + {'type': 'postback', 'title': 'Hello', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', 'url': ''} + """ + + if url is not None and call_number is not None: + logger.error("You can't have both url and call_number at the same time") + raise ValueError("You can't have both url and call_number at the same time") + + if url: + type_button = "web_url" + url_button = url + payload_button = "" + elif call_number: + type_button = "phone_number" + url_button = "" + payload_button = call_number + elif not url and not call_number: + type_button = "postback" + url_button = "" + payload_button = "DEVELOPER_DEFINED_PAYLOAD" + + return { + "type": type_button, + "title": title, + "payload": payload_button, + "url": url_button, + } + +def basic_buttons( buttons: Union[str, List[Union[str, int]]]) -> List: + """ + Creates a list of basic buttons. + + Args: + buttons (Union[str, List[Union[str, int]]]): The buttons to be created. It can be a string or a list of strings or integers. + + Returns: + List: A list of dictionaries representing the basic buttons with their properties. + + Example: + >>> basic_buttons(["Hello", "World", "馃敟"]) + [{'type': 'postback', 'title': 'Hello', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', 'url': ''}, + {'type': 'postback', 'title': 'World', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', 'url': ''}, + {'type': 'postback', 'title': '馃敟', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', 'url': ''}] + >>> basic_buttons("Hello") + [{'type': 'postback', 'title': 'Hello', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', 'url': ''}] + """ + + if isinstance(buttons, str): + return [__make_button(buttons)] + else: + if len(buttons) > 3: + logger.warning("Buttons template should be less than 3") + buttons = buttons[:3] + return [__make_button(button) for button in buttons] + +def exit_buttons( buttons: Union[Dict, List[Dict]]) -> List: + """ + Creates a list of leave buttons. + + Args: + buttons (Union[Dict, List[Dict]]): The buttons to be created. It can be a dictionary or a list of dictionaries. + + Returns: + List: A list of dictionaries representing the leave buttons with their properties. + + Example: + >>> leave_buttons([{"title": "Hello", "url": "https://www.google.com"}, {"title": "World", "call_number": "+525555555555"}]) + [{'type': 'web_url', 'title': 'Hello', 'payload': '', 'url': 'https://www.google.com'}, + {'type': 'phone_number', 'title': 'World', 'payload': '+525555555555', 'url': ''}] + >>> leave_buttons({"title": "Hello", "url": "https://www.google.com"}) + [{'type': 'web_url', 'title': 'Hello', 'payload': '', 'url': 'https://www.google.com'}] + """ + + if isinstance(buttons, dict): + return [__make_button(**buttons)] + else: + if len(buttons) > 3: + logger.warning("Buttons template should be less than 3") + buttons = buttons[:3] + return [__make_button(**button) for button in buttons] diff --git a/pynani/utils/logs.py b/pynani/utils/logs.py new file mode 100644 index 0000000..c7237a9 --- /dev/null +++ b/pynani/utils/logs.py @@ -0,0 +1,26 @@ +"""This file is used to configure the logger for the project""" + +import logging +from colorlog import ColoredFormatter + + +def get_logger(name: str) -> logging.Logger: + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + formatter = ColoredFormatter( + "%(log_color)s%(levelname)s: %(name)s [%(asctime)s] -- %(message)s", + datefmt='%d/%m/%Y %H:%M:%S', + log_colors={ + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'bold_red', + } + ) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + console_handler.setFormatter(formatter) + + logger.addHandler(console_handler) + return logger diff --git a/pynani/utils/quick_reply.py b/pynani/utils/quick_reply.py new file mode 100644 index 0000000..a8d74eb --- /dev/null +++ b/pynani/utils/quick_reply.py @@ -0,0 +1,89 @@ +from typing import Optional, Union, List, Dict +from .logs import get_logger + + +logger = get_logger(__name__) + +def __make_quick_button(text: Union[str, int], image_url: Optional[str] = None) -> Dict: + """ + Creates a quick reply button with the specified text, optional image URL, and payload. + + Args: + text (Union[str, int]): The text or integer to be displayed on the button. + image_url (Optional[str], optional): The URL of the image to be displayed on the button. Defaults to None. + + Returns: + Dict: A dictionary representing the quick reply button with its properties. + + Example: + >>> __make_quick_button("Hello") + {'content_type': 'text', 'title': 'Hello', 'payload': '', 'image_url': ''} + >>> __make_quick_button("World", "https://photos.com/world.jpg") + {'content_type': 'text', 'title': 'World', 'payload': '', 'image_url': 'https://photos.com/world.jpg'} + """ + + return { + "content_type": "text", + "title": text, + "payload": "", + "image_url": image_url, + } + +def quick_buttons(buttons: List[Union[str, int]]) -> List[Dict]: + """ + Prepares a list of quick reply buttons from a list of strings. + + Args: + buttons (List[Union[str, int]]): A list of strings or integers representing the text for each quick reply button. + + Returns: + List: A list of dictionaries representing the quick reply buttons with their properties. + + Example: + >>> quick_buttons(["Hello", "World"]) + [{'content_type': 'text', 'title': 'Hello', 'payload': '', 'image_url': None}, + {'content_type': 'text', 'title': 'World', 'payload': '', 'image_url': None}] + >>> quick_buttons([1, 2, 3]) + [{'content_type': 'text', 'title': '1', 'payload': '', 'image_url': None}, + {'content_type': 'text', 'title': '2', 'payload': '', 'image_url': None}, + {'content_type': 'text', 'title': '3', 'payload': '', 'image_url': None}] + """ + + r_buttons = [] + if len(buttons) > 13: + logger.warning("Quick replies should be less than 13") + buttons = buttons[:13] + + for b in buttons: + r_buttons.append(__make_quick_button(b)) + + return r_buttons + +def quick_image_buttons(buttons: List[Union[str, int]], images: List[str]) -> List[Dict]: + """ + Prepares a list of quick reply buttons from a list of strings. + + Args: + buttons (List[str]): A list of strings or integers representing the text for each quick reply button. + images (List[str]): A list of strings representing the image URL for each quick reply button. + + Returns: + List: A list of dictionaries representing the quick reply buttons with their properties. + + Example: + >>> quick_image_buttons(["Hello", "World"], ["https://photos.com/hello.jpg", "https://photos.com/world.jpg"]) + [{'content_type': 'text', 'title': 'Hello', 'payload': '', 'image_url': 'https://photos.com/hello.jpg'}, + {'content_type': 'text', 'title': 'World', 'payload': '', 'image_url': 'https://photos.com/world.jpg'}] + If a button does not have an image, the image URL should be None or an empty string. + >>> quick_image_buttons(["Hello", "World", "Yes"], ["https://photos.com/hello.jpg", "https://photos.com/world.jpg", None]) + """ + + r_buttons = [] + if len(buttons) > 13: + logger.warning("Quick replies should be less than 13") + buttons = buttons[:13] + + for b, i in zip(buttons, images): + r_buttons.append(__make_quick_button(b, i)) + + return r_buttons diff --git a/pynani/utils/receipt.py b/pynani/utils/receipt.py new file mode 100644 index 0000000..7aa415b --- /dev/null +++ b/pynani/utils/receipt.py @@ -0,0 +1,126 @@ +from typing import Optional, Union, List, Dict + + +def get_address(street_1: str, city: str, postal_code: str, state: str, + country: str, street_2: Optional[str] = None) -> Dict: + """ + Constructs an address dictionary from the provided parameters. + + Args: + street_1 (str): The first line of the street address. + city (str): The city of the address. + postal_code (str): The postal code of the address. + state (str): The state or province of the address. + country (str): The country of the address. + street_2 (Optional[str], optional): The second line of the street address. Defaults to None. + + Returns: + Dict: A dictionary representing the address with its components. + + Example: + >>> get_address("123 Main St", "Springfield", "12345", "IL", "US", "Apt 1") + {'street_1': '123 Main St', 'street_2': "Apt 1", 'city': 'Springfield', 'postal_code': '12345', 'state': 'IL', 'country': 'US'} + Optional parameters can be omitted: + >>> get_address("123 Main St", "Springfield", "12345", "IL", "US") + {'street_1': '123 Main St', 'street_2': None, 'city': 'Springfield', 'postal_code': '12345', 'state': 'IL', 'country': 'US'} + """ + + return { + "street_1": street_1, + "street_2": street_2, + "city": city, + "postal_code": postal_code, + "state": state, + "country": country + } + + +def get_summary(total_cost: float, subtotal: Optional[float] = None, + shipping_cost: Optional[float] = None, total_tax: Optional[float] = None) -> Dict: + """ + Constructs a summary dictionary from the provided parameters. + + Args: + total_cost (float): The total cost of the order. + subtotal (Optional[float], optional): The subtotal of the order. Defaults to None. + shipping_cost (Optional[float], optional): The shipping cost of the order. Defaults to None. + total_tax (Optional[float], optional): The total tax of the order. Defaults to None. + + Returns: + Dict: A dictionary representing the summary with its components. + + Example: + >>> get_summary(100.0, 90.0, 5.0, 5.0) + {'subtotal': 90.0, 'shipping_cost': 5.0, 'total_tax': 5.0, 'total_cost': 100.0} + Optional parameters can be omitted: + >>> get_summary(100.0) + {'subtotal': None, 'shipping_cost': None, 'total_tax': None, 'total_cost': 100.0} + """ + + return { + "subtotal": subtotal, + "shipping_cost": shipping_cost, + "total_tax": total_tax, + "total_cost": total_cost + } + + +def get_adjustments(*args: Union[str, int, float]) -> List[Dict]: + """ + Constructs a list of adjustments from the provided arguments. + + Args: + *args (Union[str, int, float]): A variable number of arguments, where each pair represents an adjustment name and amount. + + Returns: + List[Dict]: A list of dictionaries, each representing an adjustment with its name and amount. + + Example: + >>> get_adjustments("Discount", 10, "Tax", 20, "Shipping", 30) + [{'name': 'Discount', 'amount': 10}, {'name': 'Tax', 'amount': 20}, {'name': 'Shipping', 'amount': 30}] + """ + + adjustments = [] + for i in range(0, len(args), 2): + adjustment = { + "name": args[i], + "amount": args[i + 1] + } + adjustments.append(adjustment) + return adjustments + + +def get_elements(title: str, price: float, subtitle: Optional[str] = None, + quantity: Optional[int] = None, currency: Optional[str] = 'USD', + image_url: Optional[str] = None) -> List[Dict]: + """ + Constructs a list of elements from the provided parameters. + + Args: + title (str): The title of the item. + price (float): The price of the item. + subtitle (Optional[str], optional): The subtitle of the item. Defaults to None. + quantity (Optional[int], optional): The quantity of the item. Defaults to None. + currency (Optional[str], optional): The currency of the item. Defaults to 'USD'. + image_url (Optional[str], optional): The URL of the item's image. Defaults to None. + + Returns: + List: A list of dictionaries representing the elements with their titles, prices, subtitles, quantities, currencies, and image URLs. + + Example: + >>> get_elements("T-Shirt", 20.0, "Blue", 2, "USD", "https://example.com/tshirt.jpg") + [{'title': 'T-Shirt', 'subtitle': 'Blue', 'quantity': 2, 'price': 20.0, 'currency': 'USD', 'image_url': 'https://example.com/tshirt.jpg'}] + Optional parameters can be omitted: + >>> get_elements("T-Shirt", 20.0) + [{'title': 'T-Shirt', 'subtitle': None, 'quantity': None, 'price': 20.0, 'currency': 'USD', 'image_url': None}] + """ + + item = { + "title": title, + "subtitle": subtitle, + "quantity": quantity, + "price": price, + "currency": currency, + "image_url": image_url + } + return [item] diff --git a/pyproject.toml b/pyproject.toml index d5dc5d4..b16b949 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,23 +1,40 @@ [build-system] -requires = ["setuptools>=42", "wheel"] build-backend = "setuptools.build_meta" +requires = ["setuptools >= 61.0", "wheel"] [project] -name = "pynani" -version = "1.1.0" -description = "A package to wrap the Messenger API" -readme = "README.md" authors = [ - { name = "Jorge Juarez", email = "jorgeang33@gmail.com" }, + {name = "Jorge Juarez", email = "jorgeang33@gmail.com"}, ] -license = { text = "MIT" } classifiers = [ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: MIT License", - "Operating System :: OS Independent", + "Intended Audience :: Developers", + "Intended Audience :: Customer Service", + "Topic :: Software Development :: Build Tools", + "Topic :: Communications :: Chat", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Software Development :: Version Control :: Git", + "Topic :: Utilities", + "Topic :: Documentation :: Sphinx", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", ] dependencies = [ - "requests>=2.32.3", + "requests>=2.32.3", + "colorlog>=6.8.2", ] +description = "A package to wrap the Messenger API" +keywords = ["Messenger", "wrapper", "Meta", "API", "Facebook", "Pynani", "PyMessenger"] +license = {file = "LICENSE"} +name = "pynani" +readme = "README.md" +requires-python = ">=3.10" +version = "1.4.0" + [project.urls] -"Homepage" = "https://github.com/jorge-jrzz/Pynani/tree/main" \ No newline at end of file +"Bug Tracker" = "https://github.com/jorge-jrzz/Pynani/issues" +Repository = "https://github.com/jorge-jrzz/Pynani"