Skip to content

Commit

Permalink
Enable reassign_worker for reducers
Browse files Browse the repository at this point in the history
  • Loading branch information
继盛 authored and xuye.qin committed Jun 8, 2022
1 parent 9964ffc commit f320c0f
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions mars/core/operand/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ class MapReduceOperand(Operand):
reducer_ordinal = Int32Field("reducer_ordinal")
reducer_phase = StringField("reducer_phase", default=None)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.stage == OperandStage.reduce:
# for reducer, we assign worker at first
self.scheduling_hint.reassign_worker = True

def _new_chunks(self, inputs, kws=None, **kw):
if getattr(self, "reducer_index", None) is None:
index = None
Expand Down

0 comments on commit f320c0f

Please sign in to comment.