From 0f54474c7c20d5d6b0ca2af726baf10215b6720a Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 6 Mar 2019 16:02:20 +0800 Subject: [PATCH] support Flink MainClass Job --- .../ideal/sylph/controller/JettyServer.java | 3 +- .../controller/action/StreamSqlResource.java | 53 ++++-- .../src/main/webapp/app/index.html | 4 +- .../src/main/webapp/app/js/list.js | 7 +- .../src/main/webapp/app/js/stream_sql.js | 17 +- .../src/main/webapp/app/stream_sql.html | 25 +-- .../sylph/main/service/RunnerManager.java | 7 +- .../ideal/sylph/runner/flink/FlinkRunner.java | 4 +- .../flink/actuator/FlinkEnvFactory.java | 2 +- .../actuator/FlinkMainClassActuator.java | 165 ++++++++++++++++++ .../actuator/FlinkStreamSqlActuator.java | 11 +- .../runner/flink/yarn/YarnJobDescriptor.java | 6 +- .../main/java/ideal/sylph/spi/NodeLoader.java | 9 +- .../sylph/spi/job/EtlJobActuatorHandle.java | 9 +- .../sylph/spi/job/JobActuatorHandle.java | 4 +- 15 files changed, 267 insertions(+), 59 deletions(-) create mode 100644 sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkMainClassActuator.java diff --git a/sylph-controller/src/main/java/ideal/sylph/controller/JettyServer.java b/sylph-controller/src/main/java/ideal/sylph/controller/JettyServer.java index 3574dedbc..542c76997 100644 --- a/sylph-controller/src/main/java/ideal/sylph/controller/JettyServer.java +++ b/sylph-controller/src/main/java/ideal/sylph/controller/JettyServer.java @@ -84,7 +84,8 @@ protected void prepare(Request baseRequest, ServletRequest request, ServletRespo super.prepare(baseRequest, request, response); } }; - servlet.getRegistration().setMultipartConfig(new MultipartConfigElement("data/tmp", 1048576, 1048576, 262144)); + //1M = 1048576 + servlet.getRegistration().setMultipartConfig(new MultipartConfigElement("data/tmp", 1048576_00, 1048576_00, 262144)); //--------------------plblic---------------------- ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); //NO_SESSIONS diff --git a/sylph-controller/src/main/java/ideal/sylph/controller/action/StreamSqlResource.java b/sylph-controller/src/main/java/ideal/sylph/controller/action/StreamSqlResource.java index d708c90f8..1f96a780a 100644 --- a/sylph-controller/src/main/java/ideal/sylph/controller/action/StreamSqlResource.java +++ b/sylph-controller/src/main/java/ideal/sylph/controller/action/StreamSqlResource.java @@ -16,16 +16,20 @@ package ideal.sylph.controller.action; import com.github.harbby.gadtry.base.Throwables; +import com.github.harbby.gadtry.io.IOUtils; import com.github.harbby.gadtry.jvm.JVMException; import com.google.common.collect.ImmutableMap; import ideal.sylph.spi.SylphContext; import ideal.sylph.spi.exception.SylphException; import ideal.sylph.spi.job.Job; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.ServletContext; +import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.Part; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -38,7 +42,9 @@ import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -84,9 +90,14 @@ public Map saveJob(@Context HttpServletRequest request) jobId = requireNonNull(request.getParameter("jobId"), "job jobId is not empty"); String flow = request.getParameter("query"); String configString = request.getParameter("config"); + String jobType = request.getParameter("jobType"); checkState(isNotBlank(jobId), "JobId IS NULL"); checkState(isNotBlank(flow), "SQL query IS NULL"); - sylphContext.saveJob(jobId, flow, ImmutableMap.of("type", "StreamSql", "config", parserJobConfig(configString))); + + File workDir = new File("jobs/" + jobId); //工作目录 + saveFiles(workDir, request); + + sylphContext.saveJob(jobId, flow, ImmutableMap.of("type", jobType, "config", parserJobConfig(configString))); Map out = ImmutableMap.of( "jobId", jobId, "type", "save", @@ -95,16 +106,14 @@ public Map saveJob(@Context HttpServletRequest request) logger.info("save job {}", jobId); return out; } - catch (JVMException e) { - logger.warn("save job {} failed: {}", jobId, e.getMessage()); - String message = e.getMessage(); - return ImmutableMap.of("type", "save", - "status", "error", - "msg", message); - } catch (Exception e) { - logger.warn("save job {} failed: {}", jobId, e); - String message = Throwables.getStackTraceAsString(e); + Throwable error = e; + if (e instanceof InvocationTargetException) { + error = ((InvocationTargetException) e).getTargetException(); + } + String message = error instanceof JVMException ? error.getMessage() : Throwables.getStackTraceAsString(error); + + logger.warn("save job {} failed: {}", jobId, message); return ImmutableMap.of("type", "save", "status", "error", "msg", message); @@ -133,12 +142,36 @@ public Map getJob(@QueryParam("jobId") String jobId) .put("query", job.getFlow().toString()) .put("config", job.getConfig()) .put("msg", "Get job successfully") + .put("jobType", job.getActuatorName()) .put("status", "ok") .put("files", files) .put("jobId", jobId) .build(); } + private void saveFiles(File workDir, HttpServletRequest request) + throws IOException, ServletException + { + //通过表单中的name属性获取表单域中的文件 + //name 为 selectFile(file的里面可能有删除的)的文件 + List parts = request.getParts().stream() + .filter(part -> "file".equals(part.getName()) && isNotBlank(part.getSubmittedFileName())) + .collect(Collectors.toList()); + if (parts.isEmpty()) { + return; + } + + File downDir = new File(workDir, "files"); + FileUtils.deleteDirectory(downDir); + downDir.mkdirs(); + for (Part part : parts) { + IOUtils.copyBytes(part.getInputStream(), + new FileOutputStream(new File(downDir, part.getSubmittedFileName()), false), + 1024, + true); + } + } + static Map parserJobConfig(String configString) throws IOException { diff --git a/sylph-controller/src/main/webapp/app/index.html b/sylph-controller/src/main/webapp/app/index.html index e747c9d8d..09b5307f6 100644 --- a/sylph-controller/src/main/webapp/app/index.html +++ b/sylph-controller/src/main/webapp/app/index.html @@ -40,9 +40,9 @@

