Skip to content

Commit

Permalink
Switch to aiohttp and cryptography library
Browse files Browse the repository at this point in the history
  • Loading branch information
lafriks committed May 7, 2023
1 parent e6cb8ab commit 218600f
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 86 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ Commands:
```python
from karcher.karcher import KarcherHome

kh = KarcherHome()
kh.login("user@email", "password")
devices = hk.get_devices()
kh = await KarcherHome.create()
await kh.login("user@email", "password")
devices = await hk.get_devices()
```

## License
Expand Down
44 changes: 29 additions & 15 deletions karcher/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
# SPDX-License-Identifier: MIT
# -----------------------------------------------------------

import asyncio
import click
import dataclasses
import json
import logging
from functools import wraps

from karcher.exception import KarcherHomeException
from karcher.karcher import KarcherHome
Expand All @@ -18,6 +20,14 @@
echo = click.echo


def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))

return wrapper


class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o):
Expand Down Expand Up @@ -84,11 +94,12 @@ def safe_cli():

@cli.command()
@click.pass_context
def urls(ctx: click.Context):
@coro
async def urls(ctx: click.Context):
"""Get region information."""

kh = KarcherHome(region=ctx.obj.region)
d = kh.get_urls()
kh = await KarcherHome.create(region=ctx.obj.region)
d = await kh.get_urls()

ctx.obj.print(d)

Expand All @@ -97,10 +108,11 @@ def urls(ctx: click.Context):
@click.option('--username', '-u', help='Username to login with.')
@click.option('--password', '-p', help='Password to login with.')
@click.pass_context
def login(ctx: click.Context, username: str, password: str):
@coro
async def login(ctx: click.Context, username: str, password: str):
"""Get user session tokens."""

kh = KarcherHome(region=ctx.obj.region)
kh = await KarcherHome.create(region=ctx.obj.region)
ctx.obj.print(kh.login(username, password))


Expand All @@ -109,23 +121,24 @@ def login(ctx: click.Context, username: str, password: str):
@click.option('--password', '-p', default=None, help='Password to login with.')
@click.option('--auth-token', '-t', default=None, help='Authorization token.')
@click.pass_context
def devices(ctx: click.Context, username: str, password: str, auth_token: str):
@coro
async def devices(ctx: click.Context, username: str, password: str, auth_token: str):
"""List all devices."""

kh = KarcherHome(region=ctx.obj.region)
kh = await KarcherHome.create(region=ctx.obj.region)
if auth_token is not None:
kh.login_token(auth_token, '')
elif username is not None and password is not None:
kh.login(username, password)
await kh.login(username, password)
else:
raise click.BadParameter(
'Must provide either token or username and password.')

devices = kh.get_devices()
devices = await kh.get_devices()

# Logout if we used a username and password
if auth_token is None:
kh.logout()
await kh.logout()

ctx.obj.print(devices)

