-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLVC_Camera.py
135 lines (111 loc) · 4.49 KB
/
LVC_Camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
from fcntl import ioctl
import v4l2
import mmap
import numpy as np
from threading import Thread, Event
from sys import stderr
from ev3dev2.button import Button
from time import sleep
class MonitorButton(Thread):
def __init__(self, parent):
Thread.__init__(self)
self.parent = parent
self.shutdown_event = Event()
self.btn = Button()
def __str__(self):
return "MonitorButton"
def check_exposure(self, exposure):
if exposure > 32768:
exposure = 32768
if exposure < 0:
exposure = 0
return exposure
def check_gain(self, gain):
if gain > 15:
gain = 15
if gain < 0:
gain = 0
return gain
def run(self):
while True:
if self.btn.up:
self.parent.exposure += 500
self.parent.exposure = self.check_exposure(self.parent.exposure)
self.parent.set_param()
print('exposure: ' + str(self.parent.exposure))
print('exposure: ' + str(self.parent.exposure), file=stderr)
if self.btn.down:
self.parent.exposure -= 500
self.parent.exposure = self.check_exposure(self.parent.exposure)
self.parent.set_param()
print('exposure: ' + str(self.parent.exposure))
print('exposure: ' + str(self.parent.exposure), file=stderr)
if self.btn.left:
self.parent.gain -= 1
self.parent.gain = self.check_gain(self.parent.gain)
self.parent.set_param()
print('gain: ' + str(self.parent.gain))
print('gain: ' + str(self.parent.gain), file=stderr)
if self.btn.right:
self.parent.gain += 1
self.parent.gain = self.check_gain(self.parent.gain)
self.parent.set_param()
print('gain: ' + str(self.parent.gain))
print('gain: ' + str(self.parent.gain), file=stderr)
if self.shutdown_event.is_set():
break
sleep(0.1)
class LegoVisionCommandCamera(object):
def __init__(self, device_name):
self.device_name = device_name
self.exposure = 20000 # min=0 max=32768 step=1 default=20000
self.gain = 10 # min=0 max=15 step=1 default=10
self.open_device()
self.set_param()
self.init_mmap()
self.mb = MonitorButton(self)
self.shutdown_event = Event()
def __del__(self):
buf_type = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE)
ioctl(self.fd, v4l2.VIDIOC_STREAMOFF, buf_type)
self.fd.close()
def open_device(self):
self.fd = open(self.device_name, 'rb+', buffering=0)
def set_param(self):
#fmt = v4l2.v4l2_format()
#fmt.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
#ioctl(self.fd, v4l2.VIDIOC_G_FMT, fmt)
#ioctl(self.fd, v4l2.VIDIOC_S_FMT, fmt)
control = v4l2.v4l2_control()
control.id = v4l2.V4L2_CID_EXPOSURE
control.value = self.exposure
ioctl(self.fd, v4l2.VIDIOC_S_CTRL, control)
control = v4l2.v4l2_control()
control.id = v4l2.V4L2_CID_GAIN
control.value = self.gain
ioctl(self.fd, v4l2.VIDIOC_S_CTRL, control)
def init_mmap(self):
req = v4l2.v4l2_requestbuffers()
req.count = 1
req.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
req.memory = v4l2.V4L2_MEMORY_MMAP
ioctl(self.fd, v4l2.VIDIOC_REQBUFS, req)
self.buf = v4l2.v4l2_buffer()
self.buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
self.buf.memory = v4l2.V4L2_MEMORY_MMAP
self.buf.index = 0
ioctl(self.fd, v4l2.VIDIOC_QUERYBUF, self.buf)
self.buf.buffer = mmap.mmap(self.fd.fileno(), self.buf.length, mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE, offset=self.buf.m.offset)
def start_capturing(self):
ioctl(self.fd, v4l2.VIDIOC_QBUF, self.buf)
buf_type = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE)
ioctl(self.fd, v4l2.VIDIOC_STREAMON, buf_type)
self.mb.start()
#self.shutdown_event.wait()
def read(self):
ioctl(self.fd, v4l2.VIDIOC_DQBUF, self.buf)
raw_data = self.buf.buffer.read(self.buf.bytesused)
self.buf.buffer.seek(0)
ioctl(self.fd, v4l2.VIDIOC_QBUF, self.buf)
return np.frombuffer(raw_data, dtype=np.uint8).reshape((292, 356))