-
Notifications
You must be signed in to change notification settings - Fork 51
/
stage.py
303 lines (250 loc) · 13.6 KB
/
stage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import collections
import json
import numpy
import concurrency
from task import Task
class Stage:
def __init__(self):
self.start_time = -1
self.tasks = []
def average_task_runtime(self):
return sum([t.runtime() for t in self.tasks]) * 1.0 / len(self.tasks)
def __str__(self):
max_task_runtime = max([t.runtime() for t in self.tasks])
if self.tasks[0].has_fetch:
input_method = "shuffle"
else:
input_method = self.tasks[0].input_read_method
return (("%s tasks (avg runtime: %s, max runtime: %s) Start: %s, runtime: %s, "
"Max concurrency: %s, "
"Input MB: %s (from %s), Output MB: %s, Straggers: %s, Progress rate straggers: %s, "
"Progress rate stragglers explained by scheduler delay (%s), HDFS read (%s), "
"HDFS and read (%s), GC (%s), Network (%s), JIT (%s), output rate stragglers: %s") %
(len(self.tasks), self.average_task_runtime(), max_task_runtime, self.start_time,
self.finish_time() - self.start_time, concurrency.get_max_concurrency(self.tasks),
self.input_mb(), input_method, self.output_mb(),
self.traditional_stragglers(), self.progress_rate_stragglers()[0],
self.scheduler_delay_stragglers()[0], self.hdfs_read_stragglers()[0],
self.hdfs_read_and_scheduler_delay_stragglers()[0], self.gc_stragglers()[0],
# Do not compute the JIT stragglers here! Screws up the calculation.
self.network_stragglers()[0], -1,
self.output_progress_rate_stragglers()[0]))
def verbose_str(self):
# Get info about the longest task.
max_index = -1
max_runtime = -1
for i, task in enumerate(self.tasks):
if task.runtime() > max_runtime:
max_runtime = task.runtime()
max_index = i
return "%s\n Longest Task: %s" % (self, self.tasks[i])
def has_shuffle_read(self):
total_shuffle_read_bytes = sum(
[t.remote_mb_read + t.local_mb_read for t in self.tasks if t.has_fetch])
return total_shuffle_read_bytes > 0
def conservative_finish_time(self):
# Subtract scheduler delay to account for asynchrony in the scheduler where sometimes tasks
# aren't marked as finished until a few ms later.
return max([(t.finish_time - t.scheduler_delay) for t in self.tasks])
def finish_time(self):
return max([t.finish_time for t in self.tasks])
def total_runtime(self):
return sum([t.finish_time - t.start_time for t in self.tasks])
def total_fetch_wait(self):
return sum([t.fetch_wait for t in self.tasks if t.has_fetch])
def total_runtime_no_remote_shuffle_read(self):
return sum([t.runtime_no_remote_shuffle_read() for t in self.tasks])
def total_time_fetching(self):
return sum([t.total_time_fetching for t in self.tasks if t.has_fetch])
def traditional_stragglers(self):
""" Returns the total number of straggler tasks for this stage using the traditional metric.
This method considers a task a straggler if its runtime is at least 1.5 times the median
runtime for other tasks in the stage. """
median_task_duration = numpy.median([t.runtime() for t in self.tasks])
stragglers = [t for t in self.tasks if t.runtime() >= 1.5*median_task_duration]
return len(stragglers)
def total_traditional_straggler_runtime(self):
median_task_duration = numpy.median([t.runtime() for t in self.tasks])
return sum([t.runtime() for t in self.tasks if t.runtime() >= 1.5*median_task_duration])
def traditional_stragglers_explained_by_progress_rate(self):
""" Returns the number of stragglers that can be explained by the amount of data they process.
We describe a straggler as "explainable" by the amount of data it processes if it is not
a straggler based on its progress rate.
"""
progress_rates = [t.runtime() * 1.0 / t.input_size_mb()
for t in self.tasks if t.input_size_mb() > 0]
median_progress_rate = numpy.median(progress_rates)
median_task_duration = numpy.median([t.runtime() for t in self.tasks])
progress_rate_stragglers = 0
progress_rate_stragglers_total_time = 0
for task in self.tasks:
if task.runtime() >= 1.5*median_task_duration and task.input_size_mb() > 0:
progress_rate = task.runtime() * 1.0 / task.input_size_mb()
if progress_rate < 1.5*median_progress_rate:
progress_rate_stragglers += 1
progress_rate_stragglers_total_time += task.runtime()
return progress_rate_stragglers, progress_rate_stragglers_total_time
def progress_rate_stragglers(self):
stragglers = self.get_progress_rate_stragglers()
return len(stragglers), sum([t.runtime() for t in stragglers])
def get_tasks_with_non_zero_input(self):
return [t for t in self.tasks if t.input_size_mb() > 0]
def get_progress_rate_stragglers(self):
progress_rates = [t.runtime() * 1.0 / t.input_size_mb()
for t in self.get_tasks_with_non_zero_input()]
median_progress_rate = numpy.median(progress_rates)
progress_rate_stragglers = [t for t in self.get_tasks_with_non_zero_input()
if t.runtime() * 1.0 / t.input_size_mb() >= 1.5*median_progress_rate]
return progress_rate_stragglers
def get_attributable_stragglers(self, progress_rate_fn):
""" Returns the progress rate stragglers that are no longer stragglers
when the provided function is used to compute the progress rate."""
new_progress_rates = [progress_rate_fn(t) for t in self.get_tasks_with_non_zero_input()]
median_new_progress_rate = numpy.median(new_progress_rates)
attributable_stragglers = [t for t in self.get_progress_rate_stragglers()
if progress_rate_fn(t) < 1.5 * median_new_progress_rate]
for t in attributable_stragglers:
t.straggler_behavior_explained = True
return attributable_stragglers
def get_attributable_stragglers_stats(self, progress_rate_fn):
""" Returns the number of and total time taken by attributable stragglers. """
attributable_stragglers = self.get_attributable_stragglers(progress_rate_fn)
return len(attributable_stragglers), sum([t.runtime() for t in attributable_stragglers])
def output_progress_rate_stragglers(self):
"Returns stats about stragglers that can be attributed to output data size. """
def progress_rate_based_on_output(task):
return (task.runtime() / (task.shuffle_mb_written + task.output_mb))
non_zero_output_tasks = [t for t in self.get_tasks_with_non_zero_input() \
if t.shuffle_mb_written + t.output_mb > 0]
new_progress_rates = [progress_rate_based_on_output(t) \
for t in non_zero_output_tasks]
median_new_progress_rate = numpy.median(new_progress_rates)
attributable_stragglers = []
for t in self.get_progress_rate_stragglers():
if (t.shuffle_mb_written + t.output_mb > 0 and
progress_rate_based_on_output(t) < 1.5 * median_new_progress_rate):
attributable_stragglers.append(t)
t.straggler_behavior_explained = True
return len(attributable_stragglers), sum([t.runtime() for t in attributable_stragglers])
attributable_stragglers = self.get_attributable_stragglers(progress_rate_based_on_output)
straggler_time = sum([t.runtime() for t in attributable_stragglers])
return len(attributable_stragglers), straggler_time
def hdfs_read_stragglers(self):
""" Returns the number of and total time taken by stragglers caused by slow HDFS reads,
as well as the number of those stragglers that had non-local reads.
Considers a task a straggler if its processing rate is more than 1.5x the median. """
def progress_rate_wo_hdfs_read(task):
return (task.runtime() - task.input_read_time) * 1.0 / task.input_size_mb()
attributable_stragglers = self.get_attributable_stragglers(progress_rate_wo_hdfs_read)
non_local = len([t for t in attributable_stragglers if not t.data_local])
straggler_time = sum([t.runtime() for t in attributable_stragglers])
return len(attributable_stragglers), straggler_time, non_local
def hdfs_write_stragglers(self):
""" Returns the number of and total time taken by stragglers caused by slow HDFS writes,
as well as the number of those stragglers that had non-local reads.
Considers a task a straggler if its processing rate is more than 1.5x the median. """
def progress_rate_wo_hdfs_write(task):
return (task.runtime() - task.output_write_time) * 1.0 / task.input_size_mb()
attributable_stragglers = self.get_attributable_stragglers(progress_rate_wo_hdfs_write)
straggler_time = sum([t.runtime() for t in attributable_stragglers])
return len(attributable_stragglers), straggler_time
def network_stragglers(self):
""" Returns the number of and total time taken by stragglers caused by poor network performance.
Does not account for tasks that have poor network performance because they read input data from
a remote data node; only accounts for tasks with slow networks during the shuffle phase.
"""
def progress_rate_wo_network(task):
return (task.runtime() - task.fetch_wait) * 1.0 / task.input_size_mb()
if not self.has_shuffle_read():
# If the job doesn't read any shuffle data over the network, there can't be any network
# stragglers.
return 0, 0
return self.get_attributable_stragglers_stats(progress_rate_wo_network)
def scheduler_delay_stragglers(self):
def progress_rate_wo_scheduler_delay(task):
return (task.runtime() - task.scheduler_delay) * 1.0 / task.input_size_mb()
return self.get_attributable_stragglers_stats(progress_rate_wo_scheduler_delay)
def hdfs_read_and_scheduler_delay_stragglers(self):
def progress_rate_wo_read_and_sched(task):
return ((task.runtime() - task.scheduler_delay - task.input_read_time) * 1.0 /
task.input_size_mb())
return self.get_attributable_stragglers_stats(progress_rate_wo_read_and_sched)
def gc_stragglers(self):
def progress_rate_wo_gc(task):
return (task.runtime() - task.gc_time) * 1.0 / task.input_size_mb()
return self.get_attributable_stragglers_stats(progress_rate_wo_gc)
def shuffle_write_stragglers(self):
def progress_rate_wo_shuffle_write(task):
return (task.runtime() - task.shuffle_write_time) * 1.0 / task.input_size_mb()
return self.get_attributable_stragglers_stats(progress_rate_wo_shuffle_write)
def jit_stragglers(self):
""" THIS SHOULD BE CALLED AFTER ALL OTHER STRAGGLER FUNCTIONS.
Tasks won't be classified as JIT stragglers if they have already
been classified as another kind of straggler.
"""
executor_to_task_finish_times = collections.defaultdict(list)
for task in self.tasks:
executor_to_task_finish_times[task.executor].append(task.finish_time)
# Tasks where no other task in the same stage completed on the executor
# before this task started.
virgin_tasks = []
for task in self.get_tasks_with_non_zero_input():
first_task_finish_on_executor = min(executor_to_task_finish_times[task.executor])
if task.start_time < first_task_finish_on_executor:
virgin_tasks.append(task)
def progress_rate(task):
return task.runtime() * 1.0 / task.input_size_mb()
# For JIT effects, only look at compute time (other times aren't effected by JIT).
def compute_progress_rate(task):
return task.compute_time() * 1.0 / task.input_size_mb()
median_task_progress_rate = numpy.median([progress_rate(t)
for t in self.get_tasks_with_non_zero_input()])
median_virgin_task_progress_rate = numpy.median([compute_progress_rate(t) for t in virgin_tasks])
jit_stragglers = 0
total_time = 0
print "Median virgin rate", median_virgin_task_progress_rate
for task in virgin_tasks:
print compute_progress_rate(task)
task_progress_rate = progress_rate(task)
if task_progress_rate >= 1.5*median_task_progress_rate:
if not task.straggler_behavior_explained and compute_progress_rate(task) < 1.5*median_virgin_task_progress_rate:
jit_stragglers += 1
total_time += task.runtime()
task.straggler_behavior_explained = True
return jit_stragglers, total_time
def task_runtimes_with_median_progress_rate(self):
""" Returns task runtimes using the Dolly method to eliminate stragglers.
Replaces each task's runtime with the runtime based on the median progress rate
for the stage. """
# Ignore tasks with 0 input bytes when computing progress rates. These seem to occur for the
# big shuffle to partition the data (even though the blocks read are non-zero for those tasks).
progress_rates = [t.runtime() * 1.0 / t.input_size_mb() for t in self.tasks
if t.input_size_mb() > 0]
median_progress_rate = numpy.median(progress_rates)
def new_runtime(task):
if t.input_size_mb() > 0 and t.runtime() * 1.0 / t.input_size_mb() > median_progress_rate:
return t.input_size_mb() * median_progress_rate
return t.runtime()
runtimes = [new_runtime(t) for t in self.tasks]
return runtimes
def input_mb(self):
""" Returns the total input size for this stage.
This is only valid if the stage read data from a shuffle.
"""
total_input_bytes = sum([t.remote_mb_read + t.local_mb_read for t in self.tasks if t.has_fetch])
total_input_bytes += sum([t.input_mb for t in self.tasks])
return total_input_bytes
def output_mb(self):
""" Returns the total output size for this stage.
This is only valid if the output data was written for a shuffle.
TODO: Add HDFS / in-memory RDD output size.
"""
total_output_size = sum([t.shuffle_mb_written for t in self.tasks])
return total_output_size
def add_event(self, data, is_json):
task = Task(data, is_json)
if self.start_time == -1:
self.start_time = task.start_time
else:
self.start_time = min(self.start_time, task.start_time)
self.tasks.append(task)