-
-
Notifications
You must be signed in to change notification settings - Fork 70
/
PowerPlugins.py
148 lines (132 loc) · 5.97 KB
/
PowerPlugins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (c) 2022 Aldo Hoeben / fieldOfView
# OctoPrintPlugin is released under the terms of the AGPLv3 or higher.
from collections import OrderedDict
from typing import Any, Tuple, List, Dict
class PowerPlugins:
def __init__(self) -> None:
self._available_plugs = OrderedDict() # type: Dict[str, Any]
def parsePluginData(self, plugin_data: Dict[str, Any]) -> None:
self._available_plugs = OrderedDict() # type: Dict[str, Any]
# plugins that only support a single plug
simple_plugins = [
("psucontrol", "PSU Control", []),
("mystromswitch", "MyStrom Switch", ["ip"]),
("ikea_tradfri", "IKEA Trådfri", ["gateway_ip", "selected_outlet"]),
] # type: List[Tuple[str, str, List[str]]]
for (plugin_id, plugin_name, additional_data) in simple_plugins:
if plugin_id in plugin_data:
plug_data = plugin_data[plugin_id]
all_config_set = True
for config_item in additional_data:
if not plug_data.get(config_item, None):
all_config_set = False
break
if all_config_set:
plug = OrderedDict([("plugin", plugin_id), ("name", plugin_name)])
self._available_plugs[self._createPlugId(plug)] = plug
# plugins that have a `label` and `ip` specified in `arrSmartplugs`
common_api_plugins = [
("tplinksmartplug", "TP-Link Smartplug", []), # ip
("orvibos20", "Orvibo S20", []), # ip
("wemoswitch", "Wemo Switch", []), # ip
("tuyasmartplug", "Tuya Smartplug", []), # label
(
"domoticz",
"Domoticz",
["idx", "username", "password"],
), # ip, idx, username, password
(
"tasmota",
"Tasmota",
["idx"],
), # ip, idx, username, password, backlog_delay
] # type: List[Tuple[str, str, List[str]]]
for (plugin_id, plugin_name, additional_data) in common_api_plugins:
if plugin_id in plugin_data and "arrSmartplugs" in plugin_data[plugin_id]:
for plug_data in plugin_data[plugin_id]["arrSmartplugs"]:
if plug_data.get("ip", ""):
plug_label = plug_data.get("label", "")
plug = OrderedDict(
[
("plugin", plugin_id),
(
"name",
"%s (%s)" % (plug_label, plugin_name)
if plug_label
else plugin_name,
),
("label", plug_label),
("ip", plug_data["ip"]),
]
)
for key in additional_data:
plug[key] = plug_data.get(key, "")
self._available_plugs[self._createPlugId(plug)] = plug
# `tasmota_mqtt` has a slightly different settings dialect
if "tasmota_mqtt" in plugin_data:
plugin_id = "tasmota_mqtt"
plugin_name = "Tasmota MQTT"
for plug_data in plugin_data[plugin_id]["arrRelays"]:
if plug_data.get("topic", "") and plug_data.get("relayN", ""):
plug = OrderedDict(
[
("plugin", plugin_id),
(
"name",
"%s/%s (%s)"
% (
plug_data["topic"],
plug_data["relayN"],
plugin_name,
),
),
("topic", plug_data["topic"]),
("relayN", plug_data["relayN"]),
]
)
self._available_plugs[self._createPlugId(plug)] = plug
def _createPlugId(self, plug_data: Dict[str, Any]) -> str:
interesting_bits = [v for (k, v) in plug_data.items() if k != "name"]
return "/".join(interesting_bits)
def getAvailablePowerPlugs(self) -> Dict[str, Any]:
return self._available_plugs
def getSetStateCommand(
self, plug_id: str, state: bool
) -> Tuple[str, Dict[str, Any]]:
if plug_id not in self._available_plugs:
return ("", {})
plugin_id = self._available_plugs[plug_id]["plugin"]
end_point = "plugin/" + plugin_id
if plugin_id == "psucontrol":
return (
end_point,
OrderedDict([("command", "turnPSUOn" if state else "turnPSUOff")]),
)
if plugin_id == "mystromswitch":
return (
end_point,
OrderedDict(
[("command", "enableRelais" if state else "disableRelais")]
),
)
plug_data = self._available_plugs[plug_id]
command = OrderedDict([("command", "turnOn" if state else "turnOff")])
arguments = [] # type: List[str]
if plugin_id in ["tplinksmartplug", "orvibos20", "wemoswitch"]:
# ip
arguments = ["ip"]
elif plugin_id == "domoticz":
# ip, idx, username, password
arguments = ["ip", "idx", "username", "password"]
elif plugin_id == "tasmota_mqtt":
# topic, relayN
arguments = ["topic", "relayN"]
elif plugin_id == "tasmota":
# ip, idx
arguments = ["ip", "idx"]
elif plugin_id == "tuyasmartplug":
# label
arguments = ["label"]
for key in arguments:
command[key] = plug_data[key]
return (end_point, command)