Skip to content

Commit

Permalink
asynchronously obtain the processing time of a pulsar function
Browse files Browse the repository at this point in the history
  • Loading branch information
walkinggo committed Dec 26, 2024
1 parent 14129e3 commit 1ecf62e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;

/**
* This is the Java Instance. This is started by the runtimeSpawner using the JavaInstanceClient
Expand Down Expand Up @@ -106,12 +107,18 @@ private boolean resolveAsyncPreserveInputOrderForOutputMessages(InstanceConfig i
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
return handleMessage(record, input, (rec, result) -> {
}, cause -> {
});
},null);
}

// register end time
public void ProcessEndTime(ComponentStatsManager statsManager){
if(statsManager != null)
statsManager.processTimeEnd();
}

public JavaExecutionResult handleMessage(Record<?> record, Object input,
JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer,
Consumer<Throwable> asyncFailureHandler) {
Consumer<Throwable> asyncFailureHandler, ComponentStatsManager stats) {
if (context != null) {
context.setCurrentMessageContext(record);
}
Expand All @@ -128,6 +135,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
}
} catch (Exception ex) {
executionResult.setUserException(ex);
ProcessEndTime(stats);
return executionResult;
}

Expand Down Expand Up @@ -164,17 +172,20 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
asyncFailureHandler.accept(innerException);
}
}, executor);
ProcessEndTime(stats);
return null;
} catch (InterruptedException ie) {
log.warn("Exception while put Async requests", ie);
executionResult.setUserException(ie);
ProcessEndTime(stats);
return executionResult;
}
} else {
if (log.isDebugEnabled()) {
log.debug("Got result: object: {}", output);
}
executionResult.setResult(output);
ProcessEndTime(stats);
return executionResult;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,10 @@ public void run() {
currentRecord,
currentRecord.getValue(),
asyncResultConsumer,
asyncErrorHandler);
asyncErrorHandler,
stats);
Thread.currentThread().setContextClassLoader(instanceClassLoader);

// register end time
stats.processTimeEnd();

if (result != null) {
// process the synchronous results
handleResult(currentRecord, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testAsyncFunction() throws Exception {
CompletableFuture<JavaExecutionResult> resultHolder = new CompletableFuture<>();
JavaExecutionResult result = instance.handleMessage(
mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {});
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {},null);
assertNull(result);
assertNotNull(resultHolder.get());
assertEquals(testString + "-lambda", resultHolder.get().getResult());
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testNullReturningAsyncFunction() throws Exception {
String testString = "ABC123";
CompletableFuture<JavaExecutionResult> resultHolder = new CompletableFuture<>();
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {});
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {},null);
assertNull(result);
assertNotNull(resultHolder.get());
instance.close();
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testUserExceptionThrowingAsyncFunction() throws Exception {
String testString = "ABC123";
CompletableFuture<JavaExecutionResult> resultHolder = new CompletableFuture<>();
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {});
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {},null);
assertNull(result);
assertSame(userException, resultHolder.get().getUserException());
instance.close();
Expand Down Expand Up @@ -292,11 +292,11 @@ public void testAsyncFunctionMaxPendingVoidResult() throws Exception {
};
Consumer<Throwable> asyncFailureHandler = cause -> {
};
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler));
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler,null));
assertEquals(pendingQueueSize - 1, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler));
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler,null));
assertEquals(pendingQueueSize - 2, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler));
assertNull(instance.handleMessage(mock(Record.class), testString, asyncResultConsumer, asyncFailureHandler,null));
// no space left
assertEquals(0, instance.getAsyncRequestsConcurrencyLimiter().availablePermits());

Expand Down

0 comments on commit 1ecf62e

Please sign in to comment.