Skip to content

Commit 384a73f

Browse files
authored
Add files via upload
1 parent 92fb65c commit 384a73f

File tree

2 files changed

+301
-0
lines changed

2 files changed

+301
-0
lines changed

miiocommand.py

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
2+
import json
3+
from miioservice import MiIOService
4+
5+
6+
def twins_split(string, sep, default=None):
7+
pos = string.find(sep)
8+
return (string, default) if pos == -1 else (string[0:pos], string[pos+1:])
9+
10+
11+
def string_to_value(string):
12+
if string == 'null' or string == 'none':
13+
return None
14+
elif string == 'false':
15+
return False
16+
elif string == 'true':
17+
return True
18+
else:
19+
return int(string)
20+
21+
22+
def string_or_value(string):
23+
return string_to_value(string[1:]) if string[0] == '#' else string
24+
25+
26+
def miio_command_help(did=None, prefix='?'):
27+
quote = '' if prefix == '?' else "'"
28+
return f'\
29+
Get Props: {prefix}<siid[-piid]>[,...]\n\
30+
{prefix}1,1-2,1-3,1-4,2-1,2-2,3\n\
31+
Set Props: {prefix}<siid[-piid]=[#]value>[,...]\n\
32+
{prefix}2=#60,2-2=#false,3=test\n\
33+
Do Action: {prefix}<siid[-piid]> <arg1|#NA> [...] \n\
34+
{prefix}2 #NA\n\
35+
{prefix}5 Hello\n\
36+
{prefix}5-4 Hello #1\n\n\
37+
Call MIoT: {prefix}<cmd=prop/get|/prop/set|action> <params>\n\
38+
{prefix}action {quote}{{"did":"{did or "267090026"}","siid":5,"aiid":1,"in":["Hello"]}}{quote}\n\n\
39+
Call MiIO: {prefix}/<uri> <data>\n\
40+
{prefix}/home/device_list {quote}{{"getVirtualModel":false,"getHuamiDevices":1}}{quote}\n\n\
41+
Devs List: {prefix}list [name=full|name_keyword] [getVirtualModel=false|true] [getHuamiDevices=0|1]\n\
42+
{prefix}list Light true 0\n\n\
43+
MIoT Spec: {prefix}spec [model_keyword|type_urn] [format=text|python|json]\n\
44+
{prefix}spec\n\
45+
{prefix}spec speaker\n\
46+
{prefix}spec xiaomi.wifispeaker.lx04\n\
47+
{prefix}spec urn:miot-spec-v2:device:speaker:0000A015:xiaomi-lx04:1\n\n\
48+
MIoT Decode: {prefix}decode <ssecurity> <nonce> <data> [gzip]\n\
49+
'
50+
51+
52+
async def miio_command(service: MiIOService, did, text, prefix='?'):
53+
cmd, arg = twins_split(text, ' ')
54+
55+
if cmd.startswith('/'):
56+
return await service.miio_request(cmd, arg)
57+
58+
if cmd.startswith('prop') or cmd == 'action':
59+
return await service.miot_request(cmd, json.loads(arg) if arg else None)
60+
61+
argv = arg.split(' ') if arg else []
62+
argc = len(argv)
63+
if cmd == 'list':
64+
return await service.device_list(argc > 0 and argv[0], argc > 1 and string_to_value(argv[1]), argc > 2 and argv[2])
65+
66+
if cmd == 'spec':
67+
return await service.miot_spec(argc > 0 and argv[0], argc > 1 and argv[1])
68+
69+
if cmd == 'decode':
70+
return MiIOService.miot_decode(argv[0], argv[1], argv[2], argc > 3 and argv[3] == 'gzip')
71+
72+
if not did or not cmd or cmd == '?' or cmd == '?' or cmd == 'help' or cmd == '-h' or cmd == '--help':
73+
return miio_command_help(did, prefix)
74+
75+
if not did.isdigit():
76+
devices = await service.device_list(did)
77+
if not devices:
78+
return "Device not found: " + did
79+
did = devices[0]['did']
80+
81+
props = []
82+
setp = True
83+
miot = True
84+
for item in cmd.split(','):
85+
key, value = twins_split(item, '=')
86+
siid, iid = twins_split(key, '-', '1')
87+
if siid.isdigit() and iid.isdigit():
88+
prop = [int(siid), int(iid)]
89+
else:
90+
prop = [key]
91+
miot = False
92+
if value is None:
93+
setp = False
94+
elif setp:
95+
prop.append(string_or_value(value))
96+
props.append(prop)
97+
98+
if miot and argc > 0:
99+
args = [string_or_value(a) for a in argv] if arg != '#NA' else []
100+
return await service.miot_action(did, props[0], args)
101+
102+
do_props = ((service.home_get_props, service.miot_get_props), (service.home_set_props, service.miot_set_props))[setp][miot]
103+
return await do_props(did, props)

miioservice.py

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import os
2+
import time
3+
import base64
4+
import hashlib
5+
import hmac
6+
import json
7+
from miaccount import MiAccount
8+
9+
# REGIONS = ['cn', 'de', 'i2', 'ru', 'sg', 'us']
10+
11+
12+
class MiIOService:
13+
14+
def __init__(self, account: MiAccount, region=None):
15+
self.account = account
16+
self.server = 'https://' + ('' if region is None or region == 'cn' else region + '.') + 'api.io.mi.com/app'
17+
18+
async def miio_request(self, uri, data):
19+
def prepare_data(token, cookies):
20+
cookies['PassportDeviceId'] = token['deviceId']
21+
return MiIOService.sign_data(uri, data, token['xiaomiio'][0])
22+
headers = {'User-Agent': 'iOS-14.4-6.0.103-iPhone12,3--D7744744F7AF32F0544445285880DD63E47D9BE9-8816080-84A3F44E137B71AE-iPhone', 'x-xiaomi-protocal-flag-cli': 'PROTOCAL-HTTP2'}
23+
resp = await self.account.mi_request('xiaomiio', self.server + uri, prepare_data, headers)
24+
if 'result' not in resp:
25+
raise Exception(f"Error {uri}: {resp}")
26+
return resp['result']
27+
28+
async def home_request(self, did, method, params):
29+
return await self.miio_request('/home/rpc/' + did, {'id': 1, 'method': method, "accessKey": "IOS00026747c5acafc2", 'params': params})
30+
31+
async def home_get_props(self, did, props):
32+
return await self.home_request(did, 'get_prop', props)
33+
34+
async def home_set_props(self, did, props):
35+
return [await self.home_set_prop(did, i[0], i[1]) for i in props]
36+
37+
async def home_get_prop(self, did, prop):
38+
return (await self.home_get_props(did, [prop]))[0]
39+
40+
async def home_set_prop(self, did, prop, value):
41+
result = (await self.home_request(did, 'set_' + prop, value if isinstance(value, list) else [value]))[0]
42+
return 0 if result == 'ok' else result
43+
44+
async def miot_request(self, cmd, params):
45+
return await self.miio_request('/miotspec/' + cmd, {'params': params})
46+
47+
async def miot_get_props(self, did, iids):
48+
params = [{'did': did, 'siid': i[0], 'piid': i[1]} for i in iids]
49+
result = await self.miot_request('prop/get', params)
50+
return [it.get('value') if it.get('code') == 0 else None for it in result]
51+
52+
async def miot_set_props(self, did, props):
53+
params = [{'did': did, 'siid': i[0], 'piid': i[1], 'value': i[2]} for i in props]
54+
result = await self.miot_request('prop/set', params)
55+
return [it.get('code', -1) for it in result]
56+
57+
async def miot_get_prop(self, did, iid):
58+
return (await self.miot_get_props(did, [iid]))[0]
59+
60+
async def miot_set_prop(self, did, iid, value):
61+
return (await self.miot_set_props(did, [(iid[0], iid[1], value)]))[0]
62+
63+
async def miot_action(self, did, iid, args=[]):
64+
result = await self.miot_request('action', {'did': did, 'siid': iid[0], 'aiid': iid[1], 'in': args})
65+
return result.get('code', -1)
66+
67+
async def device_list(self, name=None, getVirtualModel=False, getHuamiDevices=0):
68+
result = await self.miio_request('/home/device_list', {'getVirtualModel': bool(getVirtualModel), 'getHuamiDevices': int(getHuamiDevices)})
69+
result = result['list']
70+
return result if name == 'full' else [{'name': i['name'], 'model': i['model'], 'did': i['did'], 'token': i['token']} for i in result if not name or name in i['name']]
71+
72+
async def miot_spec(self, type=None, format=None):
73+
if not type or not type.startswith('urn'):
74+
def get_spec(all):
75+
if not type:
76+
return all
77+
ret = {}
78+
for m, t in all.items():
79+
if type == m:
80+
return {m: t}
81+
elif type in m:
82+
ret[m] = t
83+
return ret
84+
import tempfile
85+
path = os.path.join(tempfile.gettempdir(), 'miservice_miot_specs.json')
86+
try:
87+
with open(path) as f:
88+
result = get_spec(json.load(f))
89+
except:
90+
result = None
91+
if not result:
92+
async with self.account.session.get('http://miot-spec.org/miot-spec-v2/instances?status=all') as r:
93+
all = {i['model']: i['type'] for i in (await r.json())['instances']}
94+
with open(path, 'w') as f:
95+
json.dump(all, f)
96+
result = get_spec(all)
97+
if len(result) != 1:
98+
return result
99+
type = list(result.values())[0]
100+
101+
url = 'http://miot-spec.org/miot-spec-v2/instance?type=' + type
102+
async with self.account.session.get(url) as r:
103+
result = await r.json()
104+
105+
def parse_desc(node):
106+
desc = node['description']
107+
# pos = desc.find(' ')
108+
# if pos != -1:
109+
# return (desc[:pos], ' # ' + desc[pos + 2:])
110+
name = ''
111+
for i in range(len(desc)):
112+
d = desc[i]
113+
if d in '-—{「[【((<《':
114+
return (name, ' # ' + desc[i:])
115+
name += '_' if d == ' ' else d
116+
return (name, '')
117+
118+
def make_line(siid, iid, desc, comment, readable=False):
119+
value = f"({siid}, {iid})" if format == 'python' else iid
120+
return f" {'' if readable else '_'}{desc} = {value}{comment}\n"
121+
122+
if format != 'json':
123+
STR_HEAD, STR_SRV, STR_VALUE = ('from enum import Enum\n\n', '\nclass {}(tuple, Enum):\n', '\nclass {}(int, Enum):\n') if format == 'python' else ('', '{} = {}\n', '{}\n')
124+
text = '# Generated by https://github.com/Yonsm/MiService\n# ' + url + '\n\n' + STR_HEAD
125+
svcs = []
126+
vals = []
127+
128+
for s in result['services']:
129+
siid = s['iid']
130+
svc = s['description'].replace(' ', '_')
131+
svcs.append(svc)
132+
text += STR_SRV.format(svc, siid)
133+
for p in s.get('properties', []):
134+
name, comment = parse_desc(p)
135+
access = p['access']
136+
137+
comment += ''.join([' # ' + k for k, v in [(p['format'], 'string'), (''.join([a[0] for a in access]), 'r')] if k and k != v])
138+
text += make_line(siid, p['iid'], name, comment, 'read' in access)
139+
if 'value-range' in p:
140+
valuer = p['value-range']
141+
length = min(3, len(valuer))
142+
values = {['MIN', 'MAX', 'STEP'][i]: valuer[i] for i in range(length) if i != 2 or valuer[i] != 1}
143+
elif 'value-list' in p:
144+
values = {i['description'].replace(' ', '_') if i['description'] else str(i['value']): i['value'] for i in p['value-list']}
145+
else:
146+
continue
147+
vals.append((svc + '_' + name, values))
148+
if 'actions' in s:
149+
text += '\n'
150+
for a in s['actions']:
151+
name, comment = parse_desc(a)
152+
comment += ''.join([f" # {io}={a[io]}" for io in ['in', 'out'] if a[io]])
153+
text += make_line(siid, a['iid'], name, comment)
154+
text += '\n'
155+
for name, values in vals:
156+
text += STR_VALUE.format(name)
157+
for k, v in values.items():
158+
text += f" {'_' + k if k.isdigit() else k} = {v}\n"
159+
text += '\n'
160+
if format == 'python':
161+
text += '\nALL_SVCS = (' + ', '.join(svcs) + ')\n'
162+
result = text
163+
return result
164+
165+
@staticmethod
166+
def miot_decode(ssecurity, nonce, data, gzip=False):
167+
from Crypto.Cipher import ARC4
168+
r = ARC4.new(base64.b64decode(MiIOService.sign_nonce(ssecurity, nonce)))
169+
r.encrypt(bytes(1024))
170+
decrypted = r.encrypt(base64.b64decode(data))
171+
if gzip:
172+
try:
173+
from io import BytesIO
174+
from gzip import GzipFile
175+
compressed = BytesIO()
176+
compressed.write(decrypted)
177+
compressed.seek(0)
178+
decrypted = GzipFile(fileobj=compressed, mode='rb').read()
179+
except:
180+
pass
181+
return json.loads(decrypted.decode())
182+
183+
@staticmethod
184+
def sign_nonce(ssecurity, nonce):
185+
m = hashlib.sha256()
186+
m.update(base64.b64decode(ssecurity))
187+
m.update(base64.b64decode(nonce))
188+
return base64.b64encode(m.digest()).decode()
189+
190+
@staticmethod
191+
def sign_data(uri, data, ssecurity):
192+
if not isinstance(data, str):
193+
data = json.dumps(data)
194+
nonce = base64.b64encode(os.urandom(8) + int(time.time() / 60).to_bytes(4, 'big')).decode()
195+
snonce = MiIOService.sign_nonce(ssecurity, nonce)
196+
msg = '&'.join([uri, snonce, nonce, 'data=' + data])
197+
sign = hmac.new(key=base64.b64decode(snonce), msg=msg.encode(), digestmod=hashlib.sha256).digest()
198+
return {'_nonce': nonce, 'data': data, 'signature': base64.b64encode(sign).decode()}

0 commit comments

Comments
 (0)