-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskqueue.py
41 lines (32 loc) · 944 Bytes
/
taskqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import queue
from threading import Thread
class TaskQueue(queue.Queue):
"""
A class to run tasks in parallel
Example:
q = TaskQueue(num_workers=2)
q.add_task(task, a, b)
q.join() # blocking
"""
def __init__(self, num_workers=1):
queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()
def add_task(self, task, *args, **kwargs):
"""Add task to the queue
Args:
task (func): Function to be run
"""
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))
def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()