Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][fn] Record Pulsar Function processing time properly for asynchronous functions #23811

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ef36d8d
delete processTimeStart method in ComponentStatsManager
walkinggo Jan 6, 2025
6f9bffa
change processTimeEnd method in ComponentStatsManager
walkinggo Jan 6, 2025
b4435dd
add startTime in JavaExecutionResult and process endTime in handleResult
walkinggo Jan 6, 2025
6c4c5a1
add field JavaExecutionResult in AsyncFuncRequest and use JavaExecuti…
walkinggo Jan 6, 2025
a776932
use the same executionResult in non asyncPreserveInputOrderForOutputM…
walkinggo Jan 6, 2025
1749db7
fix the bug that do not set result
walkinggo Jan 7, 2025
493c364
fix the bug that do not set exception
walkinggo Jan 7, 2025
0344726
add test case
walkinggo Jan 7, 2025
21816d8
remove LongSupplier timeSupplier
walkinggo Jan 8, 2025
1d34566
remove useless import
walkinggo Jan 8, 2025
ae9bbc6
remove test case
walkinggo Jan 8, 2025
7aba620
pass the start time as parameter and remove result from AsyncFuncRequest
walkinggo Jan 9, 2025
d2db727
Revert "pass the start time as parameter and remove result from Async…
walkinggo Jan 10, 2025
81f2d4b
add bugsExclude to remove EI_EXPOSE_REP
walkinggo Jan 10, 2025
6ae6819
add testFunctionAsyncTime in JavaInstanceRunnableTest
walkinggo Jan 12, 2025
86b4d41
add test in PulsarFunctionE2ETest
walkinggo Jan 13, 2025
c0994d0
Modify the test case to capture the running time of `processTimeEnd`.
walkinggo Jan 13, 2025
3895bd1
Update pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctio…
walkinggo Jan 22, 2025
94454bc
Update pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctio…
walkinggo Jan 22, 2025
659d57c
Update pulsar-functions/instance/src/test/java/org/apache/pulsar/func…
walkinggo Jan 22, 2025
312245d
unformat code in JavaInstanceRunnable
walkinggo Jan 22, 2025
0c2f1f9
unformat code in JavaInstanceRunnable
walkinggo Jan 22, 2025
b958524
unformat code in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
162bf20
unformat code in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
6625e88
unformat code in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
8d0ae62
unformat code in JavaInstance
walkinggo Jan 22, 2025
ed315b4
unformat code in ComponentStatsManager
walkinggo Jan 22, 2025
9910dd2
unformat code in SinkStatsManager
walkinggo Jan 22, 2025
56881a4
unformat code in SourceStatsManager and JavaInstanceRunnableTest
walkinggo Jan 22, 2025
cb34cf9
unformat code in JavaInstanceRunnableTest
walkinggo Jan 22, 2025
ae3ab27
unformat code in JavaInstanceTest
walkinggo Jan 22, 2025
44c8b11
unformat code in JavaInstanceTest
walkinggo Jan 22, 2025
99af7cb
unformat code in JavaInstanceRunnableTest
walkinggo Jan 22, 2025
5d74461
set stats as setMethod in JavaInstanceRunnable
walkinggo Jan 22, 2025
17ca085
use LF
walkinggo Jan 22, 2025
5c9a150
use LF
walkinggo Jan 22, 2025
be533fb
use LF
walkinggo Jan 22, 2025
2d1571b
unformat code in JavaInstanceRunnableTest
walkinggo Jan 22, 2025
4c82591
remove SpendTimeFunction class in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
d1dd15b
remove unused import in JavaInstanceRunnableTest
walkinggo Jan 23, 2025
e3e0d12
fix test case
walkinggo Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
walkinggo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.PublishingOrderCompactor;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.examples.JavaNativeAsyncExclamationFunction;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -296,6 +297,100 @@ public void testReadCompactedFunction() throws Exception {
producer.close();
}

@Test(timeOut = 20000)
public void testPulsarFunctionAsyncStatTime() throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "JavaNativeAsyncExclamationFunction";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespacePortion);
functionConfig.setName(functionName);
functionConfig.setParallelism(1);
functionConfig.setSubName(subscriptionName);
functionConfig.setInputSpecs(Collections.singletonMap(sourceTopic,
ConsumerConfig.builder().poolMessages(true).build()));
functionConfig.setAutoAck(true);
functionConfig.setClassName(JavaNativeAsyncExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput(sinkTopic);
functionConfig.setCleanupSubscription(true);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);

admin.functions().createFunctionWithUrl(functionConfig,
PulsarFunctionE2ETest.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString());

// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName(subscriptionName).subscribe();

retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).getSubscriptions().size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);
retryStrategically((test) -> {
try {
return admin.topics().getStats(sinkTopic).getSubscriptions().size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).getSubscriptions().size(), 1);
assertEquals(admin.topics().getStats(sinkTopic).getSubscriptions().size(), 1);

int cntMsg = 5;
for (int i = 0; i < cntMsg; i++) {
producer.newMessage().value("it is the " + i + "th message , it will spend 500ms").send();
}
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).getSubscriptions().get(subscriptionName);
assertEquals(subStats.getUnackedMessages(), 0);
});
int count = 0;
while (true) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
consumer.acknowledge(message);
count++;
}
Assert.assertEquals(count, cntMsg);

