Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Pulsar Functions async function concurrency limit handling is not optimal when return type is CompletableFuture<Void> #23706

Open
2 of 3 tasks
lhotari opened this issue Dec 10, 2024 · 0 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Dec 10, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

all released versions

Minimal reproduce step / Problem description

When analyzing the Pulsar Functions async function handling code, there are performance concerns around this code

private void processAsyncResults(JavaInstanceRunnable.AsyncResultConsumer resultConsumer) throws Exception {
AsyncFuncRequest asyncResult = pendingAsyncRequests.peek();
while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
pendingAsyncRequests.remove(asyncResult);
JavaExecutionResult execResult = new JavaExecutionResult();
try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
execResult.setUserException((Exception) e.getCause());
} else {
execResult.setUserException(new Exception(e.getCause()));
}
}
resultConsumer.accept(asyncResult.getRecord(), execResult);
// peek the next result
asyncResult = pendingAsyncRequests.peek();
}
}

The problem this introduces is that in the case of an asynchronous function that has a CompletableFuture<Void> return type and doesn't return results, there will be unnecessary processing delays when the concurrency limit is reached. Whenever there's a slow response in the head of the queue, the request processing will pause. This will introduce unnecessary latency for functions which use Context.newOutputMessage(...).sendAsync() for sending messages to multiple topics.

What did you expect to see?

The Pulsar Functions async function handling code should be optimal from performance perspective when using async functions with CompletableFuture<Void> return type.

What did you see instead?

Based on the code, it could lead into unnecessary CPU compared since when the result is CompletableFuture<Void>, no messages will be sent to an output topic and there's no need for preserving the ordering of processing the async results.

Anything else?

Instead, a simple java.util.concurrent.Semaphore based implementation should be used as a solution for limiting concurrency for implementing maxPendingAsyncRequests when the return type is CompletableFuture<Void>.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added the type/bug The PR fixed a bug or issue reported a bug label Dec 10, 2024
@lhotari lhotari changed the title [Bug] Pulsar Functions async function handling is inefficient and could potentially get stuck [Bug] Pulsar Functions async function concurrency limit handling is inefficient and could potentially get stuck Dec 10, 2024
@lhotari lhotari changed the title [Bug] Pulsar Functions async function concurrency limit handling is inefficient and could potentially get stuck [Bug] Pulsar Functions async function concurrency limit handling is inefficient and could potentially cause processing to get stuck Dec 10, 2024
@lhotari lhotari changed the title [Bug] Pulsar Functions async function concurrency limit handling is inefficient and could potentially cause processing to get stuck [Bug] Pulsar Functions async function concurrency limit handling is not optimal when return type is CompletableFuture<Void> Dec 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

1 participant