forked from bbx10/artnet-unicorn-hat
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathartnet-server.py
136 lines (127 loc) · 5.41 KB
/
artnet-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Art-Net protocol for Pimoroni Unicorn Hat
# Open Pixel Control protocol for Pimoroni Unicorn Hat
# License: MIT
import unicornhat as unicorn
from twisted.internet import protocol, endpoints
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
# Adjust the LED brightness as needed.
unicorn.brightness(0.5)
class ArtNet(DatagramProtocol):
def datagramReceived(self, data, (host, port)):
if ((len(data) > 18) and (data[0:8] == "Art-Net\x00")):
rawbytes = map(ord, data)
opcode = rawbytes[8] + (rawbytes[9] << 8)
protocolVersion = (rawbytes[10] << 8) + rawbytes[11]
if ((opcode == 0x5000) and (protocolVersion >= 14)):
sequence = rawbytes[12]
physical = rawbytes[13]
sub_net = (rawbytes[14] & 0xF0) >> 4
universe = rawbytes[14] & 0x0F
net = rawbytes[15]
rgb_length = (rawbytes[16] << 8) + rawbytes[17]
#print "seq %d phy %d sub_net %d uni %d net %d len %d" % \
#(sequence, physical, sub_net, universe, net, rgb_length)
idx = 18
x = 0
y = 0
while ((idx < (rgb_length+18)) and (y < 8)):
r = rawbytes[idx]
idx += 1
g = rawbytes[idx]
idx += 1
b = rawbytes[idx]
idx += 1
unicorn.set_pixel(x, y, r, g, b)
x += 1
if (x > 7):
x = 0
y += 1
unicorn.show()
class OPC(protocol.Protocol):
# Parse Open Pixel Control protocol. See http://openpixelcontrol.org/.
MAX_LEDS = 64
parseState = 0
pktChannel = 0
pktCommand = 0
pktLength = 0
pixelCount = 0
pixelLimit = 0
def dataReceived(self, data):
rawbytes = map(ord, data)
#print "len(rawbytes) %d" % len(rawbytes)
#print rawbytes
i = 0
while (i < len(rawbytes)):
#print "parseState %d i %d" % (OPC.parseState, i)
if (OPC.parseState == 0): # get OPC.pktChannel
OPC.pktChannel = rawbytes[i]
i += 1
OPC.parseState += 1
elif (OPC.parseState == 1): # get OPC.pktCommand
OPC.pktCommand = rawbytes[i]
i += 1
OPC.parseState += 1
elif (OPC.parseState == 2): # get OPC.pktLength.highbyte
OPC.pktLength = rawbytes[i] << 8
i += 1
OPC.parseState += 1
elif (OPC.parseState == 3): # get OPC.pktLength.lowbyte
OPC.pktLength |= rawbytes[i]
i += 1
OPC.parseState += 1
OPC.pixelCount = 0
OPC.pixelLimit = min(3*OPC.MAX_LEDS, OPC.pktLength)
#print "OPC.pktChannel %d OPC.pktCommand %d OPC.pktLength %d OPC.pixelLimit %d" % \
# (OPC.pktChannel, OPC.pktCommand, OPC.pktLength, OPC.pixelLimit)
if (OPC.pktLength > 3*OPC.MAX_LEDS):
print "Received pixel packet exeeds size of buffer! Data discarded."
if (OPC.pixelLimit == 0):
OPC.parseState = 0
elif (OPC.parseState == 4):
copyBytes = min(OPC.pixelLimit - OPC.pixelCount, len(rawbytes) - i)
if (copyBytes > 0):
OPC.pixelCount += copyBytes
#print "OPC.pixelLimit %d OPC.pixelCount %d copyBytes %d" % \
# (OPC.pixelLimit, OPC.pixelCount, copyBytes)
if ((OPC.pktCommand == 0) and (OPC.pktChannel <= 1)):
x = 0
y = 0
iLimit = i + copyBytes
while ((i < iLimit) and (y < 8)):
#print "i %d" % (i)
r = rawbytes[i]
i += 1
g = rawbytes[i]
i += 1
b = rawbytes[i]
i += 1
unicorn.set_pixel(x, y, r, g, b)
#print "x %d y %d r %d g %d b %d" % (x,y,r,g,b)
x += 1
if (x > 7):
x = 0
y += 1
if (OPC.pixelCount >= OPC.pixelLimit):
unicorn.show()
else:
i += copyBytes
if (OPC.pixelCount == OPC.pktLength):
OPC.parseState = 0
else:
OPC.parseState += 1
elif (OPC.parseState == 5):
discardBytes = min(OPC.pktLength - OPC.pixelLimit, len(rawbytes) - i)
#print "discardBytes %d" % (discardBytes)
OPC.pixelCount += discardBytes
i += discardBytes
if (OPC.pixelCount >= OPC.pktLength):
OPC.parseState = 0
else:
print "Invalid OPC.parseState %d" % (OPC.parseState)
class OPCFactory(protocol.Factory):
def buildProtocol(self, addr):
return OPC()
reactor.listenUDP(6454, ArtNet())
endpoints.serverFromString(reactor, "tcp:7890").listen(OPCFactory())
reactor.run()