Skip to content

Commit

Permalink
[Community] init TV email config
Browse files Browse the repository at this point in the history
  • Loading branch information
GuillaumeDSM committed Oct 3, 2024
1 parent 6341d36 commit 011303e
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 16 deletions.
48 changes: 39 additions & 9 deletions octobot/community/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import typing

import octobot.constants as constants
import octobot.enums as enums
import octobot.community.errors as errors
import octobot.community.identifiers_provider as identifiers_provider
import octobot.community.models.community_supports as community_supports
Expand Down Expand Up @@ -180,6 +181,11 @@ def get_user_id(self):
raise authentication.AuthenticationRequired()
return self.user_account.get_user_id()

def get_last_email_address_confirm_code_email_content(self) -> typing.Optional[str]:
if not self.user_account.has_user_data():
raise authentication.AuthenticationRequired()
return self.user_account.last_email_address_confirm_code_email_content

async def get_deployment_url(self):
deployment_url_data = await self.supabase_client.fetch_deployment_url(
self.user_account.get_selected_bot_deployment_id()
Expand Down Expand Up @@ -291,12 +297,15 @@ async def _create_community_feed_if_necessary(self) -> bool:
return True
return False

async def _ensure_init_community_feed(self):
async def _ensure_init_community_feed(
self,
stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]=None
):
await self._create_community_feed_if_necessary()
if not self._community_feed.is_connected() and self._community_feed.can_connect():
if self.initialized_event is not None and not self.initialized_event.is_set():
await asyncio.wait_for(self.initialized_event.wait(), self.LOGIN_TIMEOUT)
await self._community_feed.start()
await self._community_feed.start(stop_on_cfg_action)

