-
Notifications
You must be signed in to change notification settings - Fork 1
/
parallel.rb
executable file
·157 lines (132 loc) · 4.9 KB
/
parallel.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env ruby
module Enumerable
FEACH_DEBUG = false
# Iterates over an enumerable in parallel given a number of processes and a
# block to call. The parallel iteration is done by a pool of workers created
# using fork. An enumerator with synchronized output is returned and can be
# iterated over. Calling parallel with :processes => 0 disables forking for
# debugging purposes.
#
# @example - process 10 elements using 4 processes:
#
# (0 ... 10).parallel(processes: 4) { |i| sleep 3; i }.each { |e| puts e }
def parallel(options = {}, &block)
$stderr.puts "Parent pid: #{Process.pid}" if FEACH_DEBUG
procs = options[:processes] || 0
Enumerator.new do |yielder|
if procs > 0
workers = spawn_workers(procs, &block)
threads = []
cache = []
self.each_with_index do |elem, index|
threads << Thread.new do
$stderr.puts "elem: #{elem} index: #{index}" if FEACH_DEBUG
i = index % procs
cache[i] = workers[i].process(elem)
end
if threads.size == procs
threads.each { |thread| thread.join }
cache.each { |result| yielder << result }
threads = []
cache = []
end
end
threads.each { |thread| thread.join }
cache.each { |result| yielder << result }
workers.each { |worker| worker.terminate }
else
self.each do |elem|
yielder << block.call(elem)
end
end
end
end
private
# Creates a given number (_procs_) of Worker objects which are returned in an
# array. The workers are created using fork, and each worker has a call
# method to process the given _block_ as well as two pipes for communicate
# between the worker and the parent process and visa versa.
def spawn_workers(procs, &block)
workers = []
procs.times do
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
pid = Process.fork do
begin
parent_write.close
parent_read.close
call(child_read, child_write, &block)
ensure
child_read.close
child_write.close
end
end
child_read.close
child_write.close
$stderr.puts "Spawning worker with pid: #{pid}" if FEACH_DEBUG
workers << Worker.new(parent_read, parent_write, pid)
end
workers
end
# Method that is called in worker processes (forked childs) and which reads
# elements (_elem_) from one Inter Process Communcation (IPC) IO pipe and
# calls the given _block_ with the _elem_ as argument. The result is written
# to another IPC pipe.
def call(child_read, child_write, &block)
while not child_read.eof?
elem = IPC.load(child_read)
$stderr.puts " call with Process.pid: #{Process.pid}" if FEACH_DEBUG
result = block.call(elem)
IPC.dump(result, child_write)
end
end
# Class for Inter Process Communication (IPC) between forked processes. IPC
# is achieved by reading and writing Marshalled objects to IO.pipes.
class IPC
# Read, unmarshal and return an object from a given IO (_io_).
def self.load(io)
size = io.read(4)
raise EOFError unless size
size = size.unpack("I").first
marshalled = io.read(size)
Marshal.load(marshalled)
end
# Marshal a given object (_obj_) and write to a given IO (_io_).
def self.dump(obj, io)
marshalled = Marshal.dump(obj)
io.write([marshalled.size].pack("I"))
io.write(marshalled)
nil # Save GC
end
end
# Worker object that can be used to access the underlying child process by
# writing and reading to the Inter Communication Process IO pipes.
class Worker
attr_reader :parent_read, :parent_write, :pid
# Instantiates a Worker object given two IPC pipes _parent_read_ and
# _parent_write as well as the process id (_pid_).
def initialize(parent_read, parent_write, pid)
@parent_read = parent_read
@parent_write = parent_write
@pid = pid
end
# Method to be called from the parent process to delegate work on the given
# element (_elem_) to the child process of this Worker object and return the
# result through IPC.
def process(elem)
IPC.dump(elem, @parent_write)
$stderr.puts " process with worker pid: #{@pid} and parent pid: #{Process.pid}" if FEACH_DEBUG
IPC.load(@parent_read)
end
# Terminate Worker object by waiting for the child process to exit and
# close IPC IO streams.
def terminate
$stderr.puts "Terminating worker with pid: #{@pid}" if FEACH_DEBUG
Process.wait(@pid, Process::WNOHANG)
@parent_read.close
@parent_write.close
end
end
end
#def fib(n) n < 2 ? n : fib(n-1)+fib(n-2); end # Lousy Fibonacci calculator <- heavy job
#(0 ... 10).parallel(processes: 2) { |i| "#{i}: #{fib(38)}" }.each { |e| puts e }