diff --git a/.github/workflows/python-package-conda.yml b/.github/workflows/python-package-conda.yml
new file mode 100644
index 0000000..7adb2dc
--- /dev/null
+++ b/.github/workflows/python-package-conda.yml
@@ -0,0 +1,24 @@
+name: Python Package using Conda
+
+on: [push]
+
+jobs:
+ build-linux:
+ runs-on: ubuntu-latest
+ strategy:
+ max-parallel: 5
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Python 3.10
+ uses: actions/setup-python@v3
+ with:
+ python-version: '3.10'
+ - name: Add conda to system path
+ run: |
+ # $CONDA is an environment variable pointing to the root of the miniconda directory
+ echo $CONDA/bin >> $GITHUB_PATH
+ - name: Install dependencies
+ run: |
+ conda update conda
+ conda env update --file environment.yml --name base
diff --git a/.gitignore b/.gitignore
index e9e1e9b..67d184f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
# Temporary and binary files
+./build-and-install.sh
*~
*.py[cod]
*.so
@@ -22,6 +23,7 @@ __pycache__/*
.idea
.vscode
tags
+config.ini
# Package files
*.egg
diff --git a/.readthedocs.yml b/.readthedocs.yml
index a2bcab3..65bdb8f 100644
--- a/.readthedocs.yml
+++ b/.readthedocs.yml
@@ -19,7 +19,7 @@ formats:
build:
os: ubuntu-22.04
tools:
- python: "3.11"
+ python: "3.10"
python:
install:
diff --git a/GShockTimeServer.iml b/GShockTimeServer.iml
new file mode 100644
index 0000000..ac86b81
--- /dev/null
+++ b/GShockTimeServer.iml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Pipfile.lock b/Pipfile.lock
deleted file mode 100644
index eb6410c..0000000
--- a/Pipfile.lock
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "_meta": {
- "hash": {
- "sha256": "fedbd2ab7afd84cf16f128af0619749267b62277b4cb6989ef16d4bef6e4eef2"
- },
- "pipfile-spec": 6,
- "requires": {
- "python_version": "3.10"
- },
- "sources": [
- {
- "name": "pypi",
- "url": "https://pypi.org/simple",
- "verify_ssl": true
- }
- ]
- },
- "default": {},
- "develop": {}
-}
diff --git a/README.rst b/README.rst
index 114f218..e2b8d8e 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@ GShockTimeServer
Overview:
=========
-This project allows you to setup time on your Casio G-Shock B5000/B5600/B2100 watches.
+This project allows you to set the correct time to your Casio G-Shock `B5600 `__ / `B5000 `_ / `B2100 `__ watches.
In addition, this repository provides an API for developing application for the above watches. This is WIP,
but you can take a look at the ``api_tests.py`` file on how to use the API.
@@ -14,14 +14,22 @@ Usage:
This app can run on any device with Python and Bluetooth capabilities - from a desktop to a Raspberry Pi Zero.
It has been tested on Linux OS only, but should be compatible with Windows as well.
+.. figure:: images/pizero.jpg
+ :alt: Pi Zero
+ :align: center
+ :width: 200px
+
+ The server running on a Pi Zero.
+
+Here is how to use it:
+
1. Go to /src/gshocktimeserver directory.
2. run:
- **python3 gshock_server.py [----multi-watch]**
- (the --multi-watch parameter is used if you have multiple watches)
+ **python3 gshock_server.py [--multi-watch]** (the --multi-watch parameter is used if you have multiple watches)
-3. To set the time on your G-Shock, press the ``lower-right`` button and the watch will connect to the app, allowing the app to set the watch's time.
+3. To set the time on your G-Shock, short-press the ``lower-right`` button and the watch will connect to the app, allowing the app to set the watch's time.
4. If AUTO TIME ADJUSTEMENT is enabled on the watch, it will sync up to 4 times daily with the app and adjust its time accordingly.
@@ -34,13 +42,11 @@ Install the following dependencies:
pip3 install bleak
- pip3 install reactivex
-
Troubleshooting:
================
If your watch cannot connect, and the ``--multi-watch`` parameter is not used, remove the "config.ini" file and try again.
To Do:
======
-We are working on a professianal installition.
+We are working on a professional installation.
diff --git a/build-and-install.sh b/build-and-install.sh
new file mode 100755
index 0000000..66eeddf
--- /dev/null
+++ b/build-and-install.sh
@@ -0,0 +1,6 @@
+rm -rf ./dist
+python3 setup.py bdist_wheel
+scp ./dist/*.whl pi@pizero:/tmp/gshocktimeserver.zip
+ssh pi@pizero /home/pi/update.sh
+
+
diff --git a/config.ini b/config.ini
deleted file mode 100644
index 2373f6b..0000000
--- a/config.ini
+++ /dev/null
@@ -1,4 +0,0 @@
-[main]
-device.address = DD:85:03:25:62:17
-device.name = CASIO GW-B5600
-
diff --git a/environment.yml b/environment.yml
new file mode 100644
index 0000000..cf823ce
--- /dev/null
+++ b/environment.yml
@@ -0,0 +1,24 @@
+name: gshock-time-server
+channels:
+ - defaults
+ - conda-forge
+ - pytorch
+dependencies:
+ - python=3.10
+ - pip
+ - pip:
+ - -e . # install git checkout of demo-dsproject in editable mode
+ - pytz
+ - bleak
+ - reactivex
+
+ # DEVELOPMENT ONLY PACKAGES (could also be kept in a separate environment file)
+ - jupyterlab
+ - pytest
+ - pytest-cov
+ - tox
+ - pre_commit
+ - nbdime
+ - nbstripout
+ - sphinx
+ - recommonmark
\ No newline at end of file
diff --git a/images/pizero.jpg b/images/pizero.jpg
new file mode 100644
index 0000000..fab83a5
Binary files /dev/null and b/images/pizero.jpg differ
diff --git a/protocol.md b/protocol.md
new file mode 100644
index 0000000..16ec41f
--- /dev/null
+++ b/protocol.md
@@ -0,0 +1,215 @@
+# GShock GAB-2100 BLE protocol
+
+## Overview
+Casio bluetooth watches protocol has not been published by the constructor. The following description comes from experiments on the watch and analysis of the code of open source softwares dedicated to communicate with these watches :
+* [Ivo Zivkov's GShock API](https://github.com/izivkov/GShockAPI)
+* [Ivo Zivkov's GShock time server](https://github.com/izivkov/GShockTimeServer)
+* [Gadgetbridge](https://codeberg.org/Freeyourgadget/Gadgetbridge)
+* [Gadgetbridge time zone description](https://codeberg.org/johannesk/Gadgetbridge/src/branch/casio-gw-b5600/app/src/main/java/nodomain/freeyourgadget/gadgetbridge/service/devices/casio/gwb5600/CasioGWB5600TimeZone.java)
+
+Syntax and examples come from [gatttool](http://tvaira.free.fr/flower-power/gatttool.txt).
+
+## Commands
+### Read (char-read-hnd)
+* 0x4 : watch name
+* 0x6 : device type
+* 0x9 : TX Power level
+
+### Write command (char-write-cmd)
+#### 0xc
+* 10 : Get button pressed (and other informations ?)
+* 22 : Get app info
+* 1D00 : Get time zones and DST state of watch
+* 1e00 : Get local time zone parameters
+* 1e01 : Get WT time zone parameters
+* 1f00 : Get local time zone name
+* 1f01 : Get WT time zone name
+
+### Write request (char-write-req)
+#### 0xe
+* 1D00... : Set time zones and DST state of watch
+* 1e00... : Set local time zone parameters
+* 1e01... : Set WT time zone parameters
+* 1f00... : Set local time zone name
+* 1f01... : Set WT time zone name
+
+#### 0xf
+* 100 : ?
+
+## Data packets
+
+### 10 : button pressed
+```
+0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
+10 29 33 9a 4f 60 d3 7f 04 03 0f ff ff ff ff 24 00 00 00
+```
+8 : 04 : RIGHT BUTTON, 01 : LEFT BUTTON
+
+
+### 1D00 : time zones and DST state of watch
+
+```
+0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
+1d 00 01 02 03 5b 00 dc 00 ff ff ff ff ff ff
+```
+
+0 : 1d
+
+1 : local TZ index
+
+2 : WT TZ index
+
+3 : local TZ DST :
+* 02 : No DST
+* 03 : DST
+
+4 : WT TZ DST
+
+5-6 : 2 bytes integer (little-endian) : local City numeric identifier = b06 × 256 + b05
+
+7-8 : WT city numeric identifier
+
+### 1E : Time zone parameters
+
+```
+0 1 2 3 4 5 6
+1e 01 52 00 16 04 00
+```
+
+0 : 1e
+
+1 : TZ index :
+
+* 00 : local
+* 01 : WT
+
+2-3 : city numeric identifier
+
+4 : signed byte : time difference in quarter of an hour (divide by 4 to get it in hour)
+
+5 : DST offset in quarter of hour : 00 for UTC and 04 for other TZ
+
+6 : DST rules ? :
+
+* 00 : UTC and cities without DST
+* 01 : USA cities
+* 02 : European cities (LON, PAR, ATH)
+* 04 : Australia
+* 05 : New Zealand (Wellington)
+* 12 : Lord Howe Island
+* 17 : Chatam Islands
+* 2b : Teheran
+
+## Examples with gatttool
+### Installation
+gatttool is part of the bluez package. On Debian and derivatives, it could be installed with :
+
+`sudo apt install bluez`
+
+### gatttool usage
+
+Gatttool syntax with examples is described [here](http://tvaira.free.fr/flower-power/gatttool.txt).
+
+Beware of handle and data values, gatttool documentation is confusing. Write command syntax in interactive mode is :
+
+`char-write-cmd `
+
+* Handle value should be writen in hexadecimal base with 0x prefix.
+* Data value should be writen in hexadecimal base **without** 0x prefix.
+
+Example : `char-write-cmd 0xc 1d00`
+
+In non interactive mode, the syntax is :
+
+`gatttool -b --char-write-req -a -n --listen`
+
+Here handle value can be written either in hexadecimal with prefix or decimal, but data value should be in hexadecimal base **without** prefix.
+
+### Examples
+
+```shell
+$ gatttool -b D3:60:4F:9A:33:29 -I -t random
+[D3:60:4F:9A:33:29] connect
+Attempting to connect to D3:60:4F:9A:33:29
+Connection successful
+[D3:60:4F:9A:33:29][LE]> char-write-cmd 0xc 10
+Notification handle = 0x000e value: 10 29 33 9a 4f 60 d3 7f 04 03 0f ff ff ff ff 24 00 00 00
+[D3:60:4F:9A:33:29][LE]> char-read-hnd 0x04
+Characteristic value/descriptor: 43 41 53 49 4f 20 47 41 2d 42 32 31 30 30 00 00
+[D3:60:4F:9A:33:29][LE]> disconnect
+[D3:60:4F:9A:33:29][LE]> quit
+```
+
+## Character table
+
+The GAB-2100 character table is a mix of [ASCII](https://en.wikipedia.org/wiki/ASCII) and [JIS X 0201](https://en.wikipedia.org/wiki/JIS_X_0201) with additions:
+
+
+| | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | a | b | c | d | e | f |
+|-------|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
+| **0** | | | | | | | | | | | | | | | | |
+| **1** | | | | | | | | | | | | | | | | |
+| **2** | | | | # | $ | % | & | ' | ( | ) | * | + | , |---| . | / |
+| **3** | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | : | ; | < | = | > | ? |
+| **4** | @ | A | B | C | D | E | F | G | H | I | J | K | L | M | N | O |
+| **5** | P | Q | R | S | T | U | V | W | X | Y | Z | [ | \ | ] | ^ | _ |
+| **6** | ` | a | b | c | d | e | f | g | h | i | j | k | l | m | n | o |
+| **7** | p | q | r | s | t | u | v | w | x | y | z | { | \| | } | ~ | |
+| **8** | ¥ | ╏ | « | ¬ |---| ⎺ | ° | ± | ´ | · | ¸ | » | ♦ | ♪ | ■ | < |
+| **9** | 【 | 】| ◀ | ▶ | √ | y | | | | | | | | | | |
+| **a** | . | 。 | 「 | 」 | 、 | ・ | ヲ | ァ | ィ | ゥ | ェ | ォ | ャ | ュ | ョ | ッ |
+| **b** | ー | ア | イ | ウ | エ | オ | カ | キ | ク | ケ | コ | サ | シ | ス | セ | ソ |
+| **c** | タ | チ | ツ | テ | ト | ナ | ニ | ヌ | ネ | ノ | ハ | ヒ | フ | ヘ | ホ | マ |
+| **d** | ミ | ム | メ | モ | ヤ | ユ | ヨ | ラ | リ | ル | レ | ロ | ワ | ン | ~ | ▫ |
+| **e** | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . |
+| **f** | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . |
+
+To be completed :
+- empty cells are non used blocks
+- dots are fields to be completed.
+
+ ## City list
+
+ | Index | HB | LB | NAME | OFFSET | DST_OFFSET | DST_RULE |
+| ------|----|----|------|--------|------------|----------|
+| 313 | 1 | 39 | BAKER_ISLAND | -12 | 1 | 00 |
+| 215 | 0 | D7 | PAGO_PAGO | -11 | 1 | 00 |
+| 123 | 0 | 7B | HONOLULU | -10 | 1 | 00 |
+| 314 | 1 | 3A | MARQUESAS_ISLANDS | -9.5 | 1 | 00 |
+| 12 | 0 | 0C | ANCHORAGE | -9 | 1 | 01 |
+| 161 | 0 | A1 | LOS_ANGELES | -8 | 1 | 01 |
+| 84 | 0 | 54 | DENVER | -7 | 1 | 01 |
+| 66 | 0 | 42 | CHICAGO | -6 | 1 | 01 |
+| 202 | 0 | CA | NEW_YORK | -5 | 1 | 01 |
+| 113 | 0 | 71 | HALIFAX | -4 | 1 | 01 |
+| 268 | 1 | 0C | ST.JOHN'S | -3.5 | 1 | 01 |
+| 241 | 0 | F1 | RIO_DE_JANEIRO | -3 | 1 | 00 |
+| 98 | 0 | 62 | F.DE_NORONHA | -2 | 1 | 00 |
+| 233 | 0 | E9 | PRAIA | -1 | 1 | 00 |
+| 0 | 0 | 00 | UTC | 0 | 0 | 00 |
+| 160 | 0 | A0 | LONDON | 0 | 1 | 02 |
+| 220 | 0 | DC | PARIS | 1 | 1 | 02 |
+| 19 | 0 | 13 | ATHENS | 2 | 1 | 02 |
+| 133 | 0 | 85 | JEDDAH | 3 | 1 | 00 |
+| 278 | 1 | 16 | TEHRAN | 3.5 | 1 | 2B |
+| 91 | 0 | 5B | DUBAI | 4 | 1 | 00 |
+| 136 | 0 | 88 | KABUL | 4.5 | 1 | 00 |
+| 139 | 0 | 8B | KARACHI | 5 | 1 | 00 |
+| 82 | 0 | 52 | DELHI | 5.5 | 1 | 00 |
+| 140 | 0 | 8C | KATHMANDU | 5.75 | 1 | 00 |
+| 86 | 0 | 56 | DHAKA | 6 | 1 | 00 |
+| 303 | 1 | 2F | YANGON | 6.5 | 1 | 00 |
+| 28 | 0 | 1C | BANGKOK | 7 | 1 | 00 |
+| 122 | 0 | 7A | HONG_KONG | 8 | 1 | 00 |
+| 234 | 0 | EA | PYONGYANG | 9 | 1 | 00 |
+| 310 | 1 | 36 | EUCLA | 8.75 | 1 | 00 |
+| 281 | 1 | 19 | TOKYO | 9 | 1 | 00 |
+| 5 | 0 | 05 | ADELAIDE | 9.5 | 1 | 04 |
+| 271 | 1 | 0F | SYDNEY | 10 | 1 | 04 |
+| 311 | 1 | 37 | LORD_HOWE_ISLAND | 10.5 | 0.5 | 12 |
+| 205 | 0 | CD | NOUMEA | 11 | 1 | 00 |
+| 299 | 1 | 2B | WELLINGTON | 12 | 1 | 05 |
+| 63 | 0 | 3F | CHATHAM_ISLANDS | 12.75 | 1 | 17 |
+| 208 | 0 | D0 | NUKUALOFA | 13 | 1 | 00 |
+| 147 | 0 | 93 | KIRITIMATI | 14 | 1 | 00 |
+
diff --git a/setup.cfg b/setup.cfg
index 62eef66..7408ce7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -84,6 +84,7 @@ testing =
# in order to write a coverage file that can be read by Jenkins.
# CAUTION: --cov flags may prohibit setting breakpoints while debugging.
# Comment those flags to avoid this pytest issue.
+
addopts =
--cov gshocktimeserver --cov-report term-missing
--verbose
@@ -92,6 +93,7 @@ norecursedirs =
build
.tox
testpaths = tests
+
# Use pytest markers to select/deselect specific tests
# markers =
# slow: mark tests as slow (deselect with '-m "not slow"')
diff --git a/setup.py b/setup.py
index 3286c2f..e2e6173 100644
--- a/setup.py
+++ b/setup.py
@@ -6,16 +6,30 @@
PyScaffold helps you to put up the scaffold of your new Python project.
Learn more under: https://pyscaffold.org/
"""
-from setuptools import setup
+from setuptools import setup, find_packages
-if __name__ == "__main__":
- try:
- setup(use_scm_version={"version_scheme": "no-guess-dev"})
- except: # noqa
- print(
- "\n\nAn error occurred while building the project, "
- "please ensure you have the most updated version of setuptools, "
- "setuptools_scm and wheel with:\n"
- " pip install -U setuptools setuptools_scm wheel\n\n"
- )
- raise
+setup(
+ name='gshocktimeserver',
+ version='0.1',
+ use_scm_version=True,
+ packages=find_packages(),
+ install_requires=[
+ 'pytz', 'bleak', 'reactivex'
+ ],
+
+ author='Ivo Zivkov',
+ author_email='izivkov@gmail.com',
+ description='This library allows you to interact with your G-Shock watch from Python',
+ long_description='This library allows you to interact with your G-Shock watch from Python...',
+ url='https://github.com/izivkov/GShockTimeServer',
+ license='MIT', # Choose an appropriate license
+ classifiers=[
+ 'Development Status :: 3 - Alpha',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: MIT License',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
+ 'Programming Language :: Python :: 3.8',
+ ],
+)
\ No newline at end of file
diff --git a/src/gshocktimeserver/__init__.py b/src/gshocktimeserver/__init__.py
index c8ace5a..0f76b25 100644
--- a/src/gshocktimeserver/__init__.py
+++ b/src/gshocktimeserver/__init__.py
@@ -1,5 +1,7 @@
import sys
+sys.path.append("gshocktimeserver/io")
+
if sys.version_info[:2] >= (3, 8):
# TODO: Import directly (no need for conditional) when `python_requires = >= 3.8`
from importlib.metadata import PackageNotFoundError, version # pragma: no cover
diff --git a/src/gshocktimeserver/alarms.py b/src/gshocktimeserver/alarms.py
index 764381d..34b9fc8 100644
--- a/src/gshocktimeserver/alarms.py
+++ b/src/gshocktimeserver/alarms.py
@@ -1,11 +1,10 @@
import json
-import logging
from casio_constants import CasioConstants
-from utils import to_int_array, to_compact_string, to_hex_string
+from utils import to_int_array
from logger import logger
-HOURLY_CHIME_MASK = 0b10000000
-ENABLED_MASK = 0b01000000
+HOURLY_CHIME_MASK = 0b10000000
+ENABLED_MASK = 0b01000000
ALARM_CONSTANT_VALUE = 0x40
CHARACTERISTICS = CasioConstants.CHARACTERISTICS
@@ -89,16 +88,13 @@ def to_json(self, command: str):
elif int_array[0] == CHARACTERISTICS["CASIO_SETTING_FOR_ALM2"]:
int_array.pop(0)
- # for item in np.array_split(int_array, 4):
- # alarms.append(self.create_json_alarm(item))
-
# replacement to above 2 lines
alarms = []
# split int_array into 4 subarrays
- subarr1 = int_array[:len(int_array)//4]
- subarr2 = int_array[len(int_array)//4:len(int_array)//2]
- subarr3 = int_array[len(int_array)//2:len(int_array)*3//4]
- subarr4 = int_array[len(int_array)*3//4:]
+ subarr1 = int_array[: len(int_array) // 4]
+ subarr2 = int_array[len(int_array) // 4 : len(int_array) // 2]
+ subarr3 = int_array[len(int_array) // 2 : len(int_array) * 3 // 4]
+ subarr4 = int_array[len(int_array) * 3 // 4 :]
# create json alarms for each subarray
alarms.append(self.create_json_alarm(subarr1))
diff --git a/src/gshocktimeserver/api_tests.py b/src/gshocktimeserver/api_tests.py
index 4ef5e28..57c90d6 100644
--- a/src/gshocktimeserver/api_tests.py
+++ b/src/gshocktimeserver/api_tests.py
@@ -1,43 +1,53 @@
-import logging
+import calendar
import json
import pytz
+import time
from datetime import datetime, timezone
from connection import Connection
from gshock_api import GshockAPI
-from casio_watch import settings
-from event import Event, create_event_date, RepeatPeriod, day_of_week
-from casio_watch import WatchButton
-from utils import (
- to_ascii_string,
- clean_str,
-)
-from configurator import conf
+from event import Event, create_event_date, RepeatPeriod
from scanner import scanner
from logger import logger
-async def run_api_tests(args):
+
+def prompt():
+ logger.info(
+ "========================================================================"
+ )
+ logger.info(
+ "Press and hold lower-left button on your watch for 3 seconds to start..."
+ )
+ logger.info(
+ "========================================================================"
+ )
+ logger.info("")
+
+
+async def run_api_tests():
+ prompt()
+
device = await scanner.scan()
- logger.debug("Found: {}".format(device))
+ logger.info("Found: {}".format(device))
connection = Connection(device)
await connection.connect()
api = GshockAPI(connection)
- await api.get_app_info()
+ app_info = await api.get_app_info()
+ logger.info("app info: {}".format(app_info))
pressed_button = await api.get_pressed_button()
- logger.debug("pressed button: {}".format(pressed_button))
+ logger.info("pressed button: {}".format(pressed_button))
watch_name = await api.get_watch_name()
- logger.debug("got watch name: {}".format(watch_name))
+ logger.info("got watch name: {}".format(watch_name))
await api.set_time()
- # await api.reset_hand_to_12()
alarms = await api.get_alarms()
- logger.debug("alarms: {}".format(alarms))
+ logger.info("alarms: {}".format(alarms))
alarms[3]["enabled"] = True
alarms[3]["hour"] = 7
@@ -46,24 +56,29 @@ async def run_api_tests(args):
await api.set_alarms(alarms)
seconds = await api.get_timer()
- logger.debug("timer: {} seconds".format(seconds))
+ logger.info("timer: {} seconds".format(seconds))
await api.set_timer(seconds + 10)
time_adjstment = await api.get_time_adjustment()
- logger.debug("time_adjstment: {}".format(time_adjstment))
+ logger.info("time_adjstment: {}".format(time_adjstment))
+
+ await api.set_time_adjustment(time_adjustement=True, minutes_after_hour=10)
- settings.timeAdjustment = True
- await api.set_time_adjustment(settings)
+ condition = await api.get_watch_condition()
+ logger.info(f"condition: {condition}")
settings_local = await api.get_basic_settings()
- logger.debug("settings: {}".format(settings_local))
+ logger.info("settings: {}".format(settings_local))
settings_local["button_tone"] = True
- settings_local["language"] = "Engish"
- settings_local["time_format"] = "12h"
+ settings_local["language"] = "French"
+ settings_local["time_format"] = "24h"
await api.set_settings(settings_local)
+ settings_local = await api.get_basic_settings()
+ logger.info("After update: settings: {}".format(settings_local))
+
# Create a single event
tz = pytz.timezone("America/Toronto")
dt = datetime.now(timezone.utc)
@@ -85,15 +100,32 @@ async def run_api_tests(args):
+ event_date_str
+ """}}"""
)
- event = Event().create_event(json.loads(event_json_str))
+ Event().create_event(json.loads(event_json_str))
+ logger.info("Created event: {}".format(event_json_str))
reminders = await api.get_reminders()
for reminder in reminders:
- logger.debug("reminder: {}".format(reminder.__str__()))
+ logger.info("reminder: {}".format(reminder.__str__()))
+
+ reminders[3]["title"] = "Test Event"
await api.set_reminders(reminders)
- # input("Hit any key to disconnect")
+ input("Hit any key to disconnect")
await connection.disconnect()
- logger.debug("--- END OF TESTS ---")
+ logger.info("--- END OF TESTS ---")
+
+
+def convert_time_string_to_epoch(time_string):
+ try:
+ # Create a datetime object with today's date and the provided time
+ time_object = datetime.strptime(time_string, "%H:%M:%S")
+
+ # Get the timestamp in seconds since the epoch
+ timestamp = time_object.timestamp()
+
+ return timestamp
+ except ValueError:
+ logger.info("Invalid time format. Please use the format HH:MM:SS.")
+ return None
diff --git a/src/gshocktimeserver/args.py b/src/gshocktimeserver/args.py
index f1dea30..ec1f842 100644
--- a/src/gshocktimeserver/args.py
+++ b/src/gshocktimeserver/args.py
@@ -1,6 +1,7 @@
import argparse
import sys
+
class Args:
def __init__(self):
self.parse_and_store(sys.argv[1:])
@@ -8,17 +9,19 @@ def __init__(self):
def parse_and_store(self, args):
parser = argparse.ArgumentParser(description="Parser")
parser.add_argument(
- "--multi-watch", action='store_true', help="--multi-watch allows use of multimple watches"
- )
+ "--multi-watch",
+ action='store_true',
+ help="--multi-watch allows use of multimple watches")
parser.add_argument(
"--mailto", help="email when time set to email address", required=False
)
parser.add_argument(
"-l", "--log_level", default="INFO", help="Sets log level", required=False
)
- self.args = parser.parse_args(args)
+ self.args = parser.parse_args(args)
def get(self):
return self.args
+
args = Args()
diff --git a/src/gshocktimeserver/cancelable_result.py b/src/gshocktimeserver/cancelable_result.py
new file mode 100644
index 0000000..d76cb81
--- /dev/null
+++ b/src/gshocktimeserver/cancelable_result.py
@@ -0,0 +1,38 @@
+import asyncio
+from typing import Any
+
+class CancelableResult:
+ def __init__(self, timeout: float = 5.0):
+ self._timeout = timeout
+ self._future: asyncio.Future[Any] = asyncio.Future()
+
+ async def get_result(self) -> Any:
+ try:
+ return await asyncio.wait_for(self._future, timeout=self._timeout)
+ except asyncio.TimeoutError:
+ if not self._future.done():
+ self._future.set_result('')
+ return await self._future
+
+ def set_result(self, result: Any) -> None:
+ if not self._future.done():
+ self._future.set_result(result)
+
+# Example usage
+async def main():
+ cancelable_result = CancelableResult(timeout=5)
+
+ # Simulate some async work that might or might not set a result
+ async def some_async_work():
+ await asyncio.sleep(3) # Change this to 6 to test the timeout
+ cancelable_result.set_result('Result after async work')
+
+ await asyncio.gather(some_async_work(), asyncio.sleep(6))
+
+ final_result = await cancelable_result.get_result()
+ print(f'Final result: "{final_result}"')
+
+# Run the main coroutine
+# asyncio.run(main())
+
+cancelable_result = CancelableResult()
diff --git a/src/gshocktimeserver/casio_constants.py b/src/gshocktimeserver/casio_constants.py
index 97d2fdc..1bb2a7b 100644
--- a/src/gshocktimeserver/casio_constants.py
+++ b/src/gshocktimeserver/casio_constants.py
@@ -35,4 +35,10 @@ class CasioConstants:
"CASIO_REMINDER_TITLE": 0x30,
"CASIO_REMINDER_TIME": 0x31,
"CASIO_TIMER": 0x18,
+ "ERROR": 0xFF,
+ "UNKNOWN": 0x0A,
+
+ # ECB-30
+ "CMD_SET_TIMEMODE": 0x47,
+ "FIND_PHONE": 0x0A,
}
diff --git a/src/gshocktimeserver/casio_watch.py b/src/gshocktimeserver/casio_watch.py
deleted file mode 100644
index 620b1f3..0000000
--- a/src/gshocktimeserver/casio_watch.py
+++ /dev/null
@@ -1,668 +0,0 @@
-import json
-import datetime
-import logging
-from settings import settings
-from utils import (
- to_int_array,
- to_compact_string,
- to_hex_string,
- clean_str,
- to_ascii_string,
- to_byte_array,
- dec_to_hex,
-)
-from casio_constants import CasioConstants
-from enum import IntEnum
-from alarms import alarms_inst, alarm_decoder
-from logger import logger
-
-CHARACTERISTICS = CasioConstants.CHARACTERISTICS
-
-
-class WatchButton(IntEnum):
- UPPER_LEFT = 1
- LOWER_LEFT = 2
- UPPER_RIGHT = 3
- LOWER_RIGHT = 4
- NO_BUTTON = 5
- INVALID = 6
-
-
-class DtsState(IntEnum):
- ZERO = 0
- TWO = 2
- FOUR = 4
-
-
-class ReminderMasks:
- YEARLY_MASK = 0b00001000
- MONTHLY_MASK = 0b00010000
- WEEKLY_MASK = 0b00000100
-
- SUNDAY_MASK = 0b00000001
- MONDAY_MASK = 0b00000010
- TUESDAY_MASK = 0b00000100
- WEDNESDAY_MASK = 0b00001000
- THURSDAY_MASK = 0b00010000
- FRIDAY_MASK = 0b00100000
- SATURDAY_MASK = 0b01000000
-
- ENABLED_MASK = 0b00000001
-
-
-class SettingsDecoder:
- def to_json_time_adjustment(settings):
- return {"timeAdjustment": settings.time_adjustment}
-
-
-class ReminderDecoder:
- def reminder_title_to_json(title_byte: str) -> dict:
- int_arr = to_int_array(title_byte)
- if int_arr[2] == 0xFF:
- # 0XFF indicates end of reminders
- return {"end": ""}
- reminder_json = {}
-
- reminder_json["title"] = clean_str(to_ascii_string(title_byte, 2))
- return reminder_json
-
-
-def create_key(data):
- short_str = to_compact_string(data)
- key_length = 2
- # get the first byte of the returned data, which indicates the data content.
- start_of_data = short_str[0:2].upper()
- if start_of_data in ["1D", "1E", "1F", "30", "31"]:
- key_length = 4
- key = short_str[0:key_length].upper()
- return key
-
-
-def to_json(_data):
- data = to_hex_string(_data)
- int_array = to_int_array(data)
- json_obj = {}
- if int_array[0] == CHARACTERISTICS["CASIO_SETTING_FOR_ALM"]:
- return {
- "ALARMS": {
- "value": alarm_decoder.to_json(data)["ALARMS"],
- "key": "GET_ALARMS",
- }
- }
-
- if int_array[0] == CHARACTERISTICS["CASIO_SETTING_FOR_ALM2"]:
- return {
- "ALARMS2": {
- "value": alarm_decoder.to_json(data)["ALARMS"],
- "key": "GET_ALARMS2",
- }
- }
-
- # Add topics so the right component will receive data
- elif int_array[0] == CHARACTERISTICS["CASIO_DST_SETTING"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["CASIO_DST_SETTING"] = data_json
-
- elif int_array[0] == CHARACTERISTICS["CASIO_SETTING_FOR_BASIC"]:
-
- def create_json_settings(setting_string):
- mask_24_hours = 0b00000001
- MASK_BUTTON_TONE_OFF = 0b00000010
- MASK_LIGHT_OFF = 0b00000100
- POWER_SAVING_MODE = 0b00010000
-
- setting_array = to_int_array(setting_string)
-
- if setting_array[1] & mask_24_hours != 0:
- settings.time_format = "24h"
- else:
- settings.time_format = "12h"
- settings.button_tone = setting_array[1] & MASK_BUTTON_TONE_OFF == 0
- settings.auto_light = setting_array[1] & MASK_LIGHT_OFF == 0
- settings.power_saving_mode = setting_array[1] & POWER_SAVING_MODE == 0
-
- if setting_array[4] == 1:
- settings.date_format = "DD:MM"
- else:
- settings.date_format = "MM:DD"
-
- if setting_array[5] == 0:
- settings.language = "English"
- if setting_array[5] == 1:
- settings.language = "Spanish"
- if setting_array[5] == 2:
- settings.language = "French"
- if setting_array[5] == 3:
- settings.language = "German"
- if setting_array[5] == 4:
- settings.language = "Italian"
- if setting_array[5] == 5:
- settings.language = "Russian"
-
- if setting_array[2] == 1:
- settings.light_duration = "4s"
- else:
- settings.light_duration = "2s"
-
- return json.dumps(settings.__dict__)
-
- data_json = {"key": create_key(data), "value": create_json_settings(data)}
- settings_json = {"SETTINGS": data_json}
- return settings_json
-
- elif int_array[0] == CHARACTERISTICS["CASIO_SETTING_FOR_BLE"]:
- settings.CasioIsAutoTimeOriginalValue = data
- value_json = SettingsDecoder.to_json_time_adjustment(settings)
- data_json = {"key": create_key(data), "value": value_json}
- return {"TIME_ADJUSTMENT": data_json}
-
- elif int_array[0] == CHARACTERISTICS["CASIO_REMINDER_TIME"]:
- reminder_json = {}
-
- def reminder_time_to_json(reminder_str):
- def convert_array_list_to_json_array(array_list):
- json_array = []
- for item in array_list:
- json_array.append(item)
-
- return json_array
-
- def decode_time_period(time_period: int) -> tuple:
- enabled = False
- repeat_period = ""
-
- if (
- time_period & ReminderMasks.ENABLED_MASK
- == ReminderMasks.ENABLED_MASK
- ):
- enabled = True
-
- if time_period & ReminderMasks.WEEKLY_MASK == ReminderMasks.WEEKLY_MASK:
- repeat_period = "WEEKLY"
- elif (
- time_period & ReminderMasks.MONTHLY_MASK
- == ReminderMasks.MONTHLY_MASK
- ):
- repeat_period = "MONTHLY"
- elif (
- time_period & ReminderMasks.YEARLY_MASK == ReminderMasks.YEARLY_MASK
- ):
- repeat_period = "YEARLY"
- else:
- repeat_period = "NEVER"
-
- return (enabled, repeat_period)
-
- def decode_time_detail(time_detail):
- def decode_date(time_detail):
- def int_to_month_str(month_int):
- months = [
- "JANUARY",
- "FEBRUARY",
- "MARCH",
- "APRIL",
- "MAY",
- "JUNE",
- "JULY",
- "AUGUST",
- "SEPTEMBER",
- "OCTOBER",
- "NOVEMBER",
- "DECEMBER",
- ]
- if month_int < 1 or month_int > 12:
- return ""
- else:
- return months[month_int - 1]
-
- date = json.loads("{}")
-
- date["year"] = dec_to_hex(time_detail[0]) + 2000
- date["month"] = int_to_month_str(dec_to_hex(time_detail[1]))
- date["day"] = dec_to_hex(time_detail[2])
-
- return date
-
- result = {}
-
- # 00 23 02 21 23 02 21 00 00
- # start from here: ^
- # so, skip 1
- start_date = decode_date(time_detail[1:])
-
- result["start_date"] = start_date
-
- # 00 23 02 21 23 02 21 00 00
- # start from here: ^
- # so, skip 4
- end_date = decode_date(time_detail[4:])
-
- result["end_date"] = end_date
-
- day_of_week = time_detail[7]
- days_of_week = []
- if day_of_week & ReminderMasks.SUNDAY_MASK == ReminderMasks.SUNDAY_MASK:
- days_of_week.append("SUNDAY")
- if day_of_week & ReminderMasks.MONDAY_MASK == ReminderMasks.MONDAY_MASK:
- days_of_week.append("MONDAY")
- if (
- day_of_week & ReminderMasks.TUESDAY_MASK
- == ReminderMasks.TUESDAY_MASK
- ):
- days_of_week.append("TUESDAY")
- if (
- day_of_week & ReminderMasks.WEDNESDAY_MASK
- == ReminderMasks.WEDNESDAY_MASK
- ):
- days_of_week.append("WEDNESDAY")
- if (
- day_of_week & ReminderMasks.THURSDAY_MASK
- == ReminderMasks.THURSDAY_MASK
- ):
- days_of_week.append("THURSDAY")
- if day_of_week & ReminderMasks.FRIDAY_MASK == ReminderMasks.FRIDAY_MASK:
- days_of_week.append("FRIDAY")
- if (
- day_of_week & ReminderMasks.SATURDAY_MASK
- == ReminderMasks.SATURDAY_MASK
- ):
- days_of_week.append("SATURDAY")
- result["days_of_week"] = days_of_week
- return result
-
- int_arr = to_int_array(reminder_str)
- if int_arr[3] == 0xFF:
- # 0XFF indicates end of reminders
- return json.dumps({"end": ""})
-
- short_str = to_compact_string(reminder_str)
- # get the first byte of the returned data, which indicates the data content.
- key = short_str[:4].upper()
-
- reminder_all = to_int_array(reminder_str)
- # Remove the first 2 chars:
- # 0x31 05 <--- 00 23 02 21 23 02 21 00 00
- reminder = reminder_all[2:]
-
- reminder_json = {}
-
- time_period = decode_time_period(reminder[0])
- reminder_json["enabled"] = time_period[0]
- reminder_json["repeat_period"] = time_period[1]
-
- time_detail_map = decode_time_detail(reminder)
-
- reminder_json["start_date"] = time_detail_map["start_date"]
- reminder_json["end_date"] = time_detail_map["end_date"]
- reminder_json["days_of_week"] = convert_array_list_to_json_array(
- time_detail_map["days_of_week"]
- )
-
- return json.dumps({"time": reminder_json})
-
- value = reminder_time_to_json(data[2:])
- reminder_json["REMINDERS"] = {
- "key": create_key(data),
- "value": json.loads(value),
- }
- return reminder_json
-
- elif int_array[0] == CHARACTERISTICS["CASIO_REMINDER_TITLE"]:
- return {
- "REMINDERS": {
- "key": create_key(data),
- "value": ReminderDecoder.reminder_title_to_json(data),
- }
- }
- elif int_array[0] == CHARACTERISTICS["CASIO_TIMER"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["CASIO_TIMER"] = data_json
-
- elif int_array[0] == CHARACTERISTICS["CASIO_WORLD_CITIES"]:
- data_json = {"key": create_key(data), "value": data}
- characteristics_array = to_int_array(data)
- json_obj["CASIO_WORLD_CITIES"] = data_json
- elif int_array[0] == CHARACTERISTICS["CASIO_DST_WATCH_STATE"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["CASIO_DST_WATCH_STATE"] = data_json
- elif int_array[0] == CHARACTERISTICS["CASIO_WATCH_NAME"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["CASIO_WATCH_NAME"] = data_json
- elif int_array[0] == CHARACTERISTICS["CASIO_WATCH_CONDITION"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["CASIO_WATCH_CONDITION"] = data_json
- elif int_array[0] == CHARACTERISTICS["CASIO_APP_INFORMATION"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["CASIO_APP_INFORMATION"] = data_json
- elif int_array[0] == CHARACTERISTICS["CASIO_BLE_FEATURES"]:
- data_json = {"key": create_key(data), "value": data}
- json_obj["BUTTON_PRESSED"] = data_json
- return json_obj
-
-
-async def callWriter(connection, message: str):
- logger.debug(message)
- action_json = json.loads(message)
- action = action_json.get("action")
-
- if action == "GET_ALARMS":
- alarm_command = to_compact_string(
- to_hex_string(bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_ALM"]]))
- )
-
- await connection.write(0x000C, alarm_command)
-
- elif action == "GET_ALARMS2":
- # get the rest of the alarms
- alarm_command2 = to_compact_string(
- to_hex_string(bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_ALM2"]]))
- )
- await connection.write(0x000C, alarm_command2)
-
- elif action == "SET_ALARMS":
- alarms_json_arr = json.loads(message).get("value")
- alarm_casio0 = to_compact_string(
- to_hex_string(
- alarms_inst.from_json_alarm_first_alarm(alarms_json_arr[0])
- )
- )
- await connection.write(0x000E, alarm_casio0)
- alarm_casio = to_compact_string(
- to_hex_string(alarms_inst.from_json_alarm_secondary_alarms(alarms_json_arr))
- )
- await connection.write(0x000E, alarm_casio)
-
- elif action == "SET_REMINDERS":
-
- def reminder_title_from_json(reminder_json):
- title_str = reminder_json.get("title")
- return to_byte_array(title_str, 18)
-
- def reminder_time_from_json(reminder_json):
- def create_time_detail(
- repeat_period, start_date, end_date, days_of_week
- ):
- def encode_date(time_detail, start_date, end_date):
- class Month:
- JANUARY = 1
- FEBRUARY = 2
- MARCH = 3
- APRIL = 4
- MAY = 5
- JUNE = 6
- JULY = 7
- AUGUST = 8
- SEPTEMBER = 9
- OCTOBER = 10
- NOVEMBER = 11
- DECEMBER = 12
-
- def __init__(self):
- pass
-
- def string_to_month(month_str):
- months = {
- "january": Month.JANUARY,
- "february": Month.FEBRUARY,
- "march": Month.MARCH,
- "april": Month.APRIL,
- "may": Month.MAY,
- "june": Month.JUNE,
- "july": Month.JULY,
- "august": Month.AUGUST,
- "september": Month.SEPTEMBER,
- "october": Month.OCTOBER,
- "november": Month.NOVEMBER,
- "december": Month.DECEMBER,
- }
- return months.get(month_str.lower(), Month.JANUARY)
-
- def hex_to_dec(hex):
- return int(str(hex), 16)
-
- # take the last 2 digits only
- time_detail[0] = hex_to_dec(start_date["year"] % 2000)
- time_detail[1] = hex_to_dec(
- string_to_month(start_date["month"])
- )
- time_detail[2] = hex_to_dec(start_date["day"])
- time_detail[3] = hex_to_dec(
- end_date["year"] % 2000
- ) # get the last 2 gits only
- time_detail[4] = hex_to_dec(string_to_month(end_date["month"]))
- time_detail[5] = hex_to_dec(end_date["day"])
- time_detail[6], time_detail[7] = 0, 0
-
- time_detail = [0] * 8
-
- if repeat_period == "NEVER":
- encode_date(time_detail, start_date, end_date)
-
- elif repeat_period == "WEEKLY":
- encode_date(time_detail, start_date, end_date)
-
- day_of_week = 0
- if days_of_week is not None:
- for i in range(len(days_of_week)):
- if days_of_week[i] == "SUNDAY":
- day_of_week = (
- day_of_week | ReminderMasks.SUNDAY_MASK
- )
- elif days_of_week[i] == "MONDAY":
- day_of_week = (
- day_of_week | ReminderMasks.MONDAY_MASK
- )
- elif days_of_week[i] == "TUESDAY":
- day_of_week = (
- day_of_week | ReminderMasks.TUESDAY_MASK
- )
- elif days_of_week[i] == "WEDNESDAY":
- day_of_week = (
- day_of_week | ReminderMasks.WEDNESDAY_MASK
- )
- elif days_of_week[i] == "THURSDAY":
- day_of_week = (
- day_of_week | ReminderMasks.THURSDAY_MASK
- )
- elif days_of_week[i] == "FRIDAY":
- day_of_week = (
- day_of_week | ReminderMasks.FRIDAY_MASK
- )
- elif days_of_week[i] == "SATURDAY":
- day_of_week = (
- day_of_week | ReminderMasks.SATURDAY_MASK
- )
-
- time_detail[6] = day_of_week
- time_detail[7] = 0
-
- elif repeat_period == "MONTHLY":
- encode_date(time_detail, start_date, end_date)
-
- elif repeat_period == "YEARLY":
- encode_date(time_detail, start_date, end_date)
- else:
- logger.info(
- "Cannot handle Repeat Period: {}".format(repeat_period)
- )
-
- return time_detail
-
- def create_time_period(enabled: bool, repeat_period: str) -> int:
- time_period = 0
-
- if enabled:
- time_period = time_period | ReminderMasks.ENABLED_MASK
- if repeat_period == "WEEKLY":
- time_period = time_period | ReminderMasks.WEEKLY_MASK
- elif repeat_period == "MONTHLY":
- time_period = time_period | ReminderMasks.MONTHLY_MASK
- elif repeat_period == "YEARLY":
- time_period = time_period | ReminderMasks.YEARLY_MASK
- return time_period
-
- enabled = reminder_json.get("enabled")
- repeat_period = reminder_json.get("repeat_period")
- start_date = reminder_json.get("start_date")
- end_date = reminder_json.get("end_date")
- days_of_week = reminder_json.get("days_of_week")
-
- reminder_cmd = bytearray()
-
- reminder_cmd += bytearray([create_time_period(enabled, repeat_period)])
- reminder_cmd += bytearray(
- create_time_detail(
- repeat_period, start_date, end_date, days_of_week
- )
- )
-
- return reminder_cmd
-
- reminders_json_arr = json.loads(message).get("value")
- for index, element in enumerate(reminders_json_arr):
- reminder_json = element
- title = reminder_title_from_json(reminder_json)
-
- title_byte_arr = bytearray([CHARACTERISTICS["CASIO_REMINDER_TITLE"]])
- title_byte_arr += bytearray([index + 1])
- title_byte_arr += title
- title_byte_arr_to_send = to_compact_string(
- to_hex_string(title_byte_arr)
- )
- await connection.write(0x000E, title_byte_arr_to_send)
-
- reminder_time_byte_arr = bytearray([])
- reminder_time_byte_arr += bytearray(
- [CHARACTERISTICS["CASIO_REMINDER_TIME"]]
- )
- reminder_time_byte_arr += bytearray([index + 1])
- reminder_time_byte_arr += reminder_time_from_json(reminder_json)
- reminder_time_byte_arr_to_send = to_compact_string(
- to_hex_string(bytearray(reminder_time_byte_arr))
- )
- logger.info(reminder_time_byte_arr_to_send)
- await connection.write(0x000E, reminder_time_byte_arr_to_send)
-
- elif action == "GET_SETTINGS":
- await connection.write(
- 0x000C, bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_BASIC"]])
- )
-
- elif action == "SET_SETTINGS":
-
- def encode(settings: dict):
- mask_24_hours = 0b00000001
- MASK_BUTTON_TONE_OFF = 0b00000010
- MASK_LIGHT_OFF = 0b00000100
- POWER_SAVING_MODE = 0b00010000
-
- arr = bytearray(12)
- arr[0] = CHARACTERISTICS["CASIO_SETTING_FOR_BASIC"]
- if settings.time_format == "24h":
- arr[1] = arr[1] | mask_24_hours
- if not settings.button_tone:
- arr[1] = arr[1] | MASK_BUTTON_TONE_OFF
- if not settings.auto_light:
- arr[1] = arr[1] | MASK_LIGHT_OFF
- if not settings.power_saving_mode:
- arr[1] = arr[1] | POWER_SAVING_MODE
-
- if settings.light_duration == "4s":
- arr[2] = 1
- if settings.date_format == "DD:MM":
- arr[4] = 1
-
- language_index = {
- "English": 0,
- "Spanish": 1,
- "French": 2,
- "German": 3,
- "Italian": 4,
- "Russian": 5,
- }
- arr[5] = language_index.get(settings.language, 0)
-
- return arr
-
- encoded_settings = encode(settings)
- await connection.write(
- 0x000E, to_compact_string(to_hex_string(encoded_settings))
- )
-
- elif action == "GET_TIME_ADJUSTMENT":
- await connection.write(
- 0x000C,
- to_compact_string(
- to_hex_string(bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_BLE"]]))
- ),
- )
-
- elif action == "SET_TIME_ADJUSTMENT":
- value = json.loads(message).get("value")
-
- def encode_time_adjustment(time_adjustment):
- raw_string = settings.CasioIsAutoTimeOriginalValue
- int_array = to_int_array(raw_string)
-
- int_array[12] = 0x80 if time_adjustment == "True" else 0x00
- return bytes(int_array)
-
- encoded_time_adj = encode_time_adjustment(value)
- write_cmd = to_compact_string(to_hex_string(encoded_time_adj))
- await connection.write(0x000E, write_cmd)
-
- elif action == "GET_TIMER":
- connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_TIMER"]]))
-
- elif action == "SET_TIMER":
- seconds = json.loads(message).get("value")
-
- def encode(seconds_str):
- in_seconds = int(seconds_str)
- hours = in_seconds // 3600
- minutes_and_seconds = in_seconds % 3600
- minutes = minutes_and_seconds // 60
- seconds = minutes_and_seconds % 60
-
- arr = bytearray(7)
- arr[0] = 0x18
- arr[1] = hours
- arr[2] = minutes
- arr[3] = seconds
- return arr
-
- seconds_as_byte_arr = encode(seconds)
- seconds_as_compact_str = to_compact_string(
- to_hex_string(seconds_as_byte_arr)
- )
- await connection.write(0x000E, seconds_as_compact_str)
-
- elif action == "SET_TIME":
- date_time_ms = int(json.loads(message).get("value"))
- logger.debug("date_time_ms: {}".format(date_time_ms))
- date_time = datetime.datetime.fromtimestamp(date_time_ms / 1000.0)
- time_data = TimeEncoder.prepare_current_time(date_time)
- time_command = to_hex_string(
- bytearray([CHARACTERISTICS["CASIO_CURRENT_TIME"]]) + time_data
- )
- await connection.write(0xE, to_compact_string(time_command))
-
- else:
- print("callWriter: Unhandled command", action)
-
-
-class TimeEncoder:
- def prepare_current_time(date: datetime.datetime):
- arr = bytearray(10)
- year = date.year
- arr[0] = year >> 0 & 0xFF
- arr[1] = year >> 8 & 0xFF
- arr[2] = date.month
- arr[3] = date.day
- arr[4] = date.hour
- arr[5] = date.minute
- arr[6] = date.second
- arr[7] = date.weekday()
- arr[8] = 0
- arr[9] = 1
- return arr
diff --git a/src/gshocktimeserver/config.ini b/src/gshocktimeserver/config.ini
deleted file mode 100644
index 2373f6b..0000000
--- a/src/gshocktimeserver/config.ini
+++ /dev/null
@@ -1,4 +0,0 @@
-[main]
-device.address = DD:85:03:25:62:17
-device.name = CASIO GW-B5600
-
diff --git a/src/gshocktimeserver/configurator.py b/src/gshocktimeserver/configurator.py
index c1495c8..0b7eaa2 100644
--- a/src/gshocktimeserver/configurator.py
+++ b/src/gshocktimeserver/configurator.py
@@ -12,7 +12,7 @@ def get(self, key):
try:
value = self.config.get("main", key)
- except Exception as e:
+ except Exception:
return None
else:
return value
diff --git a/src/gshocktimeserver/connection.py b/src/gshocktimeserver/connection.py
index f72ae73..4d307ed 100644
--- a/src/gshocktimeserver/connection.py
+++ b/src/gshocktimeserver/connection.py
@@ -1,13 +1,12 @@
-import logging
-
+import asyncio
from bleak import BleakClient
from bleak.backends.characteristic import BleakGATTCharacteristic
from casio_constants import CasioConstants
+import message_dispatcher
from utils import to_casio_cmd
-from data_watcher import data_watcher
-from casio_watch import to_json, callWriter
from logger import logger
+
class Connection:
def __init__(self, device):
self.handles_map = self.init_handles_map()
@@ -17,11 +16,7 @@ def __init__(self, device):
def notification_handler(
self, characteristic: BleakGATTCharacteristic, data: bytearray
):
- """Simple notification handler which prints the data received."""
- json = to_json(data)
- name = list(dict(json).keys())[0]
- value = list(dict(json).values())[0]
- data_watcher.emit_event(name, value)
+ message_dispatcher.MessageDispatcher.on_received(data)
async def connect(self):
try:
@@ -32,22 +27,22 @@ async def connect(self):
)
return True
except Exception as e:
- self.logger.warning(f"Cannot connect: {e}")
+ logger.debug(f"Cannot connect: {e}")
return False
async def disconnect(self):
await self.client.disconnect()
async def write(self, handle, data):
- logger.info("write: {}".format(data))
try:
await self.client.write_gatt_char(
self.handles_map[handle], to_casio_cmd(data)
)
except Exception as e:
- logger.info("write failed with exception: {}".format(e))
+ logger.debug("write failed with exception: {}".format(e))
async def request(self, request):
+ logger.info("write: {}".format(request))
await self.write(0xC, request)
def init_handles_map(self):
@@ -56,7 +51,9 @@ def init_handles_map(self):
handles_map[0x04] = CasioConstants.CASIO_GET_DEVICE_NAME
handles_map[0x06] = CasioConstants.CASIO_APPEARANCE
handles_map[0x09] = CasioConstants.TX_POWER_LEVEL_CHARACTERISTIC_UUID
- handles_map[0x0C] = CasioConstants.CASIO_READ_REQUEST_FOR_ALL_FEATURES_CHARACTERISTIC_UUID
+ handles_map[
+ 0x0C
+ ] = CasioConstants.CASIO_READ_REQUEST_FOR_ALL_FEATURES_CHARACTERISTIC_UUID
handles_map[0x0E] = CasioConstants.CASIO_ALL_FEATURES_CHARACTERISTIC_UUID
handles_map[0x11] = CasioConstants.CASIO_DATA_REQUEST_SP_CHARACTERISTIC_UUID
handles_map[0x14] = CasioConstants.CASIO_CONVOY_CHARACTERISTIC_UUID
@@ -65,4 +62,5 @@ def init_handles_map(self):
return handles_map
async def sendMessage(self, message):
- await callWriter(self, message)
+ # await callWriter(self, message)
+ await message_dispatcher.MessageDispatcher.send_to_watch(message)
diff --git a/src/gshocktimeserver/data_watcher.py b/src/gshocktimeserver/data_watcher.py
deleted file mode 100644
index a5a355c..0000000
--- a/src/gshocktimeserver/data_watcher.py
+++ /dev/null
@@ -1,58 +0,0 @@
-from reactivex.subject import Subject
-import logging
-
-
-class DataWatcher:
- logger = logging.getLogger("DataWatcher")
-
- def __init__(self):
- self.subjects = {}
- self.subscribers = {}
-
- def add_subject(self, name: str):
- if name in self.subjects:
- return
- subject = Subject()
- self.subjects[name] = subject
-
- def add_subscriber_and_subject(self, subscriber, subject):
- if subscriber not in self.subscribers:
- subjects_for_this_subscriber = set()
- self.subscribers[subscriber] = subjects_for_this_subscriber
- subjects_for_this_subscriber = self.subscribers[subscriber]
- if subject not in subjects_for_this_subscriber:
- subjects_for_this_subscriber.add(subject)
- else:
- self.logger("Subject {} alreadt added...".format(subject))
-
- def subscriber_already_subscribed(self, subscriber, subject):
- if subscriber not in self.subscribers:
- return False
- subjects_for_this_subscriber = self.subscribers[subscriber]
- if (
- subjects_for_this_subscriber is None
- or subject not in subjects_for_this_subscriber
- ):
- return False
- return True
-
- def subscribe(self, subscriber_name, subject, on_next):
- if not self.subscriber_already_subscribed(subscriber_name, subject):
- self.get_processor(subject).subscribe(on_next)
- self.add_subscriber_and_subject(subscriber_name, subject)
-
- def subscribe_with_deferred(self, subscriber_name, subject, on_next):
- if not self.subscriber_already_subscribed(subscriber_name, subject):
- self.get_processor(subject).subscribe(on_next)
- self.add_subscriber_and_subject(subscriber_name, subject)
-
- def get_processor(self, name):
- return self.subjects.get(name)
-
- def emit_event(self, name, event):
- subject = self.subjects[name]
- if subject is not None:
- subject.on_next(event)
-
-
-data_watcher = DataWatcher()
diff --git a/src/gshocktimeserver/device_queue.py b/src/gshocktimeserver/device_queue.py
deleted file mode 100644
index 4f8c4b5..0000000
--- a/src/gshocktimeserver/device_queue.py
+++ /dev/null
@@ -1,15 +0,0 @@
-import queue
-
-
-class DeviceQueue:
- def __init__(self):
- self.q = queue.Queue()
-
- def get(self):
- return self.q.get(block=True, timeout=None)
-
- def put(self, item):
- self.q.put(item)
-
-
-device_queue = DeviceQueue()
diff --git a/src/gshocktimeserver/event.py b/src/gshocktimeserver/event.py
index 81cd35a..9488c9b 100644
--- a/src/gshocktimeserver/event.py
+++ b/src/gshocktimeserver/event.py
@@ -76,7 +76,14 @@ def __init__(self):
self.selected = False
def __str__(self):
- return f"Title: {self.title}, startDate: {self.start_date.__str__()}, endDate: {self.end_date.__str__()}, repeatPeriod: {self.repeat_period}, daysOfWeek: {self.days_of_week}, enabled: {self.enabled}, incompatible: {self.incompatible}, selected: {self.selected}"
+ return f"""Title: {self.title},
+ startDate: {self.start_date.__str__()},
+ endDate: {self.end_date.__str__()},
+ repeatPeriod: {self.repeat_period},
+ daysOfWeek: {self.days_of_week},
+ enabled: {self.enabled},
+ incompatible: {self.incompatible},
+ selected: {self.selected}"""
def create_event(self, event_jsn: dict):
def get_array_list_from_json_array(json_array: list):
diff --git a/src/gshocktimeserver/gshock_api.py b/src/gshocktimeserver/gshock_api.py
index 2c2d85a..d7436b2 100644
--- a/src/gshocktimeserver/gshock_api.py
+++ b/src/gshocktimeserver/gshock_api.py
@@ -3,25 +3,27 @@
import json
import time
-from connection import Connection
-from data_watcher import data_watcher
+# from data_watcher import data_watcher
+from iolib.dst_watch_state_io import DtsState
+from iolib.button_pressed_io import WatchButton
+import message_dispatcher
from utils import (
to_ascii_string,
+ to_hex_string,
to_int_array,
to_compact_string,
clean_str,
- encode_string
)
-from result_queue import result_queue, KeyedResult
-from casio_watch import WatchButton, DtsState
from alarms import alarms_inst
from event import Event
from watch_info import watch_info
-
+from tz_helper import sys_tz, casio_city_name
class GshockAPI:
"""
- This class contains all the API functions. This should the the main interface to the library.
+ This class contains all the API functions. This should the the main interface to the
+ library.
+
Here is how to use it:
api = GshockAPI(connection)
@@ -48,27 +50,10 @@ async def get_watch_name(self):
-------
name : String, i.e: "GW-B5600"
"""
- return await self._get_watch_name("23")
-
- async def _get_watch_name(self, key):
- await self.connection.request(key)
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def on_data_received(keyed_data):
- _key = "23"
-
- result_value = keyed_data.get("value")
- result_key = keyed_data.get("key")
+ return await self._get_watch_name()
- if result_key == _key:
- result_str = clean_str(to_ascii_string(result_value, 1))
- res = result_queue.dequeue(_key)
- res.set_result(result_str)
-
- self.subscribe("CASIO_WATCH_NAME", lambda data: on_data_received(data))
+ async def _get_watch_name(self):
+ result = await message_dispatcher.WatchNameIO.request(self.connection)
return await result
async def get_pressed_button(self) -> WatchButton:
@@ -78,12 +63,13 @@ async def get_pressed_button(self) -> WatchButton:
The return values are interpreted as follows:
- - `LOWER_LEFT` - this connection is initiated by a long-press of the lower-left button on the watch.
- The app receiving this type of connection can now send and receive commands to the watch.
+ - `LOWER_LEFT` - this connection is initiated by a long-press of the lower-left
+ button on the watch.The app receiving this type of connection can now send and
+ receive commands to the watch.
- `LOWER_RIGHT` - this connection is initiated by a short-press of the lower-right button,
- which is usually used to set time. But the app can use this signal to perform other arbitrary functions.
- Therefore, this button is also referred as `ACTION BUTTON`.
+ which is usually used to set time. But the app can use this signal to perform other
+ arbitrary functions. Therefore, this button is also referred as `ACTION BUTTON`.
The connection will automatically disconnect in about 20 seconds.
- `NO_BUTTON` - this connection is initiated automatically, periodically
@@ -97,50 +83,7 @@ async def get_pressed_button(self) -> WatchButton:
-------
button: WATCH_BUTTON
"""
- return await self._get_pressed_button("10")
-
- async def _get_pressed_button(self, key: str) -> WatchButton:
- await self.connection.request(key)
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def button_pressed_callback(keyed_data):
- """
- RIGHT BUTTON: 0x10 17 62 07 38 85 CD 7F ->04<- 03 0F FF FF FF FF 24 00 00 00
- LEFT BUTTON: 0x10 17 62 07 38 85 CD 7F ->01<- 03 0F FF FF FF FF 24 00 00 00
- RESET: 0x10 17 62 16 05 85 dd 7f ->00<- 03 0f ff ff ff ff 24 00 00 00 // after watch reset
- AUTO-TIME: 0x10 17 62 16 05 85 dd 7f ->03<- 03 0f ff ff ff ff 24 00 00 00 // no button pressed
- """
-
- result_value = keyed_data.get("value")
- result_key = keyed_data.get("key")
-
- ret = WatchButton.INVALID
-
- if (
- result_key == "10"
- and result_value != ""
- and len(to_int_array(result_value)) >= 19
- ):
- ble_int_arr = to_int_array(result_value)
- button_indicator = ble_int_arr[8]
- ret = (
- WatchButton.LOWER_LEFT
- if (button_indicator == 0 or button_indicator == 1)
- else WatchButton.LOWER_RIGHT
- if button_indicator == 4
- else WatchButton.NO_BUTTON
- if button_indicator == 3
- else WatchButton.INVALID
- )
-
- res = result_queue.dequeue("10")
- res.set_result(ret)
-
- self.subscribe("BUTTON_PRESSED", button_pressed_callback)
-
+ result = await message_dispatcher.ButtonPressedIO.request(self.connection)
return await result
async def get_world_cities(self, cityNumber: int):
@@ -154,30 +97,17 @@ async def get_world_cities(self, cityNumber: int):
-------
name : String, The name of the requested World City as a String.
"""
- key = "1f0{}".format(cityNumber)
- city = await self._get_world_cities(key)
+ city = await self._get_world_cities(cityNumber)
return city
- async def _get_world_cities(self, key: str):
- await self.connection.request(key)
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def casio_world_cities_callback(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
-
- res = result_queue.dequeue(key)
- res.set_result(value)
-
- def process_home_time(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
+ async def get_world_cities_TZ(self, cityNumber: int):
+ city = await self.get_world_cities(cityNumber)
+ city = casio_city_name(city[0:2], sys_tz.get_name())
+ self.logger.info(f"Current city name : {city}")
+ return city
- self.subscribe("CASIO_WORLD_CITIES", casio_world_cities_callback)
- # self.subscribe("HOME_TIME", process_home_time)
+ async def _get_world_cities(self, key: str):
+ result = await message_dispatcher.WorldCitiesIO.request(self.connection, key)
return await result
async def get_dst_for_world_cities(self, cityNumber: int) -> str:
@@ -192,25 +122,24 @@ async def get_dst_for_world_cities(self, cityNumber: int) -> str:
-------
name : String, Daylight Saving Time state of the requested World City as a String.
"""
- key = "1e0{}".format(cityNumber)
- return await self._get_dst_for_world_cities(key)
-
- async def _get_dst_for_world_cities(self, key: str) -> str:
- await self.connection.request(key)
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
+ return await self._get_dst_for_world_cities(cityNumber)
- def casio_dts_world_cities_callback(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
+ async def get_dst_for_world_cities_TZ(self, cityNumber: int) -> str:
+ val = await self._get_dst_for_world_cities(cityNumber)
- res = result_queue.dequeue(key)
- res.set_result(value)
+ tz = sys_tz.casiotz
+ val[2] = tz["A"]
+ val[3] = tz["B"]
+ val[4] = tz["OFF"]
+ val[5] = tz["DOFF"]
+ val[6] = tz["DST"]
- self.subscribe("CASIO_DST_SETTING", casio_dts_world_cities_callback)
+ return val
+ async def _get_dst_for_world_cities(self, key: str) -> str:
+ result = await message_dispatcher.DstForWorldCitiesIO.request(
+ self.connection, key
+ )
return await result
async def get_dst_watch_state(self, state: DtsState) -> str:
@@ -224,77 +153,71 @@ async def get_dst_watch_state(self, state: DtsState) -> str:
-------
dst: String, the Daylight Saving Time state of the watch as a String.
"""
- key = f"1d0{state.value}"
- return await self._get_dst_watch_state(key)
-
- async def _get_dst_watch_state(self, key: str) -> str:
- await self.connection.request(key)
+ return await self._get_dst_watch_state(state)
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
+ async def get_dst_watch_state_TZ(self, state: DtsState) -> str:
+ val = await self.get_dst_watch_state(state)
- def handle_message(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
+ tz = sys_tz.casiotz
+ val[3] = tz["DST"]
+ val[5] = tz["A"]
+ val[6] = tz["B"]
- res = result_queue.dequeue(key)
- res.set_result(value)
-
- self.subscribe("CASIO_DST_WATCH_STATE", handle_message)
+ return val
+ async def _get_dst_watch_state(self, state: DtsState) -> str:
+ result = await message_dispatcher.DstWatchStateIO.request(
+ self.connection, state
+ )
return await result
- # async def reset_hand_to_12 (self):
- # await self.connection.write(0xE, "1a0412000000")
- # await self.connection.write(0xE, "1a0418000000")
-
-
-
async def initialize_for_setting_time(self):
- # Before we can set time, we must read and write back these values.
- # Why? Not sure, ask Casio
-
- async def read_and_rite(function, param):
- ret = await function(param)
- short_str = to_compact_string(ret)
- await self.connection.write(0xE, short_str)
-
- await read_and_rite(self.get_dst_watch_state, DtsState.ZERO)
- await read_and_rite(self.get_dst_watch_state, DtsState.TWO)
- await read_and_rite(self.get_dst_watch_state, DtsState.FOUR)
-
- await read_and_rite(self.get_dst_for_world_cities, 0)
- await read_and_rite(self.get_dst_for_world_cities, 1)
- await read_and_rite(self.get_dst_for_world_cities, 2)
- await read_and_rite(self.get_dst_for_world_cities, 3)
- await read_and_rite(self.get_dst_for_world_cities, 4)
- await read_and_rite(self.get_dst_for_world_cities, 5)
-
- await read_and_rite(self.get_world_cities, 0)
- await read_and_rite(self.get_world_cities, 1)
- await read_and_rite(self.get_world_cities, 2)
- await read_and_rite(self.get_world_cities, 3)
- await read_and_rite(self.get_world_cities, 4)
- await read_and_rite(self.get_world_cities, 5)
-
- async def initialize_for_setting_time_b2100(self):
- # Before we can set time, we must read and write back these values.
- # Why? Not sure, ask Casio
-
- async def read_and_rite(function, param):
- ret = await function(param)
- short_str = to_compact_string(ret)
- await self.connection.write(0xE, short_str)
-
- await read_and_rite(self.get_dst_watch_state, DtsState.ZERO)
- await read_and_rite(self.get_dst_for_world_cities, 0)
- await read_and_rite(self.get_dst_for_world_cities, 1)
-
- await read_and_rite(self.get_world_cities, 0)
- await read_and_rite(self.get_world_cities, 1)
-
- async def set_time(self):
+ await self.read_write_dst_watch_states()
+ await self.read_write_dst_for_world_cities()
+ await self.read_write_world_cities()
+
+ async def read_and_write(self, function, param):
+ ret = await function(param)
+ short_str = to_compact_string(to_hex_string(ret))
+ await self.connection.write(0xE, short_str)
+
+ async def read_write_dst_watch_states(self):
+ array_of_dst_watch_state = [
+ {"function": self.get_dst_watch_state_TZ, "state": DtsState.ZERO},
+ {"function": self.get_dst_watch_state, "state": DtsState.TWO},
+ {"function": self.get_dst_watch_state, "state": DtsState.FOUR},
+ ]
+
+ for item in array_of_dst_watch_state[: watch_info.dstCount]:
+ await self.read_and_write(item["function"], item["state"])
+
+ async def read_write_dst_for_world_cities(self):
+ array_of_get_dst_for_world_cities = [
+ {"function": self.get_dst_for_world_cities_TZ, "city_number": 0},
+ {"function": self.get_dst_for_world_cities, "city_number": 1},
+ {"function": self.get_dst_for_world_cities, "city_number": 2},
+ {"function": self.get_dst_for_world_cities, "city_number": 3},
+ {"function": self.get_dst_for_world_cities, "city_number": 4},
+ {"function": self.get_dst_for_world_cities, "city_number": 5},
+ ]
+
+ for item in array_of_get_dst_for_world_cities[: watch_info.worldCitiesCount]:
+ await self.read_and_write(item["function"], item["city_number"])
+
+ async def read_write_world_cities(self):
+ array_of_world_cities = [
+ {"function": self.get_world_cities_TZ, "city_number": 0},
+ {"function": self.get_world_cities, "city_number": 1},
+ {"function": self.get_world_cities, "city_number": 2},
+ {"function": self.get_world_cities, "city_number": 3},
+ {"function": self.get_world_cities, "city_number": 4},
+ {"function": self.get_world_cities, "city_number": 5},
+ ]
+
+ for item in array_of_world_cities[: watch_info.worldCitiesCount]:
+ await self.read_and_write(item["function"], item["city_number"])
+
+ async def set_time(self, current_time=None):
"""Sets the current time on the watch from the time on the phone. In addition, it can optionally set the Home Time
to the current time zone. If timezone changes during travel, the watch will automatically be set to the
correct time and timezone after running this function.
@@ -307,17 +230,21 @@ async def set_time(self):
-------
None
"""
+ sys_tz.refresh ()
+ if current_time == None:
+ current_time = time.time()
+
+ time_diff = current_time - time.time()
- if watch_info.model == watch_info.model.B2100:
- await self.initialize_for_setting_time_b2100()
- else:
- await self.initialize_for_setting_time()
+ self.logger.info(f"=======> passed: ${current_time}, ${time.time()}")
- message = {
- "action": "SET_TIME",
- "value": "{}".format(round(time.time() * 1000)),
- }
- await self.connection.sendMessage(json.dumps(message))
+ await self.initialize_for_setting_time()
+ self.logger.info(f"Time is now {time.time()}, diff : {time.time()-current_time}")
+ await self._set_time(time.time() + time_diff + 1) # 1s for xmit
+ current_time = None
+
+ async def _set_time(self, current_time):
+ await message_dispatcher.TimeIO.request(self.connection, current_time)
async def get_alarms(self):
"""Gets the current alarms from the watch. Up to 5 alarms are supported on the watch.
@@ -332,49 +259,10 @@ async def get_alarms(self):
"""
alarms_inst.clear()
await self._get_alarms()
- await self._get_alarms2()
return alarms_inst.alarms
async def _get_alarms(self):
- await self.connection.sendMessage("""{ "action": "GET_ALARMS"}""")
- key = "GET_ALARMS"
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def alarms_received(keyed_data):
- data = keyed_data.get("value")
- key = "GET_ALARMS"
-
- alarms_inst.add_alarms(data)
-
- res = result_queue.dequeue(key)
- res.set_result(data)
-
- self.subscribe("ALARMS", alarms_received)
- return await result
-
- async def _get_alarms2(self):
- await self.connection.sendMessage("""{ "action": "GET_ALARMS2"}""")
- key = "GET_ALARMS2"
-
- # Alarm.alarms.clear()
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def alarms_received2(keyed_data):
- data = keyed_data.get("value")
- key = "GET_ALARMS2"
-
- alarms_inst.add_alarms(data)
-
- res = result_queue.dequeue(key)
- res.set_result(data)
-
- self.subscribe("ALARMS2", alarms_received2)
+ result = await message_dispatcher.AlarmsIO.request(self.connection)
return await result
async def set_alarms(self, alarms):
@@ -389,13 +277,12 @@ async def set_alarms(self, alarms):
None
"""
if not alarms:
- self.logger.info("Alarm model not initialised! Cannot set alarm")
+ self.logger.debug("Alarm model not initialised! Cannot set alarm")
return
alarms_str = json.dumps(alarms)
set_action_cmd = '{{"action":"SET_ALARMS", "value":{} }}'.format(alarms_str)
await self.connection.sendMessage(set_action_cmd)
- self.logger.info("Returning from setAlarms")
async def get_timer(self):
"""Get Timer value in seconds.
@@ -408,34 +295,10 @@ async def get_timer(self):
-------
timer_value: Integer, the timer number in seconds as an Int. E.g.: 180 means the timer is set for 3 minutes.
"""
- return await self._get_timer("18")
-
- async def _get_timer(self, key):
- await self.connection.request(key)
+ return await self._get_timer()
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def decode_value(data: str) -> str:
- timer_int_array = to_int_array(data)
-
- hours = timer_int_array[1]
- minutes = timer_int_array[2]
- seconds = timer_int_array[3]
-
- in_seconds = hours * 3600 + minutes * 60 + seconds
- return in_seconds
-
- def get_timer(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
-
- seconds = decode_value(value)
- res = result_queue.dequeue(key)
- res.set_result(seconds)
-
- self.subscribe("CASIO_TIMER", get_timer)
+ async def _get_timer(self):
+ result = await message_dispatcher.TimerIO.request(self.connection)
return await result
async def set_timer(self, timerValue):
@@ -453,6 +316,10 @@ async def set_timer(self, timerValue):
"""{"action": "SET_TIMER", "value": """ + str(timerValue) + """ }"""
)
+ async def get_watch_condition(self):
+ result = await message_dispatcher.WatchConditionIO.request(self.connection)
+ return await result
+
async def get_time_adjustment(self):
"""Determine if auto-tame adjustment is set or not
@@ -464,42 +331,25 @@ async def get_time_adjustment(self):
-------
is_time_adjustement_set: Boolean, True if time-adjustement is set.
"""
- key = "11"
- await self.connection.request(key)
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def get_time_adjustment(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
- if key != "11":
- return
-
- res = result_queue.dequeue(key)
- time_adjustment = value.get("timeAdjustment", False)
- res.set_result(time_adjustment)
-
- self.subscribe("TIME_ADJUSTMENT", get_time_adjustment)
+ result = await message_dispatcher.TimeAdjustmentIO.request(self.connection)
return await result
- async def set_time_adjustment(self, settings):
+ async def set_time_adjustment(
+ self, time_adjustement: bool, minutes_after_hour: int
+ ):
"""Sets auto-tame adjustment for the watch
Parameters
----------
- settings: Settings
+ time_adjustement: bool, True if time-adjustement is set
+ minutes_after_hour: int, minutes after hour
Returns
-------
None
"""
- await self.connection.sendMessage(
- """{"action": "SET_TIME_ADJUSTMENT", "value": \""""
- + str(settings.timeAdjustment)
- + """\" }"""
- )
+ message = f"""{{"action": "SET_TIME_ADJUSTMENT", "timeAdjustment": "{time_adjustement}", "minutesAfterHour": "{minutes_after_hour}" }}"""
+ await self.connection.sendMessage(message)
async def get_basic_settings(self):
"""Get settings from the watch. Example:
@@ -516,22 +366,8 @@ async def get_basic_settings(self):
-------
settings: a list of `Settings`
"""
- key = "13"
- await self.connection.request(key)
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def _get_settings(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
- settings = json.loads(value)
- res = result_queue.dequeue(key)
- res.set_result(settings)
-
- self.subscribe("SETTINGS", _get_settings)
+ result = await message_dispatcher.SettingsIO.request(self.connection)
return await result
async def set_settings(self, settings):
@@ -579,7 +415,7 @@ async def get_reminders(self):
return reminders
- async def get_event_from_watch(self, eventNumber: int):
+ async def get_event_from_watch(self, event_number: int):
"""Gets a single event (reminder) from the watch.
Parameters
@@ -590,32 +426,9 @@ async def get_event_from_watch(self, eventNumber: int):
-------
event: `Event`
"""
- await self.connection.request("30{}".format(eventNumber)) # reminder title
- await self.connection.request("31{}".format(eventNumber)) # reminder time
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult("310{}".format(eventNumber), result))
-
- def get_reminders(keyed_data):
- value = keyed_data.get("value")
- key = keyed_data.get("key")
-
- for obj_key in value:
- if obj_key == "title":
- self.title = value[obj_key]
-
- elif obj_key == "time":
- json_obj = json.loads("{}")
- json_obj["title"] = self.title
- json_obj["time"] = value.get("time")
- event_obj = Event()
- event = event_obj.create_event(json_obj)
-
- res = result_queue.dequeue(key)
- res.set_result(event)
-
- self.subscribe("REMINDERS", get_reminders)
+ result = await message_dispatcher.EventsIO.request(
+ self.connection, event_number
+ )
return await result
async def set_reminders(self, events: list):
@@ -635,23 +448,38 @@ async def set_reminders(self, events: list):
def to_json(events: list):
events_json = json.loads("[]")
for event in events:
- event_json = json.loads(json.dumps(event.__dict__))
+ event_json = event # json.loads(json.dumps(event.__dict__))
events_json.append(event_json)
return events_json
- def get_selected_events(events: list):
- selected_events = [event for event in events if event.selected]
- return to_json(selected_events)
-
- selected = get_selected_events(events)
-
await self.connection.sendMessage(
"""{{\"action\": \"SET_REMINDERS\", \"value\": {}}}""".format(
- json.dumps(selected)
+ json.dumps(events)
)
)
+ async def set_enabled_reminders(self, events: list):
+ """Sets events (reminders) to the watch. Up to 5 events are supported.
+
+ Only sends enabled events
+
+ Parameters
+ ----------
+ events: list of `Event`
+
+ Returns
+ -------
+ None
+ """
+ def get_enabled_events(events: list):
+ enabled_events = [event for event in events if event["time"]["enabled"]]
+ return enabled_events # to_json(enabled_events)
+
+ enabled = get_enabled_events(events)
+
+ await set_reminders(enabled)
+
async def get_app_info(self):
"""Gets and internally sets app info to the watch.
This is needed to re-enable lower-right button after the watch has been reset or BLE has been cleared.
@@ -665,37 +493,10 @@ async def get_app_info(self):
-------
app_info: String
"""
- key = "22"
- await self.connection.request(key)
-
- def set_app_info(data: str):
- # App info:
- # This is needed to re-enable button D (Lower-right) after the watch has been reset or BLE has been cleared.
- # It is a hard-coded value, which is what the official app does as well.
-
- # If watch was reset, the app info will come as:
- # 0x22 FF FF FF FF FF FF FF FF FF FF 00
- # In this case, set it to the hardcoded value bellow, so 'D' button will
- # work again.
- app_info_compact_str = to_compact_string(data)
- if app_info_compact_str == "22FFFFFFFFFFFFFFFFFFFF00":
- self.connection.write(0xE, "223488F4E5D5AFC829E06D02")
-
- loop = asyncio.get_running_loop()
- result = loop.create_future()
- result_queue.enqueue(KeyedResult(key, result))
-
- def subscribe_casio_app_information(keyed_data):
- data = keyed_data.get("value")
- key = keyed_data.get("key")
-
- set_app_info(data)
- res = result_queue.dequeue(key)
- res.set_result("")
-
- self.subscribe("CASIO_APP_INFORMATION", subscribe_casio_app_information)
+
+ result = await message_dispatcher.AppInfoIO.request(self.connection)
return await result
- def subscribe(self, subject_name, on_next) -> None:
- data_watcher.add_subject(subject_name)
- data_watcher.subscribe("GshockAPI", subject_name, on_next)
+ # def subscribe(self, subject_name, on_next) -> None:
+ # data_watcher.add_subject(subject_name)
+ # data_watcher.subscribe("GshockAPI", subject_name, on_next)
diff --git a/src/gshocktimeserver/gshock_server.py b/src/gshocktimeserver/gshock_server.py
index ae6cb5a..c4cb941 100644
--- a/src/gshocktimeserver/gshock_server.py
+++ b/src/gshocktimeserver/gshock_server.py
@@ -1,46 +1,55 @@
import asyncio
import sys
-from bleak import BleakClient, BleakScanner
-from bleak.backends.characteristic import BleakGATTCharacteristic
-from bleak.backends.device import BLEDevice
-from bleak.backends.scanner import AdvertisementData
-from scanner import Scanner
+from datetime import datetime
+
from connection import Connection
from gshock_api import GshockAPI
-from casio_watch import settings
-from event import Event, create_event_date, RepeatPeriod, day_of_week
-from casio_watch import WatchButton
+from iolib.button_pressed_io import WatchButton
from scanner import scanner
-from utils import (
- to_ascii_string,
- clean_str,
-)
from configurator import conf
-from api_tests import run_api_tests
-from mailsener import send_mail_notification
from logger import logger
from args import args
+from api_tests import run_api_tests
+from watch_info import watch_info
__author__ = "Ivo Zivkov"
__copyright__ = "Ivo Zivkov"
__license__ = "MIT"
+
async def main(argv):
- await run_time_server()
- # await run_api_tests(args)
+ #await run_time_server()
+ await run_api_tests()
+
+
+def prompt():
+ logger.info(
+ "=============================================================================================="
+ )
+ logger.info("Short-press lower-right button on your watch to set time...")
+ logger.info("")
+ logger.info(
+ "If Auto-time set on watch, the watch will connect and run automatically up to 4 times per day."
+ )
+ logger.info(
+ "=============================================================================================="
+ )
+ logger.info("")
async def run_time_server():
+ prompt()
+
while True:
try:
- if args.get().multi_watch == True:
+ if args.get().multi_watch:
address = None
else:
address = conf.get("device.address")
device = await scanner.scan(address)
- logger.info("Found: {}".format(device))
+ logger.debug("Found: {}".format(device))
connection = Connection(device)
await connection.connect()
@@ -56,16 +65,18 @@ async def run_time_server():
await api.get_app_info()
await api.set_time()
+ logger.info(f"Time set at {datetime.now()} on {watch_info.name}")
# You can add mail notification here if you run your mail server.
# send_mail_notification(args.mailto)
- await connection.disconnect()
+ if watch_info.alwaysConnected == False:
+ await connection.disconnect()
except Exception as e:
- logger.error (f"Got error: {e}")
+ logger.error(f"Got error: {e}")
continue
+
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))
-
diff --git a/src/gshocktimeserver/iolib/__init__.py b/src/gshocktimeserver/iolib/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/gshocktimeserver/iolib/alarms_io.py b/src/gshocktimeserver/iolib/alarms_io.py
new file mode 100644
index 0000000..d1d18b4
--- /dev/null
+++ b/src/gshocktimeserver/iolib/alarms_io.py
@@ -0,0 +1,64 @@
+import asyncio
+import json
+from typing import Any
+from alarms import alarms_inst, alarm_decoder
+
+from cancelable_result import CancelableResult
+from utils import to_compact_string, to_hex_string
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class AlarmsIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection):
+ await connection.request("GET_ALARMS")
+ AlarmsIO.connection = connection
+
+ alarms_inst.clear()
+ await AlarmsIO._get_alarms(connection)
+ return AlarmsIO.result
+
+ @staticmethod
+ async def _get_alarms(connection):
+ await connection.sendMessage("""{ "action": "GET_ALARMS"}""")
+
+ loop = asyncio.get_running_loop()
+ AlarmsIO.result = loop.create_future()
+ return AlarmsIO.result
+
+ @staticmethod
+ async def send_to_watch(message=""):
+ alarm_command = to_compact_string(
+ to_hex_string(bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_ALM"]]))
+ )
+ await AlarmsIO.connection.write(0x000C, alarm_command)
+
+ alarm_command_2 = to_compact_string(
+ to_hex_string(bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_ALM2"]]))
+ )
+ await AlarmsIO.connection.write(0x000C, alarm_command_2)
+
+ @staticmethod
+ async def send_to_watch_set(message):
+ alarms_json_arr = json.loads(message).get("value")
+ alarm_casio0 = to_compact_string(
+ to_hex_string(alarms_inst.from_json_alarm_first_alarm(alarms_json_arr[0]))
+ )
+ await AlarmsIO.connection.write(0x000E, alarm_casio0)
+ alarm_casio = to_compact_string(
+ to_hex_string(alarms_inst.from_json_alarm_secondary_alarms(alarms_json_arr))
+ )
+ await AlarmsIO.connection.write(0x000E, alarm_casio)
+
+ @staticmethod
+ def on_received(data):
+ decoded = alarm_decoder.to_json(to_hex_string(data))["ALARMS"]
+ alarms_inst.add_alarms(decoded)
+
+ if len(alarms_inst.alarms) == 5:
+ AlarmsIO.result.set_result(alarms_inst.alarms)
diff --git a/src/gshocktimeserver/iolib/app_info_io.py b/src/gshocktimeserver/iolib/app_info_io.py
new file mode 100644
index 0000000..bbec512
--- /dev/null
+++ b/src/gshocktimeserver/iolib/app_info_io.py
@@ -0,0 +1,48 @@
+import asyncio
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+
+from utils import to_compact_string, to_hex_string
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class AppInfoIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection):
+ logger.info(f"AppInfoIO request")
+ AppInfoIO.connection = connection
+ await connection.request("22")
+
+ loop = asyncio.get_running_loop()
+ AppInfoIO.result = loop.create_future()
+ return AppInfoIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_APP_INFORMATION"]]))
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"AppInfoIO onReceived")
+
+ def set_app_info(data: str):
+ # App info:
+ # This is needed to re-enable button D (Lower-right) after the watch has been reset or BLE has been cleared.
+ # It is a hard-coded value, which is what the official app does as well.
+
+ # If watch was reset, the app info will come as:
+ # 0x22 FF FF FF FF FF FF FF FF FF FF 00
+ # In this case, set it to the hardcoded value bellow, so 'D' button will
+ # work again.
+ app_info_compact_str = to_compact_string(data)
+ if app_info_compact_str == "22FFFFFFFFFFFFFFFFFFFF00":
+ AppInfoIO.connection.write(0xE, "223488F4E5D5AFC829E06D02")
+
+ set_app_info(to_hex_string(data))
+ AppInfoIO.result.set_result("OK")
diff --git a/src/gshocktimeserver/iolib/button_pressed_io.py b/src/gshocktimeserver/iolib/button_pressed_io.py
new file mode 100644
index 0000000..0c03b04
--- /dev/null
+++ b/src/gshocktimeserver/iolib/button_pressed_io.py
@@ -0,0 +1,78 @@
+import asyncio
+from enum import IntEnum
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+from utils import to_compact_string, to_hex_string, to_int_array
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class WatchButton(IntEnum):
+ UPPER_LEFT = 1
+ LOWER_LEFT = 2
+ UPPER_RIGHT = 3
+ LOWER_RIGHT = 4
+ NO_BUTTON = 5
+ INVALID = 6
+ FIND = 7
+
+class ButtonPressedIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection):
+ ButtonPressedIO.connection = connection
+ await connection.request("10")
+
+ loop = asyncio.get_running_loop()
+ ButtonPressedIO.result = loop.create_future()
+ return ButtonPressedIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_BLE_FEATURES"]]))
+
+ @staticmethod
+ async def send_to_watch_set(data):
+ logger.info(f"TimerIO sendToWatchSet: {data}")
+
+ await ButtonPressedIO.connection.write(0x000E, data)
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"ButtonPressedIO onReceived")
+
+ def button_pressed_callback(data):
+ """
+ RIGHT BUTTON: 0x10 17 62 07 38 85 CD 7F ->04<- 03 0F FF FF FF FF 24 00 00 00
+ LEFT BUTTON: 0x10 17 62 07 38 85 CD 7F ->01<- 03 0F FF FF FF FF 24 00 00 00
+ RESET: 0x10 17 62 16 05 85 dd 7f ->00<- 03 0f ff ff ff ff 24 00 00 00 // after watch reset
+ AUTO-TIME: 0x10 17 62 16 05 85 dd 7f ->03<- 03 0f ff ff ff ff 24 00 00 00 // no button pressed
+ """
+
+ ret = WatchButton.INVALID
+
+ if len(data) >= 19:
+ ble_int_arr = to_int_array(to_hex_string(data))
+ button_indicator = ble_int_arr[8]
+ logger.info(f"Buttom code pressed: ${button_indicator}")
+ ret = (
+ WatchButton.LOWER_LEFT
+ if (button_indicator == 0 or button_indicator == 1)
+ else WatchButton.LOWER_RIGHT
+ if button_indicator == 4
+ else WatchButton.NO_BUTTON
+ if button_indicator == 3
+ else WatchButton.FIND
+ if button_indicator == 2
+ # assime that all other buttons from watches such as the ECB-30 are for time set
+ else WatchButton.LOWER_RIGHT
+ )
+
+ return ret
+
+ button = button_pressed_callback(data)
+ ButtonPressedIO.result.set_result(button)
diff --git a/src/gshocktimeserver/iolib/dst_for_world_cities_io.py b/src/gshocktimeserver/iolib/dst_for_world_cities_io.py
new file mode 100644
index 0000000..4b784e8
--- /dev/null
+++ b/src/gshocktimeserver/iolib/dst_for_world_cities_io.py
@@ -0,0 +1,34 @@
+import asyncio
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+
+from utils import to_compact_string, to_hex_string
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class DstForWorldCitiesIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection, city_number: int):
+ logger.info(f"DstForWorldCitiesIO request")
+ DstForWorldCitiesIO.connection = connection
+ key = "1e0{}".format(city_number)
+ await connection.request(key)
+
+ loop = asyncio.get_running_loop()
+ DstForWorldCitiesIO.result = loop.create_future()
+ return DstForWorldCitiesIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_DST_SETTING"]]))
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"DstForWorldCitiesIO onReceived")
+ DstForWorldCitiesIO.result.set_result(data)
diff --git a/src/gshocktimeserver/iolib/dst_watch_state_io.py b/src/gshocktimeserver/iolib/dst_watch_state_io.py
new file mode 100644
index 0000000..2e6ad1a
--- /dev/null
+++ b/src/gshocktimeserver/iolib/dst_watch_state_io.py
@@ -0,0 +1,40 @@
+import asyncio
+from enum import IntEnum
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+from utils import to_compact_string, to_hex_string
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class DtsState(IntEnum):
+ ZERO = 0
+ TWO = 2
+ FOUR = 4
+
+
+class DstWatchStateIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection, state: DtsState):
+ logger.info(f"DstWatchStateIO request")
+ DstWatchStateIO.connection = connection
+ key = f"1d0{state.value}"
+ await connection.request(key)
+
+ loop = asyncio.get_running_loop()
+ DstWatchStateIO.result = loop.create_future()
+ return DstWatchStateIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_DST_WATCH_STATE"]]))
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"DstWatchStateIO onReceived")
+ DstWatchStateIO.result.set_result(data)
diff --git a/src/gshocktimeserver/iolib/error_io.py b/src/gshocktimeserver/iolib/error_io.py
new file mode 100644
index 0000000..acbc156
--- /dev/null
+++ b/src/gshocktimeserver/iolib/error_io.py
@@ -0,0 +1,7 @@
+from logger import logger
+
+
+class ErrorIO:
+ @staticmethod
+ async def on_received(message):
+ logger.info(f"ErrorIO onReceived: {message}")
diff --git a/src/gshocktimeserver/iolib/events_io.py b/src/gshocktimeserver/iolib/events_io.py
new file mode 100644
index 0000000..0eae232
--- /dev/null
+++ b/src/gshocktimeserver/iolib/events_io.py
@@ -0,0 +1,369 @@
+import asyncio
+import json
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+
+from utils import (
+ clean_str,
+ dec_to_hex,
+ to_ascii_string,
+ to_byte_array,
+ to_compact_string,
+ to_hex_string,
+ to_int_array,
+)
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class ReminderMasks:
+ YEARLY_MASK = 0b00001000
+ MONTHLY_MASK = 0b00010000
+ WEEKLY_MASK = 0b00000100
+
+ SUNDAY_MASK = 0b00000001
+ MONDAY_MASK = 0b00000010
+ TUESDAY_MASK = 0b00000100
+ WEDNESDAY_MASK = 0b00001000
+ THURSDAY_MASK = 0b00010000
+ FRIDAY_MASK = 0b00100000
+ SATURDAY_MASK = 0b01000000
+
+ ENABLED_MASK = 0b00000001
+
+
+class EventsIO:
+ result: CancelableResult = None
+ connection = None
+ title = None
+
+ @staticmethod
+ async def set_connection(connection):
+ EventsIO.connection = connection
+
+ @staticmethod
+ async def request(connection, event_number):
+ logger.info(f"EventsIO request")
+ EventsIO.connection = connection
+
+ await connection.request("30{}".format(event_number)) # reminder title
+ await connection.request("31{}".format(event_number)) # reminder time
+
+ loop = asyncio.get_running_loop()
+ EventsIO.result = loop.create_future()
+ return EventsIO.result
+
+ @staticmethod
+ async def send_to_watch_set(message):
+ logger.info(f"EventsIO sendToWatchSet: {message}")
+
+ def reminder_title_from_json(reminder_json):
+ title_str = reminder_json.get("title")
+ return to_byte_array(title_str, 18)
+
+ def reminder_time_from_json(reminder_json):
+ def create_time_detail(repeat_period, start_date, end_date, days_of_week):
+ def encode_date(time_detail, start_date, end_date):
+ class Month:
+ JANUARY = 1
+ FEBRUARY = 2
+ MARCH = 3
+ APRIL = 4
+ MAY = 5
+ JUNE = 6
+ JULY = 7
+ AUGUST = 8
+ SEPTEMBER = 9
+ OCTOBER = 10
+ NOVEMBER = 11
+ DECEMBER = 12
+
+ def __init__(self):
+ pass
+
+ def string_to_month(month_str):
+ months = {
+ "january": Month.JANUARY,
+ "february": Month.FEBRUARY,
+ "march": Month.MARCH,
+ "april": Month.APRIL,
+ "may": Month.MAY,
+ "june": Month.JUNE,
+ "july": Month.JULY,
+ "august": Month.AUGUST,
+ "september": Month.SEPTEMBER,
+ "october": Month.OCTOBER,
+ "november": Month.NOVEMBER,
+ "december": Month.DECEMBER,
+ }
+ return months.get(month_str.lower(), Month.JANUARY)
+
+ def hex_to_dec(hex):
+ return int(str(hex), 16)
+
+ # take the last 2 digits only
+ time_detail[0] = hex_to_dec(start_date["year"] % 2000)
+ time_detail[1] = hex_to_dec(string_to_month(start_date["month"]))
+ time_detail[2] = hex_to_dec(start_date["day"])
+ time_detail[3] = hex_to_dec(
+ end_date["year"] % 2000
+ ) # get the last 2 gits only
+ time_detail[4] = hex_to_dec(string_to_month(end_date["month"]))
+ time_detail[5] = hex_to_dec(end_date["day"])
+ time_detail[6], time_detail[7] = 0, 0
+
+ time_detail = [0] * 8
+
+ if repeat_period == "NEVER":
+ encode_date(time_detail, start_date, end_date)
+
+ elif repeat_period == "WEEKLY":
+ encode_date(time_detail, start_date, end_date)
+
+ day_of_week = 0
+ if days_of_week is not None:
+ for i in range(len(days_of_week)):
+ if days_of_week[i] == "SUNDAY":
+ day_of_week = day_of_week | ReminderMasks.SUNDAY_MASK
+ elif days_of_week[i] == "MONDAY":
+ day_of_week = day_of_week | ReminderMasks.MONDAY_MASK
+ elif days_of_week[i] == "TUESDAY":
+ day_of_week = day_of_week | ReminderMasks.TUESDAY_MASK
+ elif days_of_week[i] == "WEDNESDAY":
+ day_of_week = day_of_week | ReminderMasks.WEDNESDAY_MASK
+ elif days_of_week[i] == "THURSDAY":
+ day_of_week = day_of_week | ReminderMasks.THURSDAY_MASK
+ elif days_of_week[i] == "FRIDAY":
+ day_of_week = day_of_week | ReminderMasks.FRIDAY_MASK
+ elif days_of_week[i] == "SATURDAY":
+ day_of_week = day_of_week | ReminderMasks.SATURDAY_MASK
+
+ time_detail[6] = day_of_week
+ time_detail[7] = 0
+
+ elif repeat_period == "MONTHLY":
+ encode_date(time_detail, start_date, end_date)
+
+ elif repeat_period == "YEARLY":
+ encode_date(time_detail, start_date, end_date)
+ else:
+ logger.debug(
+ "Cannot handle Repeat Period: {}".format(repeat_period)
+ )
+
+ return time_detail
+
+ def create_time_period(enabled: bool, repeat_period: str) -> int:
+ time_period = 0
+
+ if enabled:
+ time_period = time_period | ReminderMasks.ENABLED_MASK
+ if repeat_period == "WEEKLY":
+ time_period = time_period | ReminderMasks.WEEKLY_MASK
+ elif repeat_period == "MONTHLY":
+ time_period = time_period | ReminderMasks.MONTHLY_MASK
+ elif repeat_period == "YEARLY":
+ time_period = time_period | ReminderMasks.YEARLY_MASK
+ return time_period
+
+ enabled = reminder_json.get("enabled")
+ repeat_period = reminder_json.get("repeat_period")
+ start_date = reminder_json.get("start_date")
+ end_date = reminder_json.get("end_date")
+ days_of_week = reminder_json.get("days_of_week")
+
+ reminder_cmd = bytearray()
+
+ reminder_cmd += bytearray([create_time_period(enabled, repeat_period)])
+ reminder_cmd += bytearray(
+ create_time_detail(repeat_period, start_date, end_date, days_of_week)
+ )
+
+ return reminder_cmd
+
+ reminders_json_arr = json.loads(message).get("value")
+ for index, element in enumerate(reminders_json_arr):
+ reminder_json = element
+ title = reminder_title_from_json(reminder_json)
+
+ title_byte_arr = bytearray([CHARACTERISTICS["CASIO_REMINDER_TITLE"]])
+ title_byte_arr += bytearray([index + 1])
+ title_byte_arr += title
+ title_byte_arr_to_send = to_compact_string(to_hex_string(title_byte_arr))
+ await EventsIO.connection.write(0x000E, title_byte_arr_to_send)
+
+ reminder_time_byte_arr = bytearray([])
+ reminder_time_byte_arr += bytearray(
+ [CHARACTERISTICS["CASIO_REMINDER_TIME"]]
+ )
+ reminder_time_byte_arr += bytearray([index + 1])
+ reminder_time_byte_arr += reminder_time_from_json(reminder_json.get("time"))
+ reminder_time_byte_arr_to_send = to_compact_string(
+ to_hex_string(bytearray(reminder_time_byte_arr))
+ )
+ await EventsIO.connection.write(0x000E, reminder_time_byte_arr_to_send)
+
+ @staticmethod
+ def on_received(message):
+ data = to_hex_string(message)
+
+ def reminder_time_to_json(reminder_str):
+ def convert_array_list_to_json_array(array_list):
+ json_array = []
+ for item in array_list:
+ json_array.append(item)
+
+ return json_array
+
+ def decode_time_period(time_period: int) -> tuple:
+ enabled = False
+ repeat_period = ""
+
+ if (
+ time_period & ReminderMasks.ENABLED_MASK
+ == ReminderMasks.ENABLED_MASK
+ ):
+ enabled = True
+
+ if time_period & ReminderMasks.WEEKLY_MASK == ReminderMasks.WEEKLY_MASK:
+ repeat_period = "WEEKLY"
+ elif (
+ time_period & ReminderMasks.MONTHLY_MASK
+ == ReminderMasks.MONTHLY_MASK
+ ):
+ repeat_period = "MONTHLY"
+ elif (
+ time_period & ReminderMasks.YEARLY_MASK == ReminderMasks.YEARLY_MASK
+ ):
+ repeat_period = "YEARLY"
+ else:
+ repeat_period = "NEVER"
+
+ return (enabled, repeat_period)
+
+ def decode_time_detail(time_detail):
+ def decode_date(time_detail):
+ def int_to_month_str(month_int):
+ months = [
+ "JANUARY",
+ "FEBRUARY",
+ "MARCH",
+ "APRIL",
+ "MAY",
+ "JUNE",
+ "JULY",
+ "AUGUST",
+ "SEPTEMBER",
+ "OCTOBER",
+ "NOVEMBER",
+ "DECEMBER",
+ ]
+ if month_int < 1 or month_int > 12:
+ return ""
+ else:
+ return months[month_int - 1]
+
+ date = json.loads("{}")
+
+ date["year"] = dec_to_hex(time_detail[0]) + 2000
+ date["month"] = int_to_month_str(dec_to_hex(time_detail[1]))
+ date["day"] = dec_to_hex(time_detail[2])
+
+ return date
+
+ result = {}
+
+ # 00 23 02 21 23 02 21 00 00
+ # start from here: ^
+ # so, skip 1
+ start_date = decode_date(time_detail[1:])
+
+ result["start_date"] = start_date
+
+ # 00 23 02 21 23 02 21 00 00
+ # start from here: ^
+ # so, skip 4
+ end_date = decode_date(time_detail[4:])
+
+ result["end_date"] = end_date
+
+ day_of_week = time_detail[7]
+ days_of_week = []
+ if day_of_week & ReminderMasks.SUNDAY_MASK == ReminderMasks.SUNDAY_MASK:
+ days_of_week.append("SUNDAY")
+ if day_of_week & ReminderMasks.MONDAY_MASK == ReminderMasks.MONDAY_MASK:
+ days_of_week.append("MONDAY")
+ if (
+ day_of_week & ReminderMasks.TUESDAY_MASK
+ == ReminderMasks.TUESDAY_MASK
+ ):
+ days_of_week.append("TUESDAY")
+ if (
+ day_of_week & ReminderMasks.WEDNESDAY_MASK
+ == ReminderMasks.WEDNESDAY_MASK
+ ):
+ days_of_week.append("WEDNESDAY")
+ if (
+ day_of_week & ReminderMasks.THURSDAY_MASK
+ == ReminderMasks.THURSDAY_MASK
+ ):
+ days_of_week.append("THURSDAY")
+ if day_of_week & ReminderMasks.FRIDAY_MASK == ReminderMasks.FRIDAY_MASK:
+ days_of_week.append("FRIDAY")
+ if (
+ day_of_week & ReminderMasks.SATURDAY_MASK
+ == ReminderMasks.SATURDAY_MASK
+ ):
+ days_of_week.append("SATURDAY")
+ result["days_of_week"] = days_of_week
+ return result
+
+ int_arr = to_int_array(reminder_str)
+ if int_arr[3] == 0xFF:
+ # 0XFF indicates end of reminders
+ return json.dumps({"end": ""})
+
+ reminder_all = to_int_array(reminder_str)
+ # Remove the first 2 chars:
+ # 0x31 05 <--- 00 23 02 21 23 02 21 00 00
+ reminder = reminder_all[2:]
+ reminder_json = {}
+ time_period = decode_time_period(reminder[0])
+ reminder_json["enabled"] = time_period[0]
+ reminder_json["repeat_period"] = time_period[1]
+
+ time_detail_map = decode_time_detail(reminder)
+
+ reminder_json["start_date"] = time_detail_map["start_date"]
+ reminder_json["end_date"] = time_detail_map["end_date"]
+ reminder_json["days_of_week"] = convert_array_list_to_json_array(
+ time_detail_map["days_of_week"]
+ )
+
+ return json.dumps({"time": reminder_json})
+
+ reminder_json = json.loads(reminder_time_to_json(data[2:]))
+ reminder_json.update(EventsIO.title)
+ logger.info(reminder_json)
+ EventsIO.result.set_result(reminder_json)
+
+ @staticmethod
+ def on_received_title(message):
+ EventsIO.title = ReminderDecoder.reminder_title_to_json(message)
+
+
+class ReminderDecoder:
+ def reminder_title_to_json(title_byte: str) -> dict:
+ hex_str = to_hex_string(title_byte)
+ # ascii_str = to_ascii_string(hex_str, 2)
+
+ int_arr = to_int_array(hex_str)
+ if int_arr[2] == 0xFF:
+ # 0XFF indicates end of reminders
+ return {"end": ""}
+ reminder_json = {}
+
+ reminder_json["title"] = clean_str(to_ascii_string(hex_str, 2))
+ return reminder_json
diff --git a/src/gshocktimeserver/iolib/settings_io.py b/src/gshocktimeserver/iolib/settings_io.py
new file mode 100644
index 0000000..79bed86
--- /dev/null
+++ b/src/gshocktimeserver/iolib/settings_io.py
@@ -0,0 +1,136 @@
+import asyncio
+import json
+from typing import Any
+from cancelable_result import CancelableResult
+from settings import settings
+from utils import to_compact_string, to_hex_string, to_int_array
+from casio_constants import CasioConstants
+from logger import logger
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class SettingsIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection):
+ logger.info(f"TimerIO request")
+ SettingsIO.connection = connection
+ await connection.request("13")
+
+ loop = asyncio.get_running_loop()
+ SettingsIO.result = loop.create_future()
+ return SettingsIO.result
+
+ @staticmethod
+ def send_to_watch(message):
+ logger.info(f"SettingsIO sendToWatch: {message}")
+ SettingsIO.connection.write(
+ 0x000C, bytearray([CHARACTERISTICS["CASIO_SETTING_FOR_BASIC"]])
+ )
+
+ @staticmethod
+ async def send_to_watch_set(message):
+ logger.info(f"SettingsIO sendToWatchSet: {message}")
+
+ def encode(settings):
+ mask_24_hours = 0b00000001
+ MASK_BUTTON_TONE_OFF = 0b00000010
+ MASK_LIGHT_OFF = 0b00000100
+ POWER_SAVING_MODE = 0b00010000
+
+ arr = bytearray(12)
+ arr[0] = CHARACTERISTICS["CASIO_SETTING_FOR_BASIC"]
+ if settings.time_format == "24h":
+ arr[1] = arr[1] | mask_24_hours
+ if not settings.button_tone:
+ arr[1] = arr[1] | MASK_BUTTON_TONE_OFF
+ if not settings.auto_light:
+ arr[1] = arr[1] | MASK_LIGHT_OFF
+ if not settings.power_saving_mode:
+ arr[1] = arr[1] | POWER_SAVING_MODE
+
+ if settings.light_duration == "4s":
+ arr[2] = 1
+ if settings.date_format == "DD:MM":
+ arr[4] = 1
+
+ language_index = {
+ "English": 0,
+ "Spanish": 1,
+ "French": 2,
+ "German": 3,
+ "Italian": 4,
+ "Russian": 5,
+ }
+ arr[5] = language_index.get(settings.language, 0)
+
+ return arr
+
+ class DotDict(dict):
+ def __getattr__(self, attr):
+ if attr in self:
+ return self[attr]
+ else:
+ raise AttributeError(f"'DotDict' object has no attribute '{attr}'")
+
+ __setattr__ = dict.__setitem__
+ __delattr__ = dict.__delitem__
+
+ json_setting = json.loads(message).get("value")
+ # dict_setting = json.load(json_setting)
+ encoded_stiing = encode(DotDict(json_setting))
+ setting_to_set = to_compact_string(to_hex_string(encoded_stiing))
+
+ await SettingsIO.connection.write(0x000E, setting_to_set)
+
+ @staticmethod
+ def on_received(message):
+ logger.info(f"SettingsIO onReceived: {message}")
+
+ def create_json_settings(setting_string):
+ mask_24_hours = 0b00000001
+ MASK_BUTTON_TONE_OFF = 0b00000010
+ MASK_LIGHT_OFF = 0b00000100
+ POWER_SAVING_MODE = 0b00010000
+
+ setting_array = to_int_array(setting_string)
+
+ if setting_array[1] & mask_24_hours != 0:
+ settings.time_format = "24h"
+ else:
+ settings.time_format = "12h"
+ settings.button_tone = setting_array[1] & MASK_BUTTON_TONE_OFF == 0
+ settings.auto_light = setting_array[1] & MASK_LIGHT_OFF == 0
+ settings.power_saving_mode = setting_array[1] & POWER_SAVING_MODE == 0
+
+ if setting_array[4] == 1:
+ settings.date_format = "DD:MM"
+ else:
+ settings.date_format = "MM:DD"
+
+ if setting_array[5] == 0:
+ settings.language = "English"
+ if setting_array[5] == 1:
+ settings.language = "Spanish"
+ if setting_array[5] == 2:
+ settings.language = "French"
+ if setting_array[5] == 3:
+ settings.language = "German"
+ if setting_array[5] == 4:
+ settings.language = "Italian"
+ if setting_array[5] == 5:
+ settings.language = "Russian"
+
+ if setting_array[2] == 1:
+ settings.light_duration = "4s"
+ else:
+ settings.light_duration = "2s"
+
+ return json.dumps(settings.__dict__)
+
+ data = to_hex_string(message)
+ json_data = json.loads(create_json_settings(data))
+ SettingsIO.result.set_result(json_data)
diff --git a/src/gshocktimeserver/iolib/time_adjustement_io.py b/src/gshocktimeserver/iolib/time_adjustement_io.py
new file mode 100644
index 0000000..96a8792
--- /dev/null
+++ b/src/gshocktimeserver/iolib/time_adjustement_io.py
@@ -0,0 +1,92 @@
+import asyncio
+import json
+import logging
+from typing import Any
+from cancelable_result import CancelableResult
+from settings import settings
+from utils import to_compact_string, to_hex_string, to_int_array
+from casio_constants import CasioConstants
+from iolib.error_io import ErrorIO
+from logger import logger
+
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class TimeAdjustmentIO:
+ result: CancelableResult = None
+ connection = None
+ original_value = None
+
+ @staticmethod
+ async def request(connection):
+ logger.info(f"TimerIO request")
+ TimeAdjustmentIO.connection = connection
+ await connection.request("11")
+
+ loop = asyncio.get_running_loop()
+ TimeAdjustmentIO.result = loop.create_future()
+ return TimeAdjustmentIO.result
+
+ @staticmethod
+ def send_to_watch(message):
+ logger.info(f"TimeAdjustmentIO sendToWatch: {message}")
+ TimeAdjustmentIO.connection.write(
+ 0x000C, bytearray([CHARACTERISTICS["TIME_ADJUSTMENT"]])
+ )
+
+ @staticmethod
+ async def send_to_watch_set(message):
+ logger.info(f"TimeAdjustmentIO sendToWatchSet: {message}")
+
+ if TimeAdjustmentIO.original_value == None:
+ logging.error("Error: Must call get before set")
+ return ErrorIO.request("Error: Must call get before set")
+
+ time_adjustment = json.loads(message).get("timeAdjustment") == "True"
+ minutes_after_hour = int(json.loads(message).get("minutesAfterHour"))
+
+ def encode_time_adjustment(time_adjustment, minutes_after_hour):
+ raw_string = TimeAdjustmentIO.original_value
+ "0x11 0F 0F 0F 06 00 00 00 00 00 01 00 80 30 30"
+ int_array = to_int_array(raw_string)
+ int_array[12] = 0x80 if time_adjustment == False else 0x00
+ int_array[13] = int(minutes_after_hour)
+ return bytes(int_array)
+
+ encoded_time_adj = encode_time_adjustment(time_adjustment, minutes_after_hour)
+
+ write_cmd = to_compact_string(to_hex_string(encoded_time_adj))
+ await TimeAdjustmentIO.connection.write(0x000E, write_cmd)
+
+ @staticmethod
+ def on_received(message):
+ logger.info(f"TimeAdjustmentIO onReceived: {message}")
+ TimeAdjustmentIO.original_value = to_hex_string(
+ message
+ ) # save original message
+
+ def is_time_adjustment_set(data) -> bool:
+ # syncing off: 110f0f0f0600500004000100->80<-10d2
+ # syncing on: 110f0f0f0600500004000100->00<-10d2
+
+ # CasioIsAutoTimeOriginalValue.value = data # save original data for future use
+ return int(data[12]) == 0x00
+
+ def get_minutes_after_hour(data) -> int:
+ # syncing off: 110f0f0f060050000400010080->10<-d2
+
+ # CasioIsAutoTimeOriginalValue.value = data # save original data for future use
+ return int(data[13])
+
+ timeAdjusted = is_time_adjustment_set(message)
+ munutesAfterHour = get_minutes_after_hour(message)
+ valueToSetStr = f"""{{"timeAdjusment": "{timeAdjusted}",
+ "minutesAfterHour": "{munutesAfterHour}" }}"""
+
+ value = json.loads(valueToSetStr)
+ TimeAdjustmentIO.result.set_result(value)
+
+ @staticmethod
+ async def on_received_set(message):
+ logger.info(f"TimeAdjustmentIO onReceivedSet: {message}")
diff --git a/src/gshocktimeserver/iolib/time_io.py b/src/gshocktimeserver/iolib/time_io.py
new file mode 100644
index 0000000..5485a00
--- /dev/null
+++ b/src/gshocktimeserver/iolib/time_io.py
@@ -0,0 +1,58 @@
+import asyncio
+import datetime
+import json
+import time
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+
+from utils import to_compact_string, to_hex_string
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class TimeIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection, current_time):
+ TimeIO.connection = connection
+
+ if current_time is None:
+ current_time = time.time()
+
+ message = {
+ "action": "SET_TIME",
+ "value": "{}".format(round(current_time * 1000)),
+ }
+ await connection.sendMessage(json.dumps(message))
+
+ @staticmethod
+ async def send_to_watch_set(message):
+ date_time_ms = int(json.loads(message).get("value"))
+ logger.info("date_time_ms: {}".format(date_time_ms))
+ date_time = datetime.datetime.fromtimestamp(date_time_ms / 1000.0)
+ time_data = TimeEncoder.prepare_current_time(date_time)
+ time_command = to_hex_string(
+ bytearray([CHARACTERISTICS["CASIO_CURRENT_TIME"]]) + time_data
+ )
+ await TimeIO.connection.write(0xE, to_compact_string(time_command))
+
+
+class TimeEncoder:
+ def prepare_current_time(date: datetime.datetime):
+ arr = bytearray(10)
+ year = date.year
+ arr[0] = year >> 0 & 0xFF
+ arr[1] = year >> 8 & 0xFF
+ arr[2] = date.month
+ arr[3] = date.day
+ arr[4] = date.hour
+ arr[5] = date.minute
+ arr[6] = date.second
+ arr[7] = date.weekday()
+ arr[8] = 0
+ arr[9] = 1
+ return arr
diff --git a/src/gshocktimeserver/iolib/timer_io.py b/src/gshocktimeserver/iolib/timer_io.py
new file mode 100644
index 0000000..bffd764
--- /dev/null
+++ b/src/gshocktimeserver/iolib/timer_io.py
@@ -0,0 +1,70 @@
+import asyncio
+import json
+from typing import Any
+from cancelable_result import CancelableResult
+from logger import logger
+
+from utils import to_compact_string, to_hex_string
+from casio_constants import CasioConstants
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class TimerIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection):
+ logger.info(f"TimerIO request")
+ TimerIO.connection = connection
+ await connection.request("18")
+
+ loop = asyncio.get_running_loop()
+ TimerIO.result = loop.create_future()
+ return TimerIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_TIMER"]]))
+
+ @staticmethod
+ async def send_to_watch_set(data):
+ logger.info(f"TimerIO sendToWatchSet: {data}")
+
+ def encode(seconds_str):
+ in_seconds = int(seconds_str)
+ hours = in_seconds // 3600
+ minutes_and_seconds = in_seconds % 3600
+ minutes = minutes_and_seconds // 60
+ seconds = minutes_and_seconds % 60
+
+ arr = bytearray(7)
+ arr[0] = 0x18
+ arr[1] = hours
+ arr[2] = minutes
+ arr[3] = seconds
+ return arr
+
+ data_obj = json.loads(data)
+ seconds_as_byte_arr = encode(data_obj.get("value"))
+ seconds_as_compact_str = to_compact_string(to_hex_string(seconds_as_byte_arr))
+ await TimerIO.connection.write(0x000E, seconds_as_compact_str)
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"TimerIO onReceived")
+
+ def decode_value(data: str) -> str:
+ timer_int_array = data
+
+ hours = timer_int_array[1]
+ minutes = timer_int_array[2]
+ seconds = timer_int_array[3]
+
+ in_seconds = hours * 3600 + minutes * 60 + seconds
+ return in_seconds
+
+ decoded = decode_value(data)
+ seconds = int(decoded)
+ TimerIO.result.set_result(seconds)
diff --git a/src/gshocktimeserver/iolib/unknown_io.py b/src/gshocktimeserver/iolib/unknown_io.py
new file mode 100644
index 0000000..cc49178
--- /dev/null
+++ b/src/gshocktimeserver/iolib/unknown_io.py
@@ -0,0 +1,7 @@
+from logger import logger
+
+
+class UnknownIO:
+ @staticmethod
+ def on_received(message):
+ logger.info(f"UnknownIO onReceived: {message}")
diff --git a/src/gshocktimeserver/iolib/watch_condition_io.py b/src/gshocktimeserver/iolib/watch_condition_io.py
new file mode 100644
index 0000000..c165235
--- /dev/null
+++ b/src/gshocktimeserver/iolib/watch_condition_io.py
@@ -0,0 +1,62 @@
+import asyncio
+from typing import Any
+from cancelable_result import CancelableResult
+from watch_info import watch_info
+from casio_constants import CasioConstants
+from logger import logger
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class WatchConditionIO:
+ result: CancelableResult = None
+ connection = None
+
+ class WatchConditionValue:
+ def __init__(self, battery_level_percent: int, temperature: int):
+ self.battery_level_percent = battery_level_percent
+ self.temperature = temperature
+
+ @staticmethod
+ async def request(connection):
+ logger.info(f"WatchConditionIO request")
+ WatchConditionIO.connection = connection
+ await connection.request("28")
+
+ loop = asyncio.get_running_loop()
+ WatchConditionIO.result = loop.create_future()
+ return WatchConditionIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_WATCH_CONDITION"]]))
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"WatchConditionIO onReceived")
+
+ def decode_value(data: str) -> WatchConditionIO.WatchConditionValue:
+ int_arr = list(map(int, data))
+ bytes_data = bytes(int_arr[1:])
+
+ if len(bytes_data) >= 2:
+ # Battery level between 15 and 20 for B2100 and between 12 and 19 for B5600. Scale accordingly to %
+ logger.info(f"battery level row value: {int(bytes_data[0])}")
+
+ battery_level_lower_limit = watch_info.batteryLevelLowerLimit
+ battery_level_upper_limit = watch_info.batteryLevelUpperLimit
+
+ multiplier = round(
+ 100.0 / (battery_level_upper_limit - battery_level_lower_limit)
+ )
+ battery_level = int(bytes_data[0]) - battery_level_lower_limit
+ battery_level_percent = min(max(battery_level * multiplier, 0), 100)
+ temperature = int(bytes_data[1])
+
+ return WatchConditionIO.WatchConditionValue(
+ battery_level_percent, temperature
+ )
+
+ return WatchConditionIO.WatchConditionValue(0, 0)
+
+ WatchConditionIO.result.set_result(decode_value(data).__dict__)
diff --git a/src/gshocktimeserver/iolib/watch_name_io.py b/src/gshocktimeserver/iolib/watch_name_io.py
new file mode 100644
index 0000000..1dbd4c3
--- /dev/null
+++ b/src/gshocktimeserver/iolib/watch_name_io.py
@@ -0,0 +1,31 @@
+import asyncio
+from typing import Any
+
+from casio_constants import CasioConstants
+from utils import clean_str, to_ascii_string, to_hex_string
+from cancelable_result import CancelableResult
+
+
+class WatchNameIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection):
+ WatchNameIO.connection = connection
+ await connection.request("23")
+
+ loop = asyncio.get_running_loop()
+ WatchNameIO.result = loop.create_future()
+ return WatchNameIO.result
+
+ @staticmethod
+ def on_received(data):
+ hex_str = to_hex_string(data)
+ ascii_str = to_ascii_string(hex_str, 1)
+ clean_data = clean_str(ascii_str)
+ WatchNameIO.result.set_result(clean_data)
+
+ @staticmethod
+ async def send_to_watch():
+ pass
diff --git a/src/gshocktimeserver/iolib/world_cities_io.py b/src/gshocktimeserver/iolib/world_cities_io.py
new file mode 100644
index 0000000..63e8eba
--- /dev/null
+++ b/src/gshocktimeserver/iolib/world_cities_io.py
@@ -0,0 +1,33 @@
+import asyncio
+from typing import Any
+from casio_constants import CasioConstants
+from cancelable_result import CancelableResult
+from logger import logger
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+
+class WorldCitiesIO:
+ result: CancelableResult = None
+ connection = None
+
+ @staticmethod
+ async def request(connection, cityNumber: int):
+ logger.info(f"DstWatchStateIO request")
+ WorldCitiesIO.connection = connection
+ key = "1f0{}".format(cityNumber)
+
+ await connection.request(key)
+
+ loop = asyncio.get_running_loop()
+ WorldCitiesIO.result = loop.create_future()
+ return WorldCitiesIO.result
+
+ @staticmethod
+ async def send_to_watch(connection):
+ connection.write(0x000C, bytearray([CHARACTERISTICS["CASIO_WORLD_CITIES"]]))
+
+ @staticmethod
+ def on_received(data):
+ logger.info(f"WorldCitiesIO onReceived")
+ WorldCitiesIO.result.set_result(data)
diff --git a/src/gshocktimeserver/logger.py b/src/gshocktimeserver/logger.py
index 93bca6b..92527ce 100644
--- a/src/gshocktimeserver/logger.py
+++ b/src/gshocktimeserver/logger.py
@@ -1,32 +1,34 @@
-from configparser import ConfigParser
-from enum import Enum
import logging
from args import args
_logger = logging.getLogger(__name__)
+
class Logger:
log_level = args.get().log_level
logging.basicConfig(
- encoding='utf-8',
- level=logging.INFO, # log_level,
- handlers=[
- logging.StreamHandler()
- ],
+ level=logging.INFO, # log_level,
+ handlers=[logging.StreamHandler()],
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)
- def error (self, *args):
+ logging.basicConfig(level=logging.INFO)
+
+ def error(self, *args):
_logger.error(args)
- def info (self, *args):
+ def info(self, *args):
_logger.info(args)
- def debug (self, *args):
+ def debug(self, *args):
_logger.debug(args)
- def warn (self, *args):
+ def warn(self, *args):
_logger.warn(args)
+ def warning(self, *args):
+ _logger.warn(args)
+
+
logger = Logger()
diff --git a/src/gshocktimeserver/mailsener.py b/src/gshocktimeserver/mailsener.py
index ae0acac..210815d 100644
--- a/src/gshocktimeserver/mailsener.py
+++ b/src/gshocktimeserver/mailsener.py
@@ -1,6 +1,7 @@
import smtplib
from datetime import date
+
def send_mail_notification(to_address):
# Import the email modules we'll need
from email.mime.text import MIMEText
@@ -16,4 +17,4 @@ def send_mail_notification(to_address):
# envelope header.
s = smtplib.SMTP('localhost')
s.sendmail(me, [you], msg.as_string())
- s.quit()
\ No newline at end of file
+ s.quit()
diff --git a/src/gshocktimeserver/message_dispatcher.py b/src/gshocktimeserver/message_dispatcher.py
new file mode 100644
index 0000000..030510d
--- /dev/null
+++ b/src/gshocktimeserver/message_dispatcher.py
@@ -0,0 +1,88 @@
+import sys
+import asyncio
+import json
+from typing import Any
+import connection
+from logger import logger
+from casio_constants import CasioConstants
+from iolib.app_info_io import AppInfoIO
+from iolib.dst_watch_state_io import DstWatchStateIO
+from iolib.world_cities_io import WorldCitiesIO
+from iolib.dst_for_world_cities_io import DstForWorldCitiesIO
+
+from iolib.time_io import TimeIO
+from iolib.timer_io import TimerIO
+from iolib.watch_name_io import WatchNameIO
+from iolib.alarms_io import AlarmsIO
+from iolib.events_io import EventsIO
+from iolib.settings_io import SettingsIO
+from iolib.time_adjustement_io import TimeAdjustmentIO
+from iolib.watch_condition_io import WatchConditionIO
+from iolib.error_io import ErrorIO
+from iolib.unknown_io import UnknownIO
+from iolib.button_pressed_io import ButtonPressedIO
+
+CHARACTERISTICS = CasioConstants.CHARACTERISTICS
+
+class MessageDispatcher:
+ watch_senders = {
+ "GET_ALARMS": AlarmsIO.send_to_watch,
+ "SET_ALARMS": AlarmsIO.send_to_watch_set,
+ "SET_REMINDERS": EventsIO.send_to_watch_set,
+ "GET_SETTINGS": SettingsIO.send_to_watch,
+ "SET_SETTINGS": SettingsIO.send_to_watch_set,
+ "GET_TIME_ADJUSTMENT": TimeAdjustmentIO.send_to_watch,
+ "SET_TIME_ADJUSTMENT": TimeAdjustmentIO.send_to_watch_set,
+ "GET_TIMER": TimerIO.send_to_watch,
+ "SET_TIMER": TimerIO.send_to_watch_set,
+ "SET_TIME": TimeIO.send_to_watch_set,
+ }
+
+ data_received_messages = {
+ CHARACTERISTICS["CASIO_SETTING_FOR_ALM"]: AlarmsIO.on_received,
+ CHARACTERISTICS["CASIO_SETTING_FOR_ALM2"]: AlarmsIO.on_received,
+ CHARACTERISTICS["CASIO_TIMER"]: TimerIO.on_received,
+ CHARACTERISTICS["CASIO_WATCH_NAME"]: WatchNameIO.on_received,
+ CHARACTERISTICS["CASIO_DST_SETTING"]: DstForWorldCitiesIO.on_received,
+ CHARACTERISTICS["CASIO_REMINDER_TIME"]: EventsIO.on_received,
+ CHARACTERISTICS["CASIO_REMINDER_TITLE"]: EventsIO.on_received_title,
+ CHARACTERISTICS["CASIO_WORLD_CITIES"]: WorldCitiesIO.on_received,
+ CHARACTERISTICS["CASIO_DST_WATCH_STATE"]: DstWatchStateIO.on_received,
+ CHARACTERISTICS["CASIO_WATCH_CONDITION"]: WatchConditionIO.on_received,
+ CHARACTERISTICS["CASIO_APP_INFORMATION"]: AppInfoIO.on_received,
+ CHARACTERISTICS["CASIO_BLE_FEATURES"]: ButtonPressedIO.on_received,
+ CHARACTERISTICS["CASIO_SETTING_FOR_BASIC"]: SettingsIO.on_received,
+ CHARACTERISTICS["CASIO_SETTING_FOR_BLE"]: TimeAdjustmentIO.on_received,
+ CHARACTERISTICS["ERROR"]: ErrorIO.on_received,
+ CHARACTERISTICS["UNKNOWN"]: UnknownIO.on_received,
+
+ # ECB-30
+ CHARACTERISTICS["CMD_SET_TIMEMODE"]: UnknownIO.on_received,
+ CHARACTERISTICS["FIND_PHONE"]: UnknownIO.on_received,
+ }
+
+ @staticmethod
+ async def send_to_watch(message):
+ json_message = json.loads(message)
+ action = json_message.get("action")
+ await MessageDispatcher.watch_senders[action](message)
+
+ @staticmethod
+ def on_received(data):
+ key = data[0]
+ if key not in MessageDispatcher.data_received_messages:
+ logger.info(f"Unknown key: {key}")
+ else:
+ logger.info(f"Found key: {MessageDispatcher.data_received_messages[key]}")
+ MessageDispatcher.data_received_messages[key](data)
+
+
+# Usage example
+if __name__ == "__main__":
+ # Simulated messages
+ sample_message = {"action": "GET_SETTINGS"}
+ sample_data = "1,2,3,4,5"
+
+ # Simulated message dispatching
+ MessageDispatcher.send_to_watch(sample_message)
+ MessageDispatcher.on_received(sample_data)
diff --git a/src/gshocktimeserver/result_queue.py b/src/gshocktimeserver/result_queue.py
deleted file mode 100644
index f652a28..0000000
--- a/src/gshocktimeserver/result_queue.py
+++ /dev/null
@@ -1,36 +0,0 @@
-class KeyedResult:
- def __init__(self, key, result):
- self.key = key
- self.result = result
-
- def __str__(self):
- return "KeyedResult(key='{}', result={})".format(self.key, self.result)
-
-
-class ResultQueue:
- def __init__(self):
- self.keyed_result_map = {}
-
- def enqueue(self, element):
- self.keyed_result_map[element.key.upper()] = element.result
-
- def dequeue(self, _key):
- if not self.keyed_result_map:
- return None
- else:
- key = _key.upper()
- value = self.keyed_result_map[key]
- del self.keyed_result_map[key]
- return value
-
- def is_empty(self):
- return not bool(self.keyed_result_map)
-
- def size(self):
- return len(self.keyed_result_map)
-
- def clear(self):
- self.keyed_result_map.clear()
-
-
-result_queue = ResultQueue()
diff --git a/src/gshocktimeserver/scanner.py b/src/gshocktimeserver/scanner.py
index 723bf50..6523147 100644
--- a/src/gshocktimeserver/scanner.py
+++ b/src/gshocktimeserver/scanner.py
@@ -1,4 +1,3 @@
-import logging
import sys
from bleak import BleakScanner
@@ -6,27 +5,28 @@
from watch_info import watch_info
from logger import logger
+
class Scanner:
CASIO_SERVICE_UUID = "00001804-0000-1000-8000-00805f9b34fb"
- async def scan(self, device_address):
+ async def scan(self, device_address=None):
scanner = BleakScanner()
logger.info("Scanning for devices...")
if device_address is None:
while True:
- filter = {"name": "CASIO*"}
device = await scanner.find_device_by_filter(
lambda d, ad: d.name and d.name.lower().startswith("casio"),
- timeout=5*60.0
+ timeout=5 * 60.0,
)
- logger.debug (f"device: {device}")
+ logger.info(f"device: {device}")
if device is None:
continue
- watch_info.set_name(device.name)
- watch_info.set_address(device.address)
-
+ # watch_info.set_name(device.name)
+ # watch_info.set_address(device.address)
+ watch_info.set_name_and_model(device.name)
+
conf.put("device.address", device.address)
conf.put("device.name", device.name)
break
@@ -38,4 +38,5 @@ async def scan(self, device_address):
return device
-scanner = Scanner()
\ No newline at end of file
+
+scanner = Scanner()
diff --git a/src/gshocktimeserver/utils.py b/src/gshocktimeserver/utils.py
index 80d0ed3..85c39ae 100644
--- a/src/gshocktimeserver/utils.py
+++ b/src/gshocktimeserver/utils.py
@@ -3,7 +3,7 @@
def to_casio_cmd(bytesStr):
- parts = [bytesStr[i : i + 2] for i in range(0, len(bytesStr), 2)]
+ parts = [bytesStr[i: i + 2] for i in range(0, len(bytesStr), 2)]
hexArr = [int(str, 16) for str in parts]
return bytes(hexArr)
@@ -34,14 +34,14 @@ def to_hex_string(byte_arr):
def remove_prefix(string, prefix):
- return string[len(prefix) :] if string.startswith(prefix) else string
+ return string[len(prefix):] if string.startswith(prefix) else string
def to_ascii_string(hexStr, commandLengthToSkip):
asciiStr = ""
strArrayWithCommand = hexStr.split(" ")
if len(strArrayWithCommand) == 1: # no spaces between hex values, i.e. 4C4F4E444F4E
- strArrayWithCommand = [hexStr[i : i + 2] for i in range(0, len(hexStr), 2)]
+ strArrayWithCommand = [hexStr[i: i + 2] for i in range(0, len(hexStr), 2)]
# skip command
strArray = strArrayWithCommand[commandLengthToSkip:]
@@ -49,6 +49,7 @@ def to_ascii_string(hexStr, commandLengthToSkip):
asciiStr = bytes.fromhex(asc).decode("ASCII")
return asciiStr
+
def trimNonAsciiCharacters(string):
return string.replace("\0", "")
@@ -72,7 +73,6 @@ def to_byte_array(string, maxLen):
return retArr
-
def to_hex_string_compact(asciiStr, maxLen):
byteArr = bytearray(asciiStr, 'ascii')
hexStr = ""
@@ -80,21 +80,22 @@ def to_hex_string_compact(asciiStr, maxLen):
hexStr += "{:02x}".format(byte)
return hexStr
+
def dec_to_hex(dec):
return int(str(hex(dec))[2:])
+
def encode_string(ascii_string, maxlen):
- # Convert the ascii string into an array of integers
- int_arr = [ord(c) for c in ascii_string]
-
- # Pad the array up to maxlen with zeroes
- while len(int_arr) < maxlen:
- int_arr.append(0)
-
- # Convert the array back into a string
- hex_string = ''
- for i in int_arr:
- hex_string += '{:02X}'.format(i)
-
- return hex_string
+ # Convert the ascii string into an array of integers
+ int_arr = [ord(c) for c in ascii_string]
+
+ # Pad the array up to maxlen with zeroes
+ while len(int_arr) < maxlen:
+ int_arr.append(0)
+
+ # Convert the array back into a string
+ hex_string = ''
+ for i in int_arr:
+ hex_string += '{:02X}'.format(i)
+ return hex_string
diff --git a/src/gshocktimeserver/watch_info.py b/src/gshocktimeserver/watch_info.py
index 62edf46..0ee26a9 100644
--- a/src/gshocktimeserver/watch_info.py
+++ b/src/gshocktimeserver/watch_info.py
@@ -1,22 +1,272 @@
-from configparser import ConfigParser
from enum import Enum
+from logger import logger
-class WatchInfo:
-
- class model(Enum):
- B5600 = 1
- B2100 = 2
- UNKNOWN = 3
+class WATCH_MODEL(Enum):
+ GA = 1
+ GW = 2
+ DW = 3
+ GMW = 4
+ GPR = 5
+ GST = 6
+ MSG = 7
+ GB001 = 8
+ GBD = 9
+ ECB = 10
+ UNKNOWN = 11
- def __init__(self) -> None:
+class WatchInfo:
+ def __init__(self):
self.name = ""
+ self.shortName = ""
self.address = ""
+ self.model = WATCH_MODEL.UNKNOWN
+ self.worldCitiesCount = 2
+ self.dstCount = 3
+ self.alarmCount = 5
+ self.hasAutoLight = False
+ self.hasReminders = False
+ self.shortLightDuration = ""
+ self.longLightDuration = ""
+ self.weekLanguageSupported = True
+ self.worldCities: True
+ self.temperature: True
+ self.batteryLevelLowerLimit: 15
+ self.batteryLevelUpperLimit: 20
+
+ self.alwaysConnected = False,
+ self.findButtonUserDefined = False,
+ self.hasPowerSavingMode = True,
+ self.hasDnD = False,
+
+ self.models = [
+ {
+ "model": WATCH_MODEL.GW,
+ "worldCitiesCount": 6,
+ "dstCount": 3,
+ "alarmCount": 5,
+ "hasAutoLight": False,
+ "hasReminders": True,
+ "shortLightDuration": "2s",
+ "longLightDuration": "4s",
+ "batteryLevelLowerLimit": 9,
+ "batteryLevelUpperLimit": 19,
+ },
+ {
+ "model": WATCH_MODEL.GST,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": True,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.GMW,
+ "worldCitiesCount": 6,
+ "dstCount": 3,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": True,
+ "shortLightDuration": "2s",
+ "longLightDuration": "4s",
+ },
+ {
+ "model": WATCH_MODEL.GA,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": True,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.GB001,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.MSG,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": True,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.GST,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": True,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.GPR,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.DW,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.GPR,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ "weekLanguageSupported": False,
+ },
+ {
+ "model": WATCH_MODEL.DW,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ {
+ "model": WATCH_MODEL.GBD,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ "worldCities": False,
+ "temperature": False,
+ },
+ {
+ "model": WATCH_MODEL.ECB,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ "worldCities": True,
+ "hasTemperature": False,
+ "hasBatteryLevel": False,
+ "alwaysConnected": True,
+ "findButtonUserDefined": True,
+ "hasPowerSavingMode": False,
+ "hasDnD": True
+ },
+ {
+ "model": WATCH_MODEL.UNKNOWN,
+ "worldCitiesCount": 2,
+ "dstCount": 1,
+ "alarmCount": 5,
+ "hasAutoLight": True,
+ "hasReminders": False,
+ "shortLightDuration": "1.5s",
+ "longLightDuration": "3s",
+ },
+ ]
- def set_name (self, name):
+ self.model_map = {model["model"]: model for model in self.models}
+
+ def set_name_and_model(self, name):
self.name = name
- self.model = self.model.B2100 if "2100" in name else self.model.B5600
- def set_address (self, address):
+ parts = self.name.split(" ")
+ if len(parts) > 1:
+ self.shortName = parts[1]
+
+ logger.info(f"============> self.shortName: ${self.shortName}")
+
+ # *** Order matters. Start with the longest shortName first. ***
+ if self.shortName.startswith("MSG"):
+ self.model = WATCH_MODEL.MSG
+ elif self.shortName.startswith("GPR"):
+ self.model = WATCH_MODEL.GPR
+ elif self.shortName.startswith("GST"):
+ self.model = WATCH_MODEL.GST
+ elif self.shortName.startswith("GBD"):
+ self.model = WATCH_MODEL.GBD
+ elif self.shortName.startswith("GBM"):
+ self.model = WATCH_MODEL.GA
+ elif self.shortName.startswith("GMW"):
+ self.model = WATCH_MODEL.GMW
+ elif self.shortName.startswith("DW"):
+ self.model = WATCH_MODEL.DW
+ elif self.shortName.startswith("GA"):
+ self.model = WATCH_MODEL.GA
+ elif self.shortName.startswith("GB"):
+ self.model = WATCH_MODEL.GB
+ elif self.shortName.startswith("GM"):
+ self.model = WATCH_MODEL.GM
+ elif self.shortName.startswith("GW"):
+ self.model = WATCH_MODEL.GW
+ elif self.shortName == "ECB-10" or self.shortName == "ECB-20" or self.shortName == "ECB-30":
+ self.model = WATCH_MODEL.ECB
+ else:
+ self.model = WATCH_MODEL.UNKNOWN
+
+ model_info = self.model_map.get(self.model)
+ if model_info:
+ self.hasReminders = model_info["hasReminders"]
+ self.hasAutoLight = model_info["hasAutoLight"]
+ self.alarmCount = model_info["alarmCount"]
+ self.worldCitiesCount = model_info["worldCitiesCount"]
+ self.dstCount = model_info["dstCount"]
+ self.shortLightDuration = model_info["shortLightDuration"]
+ self.longLightDuration = model_info["longLightDuration"]
+ self.weekLanguageSupported = model_info.get("weekLanguageSupported", True)
+ self.worldCities = model_info.get("worldCities", True)
+ self.temperature = model_info.get("temperature", True)
+ self.batteryLevelLowerLimit = model_info.get("batteryLevelLowerLimit", 15)
+ self.batteryLevelUpperLimit = model_info.get("batteryLevelUpperLimit", 20)
+
+ self.alwaysConnected = model_info.get("alwaysConnected", False)
+ self.findButtonUserDefined = model_info.get("findButtonUserDefined", False)
+ self.hasPowerSavingMode = model_info.get("hasPowerSavingMode", False)
+ self.hasDnD = model_info.get("hasDnD", False)
+ self.hasBatteryLevel = model_info.get("hasBatteryLevel", False)
+
+ def set_address(self, address):
self.address = address
+ def get_address(self):
+ return self.address
+
+ def get_model(self):
+ return self.model
+
+ def reset(self):
+ self.address = ""
+ self.name = ""
+ self.shortName = ""
+ self.model = WATCH_MODEL.UNKNOWN
+
+
watch_info = WatchInfo()
diff --git a/tests/conftest.py b/tests/conftest.py
index e9fb1c6..e14abf6 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -8,3 +8,4 @@
"""
# import pytest
+