Skip to content

Commit

Permalink
New upstream version 0.0.8.79.gd61cf06
Browse files Browse the repository at this point in the history
  • Loading branch information
sjoerdsimons committed Oct 29, 2023
1 parent bf3162b commit dd37dbf
Show file tree
Hide file tree
Showing 16 changed files with 783 additions and 70 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ Configure the PDU in the config file as usual, then launch pdudaemon with the fo
$ pdudaemon --conf=share/pdudaemon.conf --drive --hostname pdu01 --port 1 --request reboot
```

If requesting reboot, the delay between turning the port off and on can be modified with `--delay`
and is by default 5 seconds.

## Adding drivers
Drivers are implemented children of the "PDUDriver" class and many example
implementations can be found inside the
Expand Down
2 changes: 2 additions & 0 deletions pdudaemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ async def main_async():
drive.add_argument("--drive", action="store_true", default=False)
drive.add_argument("--request", dest="driverequest", action="store", type=str)
drive.add_argument("--retries", dest="driveretries", action="store", type=int, default=5)
drive.add_argument("--delay", dest="drivedelay", action="store", type=int, default=5)
drive.add_argument("--port", dest="driveport", action="store", type=str)

# Parse the command line
Expand Down Expand Up @@ -181,6 +182,7 @@ async def main_async():
runner = PDURunner(config, options.drivehostname, options.driveretries)
if options.driverequest == "reboot":
result = await runner.do_job_async(options.driveport, "off")
await asyncio.sleep(int(options.drivedelay))
result = await runner.do_job_async(options.driveport, "on")
else:
result = await runner.do_job_async(options.driveport, options.driverequest)
Expand Down
103 changes: 66 additions & 37 deletions pdudaemon/drivers/cleware.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/python3

# Copyright 2022 Sjoerd Simons <[email protected]>
# Copyright 2023 Sietze van Buuren <[email protected]>
#
# Based on PDUDriver:
# Copyright 2013 Linaro Limited
Expand All @@ -22,33 +23,82 @@
# MA 02110-1301, USA.

import logging
from pdudaemon.drivers.driver import PDUDriver
import hid
import os
import time
import hid
from pdudaemon.drivers.driver import PDUDriver
from pdudaemon.drivers.hiddevice import HIDDevice


log = logging.getLogger("pdud.drivers." + os.path.basename(__file__))

CLEWARE_VID = 0x0d50
CLEWARE_SWITCH1_PID = 0x0008
CLEWARE_CONTACT00_PID = 0x0030
CLEWARE_NEW_SWITCH_SERIAL = 0x63813


class ClewareSwitch1Base(PDUDriver):
class ClewareBase(PDUDriver):
""" Base class for Cleware USB-Switch drivers """
switch_pid = None
connection = None
port_count = 0

def __init__(self, hostname, settings):
self.hostname = hostname
self.settings = settings
self.serial = settings.get("serial", u"")
log.debug("serial: %s" % self.serial)

self.serial = int(settings.get("serial", u""))
log.debug("serial: %s", self.serial)
super().__init__()

def new_switch_serial(self, device_path):
""" Find the correct serial for novel Cleware USB Switch devices """
with HIDDevice(path=device_path) as dev:
serial = 0
for i in range(8, 15):
b = int(chr(self.read_byte(dev, i)), 16)
serial *= 16
serial += b
return serial

def device_path(self):
""" Search and return the matching device path """
for dev_dict in hid.enumerate(CLEWARE_VID, self.switch_pid):
device_path = dev_dict['path']
serial_compare = int(dev_dict["serial_number"], 16)
if self.serial == serial_compare:
return device_path
if serial_compare == CLEWARE_NEW_SWITCH_SERIAL:
serial_candidate = self.new_switch_serial(device_path)
log.debug("Considering serial number match: %s", serial_candidate)
if self.serial == serial_candidate:
return device_path
continue
err = f"Cleware device with serial number {self.serial} not found!"
log.error(err)
raise RuntimeError(err)

@staticmethod
def read_byte(dev, addr):
dev.write([0, 2, addr])
while True:
data = dev.read(16)
if data[4] == addr:
return data[5]
time.sleep(0.01)

@classmethod
def accepts(cls, drivername):
return drivername.lower() == cls.__name__.lower()


class ClewareSwitch1Base(ClewareBase):
switch_pid = CLEWARE_SWITCH1_PID

def port_interaction(self, command, port_number):
port_number = int(port_number)
if port_number > self.port_count or port_number < 1:
err = "Port should be in the range 1 - %d" % (self.port_count)
err = f"Port should be in the range 1 - {self.port_count}"
log.error(err)
raise RuntimeError(err)

Expand All @@ -58,39 +108,24 @@ def port_interaction(self, command, port_number):
elif command == "off":
on = 0
else:
log.error("Unknown command %s." % (command))
log.error("Unknown command %s.", (command))
return

d = hid.device()
d.open(CLEWARE_VID, CLEWARE_SWITCH1_PID, serial_number=self.serial)
d.write([0, 0, port, on])
d.close()

@classmethod
def accepts(cls, drivername):
return drivername.lower() == cls.__name__.lower()
with HIDDevice(path=self.device_path()) as dev:
dev.write([0, 0, port, on])


class ClewareUsbSwitch4(ClewareSwitch1Base):
port_count = 4


class ClewareContact00Base(PDUDriver):
connection = None
port_count = 0

def __init__(self, hostname, settings):
self.hostname = hostname
self.settings = settings
self.serial = settings.get("serial", u"")
log.debug("serial: %s" % self.serial)

super().__init__()
class ClewareContact00Base(ClewareBase):
switch_pid = CLEWARE_CONTACT00_PID

def port_interaction(self, command, port_number):
port_number = int(port_number)
if port_number > self.port_count or port_number < 1:
err = "Port should be in the range 1 - %d" % (self.port_count)
err = f"Port should be in the range 1 - {self.port_count}"
log.error(err)
raise RuntimeError(err)

Expand All @@ -100,17 +135,11 @@ def port_interaction(self, command, port_number):
elif command == "off":
on = 0
else:
log.error("Unknown command %s." % (command))
log.error("Unknown command %s.", (command))
return

d = hid.device()
d.open(CLEWARE_VID, CLEWARE_CONTACT00_PID, serial_number=self.serial)
d.write([0, 3, on >> 8, on & 0xff, port >> 8, port & 0xff])
d.close()

@classmethod
def accepts(cls, drivername):
return drivername.lower() == cls.__name__.lower()
with HIDDevice(path=self.device_path()) as dev:
dev.write([0, 3, on >> 8, on & 0xff, port >> 8, port & 0xff])


class ClewareUsbSwitch8(ClewareContact00Base):
Expand Down
Loading

0 comments on commit dd37dbf

Please sign in to comment.