String prometheusMetrics = TestPulsarFunctionUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);
Map<String, TestPulsarFunctionUtils.Metric> statsMetrics =
TestPulsarFunctionUtils.parseMetrics(prometheusMetrics);

assertEquals(statsMetrics.get("pulsar_function_process_latency_ms").value, 500.0, 100.0);
admin.functions().deleteFunction(tenant, namespacePortion, functionName);

retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).getSubscriptions().size() == 0;
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);

// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).getSubscriptions().size(), 0);

tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
}

@Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class JavaExecutionResult {
private Throwable userException;
private Object result;
private final long startTime = System.nanoTime();

public void reset() {
setUserException(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class JavaInstance implements AutoCloseable {
public static class AsyncFuncRequest {
private final Record record;
private final CompletableFuture processResult;
private final JavaExecutionResult result;
}

@Getter(AccessLevel.PACKAGE)
Expand Down Expand Up @@ -136,7 +137,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
if (asyncPreserveInputOrderForOutputMessages) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
record, (CompletableFuture) output
record, (CompletableFuture) output, executionResult
);
pendingAsyncRequests.put(request);
} else {
Expand All @@ -148,13 +149,12 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
processAsyncResultsInInputOrder(asyncResultConsumer);
} else {
try {
JavaExecutionResult execResult = new JavaExecutionResult();
if (cause != null) {
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
executionResult.setUserException(FutureUtil.unwrapCompletionException(cause));
} else {
execResult.setResult(res);
executionResult.setResult(res);
}
asyncResultConsumer.accept(record, execResult);
asyncResultConsumer.accept(record, executionResult);
} finally {
asyncRequestsConcurrencyLimiter.release();
}
Expand Down Expand Up @@ -187,7 +187,7 @@ private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultCon
while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
pendingAsyncRequests.remove(asyncResult);

JavaExecutionResult execResult = new JavaExecutionResult();
JavaExecutionResult execResult = asyncResult.getResult();
try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
Expand Down
walkinggo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ public void run() {
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());

// start time for process latency stat
stats.processTimeStart();

// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
Expand All @@ -346,9 +344,6 @@ public void run() {
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);

// register end time
stats.processTimeEnd();

if (result != null) {
// process the synchronous results
handleResult(currentRecord, result);
Expand Down Expand Up @@ -448,6 +443,8 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
// handle endTime here
stats.processTimeEnd(result.getStartTime());
}

private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
Expand Down Expand Up @@ -631,6 +628,11 @@ public String getStatsAsString() throws IOException {
return "";
}

@VisibleForTesting
void setStats(ComponentStatsManager stats) {
this.stats = stats;
}

public InstanceCommunication.MetricsData getAndResetMetrics() {
if (isInitialized) {
statsLock.writeLock().lock();
Expand Down
walkinggo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,

public abstract void setLastInvocation(long ts);

public abstract void processTimeStart();

public abstract void processTimeEnd();
public abstract void processTimeEnd(long startTime);

public abstract double getTotalProcessedSuccessfully();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,13 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}

private Long processTimeStart;

@Override
public void processTimeStart() {
processTimeStart = System.nanoTime();
}

@Override
public void processTimeEnd() {
if (processTimeStart != null) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D;
public void processTimeEnd(long startTime) {
double endTimeMs = ((double) System.nanoTime() - startTime) / 1.0E6D;
statProcessLatencyChild.observe(endTimeMs);
statProcessLatency1minChild.observe(endTimeMs);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,9 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}

@Override
public void processTimeStart() {
//no-op
}

@Override
public void processTimeEnd() {
public void processTimeEnd(long startTime) {
//no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,9 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}

@Override
public void processTimeStart() {
//no-op
}

@Override
public void processTimeEnd() {
public void processTimeEnd(long startTime) {
//no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,4 +557,9 @@
<Method name="setSourceInputSpecs"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
<Class name="org.apache.pulsar.functions.instance.JavaInstance$AsyncFuncRequest"/>
<Method name="getResult"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
Expand All @@ -36,6 +38,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -50,6 +53,7 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
Expand All @@ -61,6 +65,7 @@
import org.apache.pulsar.io.core.SourceContext;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -173,6 +178,24 @@ public Void process(String input, Context context) throws Exception {
}
}

@Test
public void testFunctionAsyncTime() throws Exception {
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setAutoAck(true)
.setProcessingGuarantees(org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.MANUAL)
.build();
JavaInstanceRunnable javaInstanceRunnable = createRunnable(functionDetails);
FunctionStatsManager manager = mock(FunctionStatsManager.class);
javaInstanceRunnable.setStats(manager);
JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
Thread.sleep(500);
Record record = mock(Record.class);
javaInstanceRunnable.handleResult(record, javaExecutionResult);
ArgumentCaptor<Long> timeCaptor = ArgumentCaptor.forClass(Long.class);
verify(manager).processTimeEnd(timeCaptor.capture());
Assert.assertEquals(timeCaptor.getValue(), javaExecutionResult.getStartTime());
}

@Test
public void testFunctionResultNull() throws Exception {
JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
Expand Down
Loading