Skip to content

Commit

Permalink
Merge pull request #71 from harbby/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
harbby authored Mar 11, 2019
2 parents 7e2e3be + f6ef183 commit c40d6d1
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ protected void prepare(Request baseRequest, ServletRequest request, ServletRespo
super.prepare(baseRequest, request, response);
}
};
servlet.getRegistration().setMultipartConfig(new MultipartConfigElement("data/tmp", 1048576, 1048576, 262144));
//1M = 1048576
servlet.getRegistration().setMultipartConfig(new MultipartConfigElement("data/tmp", 1048576_00, 1048576_00, 262144));

//--------------------plblic----------------------
ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); //NO_SESSIONS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
package ideal.sylph.controller.action;

import com.github.harbby.gadtry.base.Throwables;
import com.github.harbby.gadtry.io.IOUtils;
import com.github.harbby.gadtry.jvm.JVMException;
import com.google.common.collect.ImmutableMap;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.exception.SylphException;
import ideal.sylph.spi.job.Job;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.Part;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -38,7 +42,9 @@

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -84,9 +90,14 @@ public Map saveJob(@Context HttpServletRequest request)
jobId = requireNonNull(request.getParameter("jobId"), "job jobId is not empty");
String flow = request.getParameter("query");
String configString = request.getParameter("config");
String jobType = request.getParameter("jobType");
checkState(isNotBlank(jobId), "JobId IS NULL");
checkState(isNotBlank(flow), "SQL query IS NULL");
sylphContext.saveJob(jobId, flow, ImmutableMap.of("type", "StreamSql", "config", parserJobConfig(configString)));

File workDir = new File("jobs/" + jobId); //工作目录
saveFiles(workDir, request);

sylphContext.saveJob(jobId, flow, ImmutableMap.of("type", jobType, "config", parserJobConfig(configString)));
Map out = ImmutableMap.of(
"jobId", jobId,
"type", "save",
Expand All @@ -95,16 +106,14 @@ public Map saveJob(@Context HttpServletRequest request)
logger.info("save job {}", jobId);
return out;
}
catch (JVMException e) {
logger.warn("save job {} failed: {}", jobId, e.getMessage());
String message = e.getMessage();
return ImmutableMap.of("type", "save",
"status", "error",
"msg", message);
}
catch (Exception e) {
logger.warn("save job {} failed: {}", jobId, e);
String message = Throwables.getStackTraceAsString(e);
Throwable error = e;
if (e instanceof InvocationTargetException) {
error = ((InvocationTargetException) e).getTargetException();
}
String message = error instanceof JVMException ? error.getMessage() : Throwables.getStackTraceAsString(error);

logger.warn("save job {} failed: {}", jobId, message);
return ImmutableMap.of("type", "save",
"status", "error",
"msg", message);
Expand Down Expand Up @@ -133,12 +142,36 @@ public Map getJob(@QueryParam("jobId") String jobId)
.put("query", job.getFlow().toString())
.put("config", job.getConfig())
.put("msg", "Get job successfully")
.put("jobType", job.getActuatorName())
.put("status", "ok")
.put("files", files)
.put("jobId", jobId)
.build();
}

private void saveFiles(File workDir, HttpServletRequest request)
throws IOException, ServletException
{
//通过表单中的name属性获取表单域中的文件
//name 为 selectFile(file的里面可能有删除的)的文件
List<Part> parts = request.getParts().stream()
.filter(part -> "file".equals(part.getName()) && isNotBlank(part.getSubmittedFileName()))
.collect(Collectors.toList());
if (parts.isEmpty()) {
return;
}

File downDir = new File(workDir, "files");
FileUtils.deleteDirectory(downDir);
downDir.mkdirs();
for (Part part : parts) {
IOUtils.copyBytes(part.getInputStream(),
new FileOutputStream(new File(downDir, part.getSubmittedFileName()), false),
1024,
true);
}
}