async def register_feed_callback(self, channel_type: commons_enums.CommunityChannelTypes, callback, identifier=None):
try:
Expand All @@ -305,6 +314,14 @@ async def register_feed_callback(self, channel_type: commons_enums.CommunityChan
except errors.BotError as e:
self.logger.error(f"Impossible to connect to community signals: {e}")

async def trigger_wait_for_email_address_confirm_code_email(self):
if not self.get_owned_packages():
raise errors.ExtensionRequiredError(
f"The {constants.OCTOBOT_EXTENSION_PACKAGE_1_NAME} is required to use TradingView email alerts"
)
await self._ensure_init_community_feed(enums.CommunityConfigurationActions.EMAIL_CONFIRM_CODE)


async def send(self, message, channel_type, identifier=None):
"""
Sends a message
Expand Down Expand Up @@ -506,6 +523,8 @@ def remove_login_detail(self):
self.user_account.flush()
self._reset_login_token()
self._save_bot_id("")
self.save_tradingview_email("")
self.save_mqtt_device_uuid("")
self.logger.debug("Removed community login data")

async def stop(self):
Expand Down Expand Up @@ -606,7 +625,9 @@ async def fetch_private_data(self, reset=False):
self.logger.info("Community extension check is disabled")
elif reset or (not self.user_account.community_package_urls or not mqtt_uuid):
self.successfully_fetched_tentacles_package_urls = False
packages, package_urls, fetched_mqtt_uuid = await self._fetch_package_urls(mqtt_uuid)
packages, package_urls, fetched_mqtt_uuid, tradingview_email = (
await self._fetch_extensions_details(mqtt_uuid)
)
self.successfully_fetched_tentacles_package_urls = True
self.user_account.owned_packages = packages
self.save_installed_package_urls(package_urls)
Expand All @@ -620,6 +641,8 @@ async def fetch_private_data(self, reset=False):
self.user_account.has_pending_packages_to_install = True
if fetched_mqtt_uuid and fetched_mqtt_uuid != mqtt_uuid:
self.save_mqtt_device_uuid(fetched_mqtt_uuid)
if tradingview_email and tradingview_email != self.get_saved_tradingview_email():
self.save_tradingview_email(tradingview_email)
except Exception as err:
self.logger.exception(err, True, f"Unexpected error when fetching package urls: {err}")
finally:
Expand All @@ -630,12 +653,12 @@ async def fetch_private_data(self, reset=False):
# fetch indexes as well
await self._refresh_products()

async def _fetch_package_urls(self, mqtt_uuid: typing.Optional[str]) -> (list[str], str):
async def _fetch_extensions_details(self, mqtt_uuid: typing.Optional[str]) -> (list[str], list[str], str, str):
self.logger.debug(f"Fetching extension package details")
extensions_details = await self.supabase_client.fetch_extensions(mqtt_uuid)
self.logger.debug("Fetched extension package details")
if not extensions_details:
return None, None, None
return None, None, None, None
packages = [
package
for package in extensions_details["paid_package_slugs"]
Expand All @@ -647,7 +670,8 @@ async def _fetch_package_urls(self, mqtt_uuid: typing.Optional[str]) -> (list[st
if url
]
mqtt_id = extensions_details["mqtt_id"]
return packages, urls, mqtt_id
tradingview_email = extensions_details["tradingview_email"]
return packages, urls, mqtt_id, tradingview_email

async def fetch_checkout_url(self, payment_method: str, redirect_url: str):
try:
Expand Down Expand Up @@ -680,21 +704,27 @@ def _reset_login_token(self):
def save_installed_package_urls(self, package_urls: list[str]):
self._save_value_in_config(constants.CONFIG_COMMUNITY_PACKAGE_URLS, package_urls)

def save_mqtt_device_uuid(self, mqtt_uuid):
def save_tradingview_email(self, tradingview_email: str):
self._save_value_in_config(constants.CONFIG_COMMUNITY_TRADINGVIEW_EMAIL, tradingview_email)

def save_mqtt_device_uuid(self, mqtt_uuid: str):
self._save_value_in_config(constants.CONFIG_COMMUNITY_MQTT_UUID, mqtt_uuid)

def get_saved_package_urls(self) -> list[str]:
return self._get_value_in_config(constants.CONFIG_COMMUNITY_PACKAGE_URLS) or []

def get_saved_mqtt_device_uuid(self):
def get_saved_mqtt_device_uuid(self) -> str:
if mqtt_uuid := self._get_value_in_config(constants.CONFIG_COMMUNITY_MQTT_UUID):
return mqtt_uuid
raise errors.NoBotDeviceError("No MQTT device ID has been set")

def get_saved_tradingview_email(self) -> str:
return self._get_value_in_config(constants.CONFIG_COMMUNITY_TRADINGVIEW_EMAIL)

def _save_bot_id(self, bot_id):
self._save_value_in_config(constants.CONFIG_COMMUNITY_BOT_ID, bot_id)

def _get_saved_bot_id(self):
def _get_saved_bot_id(self) -> str:
return constants.COMMUNITY_BOT_ID or self._get_value_in_config(constants.CONFIG_COMMUNITY_BOT_ID)

def _save_value_in_config(self, key, value):
Expand Down
4 changes: 3 additions & 1 deletion octobot/community/feeds/abstract_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import time
import typing

import octobot.enums as enums
import octobot_commons.logging as bot_logging


Expand All @@ -35,7 +37,7 @@ def __init__(self, feed_url, authenticator):
def has_registered_feed(self) -> bool:
return bool(self.feed_callbacks)

async def start(self):
async def start(self, stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]):
raise NotImplementedError("start is not implemented")

async def stop(self):
Expand Down
55 changes: 52 additions & 3 deletions octobot/community/feeds/community_mqtt_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import octobot.community.errors as errors
import octobot.community.feeds.abstract_feed as abstract_feed
import octobot.constants as constants
import octobot.enums as enums


def _disable_gmqtt_info_loggers():
Expand Down Expand Up @@ -69,12 +70,19 @@ def __init__(self, feed_url, authenticator):
self._connected_at_least_once = False
self._processed_messages = []

async def start(self):
self._default_callbacks_by_subscription_topic = self._build_default_callbacks_by_subscription_topic()
self._stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions] = None

async def start(self, stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]):
if self.is_connected():
self.logger.info("Already connected")
return
self.should_stop = False
try:
await self._connect()
if self.is_connected():
self.logger.info("Successful connection request to mqtt device")
self._stop_on_cfg_action = stop_on_cfg_action
else:
self.logger.info("Failed to connect to mqtt device")
except asyncio.TimeoutError as err:
Expand Down Expand Up @@ -103,6 +111,7 @@ async def restart(self):

def _reset(self):
self._connected_at_least_once = False
self._stop_on_cfg_action = None
self._subscription_attempts = 0
self._connect_task = None
self._valid_auth = True
Expand All @@ -112,6 +121,41 @@ async def _stop_mqtt_client(self):
if self.is_connected():
await self._mqtt_client.disconnect()

def _get_default_subscription_topics(self) -> set:
"""
topics that are always to be subscribed
"""
return set(self._default_callbacks_by_subscription_topic)

def _build_default_callbacks_by_subscription_topic(self) -> dict:
return {
self._build_topic(
commons_enums.CommunityChannelTypes.CONFIGURATION,
self.authenticator.get_saved_mqtt_device_uuid()
): [self._config_feed_callback, ]
}

async def _config_feed_callback(self, data: dict):
"""
format:
{
"u": "ABCCD "v": "1.0.0",
-D11 ...",
"s": '{"action": "email_confirm_code", "code_email": "hello 123-1232"}',
}
"""
parsed_message = data[commons_enums.CommunityFeedAttrs.VALUE.value]
action = parsed_message["action"]
if action == enums.CommunityConfigurationActions.EMAIL_CONFIRM_CODE.value:
email_body = parsed_message["code_email"]
self.logger.info(f"Received email address confirm code:\n{email_body}")
self.authenticator.user_account.last_email_address_confirm_code_email_content = email_body
else:
self.logger.error(f"Unknown cfg message action: {action=}")
if action and self._stop_on_cfg_action and self._stop_on_cfg_action.value == action:
self.logger.info(f"Stopping after expected {action} configuration action.")
await self.stop()

def is_connected(self):
return self._mqtt_client is not None and self._mqtt_client.is_connected and not self._disconnected

Expand Down Expand Up @@ -182,9 +226,12 @@ async def send(self, message, channel_type, identifier, **kwargs):
raise NotImplementedError("Sending is not implemented")

def _get_callbacks(self, topic):
for callback in self.feed_callbacks.get(topic, ()):
for callback in self._get_feed_callbacks(topic):
yield callback

def _get_feed_callbacks(self, topic) -> list:
return self._default_callbacks_by_subscription_topic.get(topic, []) + self.feed_callbacks.get(topic, [])

def _get_channel_type(self, message):
return commons_enums.CommunityChannelTypes(message[commons_enums.CommunityFeedAttrs.CHANNEL_TYPE.value])

Expand All @@ -206,7 +253,7 @@ def _on_connect(self, client, flags, rc, properties):
# There are no subscription when we just connected
self.subscribed = False
# Auto subscribe to known topics (mainly used in case of reconnection)
self._subscribe(self._subscription_topics)
self._subscribe(self._subscription_topics.union(self._get_default_subscription_topics()))

def _try_reconnect_if_necessary(self, client):
if self._reconnect_task is None or self._reconnect_task.done():
Expand Down Expand Up @@ -312,6 +359,8 @@ def _get_username(client: gmqtt.Client) -> str:

async def _connect(self):
device_uuid = self.authenticator.get_saved_mqtt_device_uuid()
# ensure _default_callbacks_by_subscription_topic is up to date
self._default_callbacks_by_subscription_topic = self._build_default_callbacks_by_subscription_topic()
if device_uuid is None:
self._valid_auth = False
raise errors.BotError("mqtt device uuid is None, impossible to connect client")
Expand Down
4 changes: 3 additions & 1 deletion octobot/community/feeds/community_supabase_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import uuid
import json
import realtime
import typing
import packaging.version as packaging_version

import octobot_commons.enums as commons_enums
Expand All @@ -24,6 +25,7 @@
import octobot.community.supabase_backend.enums as enums
import octobot.community.feeds.abstract_feed as abstract_feed
import octobot.constants as constants
import octobot.enums as enums


class CommunitySupabaseFeed(abstract_feed.AbstractFeed):
Expand Down Expand Up @@ -76,7 +78,7 @@ async def _process_message(self, table: str, message: dict):
except Exception as err:
self.logger.exception(err, True, f"Unexpected error when processing message: {err}")

async def start(self):
async def start(self, stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]):
# handled in supabase client directly, just ensure no subscriptions are pending
for table in self.feed_callbacks:
await self._subscribe_to_table_if_necessary(table)
Expand Down
5 changes: 3 additions & 2 deletions octobot/community/feeds/community_ws_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import random
import time

import typing
import websockets
import asyncio
import enum
Expand All @@ -26,6 +26,7 @@
import octobot_commons.enums as commons_enums
import octobot_commons.authentication as authentication
import octobot.constants as constants
import octobot.enums as enums
import octobot.community.feeds.abstract_feed as abstract_feed
import octobot.community.identifiers_provider as identifiers_provider

Expand Down Expand Up @@ -55,7 +56,7 @@ def __init__(self, feed_url, authenticator):
self._reconnect_attempts = 0
self._last_ping_time = None

async def start(self):
async def start(self, stop_on_cfg_action: typing.Optional[enums.CommunityConfigurationActions]):
await self._ensure_connection()
if self.consumer_task is None or self.consumer_task.done():
self.consumer_task = asyncio.create_task(self.start_consumer())
Expand Down
2 changes: 2 additions & 0 deletions octobot/community/models/community_user_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self):
self.owned_packages: list[str] = []
self.has_pending_packages_to_install = False

self.last_email_address_confirm_code_email_content: typing.Optional[str] = None

self._profile_raw_data = None
self._selected_bot_raw_data = None
self._all_user_bots_raw_data = []
Expand Down
1 change: 1 addition & 0 deletions octobot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
CONFIG_COMMUNITY = "community"
CONFIG_COMMUNITY_BOT_ID = "bot_id"
CONFIG_COMMUNITY_MQTT_UUID = "mqtt_uuid"
CONFIG_COMMUNITY_TRADINGVIEW_EMAIL = "tradingview_email"
CONFIG_COMMUNITY_PACKAGE_URLS = "package_urls"
CONFIG_COMMUNITY_ENVIRONMENT = "environment"
USE_BETA_EARLY_ACCESS = os_util.parse_boolean_environment_var("USE_BETA_EARLY_ACCESS", "false")
Expand Down
4 changes: 4 additions & 0 deletions octobot/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class CommunityEnvironments(enum.Enum):
Production = "Production"


class CommunityConfigurationActions(enum.Enum):
EMAIL_CONFIRM_CODE = "email_confirm_code"


class OptimizerModes(enum.Enum):
NORMAL = "normal"
GENETIC = "genetic"
Expand Down

0 comments on commit 011303e

Please sign in to comment.