Skip to content

Commit

Permalink
[hotfix] Fix in-place rescaling for removed overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jul 13, 2023
1 parent 98e7a67 commit ef2d21d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,26 @@ public boolean scale(FlinkResourceContext<?> ctx) throws Exception {

var deployConfig = ctx.getDeployConfig(spec);
var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
return false;
}

try (var client = getClusterClient(observeConfig)) {
var currentReqs = getVertexResources(client, resource);

for (String previousOverrideVertex : previousOverrides.keySet()) {
// Check if this is an active vertex (present in the current jobgraph)
if (currentReqs.containsKey(JobVertexID.fromHexString(previousOverrideVertex))) {
if (!newOverrides.containsKey(previousOverrideVertex)) {
LOG.info(
"Parallelism override for {} has been removed, falling back to regular upgrade.",
previousOverrideVertex);
return false;
}
}
}

var newReqs = new HashMap<>(currentReqs);
newOverrides.forEach(
(vs, ps) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,50 @@ protected void updateVertexResources(
flinkDep, service, d -> d.getStatus().getJobStatus().setState("RUNNING"), true);

testScaleConditionDep(flinkDep, service, d -> d.getSpec().setJob(null), false);

// Do not scale if parallelism overrides were removed from an active vertex
testScaleConditionLastSpec(
flinkDep,
service,
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":3"),
false);

// Scale if parallelism overrides were removed only from a non-active vertex
testScaleConditionLastSpec(
flinkDep,
service,
s ->
s.getFlinkConfiguration()
.put(
PipelineOptions.PARALLELISM_OVERRIDES.key(),
v1 + ":1," + new JobVertexID() + ":5"),
true);

// Do not scale if parallelism overrides were completely removed out
var flinkDep2 = ReconciliationUtils.clone(flinkDep);
flinkDep2
.getSpec()
.getFlinkConfiguration()
.remove(PipelineOptions.PARALLELISM_OVERRIDES.key());
testScaleConditionLastSpec(
flinkDep2,
service,
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":3"),
false);

// Do not scale if overrides never set
testScaleConditionDep(
flinkDep2,
service,
d ->
flinkDep.getSpec()
.getFlinkConfiguration()
.remove(PipelineOptions.PARALLELISM_OVERRIDES.key()),
false);
}

private void testScaleConditionDep(
Expand Down

0 comments on commit ef2d21d

Please sign in to comment.