Skip to content

Commit

Permalink
add stream sql
Browse files Browse the repository at this point in the history
  • Loading branch information
ideal committed Aug 16, 2018
1 parent 6b4f9dd commit cd6f5d2
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 175 deletions.
60 changes: 60 additions & 0 deletions ideal-common/src/main/java/ideal/sylph/common/base/Suppliers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ideal.sylph.common.base;

import java.io.Serializable;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

public class Suppliers
{
private Suppliers() {}

public static <T> Supplier<T> memoize(Supplier<T> delegate)
{
return delegate instanceof Suppliers.MemoizingSupplier ?
delegate :
new Suppliers.MemoizingSupplier<>(requireNonNull(delegate));
}

public static <T> Supplier<T> goLazy(Supplier<T> delegate)
{
return delegate instanceof Suppliers.MemoizingSupplier ?
delegate :
new Suppliers.MemoizingSupplier<>(requireNonNull(delegate));
}

static class MemoizingSupplier<T>
implements Supplier<T>, Serializable
{
private final Supplier<T> delegate;
private transient volatile boolean initialized = false;
private transient T value;
private static final long serialVersionUID = 0L;

MemoizingSupplier(Supplier<T> delegate)
{
this.delegate = delegate;
}

public T get()
{
if (!this.initialized) {
synchronized (this) {
if (!this.initialized) {
T t = this.delegate.get();
this.value = t;
this.initialized = true;
return t;
}
}
}

return this.value;
}

public String toString()
{
return "Suppliers.memoize(" + this.delegate + ")";
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableMap;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.exception.SylphException;
import ideal.sylph.spi.job.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,10 +19,10 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriInfo;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static ideal.sylph.spi.exception.StandardErrorCode.ILLEGAL_OPERATION;
import static java.util.Objects.requireNonNull;

@javax.inject.Singleton
Expand Down Expand Up @@ -82,19 +83,14 @@ public Map saveJob(@Context HttpServletRequest request)
public Map getJob(@QueryParam("jobId") String jobId)
{
requireNonNull(jobId, "jobId is null");
Optional<Job> job = sylphContext.getJob(jobId);
Optional<Job> jobOptional = sylphContext.getJob(jobId);
Job job = jobOptional.orElseThrow(() -> new SylphException(ILLEGAL_OPERATION, "job " + jobId + " not found"));

final Map<String, Object> out = new HashMap<>();
if (job.isPresent()) {
out.put("graph", job.get().getFlow());
out.put("msg", "获取任务成功");
out.put("status", "ok");
}
else {
out.put("msg", "jobid:" + jobId + "不存在");
out.put("status", "error");
}
out.put("jobId", jobId);
return ImmutableMap.copyOf(out);
return ImmutableMap.builder()
.put("graph", job.getFlow())
.put("msg", "获取任务成功")
.put("status", "ok")
.put("jobId", jobId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableMap;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.exception.SylphException;
import ideal.sylph.spi.job.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,10 +19,15 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriInfo;

import java.util.HashMap;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static ideal.sylph.spi.exception.StandardErrorCode.ILLEGAL_OPERATION;
import static java.util.Objects.requireNonNull;

@javax.inject.Singleton
Expand Down Expand Up @@ -53,6 +59,8 @@ public Map saveJob(@Context HttpServletRequest request)
try {
String jobId = requireNonNull(request.getParameter("jobId"), "job jobId 不能为空");
String flow = request.getParameter("query");
String config = request.getParameter("config");

sylphContext.saveJob(jobId, flow, "StreamSql");
Map out = ImmutableMap.of(
"jobId", jobId,
Expand All @@ -61,15 +69,15 @@ public Map saveJob(@Context HttpServletRequest request)
"msg", "编译过程:..."
);
logger.info("save job {}", jobId);
return ImmutableMap.copyOf(out);
return out;
}
catch (Exception e) {
Map out = ImmutableMap.of("type", "save",
"status", "error",
"msg", "任务创建失败: " + e.toString()
);
logger.warn("job 创建失败", e);
return ImmutableMap.copyOf(out);
return out;
}
}

Expand All @@ -82,19 +90,21 @@ public Map saveJob(@Context HttpServletRequest request)
public Map getJob(@QueryParam("jobId") String jobId)
{
requireNonNull(jobId, "jobId is null");
Optional<Job> job = sylphContext.getJob(jobId);
Optional<Job> jobOptional = sylphContext.getJob(jobId);
Job job = jobOptional.orElseThrow(() -> new SylphException(ILLEGAL_OPERATION, "job " + jobId + " not found"));

final Map<String, Object> out = new HashMap<>();
if (job.isPresent()) {
out.put("graph", job.get().getFlow());
out.put("msg", "获取任务成功");
out.put("status", "ok");
}
else {
out.put("msg", "jobid:" + jobId + "不存在");
out.put("status", "error");
}
out.put("jobId", jobId);
return ImmutableMap.copyOf(out);
File userFilesDir = new File(job.getWorkDir(), "files");
File[] userFiles = userFilesDir.listFiles();
List<String> files = userFilesDir.exists() && userFiles != null ?
Arrays.stream(userFiles).map(File::getName).collect(Collectors.toList())
: Collections.emptyList();

return ImmutableMap.of(
"graph", job.getFlow(),
"msg", "获取任务成功",
"status", "ok",
"files", files,
"jobId", jobId
);
}
}
4 changes: 2 additions & 2 deletions sylph-dist/src/webapps/js/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ $(function () {
$(document).on("click", ".btn_edit", function () {
var id = $(this).attr("data-id");
var type = $(this).attr("data-type");
if (type == 'flink_sql') {
window.location.href = "edit.html?type=edit&jobId=" + id;
if (type == 'StreamSql') {
window.location.href = "stream_sql.html?type=edit&jobId=" + id;
} else {
window.location.href = "etl.html?jobId=" + id;
}
Expand Down
7 changes: 2 additions & 5 deletions sylph-dist/src/webapps/js/stream_sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ $(function () {
data: {},
cache: false,
success: function (result) {
$("textarea[name=name]").val(result.jobId);
$("textarea[name=source]").val(JSON.stringify(result.dag.source, null, 2));
$("textarea[name=transform]").val(result.dag.transform);
$("textarea[name=sink]").val(result.dag.sink);
$("textarea[name=jobId]").val(result.jobId);
$("textarea[name=query]").val(result.graph.flowString);
var files = result.files;
for(var i = 0; i < files.length; i++) {
$('#fileList').append(
Expand All @@ -64,7 +62,6 @@ $(function () {
processData: false,
contentType: false
}).done(function(data) {
var data = JSON.parse(data);
if (data.status == "ok") {
alert("保存成功");
window.location.href = "index.html";
Expand Down
8 changes: 4 additions & 4 deletions sylph-dist/src/webapps/stream_sql.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
</style>
</head>
<body>
<h2>任务编辑</h2>
<h2>StreamSql</h2>
<div style="margin: 0 auto;width:1000px;">
<form class="form-horizontal" enctype="multipart/form-data">
<input class="form-control" rows="1" name="type" style="display: none" value="add"/>
<div class="form-group">
<label for="inputEmail3" class="col-sm-2 control-label">Name</label>
<label for="inputEmail3" class="col-sm-2 control-label">jobId</label>
<div class="col-sm-10">
<textarea class="form-control" rows="1" name="jobId"></textarea>
</div>
Expand All @@ -41,8 +41,8 @@ <h2>任务编辑</h2>
<i class="fa fa-cog" onclick="openConfigSetLayer();"></i>
</div>
<div class="col-sm-2">
<button id="submit" type="button" class="btn btn-primary">保存</button>
<button type="button" class="btn btn-primary" onclick="window.location.href='./index.html'">返回</button>
<button id="submit" type="button" class="btn btn-primary">Save</button>
<button type="button" class="btn btn-primary" onclick="window.location.href='./index.html'">取消</button>
</div>
</div>
<div id="upload-files" style="display: none">
Expand Down
Loading

0 comments on commit cd6f5d2

Please sign in to comment.