Skip to content

Commit

Permalink
[Optimize][E2E] Optimize e2e code structure (#4130)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Jan 9, 2025
1 parent 06744f1 commit 4b60ab2
Show file tree
Hide file tree
Showing 15 changed files with 521 additions and 27 deletions.
17 changes: 16 additions & 1 deletion .github/workflows/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,28 @@ jobs:
localhost:5000/dinky/flink:flink
- name: Init Env Jar
run: |
mkdir O e2e_test/docker-compose-env/dinky/jars
wget -O e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar &&
wget -O e2e_test/docker-compose-env/flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar &&
wget -O e2e_test/docker-compose-env/dinky/javax.ws.rs-api-2.1.1.jar https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.jar
wget -O e2e_test/docker-compose-env/dinky/jars/flink-doris-connector.jar https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-${{ matrix.flink }}/24.0.1/flink-doris-connector-${{ matrix.flink }}-24.0.1.jar
wget -O e2e_test/docker-compose-env/dinky/jars/flink-sql-connector-mysql-cdc.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.0/flink-sql-connector-mysql-cdc-3.2.0.jar
cp e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar e2e_test/docker-compose-env/dinky/jars/mysql-connector-java.jar
- name: Init Run Docker MySQL
uses: hoverkraft-tech/[email protected]
with:
compose-file: ./e2e_test/docker-compose-env/mysql/docker-compose.yml
# - name: Init System Env And Clear Docker Build Cache
# run: |
# echo y | docker builder prune
# df -h
# ulimit -a
# sudo swapoff -a
# sudo sysctl -w vm.max_map_count=2000000
# - name: Init Run Docker Doris
# uses: hoverkraft-tech/[email protected]
# with:
# compose-file: ./e2e_test/docker-compose-env/doris/docker-compose.yml
- name: Init Run Docker Dinky
uses: hoverkraft-tech/[email protected]
with:
Expand All @@ -228,7 +243,7 @@ jobs:
- name: Init k3s
uses: nolar/setup-k3d-k3s@v1
with:
version: v1.25.16+k3s4
version: latest
k3d-args: -s 1 --network dinky_net --api-port 172.28.0.1:6550
- name: Get k3s kube config
run: k3d kubeconfig get --all && mkdir ./kube && k3d kubeconfig get --all > ./kube/k3s.yaml && sed -i 's/0.0.0.0/172.28.0.1/g' ./kube/k3s.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
Expand Down Expand Up @@ -587,7 +588,8 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
} catch (Throwable e) {
throw new BusException(
"UDF compilation failed and cannot be published. The error message is as follows:"
+ e.getMessage());
+ ExceptionUtil.stacktraceToOneLineString(e),
e);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public class BusException extends RuntimeException {
/** Exception parameters */
private Object[] errorArgs;

/**
* Constructs a BusException with the specified message.
*
* @param message the detail message
*/
public BusException(String message, Throwable cause) {
super(message, cause);
}
/**
* Constructs a BusException with the specified message.
*
Expand Down
7 changes: 5 additions & 2 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,12 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements)
jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType);
} else if (operationType.equals(SqlType.EXECUTE_JAR)) {
JarSubmitParam jarSubmitParam = JarSubmitParam.build(statement);
jarSubmitParam.setUri("base64@" + Base64.encode(pretreatStatement(jarSubmitParam.getArgs())));
String args = jarSubmitParam.getArgs();
jarSubmitParam.setArgs("base64@"
+ Base64.encode(
isUseSqlFragment() ? getVariableManager().parseVariable(args) : args));
jobStatementPlan.addJobStatement(
jarSubmitParam.toString(), JobStatementType.EXECUTE_JAR, operationType);
jarSubmitParam.getStatement(), JobStatementType.EXECUTE_JAR, operationType);
} else if (operationType.equals(SqlType.EXECUTE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType);
} else if (operationType.equals(SqlType.PRINT)) {
Expand Down
1 change: 1 addition & 0 deletions e2e_test/docker-compose-env/dinky/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ services:
- ../flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/dinky/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- ../flink/conf/log4j-console.properties:/opt/flink/conf/log4j-console.properties
- ../hadoop:/opt/flink/conf
- ./jars:/dinky
networks:
- dinky_net
11 changes: 11 additions & 0 deletions e2e_test/docker-compose-env/doris/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "3"
networks:
dinky_net:
external: true
services:
doris:
image: yagagagaga/doris-standalone:2.1.7
hostname: doris
container_name: doris
networks:
- dinky_net
9 changes: 9 additions & 0 deletions e2e_test/tools/dinky_task/flink_jar_sql/mysql2doris.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
set 'execution.checkpointing.interval'='5 s';
ADD CUSTOMJAR 'rs:/flink-sql-connector-mysql-cdc.jar';
ADD CUSTOMJAR 'rs:/mysql-connector-java.jar';
EXECUTE JAR WITH (
'uri'='rs:/flink-doris-connector.jar',
'main-class'='org.apache.doris.flink.tools.cdc.CdcTools',
'args'='base64@bXlzcWwtc3luYy1kYXRhYmFzZSAgICAgLS1kYXRhYmFzZSBkaW5reSAgICAgLS1teXNxbC1jb25mIGhvc3RuYW1lPW15c3FsICAgICAtLW15c3FsLWNvbmYgcG9ydD0zMzA2ICAgICAtLW15c3FsLWNvbmYgdXNlcm5hbWU9cm9vdCAgICAgLS1teXNxbC1jb25mIHBhc3N3b3JkPWRpbmt5ICAgICAtLW15c3FsLWNvbmYgZGF0YWJhc2UtbmFtZT1kaW5reSAgICAgLS1teXNxbC1jb25mIHNlcnZlci10aW1lLXpvbmU9QXNpYS9TaGFuZ2hhaSAgICAgLS1pbmNsdWRpbmctdGFibGVzICJkaW5reV90YXNrIiAgICAgLS1zaW5rLWNvbmYgZmVub2Rlcz1kb3Jpcy1mZTo4MDMwICAgICAtLXNpbmstY29uZiB1c2VybmFtZT1yb290ICAgICAtLXNpbmstY29uZiBqZGJjLXVybD1qZGJjOm15c3FsOi8vZG9yaXMtZmU6OTAzMCAgICAgLS1zaW5rLWNvbmYgc2luay5sYWJlbC1wcmVmaXg9bGFiZWwtMSAgICAgLS10YWJsZS1jb25mIHJlcGxpY2F0aW9uX251bT0xIA==',
'allowNonRestoredState'='false'
);
59 changes: 59 additions & 0 deletions e2e_test/tools/dinky_task/flink_sql/flink-sql-datagen-test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
DROP TABLE IF EXISTS source_table3;

CREATE TABLE IF NOT EXISTS source_table3 (
-- 订单id
`order_id` BIGINT,
--产品
`product` BIGINT,
--金额
`amount` BIGINT,
-- 支付时间
`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`
--WATERMARK
WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
)
WITH
(
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);

-- SELECT * FROM source_table3 LIMIT 10;
DROP TABLE IF EXISTS sink_table5;

CREATE TABLE IF NOT EXISTS sink_table5 (
--产品
`product` BIGINT,
--金额
`amount` BIGINT,
--支付时间
`order_time` TIMESTAMP(3),
-- 1分钟时间聚合总数
`one_minute_sum` BIGINT
)
WITH
('connector' = 'print');

INSERT INTO
sink_table5
SELECT
product,
amount,
order_time,
SUM(amount) OVER (
PARTITION BY
product
ORDER BY
order_time
-- 标识统计范围是1个 product 的最近 1 分钟的数据
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING
AND CURRENT ROW
) as one_minute_sum
FROM
source_table3;
Loading

0 comments on commit 4b60ab2

Please sign in to comment.