Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
walkinggo committed Jan 7, 2025
1 parent 493c364 commit 0344726
Showing 1 changed file with 54 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Function;
Expand All @@ -43,6 +44,7 @@ public class JavaInstanceTest {

/**
* Verify that be able to run lambda functions.
*
* @throws Exception
*/
@Test
Expand All @@ -59,32 +61,32 @@ public void testLambda() throws Exception {
}

@Test
public void testNullReturningFunction() throws Exception {
JavaInstance instance = new JavaInstance(
public void testNullReturningFunction() throws Exception {
JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
(Function<String, String>) (input, context) -> null,
new InstanceConfig());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
assertNull(result.getResult());
instance.close();
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
assertNull(result.getResult());
instance.close();
}

@Test
public void testUserExceptionThrowingFunction() throws Exception {
final UserException userException = new UserException("Boom");
Function<String, String> func = (input, context) -> {
throw userException;
};
public void testUserExceptionThrowingFunction() throws Exception {
final UserException userException = new UserException("Boom");
Function<String, String> func = (input, context) -> {
throw userException;
};

JavaInstance instance = new JavaInstance(
JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
func,
new InstanceConfig());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
assertSame(userException, result.getUserException());
instance.close();
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
assertSame(userException, result.getUserException());
instance.close();
}

@Test
Expand All @@ -95,7 +97,7 @@ public void testAsyncFunction() throws Exception {

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
CompletableFuture<String> result = new CompletableFuture<>();
executor.submit(() -> {
try {
Thread.sleep(500);
Expand All @@ -115,8 +117,9 @@ public void testAsyncFunction() throws Exception {
String testString = "ABC123";
CompletableFuture<JavaExecutionResult> resultHolder = new CompletableFuture<>();
JavaExecutionResult result = instance.handleMessage(
mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {});
mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {
});
assertNull(result);
assertNotNull(resultHolder.get());
assertEquals(testString + "-lambda", resultHolder.get().getResult());
Expand All @@ -131,7 +134,7 @@ public void testNullReturningAsyncFunction() throws Exception {

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
CompletableFuture<String> result = new CompletableFuture<>();
executor.submit(() -> {
try {
Thread.sleep(500);
Expand All @@ -151,24 +154,25 @@ public void testNullReturningAsyncFunction() throws Exception {
String testString = "ABC123";
CompletableFuture<JavaExecutionResult> resultHolder = new CompletableFuture<>();
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {});
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {
});
assertNull(result);
assertNotNull(resultHolder.get());
instance.close();
}

@Test
public void testUserExceptionThrowingAsyncFunction() throws Exception {
final UserException userException = new UserException("Boom");
final UserException userException = new UserException("Boom");
InstanceConfig instanceConfig = new InstanceConfig();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
CompletableFuture<String> result = new CompletableFuture<>();
executor.submit(() -> {
result.completeExceptionally(userException);
result.completeExceptionally(userException);
});

return result;
Expand All @@ -181,7 +185,8 @@ public void testUserExceptionThrowingAsyncFunction() throws Exception {
String testString = "ABC123";
CompletableFuture<JavaExecutionResult> resultHolder = new CompletableFuture<>();
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString,
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {});
(record, javaResult) -> resultHolder.complete(javaResult), cause -> {
});
assertNull(result);
assertSame(userException, resultHolder.get().getUserException());
instance.close();
Expand All @@ -198,7 +203,7 @@ public void testAsyncFunctionMaxPending() throws Exception {

Function<String, CompletableFuture<String>> function = (input, context) -> {
log.info("input string: {}", input);
CompletableFuture<String> result = new CompletableFuture<>();
CompletableFuture<String> result = new CompletableFuture<>();
executor.submit(() -> {
try {
count.await();
Expand Down Expand Up @@ -243,10 +248,10 @@ public void testAsyncFunctionMaxPending() throws Exception {
instance.close();
}

private static class UserException extends Exception {
public UserException(String msg) {
super(msg);
}
private static class UserException extends Exception {
public UserException(String msg) {
super(msg);
}
}

@Test
Expand All @@ -264,7 +269,7 @@ public void testAsyncFunctionMaxPendingVoidResult() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();

Function<String, CompletableFuture<Void>> function = (input, context) -> {
CompletableFuture<Void> result = new CompletableFuture<>();
CompletableFuture<Void> result = new CompletableFuture<>();
executor.submit(() -> {
try {
count.await();
Expand Down Expand Up @@ -309,4 +314,22 @@ public void testAsyncFunctionMaxPendingVoidResult() throws Exception {
log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
instance.close();
}

@Test
public void testAsyncFunctionTime() {
JavaInstance instance = new JavaInstance(
mock(ContextImpl.class),
(Function<String, String>) (input, context) -> {
Thread.sleep(500);
return input;
},
new InstanceConfig());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
LongSupplier timeSupplier = () -> System.nanoTime() - 500_000_000L;
assertNotNull(result.getResult());
long beforeTime = timeSupplier.getAsLong();
assertTrue(Math.abs(beforeTime - result.getStartTime()) <= 20_000_000);
instance.close();
}
}

0 comments on commit 0344726

Please sign in to comment.