Expand All @@ -137,7 +150,8 @@ def devices(ctx: click.Context, username: str, password: str, auth_token: str):
@click.option('--mqtt-token', '-m', default=None, help='MQTT authorization token.')
@click.option('--device-id', '-d', required=True, help='Device ID.')
@click.pass_context
def device_properties(
@coro
async def device_properties(
ctx: click.Context,
username: str,
password: str,
Expand All @@ -146,17 +160,17 @@ def device_properties(
device_id: str):
"""Get device properties."""

kh = KarcherHome(region=ctx.obj.region)
kh = await KarcherHome.create(region=ctx.obj.region)
if auth_token is not None:
kh.login_token(auth_token, mqtt_token)
elif username is not None and password is not None:
kh.login(username, password)
await kh.login(username, password)
else:
raise click.BadParameter(
'Must provide either token or username and password.')

dev = None
for device in kh.get_devices():
for device in await kh.get_devices():
if device.device_id == device_id:
dev = device
break
Expand All @@ -168,6 +182,6 @@ def device_properties(

# Logout if we used a username and password
if auth_token is None:
kh.logout()
await kh.logout()

ctx.obj.print(props)
116 changes: 66 additions & 50 deletions karcher/karcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,66 @@
import json
import threading
from typing import List, Any
import requests
import aiohttp
import urllib.parse


from .auth import Domains, Session
from .countries import get_country_code, get_region_by_country
from .consts import APP_VERSION_CODE, APP_VERSION_NAME, PROJECT_TYPE, \
PROTOCOL_VERSION, REGION_URLS, ROBOT_PROPERTIES, TENANT_ID, \
Language
from .consts import (
APP_VERSION_CODE, APP_VERSION_NAME, PROJECT_TYPE,
PROTOCOL_VERSION, REGION_URLS, ROBOT_PROPERTIES, TENANT_ID,
Language, Region
)
from .device import Device, DeviceProperties
from .exception import KarcherHomeAccessDenied, KarcherHomeException, handle_error_code
from .map import Map
from .mqtt import MqttClient, get_device_topic_property_get_reply, get_device_topics
from .utils import decrypt, decrypt_map, encrypt, get_nonce, get_random_string, \
from .utils import (
decrypt, decrypt_map, encrypt, get_nonce, get_random_string,
get_timestamp, get_timestamp_ms, is_email, md5
)


class KarcherHome:
"""Main class to access Karcher Home Robots API"""

def __init__(self, country: str = 'GB', language: Language = Language.EN):
"""Initialize Karcher Home Robots API"""
@classmethod
async def create(cls, country: str = 'GB', language: Language = Language.EN):
"""Create Karcher Home Robots API instance"""

super().__init__()
self = KarcherHome()
self._country = country.upper()
self._base_url = REGION_URLS[get_region_by_country(self._country)]
self._mqtt_url = None
self._language = language
self._session = None
self._mqtt = None
self._device_props = {}
self._wait_events = {}

d = self.get_urls()
d = await self.get_urls()
# Update base URLs
if d.app_api != '':
self._base_url = d.app_api
if d.mqtt != '':
self._mqtt_url = d.mqtt

def _request(self, method: str, url: str, **kwargs) -> requests.Response:
session = requests.Session()
return self

def __init__(self):
"""Initialize Karcher Home Robots API"""

super().__init__()
self._country = 'US'
self._base_url = REGION_URLS[Region.US]
self._mqtt_url = None
self._language = Language.EN
self._session = None
self._mqtt = None
self._device_props = {}
self._wait_events = {}

async def _request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
session = aiohttp.ClientSession()
# TODO: Fix SSL
requests.packages.urllib3.disable_warnings()
session.verify = False
# requests.packages.urllib3.disable_warnings()
# session.skip = False

headers = {}
if kwargs.get('headers') is not None:
Expand Down Expand Up @@ -98,26 +113,27 @@ def _request(self, method: str, url: str, **kwargs) -> requests.Response:
headers['nonce'] = nonce

kwargs['headers'] = headers
return session.request(method, self._base_url + url, **kwargs)
kwargs['verify_ssl'] = False
return await session.request(method, self._base_url + url, **kwargs)

def _download(self, url) -> bytes:
session = requests.Session()
async def _download(self, url) -> bytes:
session = aiohttp.ClientSession()
headers = {
'User-Agent': 'Android_' + TENANT_ID,
}

resp = session.get(url, headers=headers)
if resp.status_code != 200:
resp = await session.get(url, headers=headers)
if resp.status != 200:
raise KarcherHomeException(-1,
'HTTP error: ' + str(resp.status_code))

return resp.content
return await resp.content.read(-1)

def _process_response(self, resp, prop=None) -> Any:
if resp.status_code != 200:
async def _process_response(self, resp: aiohttp.ClientResponse, prop=None) -> Any:
if resp.status != 200:
raise KarcherHomeException(-1,
'HTTP error: ' + str(resp.status_code))
data = resp.json()
'HTTP error: ' + str(resp.status))
data = await resp.json()
# Check for error response
if data['code'] != 0:
handle_error_code(data['code'], data['msg'])
Expand Down Expand Up @@ -161,19 +177,19 @@ def _mqtt_connect(self, wait_for_connect=False):
event.wait()
self._mqtt.on_connect = None

def get_urls(self) -> Domains:
async def get_urls(self) -> Domains:
"""Get URLs for API and MQTT."""

resp = self._request('GET', '/network-service/domains/list', params={
resp = await self._request('GET', '/network-service/domains/list', params={
'tenantId': TENANT_ID,
'productModeCode': PROJECT_TYPE,
'version': PROTOCOL_VERSION,
})

d = self._process_response(resp, 'domain')
d = await self._process_response(resp, 'domain')
return Domains(**d)

def login(self, username, password, register_id=None) -> Session:
async def login(self, username, password, register_id=None) -> Session:
"""Login using provided credentials."""

if register_id is None or register_id == '':
Expand All @@ -182,7 +198,7 @@ def login(self, username, password, register_id=None) -> Session:
if not is_email(username):
username = '86-' + username

resp = self._request('POST', '/user-center/auth/login', json={
resp = await self._request('POST', '/user-center/auth/login', json={
'tenantId': TENANT_ID,
'lang': str(self._language),
'token': None,
Expand All @@ -201,7 +217,7 @@ def login(self, username, password, register_id=None) -> Session:
},
})

d = self._process_response(resp)
d = await self._process_response(resp)
self._session = Session(**d)
self._session.register_id = register_id

Expand All @@ -222,7 +238,7 @@ def login_token(

return self._session

def logout(self):
async def logout(self):
"""End current session.
This will also reset the session object.
Expand All @@ -232,49 +248,49 @@ def logout(self):
self._session = None
return

self._process_response(self._request(
await self._process_response(await self._request(
'POST', '/user-center/auth/logout'))
self._session = None

if self._mqtt is not None:
self._mqtt.disconnect()
self._mqtt = None

def get_devices(self) -> List[Device]:
async def get_devices(self) -> List[Device]:
"""Get all user devices."""

if self._session is None \
or self._session.auth_token == '' or self._session.user_id == '':
raise KarcherHomeAccessDenied('Not authorized')

resp = self._request(
resp = await self._request(
'GET',
'/smart-home-service/smartHome/user/getDeviceInfoByUserId/'
+ self._session.user_id)

return [Device(**d) for d in self._process_response(resp)]
return [Device(**d) for d in await self._process_response(resp)]

def get_map_data(self, dev: Device, map: int = 1):
async def get_map_data(self, dev: Device, map: int = 1):
# <tenantId>/<modeType>/<deviceSn>/01-01-2022/map/temp/0046690461_<deviceSn>_1
mapDir = TENANT_ID + '/' + dev.product_mode_code + '/' +\
dev.sn + '/01-01-2022/map/temp/0046690461_' + \
dev.sn + '_' + str(map)

resp = self._request('POST',
'/storage-management/storage/aws/getAccessUrl',
json={
'dir': mapDir,
'countryCode': get_country_code(self._country),
'serviceType': 2,
'tenantId': TENANT_ID,
})
resp = await self._request('POST',
'/storage-management/storage/aws/getAccessUrl',
json={
'dir': mapDir,
'countryCode': get_country_code(self._country),
'serviceType': 2,
'tenantId': TENANT_ID,
})

d = self._process_response(resp)
d = await self._process_response(resp)
downloadUrl = d['url']
if 'cdnDomain' in d and d['cdnDomain'] != '':
downloadUrl = 'https://' + d['cdnDomain'] + '/' + d['dir']

d = self._download(downloadUrl)
d = await self._download(downloadUrl)
data = decrypt_map(dev.sn, dev.mac, dev.product_id, d)
if map == 1 or map == 2:
return Map.parse(data)
Expand Down
Loading

0 comments on commit 218600f

Please sign in to comment.