Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events #41

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[dev]
pip install --disable-pip-version-check -e .[dev]
- name: Linting tests
run: |
black --check --diff v4l2py tests examples
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ checks for `MenuItem`, these should be changed to `LegacyMenuItem`.
See the ``linux/videodev2.h`` header file for details.


* `Video for Linux Two Specification <https://www.kernel.org/doc/html/v6.2/userspace-api/media/v4l/v4l2.html>`
* [V4L2 (Latest)](https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/v4l2.html) ([videodev.h](https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/videodev.html))
* [V4L2 6.2](https://www.kernel.org/doc/html/v6.2/userspace-api/media/v4l/v4l2.html) ([videodev.h](https://www.kernel.org/doc/html/v6.2/userspace-api/media/v4l/videodev.html))

[pypi-python-versions]: https://img.shields.io/pypi/pyversions/v4l2py.svg
[pypi-version]: https://img.shields.io/pypi/v/v4l2py.svg
Expand Down
2 changes: 1 addition & 1 deletion tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def mmap(self, fd, length, offset):
assert self.fd == fd
return MemoryMap(self)

def select(self, readers, writers, other):
def select(self, readers, writers, other, timeout=None):
assert readers[0].fileno() == self.fd
return readers, writers, other

Expand Down
196 changes: 176 additions & 20 deletions v4l2py/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import mmap
import os
import pathlib
import select
import typing
from io import IOBase
from collections import UserDict
Expand Down Expand Up @@ -64,6 +65,10 @@ def _enum(name, prefix, klass=enum.IntEnum):
Priority = _enum("Priority", "V4L2_PRIORITY_")
TimeCode = _enum("TimeCode", "V4L2_TC_TYPE_")
TimeFlag = _enum("TimeFlag", "V4L2_TC_FLAG_", klass=enum.IntFlag)
EventType = _enum("EventType", "V4L2_EVENT_")
EventSubscriptionFlag = _enum(
"EventSubscriptionFlag", "V4L2_EVENT_SUB_FL_", klass=enum.IntFlag
)


def human_pixel_format(ifmt):
Expand Down Expand Up @@ -578,6 +583,32 @@ def set_priority(fd, priority: Priority):
ioctl(fd, IOC.S_PRIORITY, priority)


def subscribe_event(
fd,
event_type: EventType = EventType.ALL,
id: int = 0,
flags: EventSubscriptionFlag = 0,
):
sub = raw.v4l2_event_subscription()
sub.type = event_type
sub.id = id
sub.flags = flags
ioctl(fd, IOC.SUBSCRIBE_EVENT, sub)


def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0):
sub = raw.v4l2_event_subscription()
sub.type = event_type
sub.id = id
ioctl(fd, IOC.UNSUBSCRIBE_EVENT, sub)


def deque_event(fd):
event = raw.v4l2_event()
ioctl(fd, IOC.DQEVENT, event)
return event


# Helpers


Expand Down Expand Up @@ -783,6 +814,20 @@ def stream_off(self, buffer_type):
def write(self, data: bytes) -> None:
self._fobj.write(data)

def subscribe_event(
self,
event_type: EventType = EventType.ALL,
id: int = 0,
flags: EventSubscriptionFlag = 0,
):
return subscribe_event(self.fileno(), event_type, id, flags)

def unsubscribe_event(self, event_type: EventType = EventType.ALL, id: int = 0):
return unsubscribe_event(self.fileno(), event_type, id)

def deque_event(self):
return deque_event(self.fileno())


class Controls(dict):
@classmethod
Expand Down Expand Up @@ -1464,35 +1509,21 @@ def __init__(self, buffer_manager: BufferManager):
self.buffer_manager = buffer_manager
self.buffers = None
self.reader = QueueReader(buffer_manager, Memory.MMAP)
self.frame_reader = FrameReader(self.device, self.raw_read)

@property
def device(self) -> Device:
return self.buffer_manager.device

def __iter__(self):
while True:
yield self.read()
with self.frame_reader:
while True:
yield self.frame_reader.read()

async def __aiter__(self):
device = self.device.fileno()
loop = asyncio.get_event_loop()
event = asyncio.Event()
frame = None

def cb():
nonlocal frame
frame = self.raw_read()
event.set()

loop.add_reader(device, cb)
try:
async with self.frame_reader:
while True:
await event.wait()
event.clear()
yield frame
frame = None
finally:
loop.remove_reader(device)
yield await self.frame_reader.aread()

def open(self):
if self.buffers is None:
Expand Down Expand Up @@ -1539,6 +1570,131 @@ def read(self):
return self.read()


class EventReader:
def __init__(self, device: Device, max_queue_size=100):
self.device = device
self._loop = None
self._selector = None
self._buffer = None
self._max_queue_size = max_queue_size

async def __aenter__(self):
if self.device.is_blocking:
raise V4L2Error("Cannot use async event reader on blocking device")
self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
self._selector = select.epoll()
self._loop = asyncio.get_event_loop()
self._loop.add_reader(self._selector.fileno(), self._on_event)
self._selector.register(self.device.fileno(), select.EPOLLPRI)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self._selector.unregister(self.device.fileno())
self._loop.remove_reader(self._selector.fileno())
self._selector.close()
self._selector = None
self._loop = None
self._buffer = None

async def __aiter__(self):
while True:
yield await self.aread()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, tb):
pass

def _on_event(self):
task = self._loop.create_future()
try:
self._selector.poll(0) # avoid blocking
event = self.device.deque_event()
task.set_result(event)
except Exception as error:
task.set_exception(error)

buffer = self._buffer
if buffer.full():
self.device.log.debug("missed event")
buffer.popleft()
buffer.put_nowait(task)

def read(self, timeout=None):
if not self.device.is_blocking:
_, _, exc = self.device.io.select((), (), (self.device,), timeout)
if not exc:
return
return self.device.deque_event()

async def aread(self):
"""Wait for next event or return last event in queue"""
task = await self._buffer.get()
return await task


class FrameReader:
def __init__(self, device: Device, raw_read, max_queue_size=1):
self.device = device
self.raw_read = raw_read
self._loop = None
self._selector = None
self._buffer = None
self._max_queue_size = max_queue_size

async def __aenter__(self):
if self.device.is_blocking:
raise V4L2Error("Cannot use async frame reader on blocking device")
self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
self._selector = select.epoll()
self._loop = asyncio.get_event_loop()
self._loop.add_reader(self._selector.fileno(), self._on_event)
self._selector.register(self.device.fileno(), select.POLLIN)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self._selector.unregister(self.device.fileno())
self._loop.remove_reader(self._selector.fileno())
self._selector.close()
self._selector = None
self._loop = None
self._buffer = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, tb):
pass

def _on_event(self):
task = self._loop.create_future()
try:
self._selector.poll(0) # avoid blocking
data = self.raw_read()
task.set_result(data)
except Exception as error:
task.set_exception(error)

buffer = self._buffer
if buffer.full():
self.device.log.warn("missed frame")
buffer.get_nowait()
buffer.put_nowait(task)

def read(self, timeout=None):
if not self.device.is_blocking:
read, _, _ = self.device.io.select((self.device,), (), (), timeout)
if not read:
return
return self.raw_read()

async def aread(self):
"""Wait for next frame or return last frame"""
task = await self._buffer.get()
return await task


class QueueReader:
def __init__(self, buffer_manager: BufferManager, memory: Memory):
self.buffer_manager = buffer_manager
Expand Down
Loading