static Map parserJobConfig(String configString)
throws IOException
{
Expand Down
4 changes: 2 additions & 2 deletions sylph-controller/src/main/webapp/app/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ <h2>JobManager</h2>
</div>

<div class="row" id="rowHead">
<div class="col-md-2">job</div>
<div class="col-md-3">job</div>
<div class="col-md-3">runId</div>
<div class="col-md-1">type</div>
<div class="col-md-2">type</div>
<!--<div class="col-md-2">create_time</div>-->
<div class="col-md-1">status</div>
<div class="col-md-3">click</div>
Expand Down
7 changes: 3 additions & 4 deletions sylph-controller/src/main/webapp/app/js/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ $(function () {
}
var tmp =
'<div class="row">' +
'<div class="col-md-2">' + jobId + '</div>' +
'<div class="col-md-3">' + jobId + '</div>' +
'<div class="col-md-3">' + yarnId + '</div>' +
'<div class="col-md-1">' + type + '</div>' +
'<div class="col-md-2">' + type + '</div>' +
// '<div class="col-md-2">' + create_time + '</div>' +
'<div class="col-md-1">' + status + '</div>' +
'<div class="col-md-3" jobId="' + jobId + '">' + button + '</div>' +
Expand Down Expand Up @@ -103,13 +103,12 @@ $(function () {
$(document).on("click", ".btn_edit", function () {
var id = $(this).attr("data-id");
var type = $(this).attr("data-type");
if (type == 'StreamSql') {
if (type == 'StreamSql' || type == 'FlinkMainClass') {
window.location.href = "stream_sql.html?type=edit&jobId=" + id;
}
else {
window.location.href = "etl.html?jobId=" + id;
}

});

});
17 changes: 16 additions & 1 deletion sylph-controller/src/main/webapp/app/js/stream_sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ function getUrlParam(paramName) {

/*页面加载*/
$(function () {
var sql_editor = CodeMirror.fromTextArea(document.getElementById("query"), {
mode: 'text/x-sql',
lineNumbers: true,
styleActiveLine: true,
matchBrackets: true
});
sql_editor.on('change', editor => {
document.getElementById('query').value = editor.getValue();
console.log('change up value:'+ editor.getValue());
});


/*add or edit*/
var type = getUrlParam("type");
if (type === "add") {
Expand All @@ -38,8 +50,11 @@ $(function () {
data: {},
cache: false,
success: function (result) {
$("textarea[name=jobId]").val(result.jobId);
$("input[name=jobId]").val(result.jobId);
$("select[name=jobType]").val(result.jobType)
$("textarea[name=query]").val(result.query);
sql_editor.setValue(result.query);

var congfigString = "";
$.each(result.config.config, function (key, value) {
congfigString += key + "= " + value + "\n"
Expand Down
25 changes: 9 additions & 16 deletions sylph-controller/src/main/webapp/app/stream_sql.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ <h2>StreamSql</h2>
<input class="form-control" rows="1" name="type" style="display: none" value="add"/>
<div class="form-group">
<label for="inputEmail3" class="col-sm-2 control-label">jobId:</label>
<div class="col-sm-10">
<textarea class="form-control" rows="1" name="jobId"></textarea>
<div class="col-sm-10" style="width: 40%">
<input class="form-control" type="text" rows="1" name="jobId"/>
</div>
<label for="inputEmail3" class="col-sm-2 control-label">type:</label>
<div class="col-sm-10" style="width: 20%">
<select class="form-control" type="text" rows="1" name="jobType">
<option value="StreamSql">StreamingSql</option>
<option value="FlinkMainClass">FlinkMainClass</option>
</select>
</div>
</div>
<div class="form-group">
Expand Down Expand Up @@ -89,18 +96,4 @@ <h2>StreamSql</h2>
</body>

<script type="text/javascript" src="./js/stream_sql.js"></script>
<script>
window.onload = function () {
CodeMirror.fromTextArea(document.getElementById("query"), {
mode: 'text/x-sql',
lineNumbers: true,
styleActiveLine: true,
matchBrackets: true
}).on('change', editor => {
document.getElementById('query').value = editor.getValue();
console.log('change up value:'+ editor.getValue());
});
};

</script>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import ideal.sylph.spi.job.JobContainer;
import ideal.sylph.spi.job.JobHandle;
import ideal.sylph.spi.model.PipelinePluginInfo;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -210,7 +211,11 @@ private Job formJobWithFlow(String jobId, byte[] flowBytes, JobActuator jobActua
jobClassLoader.addDir(jobWorkDir);

Flow flow = jobActuatorHandle.formFlow(flowBytes);
jobClassLoader.addJarFiles(jobActuatorHandle.parserFlowDepends(flow));

Set<File> files = jobActuatorHandle.parserFlowDepends(flow).stream().flatMap(plugin ->
FileUtils.listFiles(plugin.getPluginFile(), null, true).stream()
).collect(Collectors.toSet());
jobClassLoader.addJarFiles(files);
JobHandle jobHandle = jobActuatorHandle.formJob(jobId, flow, jobConfig, jobClassLoader);
Collection<URL> dependFiles = getJobDependFiles(jobClassLoader);
return new Job()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.github.harbby.gadtry.classloader.DirClassLoader;
import com.github.harbby.gadtry.ioc.IocFactory;
import ideal.sylph.runner.flink.actuator.FlinkMainClassActuator;
import ideal.sylph.runner.flink.actuator.FlinkStreamEtlActuator;
import ideal.sylph.runner.flink.actuator.FlinkStreamSqlActuator;
import ideal.sylph.spi.Runner;
Expand Down Expand Up @@ -63,13 +64,14 @@ public Set<JobActuatorHandle> create(RunnerContext context)
((DirClassLoader) classLoader).addDir(new File(flinkHome, "lib"));
}
IocFactory injector = IocFactory.create(binder -> {
binder.bind(FlinkMainClassActuator.class).withSingle();
binder.bind(FlinkStreamEtlActuator.class).withSingle();
binder.bind(FlinkStreamSqlActuator.class).withSingle();
//----------------------------------
binder.bind(PipelinePluginManager.class).byCreator(() -> createPipelinePluginManager(context)).withSingle();
});

return Stream.of(FlinkStreamEtlActuator.class, FlinkStreamSqlActuator.class)
return Stream.of(FlinkMainClassActuator.class, FlinkStreamEtlActuator.class, FlinkStreamSqlActuator.class)
.map(injector::getInstance).collect(Collectors.toSet());
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private FlinkEnvFactory() {}

public static StreamExecutionEnvironment getStreamEnv(JobParameter jobConfig, String jobId)
{
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();

return setJobConfig(execEnv, jobConfig, jobId);
}
Expand Down
Loading

0 comments on commit c40d6d1

Please sign in to comment.