Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect to ap with pc. #9

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ Using the native AIRMX app (not working as Nov 2024):
* Follow [screenshots](./images/ios) (make sure to give geolocation permissions)
* After entering SSID and password, wait 20-30 seconds and you can close the application

Using Windows PC with ble controller.

* Env: Python >= 3.7, "pip install bleak"
* Edit SSID and PASSWOR in ./change_ap_with_pc/conn.py
* Run "pyhon ./change_ap_with_pc/conn.py", will scan nearby ble devices, choose each AIRMX device.

## Other

* Changing AP binding can be done at any time, no need to remove the device from HA
4 changes: 4 additions & 0 deletions README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
2. 选择“Automatic setup(AIRMX addon required)”
3. 勾选你需要添加设备对应的MAC地址【MAC的后两位可能会与路由里显示的MAC对应不上,确认前5位基本就可确认】

## 在电脑上给加湿器设置AP
1. 需要一个具有蓝牙功能的PC以及Python环境(Python >= 3.7)。
2. 按需修改conn.py中的用户名及密码。
3. 执行python ./change_ap_with_pc/conn.py即可,会扫描附近的设备并列出,在命令行选择设备。

## 其他
* 我并没把原作者的全部信息翻译和复制过来,这只是我在没有原作者硬件环境的情况下,实现的办法
136 changes: 136 additions & 0 deletions change_ap_with_pc/conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import asyncio
from contextlib import suppress
from dataclasses import dataclass
import logging
from bleak import BleakClient, BleakError, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak import BLEDevice

# 设置日志记录,确保INFO级别的日志也能打印出来
logging.basicConfig(level=logging.DEBUG) # 设置日志记录级别为DEBUG,INFO也会打印

_LOGGER = logging.getLogger(__name__)

PACKET_SIZE = 16
NOTIFICATION_UUID = "22210002-554a-4546-5542-46534450464d"
COMMAND_UUID = "22210001-554a-4546-5542-46534450464d"


@dataclass
class BindAPRequest:
ssid: str
password: str

@property
def as_bytes(self) -> bytes:
data = b""
data += len(self.ssid).to_bytes(1, 'little') # Added '1' as the byte length argument
data += self.ssid.encode()
data += len(self.password).to_bytes(1, 'little') # Added '1' as the byte length argument
data += self.password.encode()
return data


class AirWaterBLEConnector:
_bind_ap_done = False

def _notification_handler(self, _: BleakGATTCharacteristic, data: bytearray) -> None:
_LOGGER.debug("< %s", data.hex())
if data != b"\x00\x11\x00\x15\x01":
_LOGGER.error(f"Unexpected data: {data.hex()}")
else:
self._bind_ap_done = True

async def bind_ap(self, device: BLEDevice, ssid: str, password: str) -> None:
_LOGGER.debug(f"Connecting to {device}...")

async with BleakClient(device) as client:
await client.start_notify(NOTIFICATION_UUID, self._notification_handler)

request = BindAPRequest(ssid, password).as_bytes
request_size = len(request)
packet_count = int(request_size / PACKET_SIZE)
if request_size % PACKET_SIZE > 0:
packet_count += 1

for seq in range(0, packet_count):
csum = (seq + 1 << 4) + packet_count

packet = b""
packet += seq.to_bytes(1, 'little') # Added '1' as the byte length argument
packet += csum.to_bytes(1, 'little') # Added '1' as the byte length argument
packet += b"\x00\x15"

f = seq * PACKET_SIZE
s = seq * PACKET_SIZE + PACKET_SIZE
packet += request[f:s]

_LOGGER.debug("> %s", packet.hex())
await client.write_gatt_char(COMMAND_UUID, packet, response=True)

for _ in range(0, 60):
await asyncio.sleep(0.3)
if self._bind_ap_done:
break

if not self._bind_ap_done:
raise Exception("AP binding timeout")

ack_packet = (seq + 1).to_bytes(1, 'little') + b"\x11\x00\x16"
_LOGGER.debug("> %s", ack_packet.hex())
await client.write_gatt_char(COMMAND_UUID, ack_packet, response=True)

with suppress(BleakError):
await client.stop_notify(NOTIFICATION_UUID)

_LOGGER.info(f"Successfully bound to {device.name} with SSID: {ssid}.")

# 扫描附近的蓝牙设备并列出设备供用户选择
async def scan_devices():
devices = await BleakScanner.discover()
if not devices:
_LOGGER.error("No BLE devices found.")
return []

print("Found the following BLE devices:")
for i, device in enumerate(devices):
print(f"{i + 1}: {device.name} ({device.address})")

return devices

# 让用户选择设备
def choose_device(devices):
while True:
try:
choice = int(input(f"Please choose a device by number (1-{len(devices)}): "))
if 1 <= choice <= len(devices):
return devices[choice - 1]
else:
print("Invalid choice. Please select a valid number.")
except ValueError:
print("Invalid input. Please enter a number.")

# 异步主函数
async def main():
devices = await scan_devices()
if not devices:
return # No devices found, exit

device = choose_device(devices)

# 设置Wi-Fi信息
ssid = "SSID"
password = "PASSWORD"

# 创建 AirWaterBLEConnector 实例并绑定 AP
air_water_ble_connector = AirWaterBLEConnector()

try:
# 执行连接操作
await air_water_ble_connector.bind_ap(device, ssid, password)
print(f"AP binding was successful for {device.name}!")
except Exception as e:
print(f"Failed to bind AP to {device.name}. Error: {e}")

# 运行主程序
asyncio.run(main())