-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4130] Add job submission capabilities to Flink runner. #5493
Conversation
f819d57
to
421b31f
Compare
421b31f
to
575b342
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
|
||
public static InMemoryJobService create( | ||
Endpoints.ApiServiceDescriptor stagingServiceDescriptor, | ||
Function<String, String> stagingServiceTokenProvider, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe document that this function expects a preparation ID as input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
} | ||
|
||
private PipelineResult runPipeline() throws Exception { | ||
LOG.trace("Translating pipeline from proto"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trace is unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
LOG.info("Translating pipeline to Flink program."); | ||
// Fused pipeline proto. | ||
RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we pass the fuser Function in constructor parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until we want/have more than one fuser, I think it's better to leave it internal for now.
// batch translation | ||
FlinkBatchPortablePipelineTranslator translator = | ||
FlinkBatchPortablePipelineTranslator.createTranslator(); | ||
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we only create the context in the if-else block and do the remaining things outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually wanted to do this, but the problem is that the (Flink-defined) streaming/batch execution environments don't share any common baseclass.
synchronized (this) { | ||
setState(Enum.STARTING); | ||
invocationFuture = executorService.submit(this::runPipeline); | ||
setState(Enum.RUNNING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should set the Status to running once we have submit to flink.
Should we set the status to running in runPipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could defer it until after the pipeline translation, but that doesn't seem to be a significant difference, and it's cleaner to have all the state transitions locally here. I added a TODO to actually do this right (which we can't do from runPipeline alone).
new FutureCallback<PipelineResult>() { | ||
@Override | ||
public void onSuccess( | ||
@Nullable PipelineResult pipelineResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check the pipeline.getState to make sure pipeline was successfully done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return new FlinkJobServerDriver(configuration, executor, serverFactory); | ||
} | ||
|
||
private final ListeningExecutorService executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move these variables to the top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
ArtifactApi.ProxyManifest.Builder builder = ArtifactApi.ProxyManifest.newBuilder(); | ||
JsonFormat.parser().merge(Channels.newReader( | ||
FileSystems.open(FileSystems.matchNewResource(retrievalToken, false /* is directory */)), | ||
StandardCharsets.UTF_8.name()), builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we standardize the characterset for all BeamFileSystemArtifact**** ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just like we're standardizing on Json (and Json is always supposed to be UTF-8.)
@@ -60,17 +60,36 @@ | |||
|
|||
public static InMemoryJobService create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this method?
} | ||
}); | ||
|
||
return stagingTokenHolder[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should wait for the commit call to finish before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a direct call, nothing asynchronous to wait on.
@Override public void onCompleted() { | ||
} | ||
}); | ||
return contents.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should wait for the call to finish before returning the contents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
LGTM |
@robertwb Based on the discussion in the chat thread, should we remove BeamFileSystemArtifactSource as ArtifactRetrivalService is meant to do the same job. |
This implements job invocation wrappers for the Flink runner, as well as a standalone driver for a JobService daemon. Together these complete the job submission story for the portability framework on the Flink runner.
0a16b42
to
c5456c4
Compare
It's just hitting org.apache.beam.runners.direct.portable.ReferenceRunnerTest.pipelineExecution which is unrelated and flaky in head too. |
In that case we can go ahead with the merge. |
This is a rebase of the submission service in the hacking branch, updated to reflect the new state of artifact staging.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.