diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index 6b0ac4c0fc..10708f2ae6 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -201,7 +201,7 @@ public Result exportSql(@RequestParam Integer id) { dataType = "Integer", paramType = "query", dataTypeClass = Integer.class) - public Result getTaskLineage(@RequestParam Integer id) { + public Result getTaskLineage(@RequestParam Integer id) throws NotSupportExplainExcepition { taskService.initTenantByTaskId(id); return Result.succeed(taskService.getTaskLineage(id), Status.QUERY_SUCCESS); } diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskService.java b/dinky-admin/src/main/java/org/dinky/service/TaskService.java index c0453567dc..08ddfbb760 100644 --- a/dinky-admin/src/main/java/org/dinky/service/TaskService.java +++ b/dinky-admin/src/main/java/org/dinky/service/TaskService.java @@ -280,7 +280,7 @@ public interface TaskService extends ISuperService { * @param id The id of the task to get. * @return A {@link LineageResult} object representing the found task lineage. */ - LineageResult getTaskLineage(Integer id); + LineageResult getTaskLineage(Integer id) throws NotSupportExplainExcepition; /** * Build the job submit config with the given task diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 75454bb37e..bda815ac2e 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -41,7 +41,6 @@ import org.dinky.data.model.mapping.ClusterInstanceMapping; import org.dinky.data.result.ProTableResult; import org.dinky.data.vo.task.JobInstanceVo; -import org.dinky.executor.ExecutorConfig; import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; import org.dinky.job.FlinkJobTask; @@ -289,7 +288,7 @@ public void refreshJobByTaskIds(Integer... taskIds) { @Override public LineageResult getLineage(Integer id) { History history = getJobInfoDetail(id).getHistory(); - return LineageBuilder.getColumnLineageByLogicalPlan(history.getStatement(), ExecutorConfig.DEFAULT); + return LineageBuilder.getColumnLineageByLogicalPlan(history.getStatement()); } @Override diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index a87d3a1d69..5cb7d11007 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -59,9 +59,7 @@ import org.dinky.data.model.udf.UDFTemplate; import org.dinky.data.result.Result; import org.dinky.data.result.SqlExplainResult; -import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; -import org.dinky.explainer.sqllineage.SQLLineageBuilder; import org.dinky.function.FunctionFactory; import org.dinky.function.compiler.CustomStringJavaCompiler; import org.dinky.function.data.model.UDF; @@ -980,27 +978,10 @@ public Result> queryAllCatalogue() { } @Override - public LineageResult getTaskLineage(Integer id) { + public LineageResult getTaskLineage(Integer id) throws NotSupportExplainExcepition { TaskDTO task = getTaskInfoById(id); - if (!Dialect.isCommonSql(task.getDialect())) { - if (Asserts.isNull(task.getDatabaseId())) { - return null; - } - DataBase dataBase = dataBaseService.getById(task.getDatabaseId()); - if (Asserts.isNull(dataBase)) { - return null; - } - if (task.getDialect().equalsIgnoreCase("doris") || task.getDialect().equalsIgnoreCase("starrocks")) { - return SQLLineageBuilder.getSqlLineage(task.getStatement(), "mysql", dataBase.getDriverConfig()); - } else { - return SQLLineageBuilder.getSqlLineage( - task.getStatement(), task.getDialect().toLowerCase(), dataBase.getDriverConfig()); - } - } else { - task.setStatement(buildEnvSql(task) + task.getStatement()); - JobConfig jobConfig = task.getJobConfig(); - return LineageBuilder.getColumnLineageByLogicalPlan(task.getStatement(), jobConfig); - } + BaseTask baseTask = BaseTask.getTask(task); + return baseTask.getColumnLineage(); } private List> dealWithCatalogue(List catalogueList) { diff --git a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java index bb41ea9db1..948a59e52c 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java @@ -25,6 +25,7 @@ import org.dinky.data.dto.TaskDTO; import org.dinky.data.exception.NotSupportExplainExcepition; import org.dinky.data.result.SqlExplainResult; +import org.dinky.explainer.lineage.LineageResult; import org.dinky.job.JobResult; import java.util.List; @@ -57,6 +58,11 @@ public ObjectNode getJobPlan() throws NotSupportExplainExcepition { StrFormatter.format("task [{}] dialect [{}] is can not getJobPlan", task.getName(), task.getDialect())); } + public LineageResult getColumnLineage() throws NotSupportExplainExcepition { + throw new NotSupportExplainExcepition(StrFormatter.format( + "task [{}] dialect [{}] is can not get column lineage", task.getName(), task.getDialect())); + } + public static BaseTask getTask(TaskDTO taskDTO) { Set> classes = ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class); diff --git a/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java index 760e46dae7..b6ecae4a44 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/CommonSqlTask.java @@ -19,13 +19,18 @@ package org.dinky.service.task; +import org.dinky.assertion.Asserts; import org.dinky.config.Dialect; import org.dinky.data.annotations.SupportDialect; import org.dinky.data.dto.SqlDTO; import org.dinky.data.dto.TaskDTO; +import org.dinky.data.model.DataBase; import org.dinky.data.result.SqlExplainResult; +import org.dinky.explainer.lineage.LineageResult; +import org.dinky.explainer.sqllineage.SQLLineageBuilder; import org.dinky.job.JobResult; import org.dinky.service.DataBaseService; +import org.dinky.service.impl.DataBaseServiceImpl; import java.util.List; @@ -80,4 +85,21 @@ public JobResult StreamExecute() { public boolean stop() { return false; } + + public LineageResult getColumnLineage() { + if (Asserts.isNull(task.getDatabaseId())) { + return null; + } + DataBaseService dataBaseService = SpringUtil.getBean(DataBaseServiceImpl.class); + DataBase dataBase = dataBaseService.getById(task.getDatabaseId()); + if (Asserts.isNull(dataBase)) { + return null; + } + if (task.getDialect().equalsIgnoreCase("doris") || task.getDialect().equalsIgnoreCase("starrocks")) { + return SQLLineageBuilder.getSqlLineage(task.getStatement(), "mysql", dataBase.getDriverConfig()); + } else { + return SQLLineageBuilder.getSqlLineage( + task.getStatement(), task.getDialect().toLowerCase(), dataBase.getDriverConfig()); + } + } } diff --git a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java index 481f25fe41..1ddbbb2b2d 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java @@ -25,6 +25,7 @@ import org.dinky.data.dto.TaskDTO; import org.dinky.data.enums.GatewayType; import org.dinky.data.result.SqlExplainResult; +import org.dinky.explainer.lineage.LineageResult; import org.dinky.job.JobManager; import org.dinky.job.JobResult; import org.dinky.service.TaskService; @@ -62,6 +63,10 @@ public ObjectNode getJobPlan() { return JsonUtils.parseObject(planJson); } + public LineageResult getColumnLineage() { + return jobManager.getColumnLineage(task.getStatement()); + } + @Override public JobResult execute() throws Exception { log.info("Initializing Flink job config..."); diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/dinky/utils/LineageContext.java b/dinky-client/dinky-client-1.18/src/main/java/org/dinky/utils/LineageContext.java index c40bdab84c..f8574d0d27 100644 --- a/dinky-client/dinky-client-1.18/src/main/java/org/dinky/utils/LineageContext.java +++ b/dinky-client/dinky-client-1.18/src/main/java/org/dinky/utils/LineageContext.java @@ -19,32 +19,23 @@ package org.dinky.utils; -import org.dinky.data.model.FunctionResult; import org.dinky.data.model.LineageRel; -import org.dinky.executor.CustomParser; import org.dinky.executor.CustomTableEnvironment; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelColumnOrigin; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.sql.SqlNode; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.ContextResolvedFunction; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.UnresolvedIdentifier; -import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.SinkModifyOperation; -import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -85,7 +76,6 @@ private Tuple2 parseStatement(String sql) { Operation operation = operations.get(0); if (operation instanceof SinkModifyOperation) { SinkModifyOperation sinkOperation = (SinkModifyOperation) operation; - PlannerQueryOperation queryOperation = (PlannerQueryOperation) sinkOperation.getChild(); RelNode relNode = queryOperation.getCalciteTree(); return new Tuple2<>( @@ -156,50 +146,4 @@ private List buildFiledLineageResult(String sinkTable, RelNode optRe } return resultList; } - - /** - * Analyze custom functions from SQL, does not contain system functions. - * - * @param singleSql the SQL statement to analyze - * @return custom functions set - */ - public Set analyzeFunction(String singleSql) { - LOG.info("Analyze function Sql: \n {}", singleSql); - CustomParser parser = (CustomParser) tableEnv.getParser(); - - // parsing sql and return the abstract syntax tree - SqlNode sqlNode = parser.parseSql(singleSql); - - // validate the query - SqlNode validated = parser.validate(sqlNode); - - // look for all functions - FunctionVisitor visitor = new FunctionVisitor(); - validated.accept(visitor); - List fullFunctionList = visitor.getFunctionList(); - - // filter custom functions - Set resultSet = new HashSet<>(); - for (UnresolvedIdentifier unresolvedIdentifier : fullFunctionList) { - getFunctionCatalog() - .lookupFunction(unresolvedIdentifier) - .flatMap(ContextResolvedFunction::getIdentifier) - // the objectIdentifier of the built-in function is null - .flatMap(FunctionIdentifier::getIdentifier) - .ifPresent(identifier -> { - FunctionResult functionResult = new FunctionResult() - .setCatalogName(identifier.getCatalogName()) - .setDatabase(identifier.getDatabaseName()) - .setFunctionName(identifier.getObjectName()); - LOG.debug("analyzed function: {}", functionResult); - resultSet.add(functionResult); - }); - } - return resultSet; - } - - private FunctionCatalog getFunctionCatalog() { - PlannerBase planner = (PlannerBase) tableEnv.getPlanner(); - return planner.getFlinkContext().getFunctionCatalog(); - } } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 3203b9ac1c..10909383ce 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -67,21 +67,15 @@ public class Explainer { private Executor executor; - private boolean useStatementSet; private JobManager jobManager; - public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) { + public Explainer(Executor executor, JobManager jobManager) { this.executor = executor; - this.useStatementSet = useStatementSet; this.jobManager = jobManager; } public static Explainer build(JobManager jobManager) { - return new Explainer(jobManager.getExecutor(), true, jobManager); - } - - public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) { - return new Explainer(executor, useStatementSet, jobManager); + return new Explainer(jobManager.getExecutor(), jobManager); } public Explainer initialize(JobConfig config, String statement) { @@ -229,7 +223,6 @@ public List getLineage(String statement) { .type(GatewayType.LOCAL.getLongValue()) .useRemote(false) .fragment(true) - .statementSet(useStatementSet) .parallelism(1) .configJson(executor.getTableConfig().getConfiguration().toMap()) .build(); diff --git a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java index 8dcb03e069..acdbca0856 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java +++ b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java @@ -20,9 +20,6 @@ package org.dinky.explainer.lineage; import org.dinky.data.model.LineageRel; -import org.dinky.executor.Executor; -import org.dinky.executor.ExecutorConfig; -import org.dinky.executor.ExecutorFactory; import org.dinky.explainer.Explainer; import org.dinky.job.JobConfig; import org.dinky.job.JobManager; @@ -41,20 +38,17 @@ public class LineageBuilder { public static LineageResult getColumnLineageByLogicalPlan(String statement, JobConfig jobConfig) { JobManager jobManager = JobManager.buildPlanMode(jobConfig); - Explainer explainer = new Explainer(jobManager.getExecutor(), false, jobManager); - return getColumnLineageByLogicalPlan(statement, explainer); + Explainer explainer = Explainer.build(jobManager); + return getColumnLineageByLogicalPlan(explainer.getLineage(statement)); } - public static LineageResult getColumnLineageByLogicalPlan(String statement, ExecutorConfig executorConfig) { + public static LineageResult getColumnLineageByLogicalPlan(String statement) { JobManager jobManager = JobManager.buildPlanMode(JobConfig.buildPlanConfig()); - Executor executor = ExecutorFactory.buildExecutor(executorConfig, jobManager.getDinkyClassLoader()); - jobManager.setExecutor(executor); - Explainer explainer = new Explainer(executor, false, jobManager); - return getColumnLineageByLogicalPlan(statement, explainer); + Explainer explainer = Explainer.build(jobManager); + return getColumnLineageByLogicalPlan(explainer.getLineage(statement)); } - public static LineageResult getColumnLineageByLogicalPlan(String statement, Explainer explainer) { - List lineageRelList = explainer.getLineage(statement); + public static LineageResult getColumnLineageByLogicalPlan(List lineageRelList) { List relations = new ArrayList<>(); Map tableMap = new HashMap<>(); int tableIndex = 1; diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 2e6cfd7749..3d85ca2f5b 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -43,6 +43,8 @@ import org.dinky.executor.ExecutorConfig; import org.dinky.executor.ExecutorFactory; import org.dinky.explainer.Explainer; +import org.dinky.explainer.lineage.LineageBuilder; +import org.dinky.explainer.lineage.LineageResult; import org.dinky.function.util.UDFUtil; import org.dinky.gateway.Gateway; import org.dinky.gateway.config.FlinkConfig; @@ -382,6 +384,11 @@ public String getJobPlanJson(String statement) { return Explainer.build(this).getJobPlanInfo(statement).getJsonPlan(); } + public LineageResult getColumnLineage(String statement) { + return LineageBuilder.getColumnLineageByLogicalPlan( + Explainer.build(this).getLineage(statement)); + } + public boolean cancelNormal(String jobId) { try { return FlinkAPI.build(config.getAddress()).stop(jobId); diff --git a/dinky-core/src/test/java/org/dinky/core/LineageTest.java b/dinky-core/src/test/java/org/dinky/core/LineageTest.java index c7528848a0..b208812e8a 100644 --- a/dinky-core/src/test/java/org/dinky/core/LineageTest.java +++ b/dinky-core/src/test/java/org/dinky/core/LineageTest.java @@ -19,7 +19,6 @@ package org.dinky.core; -import org.dinky.executor.ExecutorConfig; import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; @@ -56,7 +55,7 @@ public void sumTest() { + " 'connector' = 'print'\n" + ");\n" + "insert into TT select a||c A ,b||c B from ST"; - LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(sql, ExecutorConfig.DEFAULT); + LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(sql); LOGGER.info("end"); } } diff --git a/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java b/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java index ace3b28d94..5a1a38b076 100644 --- a/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java +++ b/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java @@ -23,7 +23,6 @@ import org.dinky.data.enums.GatewayType; import org.dinky.data.result.ExplainResult; -import org.dinky.executor.ExecutorConfig; import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; @@ -111,7 +110,7 @@ void testExecuteSql() throws Exception { void testLineageSqlSingle() throws Exception { String statement = IOUtils.toString(Resources.getResource("flink/sql/single-insert.sql"), StandardCharsets.UTF_8); - LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(statement, ExecutorConfig.DEFAULT); + LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(statement); assertNotNull(result); assertEquals(2, result.getTables().size()); assertEquals(4, result.getRelations().size());