JobManager

-
job
+
job
runId
-
type
+
type
status
click
diff --git a/sylph-controller/src/main/webapp/app/js/list.js b/sylph-controller/src/main/webapp/app/js/list.js index 2db8bbe8d..7f0d7ab2b 100644 --- a/sylph-controller/src/main/webapp/app/js/list.js +++ b/sylph-controller/src/main/webapp/app/js/list.js @@ -49,9 +49,9 @@ $(function () { } var tmp = '
' + - '
' + jobId + '
' + + '
' + jobId + '
' + '
' + yarnId + '
' + - '
' + type + '
' + + '
' + type + '
' + // '
' + create_time + '
' + '
' + status + '
' + '
' + button + '
' + @@ -103,13 +103,12 @@ $(function () { $(document).on("click", ".btn_edit", function () { var id = $(this).attr("data-id"); var type = $(this).attr("data-type"); - if (type == 'StreamSql') { + if (type == 'StreamSql' || type == 'FlinkMainClass') { window.location.href = "stream_sql.html?type=edit&jobId=" + id; } else { window.location.href = "etl.html?jobId=" + id; } - }); }); diff --git a/sylph-controller/src/main/webapp/app/js/stream_sql.js b/sylph-controller/src/main/webapp/app/js/stream_sql.js index 89b697f6c..ba6c63862 100644 --- a/sylph-controller/src/main/webapp/app/js/stream_sql.js +++ b/sylph-controller/src/main/webapp/app/js/stream_sql.js @@ -26,6 +26,18 @@ function getUrlParam(paramName) { /*页面加载*/ $(function () { + var sql_editor = CodeMirror.fromTextArea(document.getElementById("query"), { + mode: 'text/x-sql', + lineNumbers: true, + styleActiveLine: true, + matchBrackets: true + }); + sql_editor.on('change', editor => { + document.getElementById('query').value = editor.getValue(); + console.log('change up value:'+ editor.getValue()); + }); + + /*add or edit*/ var type = getUrlParam("type"); if (type === "add") { @@ -38,8 +50,11 @@ $(function () { data: {}, cache: false, success: function (result) { - $("textarea[name=jobId]").val(result.jobId); + $("input[name=jobId]").val(result.jobId); + $("select[name=jobType]").val(result.jobType) $("textarea[name=query]").val(result.query); + sql_editor.setValue(result.query); + var congfigString = ""; $.each(result.config.config, function (key, value) { congfigString += key + "= " + value + "\n" diff --git a/sylph-controller/src/main/webapp/app/stream_sql.html b/sylph-controller/src/main/webapp/app/stream_sql.html index 4a7e8cdfd..7baaa264f 100755 --- a/sylph-controller/src/main/webapp/app/stream_sql.html +++ b/sylph-controller/src/main/webapp/app/stream_sql.html @@ -45,8 +45,15 @@

StreamSql

-
- +
+ +
+ +
+
@@ -89,18 +96,4 @@

StreamSql

- \ No newline at end of file diff --git a/sylph-main/src/main/java/ideal/sylph/main/service/RunnerManager.java b/sylph-main/src/main/java/ideal/sylph/main/service/RunnerManager.java index 93367ba15..a7f0808f6 100644 --- a/sylph-main/src/main/java/ideal/sylph/main/service/RunnerManager.java +++ b/sylph-main/src/main/java/ideal/sylph/main/service/RunnerManager.java @@ -36,6 +36,7 @@ import ideal.sylph.spi.job.JobContainer; import ideal.sylph.spi.job.JobHandle; import ideal.sylph.spi.model.PipelinePluginInfo; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,7 +211,11 @@ private Job formJobWithFlow(String jobId, byte[] flowBytes, JobActuator jobActua jobClassLoader.addDir(jobWorkDir); Flow flow = jobActuatorHandle.formFlow(flowBytes); - jobClassLoader.addJarFiles(jobActuatorHandle.parserFlowDepends(flow)); + + Set files = jobActuatorHandle.parserFlowDepends(flow).stream().flatMap(plugin -> + FileUtils.listFiles(plugin.getPluginFile(), null, true).stream() + ).collect(Collectors.toSet()); + jobClassLoader.addJarFiles(files); JobHandle jobHandle = jobActuatorHandle.formJob(jobId, flow, jobConfig, jobClassLoader); Collection dependFiles = getJobDependFiles(jobClassLoader); return new Job() diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkRunner.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkRunner.java index 890b2ed32..627957a0a 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkRunner.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkRunner.java @@ -17,6 +17,7 @@ import com.github.harbby.gadtry.classloader.DirClassLoader; import com.github.harbby.gadtry.ioc.IocFactory; +import ideal.sylph.runner.flink.actuator.FlinkMainClassActuator; import ideal.sylph.runner.flink.actuator.FlinkStreamEtlActuator; import ideal.sylph.runner.flink.actuator.FlinkStreamSqlActuator; import ideal.sylph.spi.Runner; @@ -63,13 +64,14 @@ public Set create(RunnerContext context) ((DirClassLoader) classLoader).addDir(new File(flinkHome, "lib")); } IocFactory injector = IocFactory.create(binder -> { + binder.bind(FlinkMainClassActuator.class).withSingle(); binder.bind(FlinkStreamEtlActuator.class).withSingle(); binder.bind(FlinkStreamSqlActuator.class).withSingle(); //---------------------------------- binder.bind(PipelinePluginManager.class).byCreator(() -> createPipelinePluginManager(context)).withSingle(); }); - return Stream.of(FlinkStreamEtlActuator.class, FlinkStreamSqlActuator.class) + return Stream.of(FlinkMainClassActuator.class, FlinkStreamEtlActuator.class, FlinkStreamSqlActuator.class) .map(injector::getInstance).collect(Collectors.toSet()); } catch (Exception e) { diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkEnvFactory.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkEnvFactory.java index 7294bebb2..be60726b6 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkEnvFactory.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkEnvFactory.java @@ -40,7 +40,7 @@ private FlinkEnvFactory() {} public static StreamExecutionEnvironment getStreamEnv(JobParameter jobConfig, String jobId) { - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); return setJobConfig(execEnv, jobConfig, jobId); } diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkMainClassActuator.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkMainClassActuator.java new file mode 100644 index 000000000..27da9b88e --- /dev/null +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkMainClassActuator.java @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.runner.flink.actuator; + +import com.github.harbby.gadtry.jvm.JVMException; +import com.github.harbby.gadtry.jvm.JVMLauncher; +import com.github.harbby.gadtry.jvm.JVMLaunchers; +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.runner.flink.FlinkJobConfig; +import ideal.sylph.runner.flink.FlinkJobHandle; +import ideal.sylph.spi.job.Flow; +import ideal.sylph.spi.job.JobConfig; +import ideal.sylph.spi.job.JobHandle; +import ideal.sylph.spi.model.PipelinePluginInfo; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.fusesource.jansi.Ansi; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URLClassLoader; +import java.util.Collection; +import java.util.Collections; + +import static com.github.harbby.gadtry.base.Throwables.throwsException; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.fusesource.jansi.Ansi.Color.GREEN; +import static org.fusesource.jansi.Ansi.Color.YELLOW; + +/** + * 通过main class 加载Job和编译 + *

+ * flink submit通过{@link org.apache.flink.client.program.OptimizerPlanEnvironment#getOptimizedPlan} 加载和编译 + * 具体思路是1: setAsContext(); 设置创建静态env(session) + * 2, 反射执行 用户main()方法 + * 3, return plan JobGraph + */ +@Name("FlinkMainClass") +@Description("this is FlinkMainClassActuator Actuator") +public class FlinkMainClassActuator + extends FlinkStreamEtlActuator +{ + @Override + public Flow formFlow(byte[] flowBytes) + throws IOException + { + return new StringFlow(flowBytes); + } + + @Override + public Collection parserFlowDepends(Flow inFlow) + throws IOException + { + return Collections.emptyList(); + } + + @Override + public JobHandle formJob(String jobId, Flow flow, JobConfig jobConfig, URLClassLoader jobClassLoader) + throws Exception + { + FlinkJobConfig flinkJobConfig = (FlinkJobConfig) jobConfig; + JobGraph jobGraph = compile(jobId, (StringFlow) flow, flinkJobConfig.getConfig(), jobClassLoader); + + return new FlinkJobHandle(jobGraph); + } + + private static JobGraph compile(String jobId, StringFlow flow, JobParameter jobConfig, URLClassLoader jobClassLoader) + throws JVMException + { + JVMLauncher launcher = JVMLaunchers.newJvm() + .setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset())) + .setCallable(() -> { + //---set env + Configuration configuration = new Configuration(); + OptimizerPlanEnvironment planEnvironment = new OptimizerPlanEnvironment(new Optimizer(configuration)); + ExecutionEnvironmentFactory factory = () -> planEnvironment; + Method method = ExecutionEnvironment.class.getDeclaredMethod("initializeContextEnvironment", ExecutionEnvironmentFactory.class); + method.setAccessible(true); + method.invoke(null, factory); + + //--set streamEnv + StreamExecutionEnvironment streamExecutionEnvironment = FlinkEnvFactory.getStreamEnv(jobConfig, jobId); + StreamExecutionEnvironmentFactory streamFactory = () -> streamExecutionEnvironment; + Method m1 = StreamExecutionEnvironment.class.getDeclaredMethod("initializeContextEnvironment", StreamExecutionEnvironmentFactory.class); + m1.setAccessible(true); + m1.invoke(null, streamFactory); + //--- + Class mainClass = Class.forName(flow.mainClass); + System.out.println("this flink job Main class: " + mainClass); + Method main = mainClass.getMethod("main", String[].class); + try { + main.invoke(null, (Object) new String[0]); + throwsException(ProgramInvocationException.class); + } + catch (ProgramInvocationException e) { + throw e; + } + catch (Throwable t) { + Field field = OptimizerPlanEnvironment.class.getDeclaredField("optimizerPlan"); + field.setAccessible(true); + FlinkPlan flinkPlan = (FlinkPlan) field.get(planEnvironment); + if (flinkPlan == null) { + throw new ProgramInvocationException("The program caused an error: ", t); + } + if (flinkPlan instanceof StreamGraph) { + return ((StreamGraph) flinkPlan).getJobGraph(); + } + else { + final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration); + return jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, null); + } + } + + throw new ProgramInvocationException("The program plan could not be fetched - the program aborted pre-maturely."); + }) + .setClassLoader(jobClassLoader) + .addUserURLClassLoader(jobClassLoader) + .build(); + + return launcher.startAndGet(); + } + + public static class StringFlow + extends Flow + { + private final String mainClass; + + public StringFlow(byte[] flowBytes) + { + this.mainClass = new String(flowBytes, UTF_8).trim(); + } + + @Override + public String toString() + { + return mainClass; + } + } +} diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkStreamSqlActuator.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkStreamSqlActuator.java index 43c49fa3a..2a3f966b7 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkStreamSqlActuator.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/actuator/FlinkStreamSqlActuator.java @@ -30,8 +30,8 @@ import ideal.sylph.spi.job.Flow; import ideal.sylph.spi.job.JobConfig; import ideal.sylph.spi.job.JobHandle; +import ideal.sylph.spi.model.PipelinePluginInfo; import ideal.sylph.spi.model.PipelinePluginManager; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -44,7 +44,6 @@ import javax.validation.constraints.NotNull; -import java.io.File; import java.net.URLClassLoader; import java.util.Arrays; import java.util.Collection; @@ -73,10 +72,10 @@ public Flow formFlow(byte[] flowBytes) @NotNull @Override - public Collection parserFlowDepends(Flow inFlow) + public Collection parserFlowDepends(Flow inFlow) { SqlFlow flow = (SqlFlow) inFlow; - ImmutableSet.Builder builder = ImmutableSet.builder(); + ImmutableSet.Builder builder = ImmutableSet.builder(); AntlrSqlParser parser = new AntlrSqlParser(); Stream.of(flow.getSqlSplit()) @@ -94,9 +93,7 @@ public Collection parserFlowDepends(Flow inFlow) Map withConfig = createTable.getWithConfig(); String driverOrName = (String) requireNonNull(withConfig.get("type"), "driver is null"); pluginManager.findPluginInfo(driverOrName, getPipeType(createTable.getType())) - .ifPresent(plugin -> FileUtils - .listFiles(plugin.getPluginFile(), null, true) - .forEach(builder::add)); + .ifPresent(builder::add); }); return builder.build(); } diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnJobDescriptor.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnJobDescriptor.java index 272bab701..7c447ecd3 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnJobDescriptor.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnJobDescriptor.java @@ -136,7 +136,9 @@ public YarnClient getYarnClient() public ClusterClient deploy(JobGraph jobGraph, boolean detached) throws Exception { - //jobGraph.setAllowQueuedScheduling(true); + // this is required because the slots are allocated lazily + jobGraph.setAllowQueuedScheduling(true); + // YarnClientApplication application = yarnClient.createApplication(); ApplicationReport report = startAppMaster(application, jobGraph); @@ -392,7 +394,7 @@ private Map setUpAmEnvironment( } /** - * flink 1.5 add + * flink1.5 add */ @Override public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java b/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java index 7fd9042d6..7ecc7478f 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java @@ -77,14 +77,13 @@ static PluginConfig getPipeConfigInstance(Class type, Cl //Ignore the constructor in the configuration class try { - Constructor pluginConfigConstructor = type.getDeclaredConstructor(); - logger.info("[PluginConfig] find 'no parameter' constructor with [{}]", type); + Constructor pluginConfigConstructor = type.getDeclaredConstructor(); + logger.debug("find 'no parameter' constructor with [{}]", type); pluginConfigConstructor.setAccessible(true); - PluginConfig pluginConfig = (PluginConfig) pluginConfigConstructor.newInstance(); - return pluginConfig; + return pluginConfigConstructor.newInstance(); } catch (NoSuchMethodException e) { - logger.warn("[PluginConfig] not find 'no parameter' constructor, use javassist inject with [{}]", type); + logger.info("Not find 'no parameter' constructor, use javassist inject with [{}]", type); ClassPool classPool = new ClassPool(); classPool.appendClassPath(new LoaderClassPath(classLoader)); diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/EtlJobActuatorHandle.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/EtlJobActuatorHandle.java index b245bddcf..00eb4e9aa 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/EtlJobActuatorHandle.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/job/EtlJobActuatorHandle.java @@ -19,11 +19,9 @@ import ideal.sylph.etl.PipelinePlugin; import ideal.sylph.spi.model.NodeInfo; import ideal.sylph.spi.model.PipelinePluginInfo; -import org.apache.commons.io.FileUtils; import javax.validation.constraints.NotNull; -import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Optional; @@ -41,18 +39,17 @@ public Flow formFlow(byte[] flowBytes) @NotNull @Override - public Collection parserFlowDepends(Flow inFlow) + public Collection parserFlowDepends(Flow inFlow) throws IOException { EtlFlow flow = (EtlFlow) inFlow; //---- flow parser depends ---- - ImmutableSet.Builder builder = ImmutableSet.builder(); + ImmutableSet.Builder builder = ImmutableSet.builder(); for (NodeInfo nodeInfo : flow.getNodes()) { String driverOrName = nodeInfo.getDriverClass(); PipelinePlugin.PipelineType type = PipelinePlugin.PipelineType.valueOf(nodeInfo.getNodeType()); Optional pluginInfo = this.getPluginManager().findPluginInfo(driverOrName, type); - pluginInfo.ifPresent(plugin -> FileUtils.listFiles(plugin.getPluginFile(), null, true) - .forEach(builder::add)); + pluginInfo.ifPresent(builder::add); } return builder.build(); } diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobActuatorHandle.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobActuatorHandle.java index a3a11c100..75f0eb7ff 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobActuatorHandle.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobActuatorHandle.java @@ -16,11 +16,11 @@ package ideal.sylph.spi.job; import com.github.harbby.gadtry.jvm.JVMException; +import ideal.sylph.spi.model.PipelinePluginInfo; import ideal.sylph.spi.model.PipelinePluginManager; import javax.validation.constraints.NotNull; -import java.io.File; import java.io.IOException; import java.net.URLClassLoader; import java.util.Collection; @@ -50,7 +50,7 @@ Flow formFlow(byte[] flowBytes) throws IOException; @NotNull - default Collection parserFlowDepends(Flow flow) + default Collection parserFlowDepends(Flow flow) throws IOException { return Collections.emptyList();