Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature.fan support #153

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions pywizlight/bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ def _validate_ratio_or_raise(ratio: int) -> None:
raise ValueError("Value must be between 0 and 100")


def _validate_fan_state_or_raise(state: int) -> None:
if not 0 <= state <= 1:
raise ValueError("Value must be between 0 and 1")


def _validate_fan_speed_or_raise(speed: int) -> None:
if not 1 <= speed <= 6:
raise ValueError("Value must be between 1 and 6")


def _validate_fan_mode_or_raise(mode: int) -> None:
if not 1 <= mode <= 2:
raise ValueError("Value must be between 1 and 2")


def _validate_fan_reverse_or_raise(reverse: int) -> None:
if not 0 <= reverse <= 1:
raise ValueError("Value must be between 0 and 1")


class PilotBuilder:
"""Get information from the bulb."""

Expand All @@ -127,6 +147,10 @@ def __init__(
colortemp: Optional[int] = None,
state: bool = True,
ratio: Optional[int] = None,
fan_state: Optional[int] = None,
fan_mode: Optional[int] = None,
fan_reverse: Optional[int] = None,
fan_speed: Optional[int] = None,
) -> None:
"""Set the parameter."""
self.pilot_params: Dict[str, Any] = {"state": state}
Expand All @@ -152,6 +176,14 @@ def __init__(
self._set_warm_white(warm_white)
if cold_white is not None:
self._set_cold_white(cold_white)
if fan_state is not None:
self._set_fan_state(0)
if fan_mode is not None:
self._set_fan_mode(1)
if fan_reverse is not None:
self._set_fan_reverse(0)
if fan_speed is not None:
self._set_fan_speed(1)

def set_pilot_message(self) -> Dict:
"""Return the pilot message."""
Expand Down Expand Up @@ -246,6 +278,27 @@ def _set_colortemp(self, kelvin: int) -> None:
# normalize the kelvin values - should be removed
self.pilot_params["temp"] = min(10000, max(1000, kelvin))

def _set_fan_state(self, fan_state: int) -> None:
"""Set the fan state to on or off."""
_validate_fan_state_or_raise(fan_state)
self.pilot_params["fanState"] = fan_state

def _set_fan_mode(self, fan_mode: int) -> None:
"""Set the fan mode to normal or breeze."""
_validate_fan_mode_or_raise(fan_mode)
self.pilot_params["fanMode"] = fan_mode

def _set_fan_reverse(self, fan_reverse: int) -> None:
"""Set the fan to rotate normally (summer) or reverse (winter)."""
_validate_fan_reverse_or_raise(fan_reverse)
self.pilot_params["fanRevrs"] = fan_reverse

def _set_fan_speed(self, fan_speed: int) -> None:
"""Set the fan speed."""
# TODO test the actual range from discovery
_validate_fan_speed_or_raise(fan_speed)
self.pilot_params["fanSpeed"] = fan_speed


def _extract_bool(response: BulbResponse, key: str) -> Optional[bool]:
return bool(response[key]) if key in response else None
Expand Down Expand Up @@ -363,6 +416,28 @@ def get_colortemp(self) -> Optional[int]:
"""Get the color temperature from the bulb."""
return _extract_int(self.pilotResult, "temp")

def get_fan_state(self) -> Optional[int]:
"""Get the the fan state."""
return _extract_int(self.pilotResult, "fanState")

def get_fan_mode(self) -> Optional[int]:
"""Get the the fan mode."""
return _extract_int(self.pilotResult, "fanMode")

def get_fan_reverse(self) -> Optional[int]:
"""Get the the fan rotation."""
return _extract_int(self.pilotResult, "fanRevrs")

def get_fan_speed(self) -> Optional[int]:
"""Get the the fan speed."""
return _extract_int(self.pilotResult, "fanSpeed")

def get_fan_speed_range(self) -> Optional[List[int]]:
"""Get the value of the fanSpeed range property."""
if "fanSpeed" in self.pilotResult:
return list(range(1, (self.pilotResult["fanSpeed"] + 1)))
return None


async def _send_udp_message_with_retry(
message: str,
Expand Down Expand Up @@ -434,6 +509,7 @@ def __init__(
self.modelConfig: Optional[Dict] = None
self.whiteRange: Optional[List[float]] = None
self.extwhiteRange: Optional[List[float]] = None
self.fanSpeedRange: Optional[List[int]] = None
self.transport: Optional[asyncio.DatagramTransport] = None
self.protocol: Optional[WizProtocol] = None
self.history = WizHistory()
Expand All @@ -456,6 +532,7 @@ def diagnostics(self) -> dict:
"state": self.state.pilotResult if self.state else None,
"white_range": self.whiteRange,
"extended_white_range": self.extwhiteRange,
"fan_speed_range": self.fanSpeedRange,
"bulb_type": self.bulbtype.as_dict() if self.bulbtype else None,
"last_push": self.last_push,
"push_running": self.push_running,
Expand Down Expand Up @@ -567,6 +644,7 @@ async def get_bulbtype(self) -> BulbType:
bulb_config = await self.getBulbConfig()
result = bulb_config["result"]
white_range = await self.getExtendedWhiteRange()
fan_speed_range = await self.getFanSpeedRange()
white_to_color_ratio = None
white_channels = None
if "drvConf" in result:
Expand All @@ -581,6 +659,7 @@ async def get_bulbtype(self) -> BulbType:
self.bulbtype = BulbType.from_data(
module_name,
white_range,
fan_speed_range,
fw_version,
white_channels,
white_to_color_ratio,
Expand Down Expand Up @@ -615,6 +694,22 @@ async def getExtendedWhiteRange(self) -> Optional[List[float]]:

return self.extwhiteRange

async def getFanSpeedRange(self) -> Optional[List[int]]:
"""Read fan speed range from the bulb."""
if self.fanSpeedRange is not None:
return self.fanSpeedRange

# First for FW > 1.22
resp = await self.getModelConfig()
if resp is None or "result" not in resp:
# For old FW < 1.22
resp = await self.getUserConfig()

if "result" in resp:
self.fanSpeedRange = PilotParser(resp["result"]).get_fan_speed_range()

return self.fanSpeedRange

async def getSupportedScenes(self) -> List[str]:
"""Return the supported scenes based on type.

Expand Down Expand Up @@ -656,6 +751,33 @@ async def turn_on(self, pilot_builder: PilotBuilder = PilotBuilder()) -> None:
"""
await self.send(pilot_builder.set_pilot_message())

# ---------- Fan Functions ------------
async def turn_fan_off(self) -> None:
"""Turn the fan off."""
await self.send({"method": "setPilot", "params": {"fanState": 0}})

async def turn_fan_on(self) -> None:
"""Turn the fan off."""
await self.send({"method": "setPilot", "params": {"fanState": 1}})

async def set_fan_speed(self, speed: int) -> None:
"""Set the fan speed."""
# If we have state: True in the setPilot, the speed does not change
_validate_fan_speed_or_raise(speed)
await self.send({"method": "setPilot", "params": {"fanSpeed": speed}})

async def set_fan_mode(self, mode: int) -> None:
"""Set the fan mode to breeze or normal."""
# If we have state: True in the setPilot, the speed does not change
_validate_fan_mode_or_raise(mode)
await self.send({"method": "setPilot", "params": {"fanMode": mode}})

async def set_fan_reverse(self, reverse: int) -> None:
"""Set the fan rotation to reverse (winter mode) or normal."""
# If we have state: True in the setPilot, the speed does not change
_validate_fan_reverse_or_raise(reverse)
await self.send({"method": "setPilot", "params": {"fanRevrs": reverse}})

async def set_state(self, pilot_builder: PilotBuilder = PilotBuilder()) -> None:
"""Set the state of the bulb with defined message. Doesn't turn on the light.

Expand Down Expand Up @@ -740,6 +862,19 @@ async def lightSwitch(self) -> None:
# if the light is off - turn on
await self.turn_on()

async def fanSwitch(self) -> None:
"""Turn the fan on or off like a switch."""
# first get the status
state = await self.updateState()
if not state: # Did not get state, nothing to do
return
if state.get_fan_state():
# if the light is on - switch off
await self.turn_fan_off()
else:
# if the light is off - turn on
await self.turn_fan_on()

async def send(self, msg_dict: Dict) -> BulbResponse:
"""Serialize a dict to json and send it to device over UDP."""
self.history.message(HISTORY_SEND, msg_dict)
Expand Down
36 changes: 36 additions & 0 deletions pywizlight/bulblibrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class Features:
brightness: bool
dual_head: bool

fan: bool
fan_breeze_mode: bool
fan_reverse: bool


@dataclasses.dataclass(frozen=True)
class KelvinRange:
Expand All @@ -40,6 +44,14 @@ class KelvinRange:
min: int


@dataclasses.dataclass(frozen=True)
class FanSpeedRange:
"""Defines the fan speed range."""

max: int
min: int


class BulbClass(Enum):
"""Bulb Types."""

Expand All @@ -51,6 +63,8 @@ class BulbClass(Enum):
"""Have RGB LEDs."""
SOCKET = "Socket"
"""Smart socket with only on/off."""
FANDIM = "Fan Dimmable"
"""Smart fan with only Dimmable white LEDs."""


KNOWN_TYPE_IDS = {0: BulbClass.DW}
Expand Down Expand Up @@ -80,6 +94,15 @@ class BulbClass(Enum):
"color": False,
"color_tmp": False,
},
# Fan with dimmable white only supports brightness and some basic effects
BulbClass.FANDIM: {
"brightness": True,
"color": False,
"color_tmp": False,
"fan": True,
"fan_breeze_mode": True,
"fan_reverse": True,
},
}


Expand All @@ -90,6 +113,7 @@ class BulbType:
features: Features
name: Optional[str]
kelvin_range: Optional[KelvinRange]
fan_speed_range: Optional[FanSpeedRange]
bulb_type: BulbClass
fw_version: Optional[str]
white_channels: Optional[int]
Expand All @@ -105,6 +129,7 @@ def as_dict(self):
def from_data(
module_name: str,
kelvin_list: Optional[List[float]],
fan_speed_list: Optional[List[int]],
fw_version: Optional[str],
white_channels: Optional[int],
white_to_color_ratio: Optional[int],
Expand All @@ -128,6 +153,9 @@ def from_data(
elif "SOCKET" in _identifier: # A smart socket
bulb_type = BulbClass.SOCKET
effect = False
elif "FANDIM" in _identifier: # A Fan with dimmable light
bulb_type = BulbClass.FANDIM
effect = False
else: # Plain brightness-only bulb
bulb_type = BulbClass.DW
effect = "DH" in _identifier or "SH" in _identifier
Expand Down Expand Up @@ -158,6 +186,13 @@ def from_data(
else:
kelvin_range = None

if fan_speed_list:
fan_speed_range: Optional[FanSpeedRange] = FanSpeedRange(
min=int(min(fan_speed_list)), max=int(max(fan_speed_list))
)
else:
fan_speed_range = None

features = Features(
**_BASE_FEATURE_MAP[bulb_type], dual_head=dual_head, effect=effect
)
Expand All @@ -167,6 +202,7 @@ def from_data(
name=module_name,
features=features,
kelvin_range=kelvin_range,
fan_speed_range=fan_speed_range,
fw_version=fw_version,
white_channels=white_channels,
white_to_color_ratio=white_to_color_ratio,
Expand Down
59 changes: 59 additions & 0 deletions pywizlight/tests/test_bulb_fan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Tests for the Bulb API with a fan."""
from typing import AsyncGenerator

import pytest

from pywizlight import wizlight
from pywizlight.bulblibrary import BulbClass, BulbType, Features, KelvinRange
from pywizlight.tests.fake_bulb import startup_bulb


@pytest.fixture()
async def fan() -> AsyncGenerator[wizlight, None]:
shutdown, port = await startup_bulb(
module_name="ESP03_FANDIMS_31", firmware_version="1.28.0"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()


@pytest.mark.asyncio
async def test_model_description_fan(fan: wizlight) -> None:
"""Test fetching the model description of a socket is None."""
bulb_type = await fan.get_bulbtype()
assert bulb_type == BulbType(
features=Features(
color=False,
color_tmp=False,
effect=False,
brightness=True,
dual_head=False,
fan=True,
fan_breeze_mode=True,
fan_reverse=True,
),
name="ESP03_FANDIMS_31",
kelvin_range=KelvinRange(max=2700, min=2700),
bulb_type=BulbClass.FANDIM,
fw_version="1.28.0",
white_channels=1,
white_to_color_ratio=20,
)


@pytest.mark.asyncio
async def test_diagnostics(fan: wizlight) -> None:
"""Test fetching diagnostics."""
await fan.get_bulbtype()
diagnostics = fan.diagnostics
assert diagnostics["bulb_type"]["bulb_type"] == "FANDIM"
assert diagnostics["history"]["last_error"] is None
assert diagnostics["push_running"] is False


@pytest.mark.asyncio
async def test_supported_scenes(fan: wizlight) -> None:
"""Test supported scenes."""
assert await fan.getSupportedScenes() == []