Skip to content

Commit

Permalink
Add async function test (#986)
Browse files Browse the repository at this point in the history
  • Loading branch information
henneberger authored Nov 9, 2024
1 parent a6ec6a5 commit 9748d08
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ LogicalProject(val=[$0], myFnc=[MyScalarFunction(CAST($0):BIGINT, CAST($0):BIGIN
>>>flink.json
{
"flinkSql" : [
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `MyAsyncScalarFunction` AS 'com.myudf.MyAsyncScalarFunction' LANGUAGE JAVA;",
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `MyScalarFunction` AS 'com.myudf.MyScalarFunction' LANGUAGE JAVA;",
"CREATE TEMPORARY TABLE `mytable_1` (\n `val` INTEGER NOT NULL,\n `myFnc` BIGINT,\n PRIMARY KEY (`val`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'mytable_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE VIEW `table$1`\nAS\nSELECT `val`, `default_catalog`.`default_database`.`MyScalarFunction`(CAST(`val` AS BIGINT), CAST(`val` AS BIGINT)) AS `myFnc`\nFROM (VALUES (1),\n (2),\n (3),\n (4),\n (5),\n (6),\n (7),\n (8),\n (9),\n (10)) AS `t` (`val`);",
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.myudf;

import com.google.auto.service.AutoService;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionContext;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.table.functions.ScalarFunction;

@AutoService(AsyncScalarFunction.class)
public class MyAsyncScalarFunction extends AsyncScalarFunction {

private transient ExecutorService executor;

@Override
public void open(FunctionContext context) throws Exception {
// Configure the thread pool to handle asynchronous calls
this.executor = Executors.newFixedThreadPool(10);
}

@Override
public void close() throws Exception {
// Properly shut down the executor service
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
}
}

public void eval(CompletableFuture<String> result, String param1, int param2) {
executor.submit(() -> {
try {
// Simulate a delay to mimic an I/O-bound operation
Thread.sleep(1000);
String response = "Processed " + param1 + " with " + param2;
result.complete(response); // Complete the future with the response
} catch (Exception e) {
result.completeExceptionally(e); // Complete exceptionally if an error occurs
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
IMPORT myjavafunction.build.libs.MyScalarFunction;
IMPORT myjavafunction.build.libs.MyAsyncScalarFunction;

MyTable := SELECT val, MyScalarFunction(val, val) AS myFnc
FROM (VALUES ((1)), ((2)), ((3)), ((4)), ((5)),
((6)), ((7)), ((8)), ((9)), ((10))) AS t(val);
((6)), ((7)), ((8)), ((9)), ((10))) AS t(val);

/*+test*/
MyAsyncTable := SELECT val, MyAsyncScalarFunction(val, ival) AS myFnc
FROM (VALUES (('1'), (1)), (('2'), (2)), (('3'), (3))) AS t(val, ival);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data":{"MyAsyncTable":[{"val":"1","myFnc":"Processed 1 with 1"},{"val":"2","myFnc":"Processed 2 with 2"},{"val":"3","myFnc":"Processed 3 with 3"}]}}

0 comments on commit 9748d08

Please sign in to comment.