diff --git a/build.gradle b/build.gradle index d13b82ca1..26d568511 100644 --- a/build.gradle +++ b/build.gradle @@ -104,4 +104,19 @@ subprojects { //assemble.dependsOn 'licenseMain','licenseTest' //licenseMain.includes //license.mapping('javascript', 'JAVADOC_STYLE') + + task sourcesJar(type: Jar, dependsOn: classes) { + classifier = 'sources' + from sourceSets.main.allSource + } + + task javadocJar(type: Jar, dependsOn: javadoc) { + classifier = 'javadoc' + from javadoc.destinationDir + //javadoc.failOnError = false + } + + artifacts { + archives sourcesJar, javadocJar + } } diff --git a/settings.gradle b/settings.gradle index e1f5d5a39..d8850fb81 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,11 +28,13 @@ include 'sylph-connectors:sylph-hdfs' include 'sylph-connectors:sylph-kafka09' include 'sylph-connectors:sylph-hbase' include 'sylph-connectors:sylph-elasticsearch6' - +include 'sylph-connectors:sylph-elasticsearch5' +include 'sylph-connectors:sylph-clickhouse' //---- include 'sylph-dist' include 'sylph-parser' include 'sylph-docs' include 'sylph-yarn' - +//include 'sylph-clickhouse' +//include 'sylph-elasticsearch5' diff --git a/sylph-connectors/sylph-clickhouse/build.gradle b/sylph-connectors/sylph-clickhouse/build.gradle new file mode 100644 index 000000000..4f4520160 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/build.gradle @@ -0,0 +1,9 @@ +dependencies { + compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { + exclude(module: 'flink-shaded-hadoop2') + } + compileOnly group: 'org.apache.flink', name: 'flink-table_2.11', version: deps.flink + compileOnly group: 'org.slf4j', name: 'slf4j-api', version: deps.log4j12 + compile group: 'com.github.housepower', name: 'clickhouse-native-jdbc', version: '1.5-stable' + +} diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java new file mode 100644 index 000000000..28bd11c87 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.clickhouse; + +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkState; + +@Name("ClickHouseSink") +@Description("this is ClickHouseSink sink plugin") +public class ClickHouseSink + implements RealTimeSink +{ + private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class); + + private final ClickHouseSinkConfig config; + private final String prepareStatementQuery; + private final Row.Schema schema; + private int idIndex = -1; + private transient Connection connection; + private transient PreparedStatement statement; + private int num = 0; + private final Map nametypes; + + public ClickHouseSink(SinkContext context, ClickHouseSinkConfig clickHouseSinkConfig) + { + this.config = clickHouseSinkConfig; + checkState(config.getQuery() != null, "insert into query not setting"); + this.prepareStatementQuery = config.getQuery().replaceAll("\\$\\{.*?}", "?"); + schema = context.getSchema(); + Map nt = new HashMap(); + for (int i = 0; i < schema.getFieldNames().size(); i++) { + nt.put(schema.getFieldNames().get(i), schema.getFieldTypes().get(i).toString().split(" ")[1]); + } + this.nametypes = nt; + } + + @Override + public void process(Row row) + { + int ith = 1; + try { + for (String fieldName : schema.getFieldNames()) { + //Byte Double String Date Long ..... + if (nametypes.get(fieldName).equals("java.sql.Date")) { + statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString())); + } + else if ((nametypes.get(fieldName).equals("java.lang.Long"))) { + statement.setLong(ith, row.getAs(fieldName)); + } + else if ((nametypes.get(fieldName).equals("java.lang.Double"))) { + statement.setDouble(ith, row.getAs(fieldName)); + } + else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) { + statement.setByte(ith, Byte.valueOf(row.getAs(fieldName))); + } + else { + statement.setString(ith, row.getAs(fieldName)); + } + ith += 1; + } + statement.addBatch(); + if (num++ >= config.bulkSize) { + statement.executeBatch(); + num = 0; + } + } + catch (SQLException e) { + e.printStackTrace(); + } + } + + @Override + public boolean open(long partitionId, long version) + throws SQLException, ClassNotFoundException + { + Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); + this.connection = DriverManager.getConnection(config.jdbcUrl, config.user, config.password); + this.statement = connection.prepareStatement(prepareStatementQuery); + return true; + } + + @Override + public void close(Throwable errorOrNull) + { + try (Connection conn = connection) { + try (Statement stmt = statement) { + if (stmt != null) { + stmt.executeBatch(); + } + } + catch (SQLException e) { + logger.error("close executeBatch fail", e); + } + } + catch (SQLException e) { + logger.error("close connection fail", e); + } + } + + public static class ClickHouseSinkConfig + extends PluginConfig + { + @Name("url") + @Description("this is ck jdbc url") + private String jdbcUrl = "jdbc:clickhouse://localhost:9000"; + + @Name("userName") + @Description("this is ck userName") + private String user = "default"; + + @Name("password") + @Description("this is ck password") + private String password = "default"; + + @Name("query") + @Description("this is ck save query") + private String query = null; + + @Name("bulkSize") + @Description("this is ck bulkSize") + private int bulkSize = 20000; + + @Name("eventDate_field") + @Description("this is your data eventDate_field, 必须是 YYYY-mm--dd位时间戳") + private String eventTimeName; + + public String getJdbcUrl() + { + return jdbcUrl; + } + + public String getUser() + { + return user; + } + + public String getPassword() + { + return password; + } + + public String getQuery() + { + return query; + } + } +} diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java new file mode 100644 index 000000000..fb35392e8 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.clickhouse; + +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.annotation.Version; +import ideal.sylph.etl.api.Source; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; +import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +/** + * test source + **/ +@Name("testCK") +@Description("this flink test source inputStream") +@Version("1.0.0") +public class TestCKSource + implements Source> +{ + private static final long serialVersionUID = 2L; + private static final Logger logger = LoggerFactory.getLogger(TestCKSource.class); + private final transient Supplier> loadStream; + + public TestCKSource(StreamExecutionEnvironment execEnv) + { + this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource())); + } + + @Override + public DataStream getSource() + { + return loadStream.get(); + } + + public static class MyDataSource + extends RichParallelSourceFunction + implements ResultTypeQueryable + { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private volatile boolean running = true; + + @Override + public void run(SourceContext sourceContext) + throws Exception + { + Random random = new Random(1000000); + int numKeys = 10; + while (running) { + java.time.LocalDate date = java.time.LocalDate.now(); + java.sql.Date now = java.sql.Date.valueOf(date); + String msg = "https://github.com/harbby/sylph/" + random.nextLong(); + Row row = Row.of("github.com" + random.nextLong(), msg, now); + sourceContext.collect(row); + } + } + + @Override + public TypeInformation getProducedType() + { + TypeInformation[] types = new TypeInformation[] { + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(java.sql.Date.class) + }; + + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "mes_time"}); + return rowTypeInfo; + } + + @Override + public void cancel() + { + running = false; + } + + @Override + public void close() + throws Exception + { + this.cancel(); + super.close(); + } + } +} diff --git a/sylph-connectors/sylph-elasticsearch5/build.gradle b/sylph-connectors/sylph-elasticsearch5/build.gradle new file mode 100644 index 000000000..b17d19810 --- /dev/null +++ b/sylph-connectors/sylph-elasticsearch5/build.gradle @@ -0,0 +1,28 @@ +plugins { + id "com.github.johnrengelman.shadow" version "4.0.3" +} + +dependencies { + shadow group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0' + compile 'org.elasticsearch.client:transport:5.6.0' +} + +shadowJar { + baseName = project.name + classifier = 'shaded' + version = project.version + + configurations = [project.configurations.compile] + + dependencies { + exclude(dependency('junit:junit:')) + } + + //relocate 'com.google.protobuf', 'shaded.com.google.protobuf' + relocate 'com.google.common', 'shaded.elasticsearch6.com.google.common' + relocate 'io.netty', 'shaded.elasticsearch5.io.netty' + relocate 'io.netty', 'shaded.elasticsearch5.io.netty' + relocate 'org.apache.logging', 'shaded.elasticsearch5.org.apache.logging' +} +assemble.dependsOn shadowJar +buildPlugins.dependsOn shadowJar \ No newline at end of file diff --git a/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java new file mode 100644 index 000000000..85752cfb1 --- /dev/null +++ b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.elasticsearch5; + +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState; + +@Name("elasticsearch5") +@Description("this is elasticsearch5 sink plugin") +public class Elasticsearch5Sink + implements RealTimeSink +{ + private static final int MAX_BATCH_BULK = 50; + private final Row.Schema schema; + private final ElasticsearchSinkConfig config; + + private TransportClient client; + private int idIndex = -1; + private final AtomicInteger cnt = new AtomicInteger(0); + private BulkRequestBuilder bulkBuilder; + + public Elasticsearch5Sink(SinkContext context, ElasticsearchSinkConfig config) + { + schema = context.getSchema(); + this.config = config; + if (!Strings.isNullOrEmpty(config.idField)) { + int fieldIndex = schema.getFieldIndex(config.idField); + checkState(fieldIndex != -1, config.idField + " does not exist, only " + schema.getFields()); + this.idIndex = fieldIndex; + } + if (config.update) { + checkState(idIndex != -1, "This is Update mode, `id_field` must be set"); + } + } + + @Override + public void process(Row value) + { + Map map = new HashMap<>(); + for (String fieldName : schema.getFieldNames()) { + map.put(fieldName, value.getAs(fieldName)); + } + if (config.update) { //is update + Object id = value.getAs(idIndex); + if (id == null) { + return; + } + UpdateRequestBuilder requestBuilder = client.prepareUpdate(config.index, config.type, id.toString()); + requestBuilder.setDoc(map); + requestBuilder.setDocAsUpsert(true); + bulkBuilder.add(requestBuilder.request()); + } + else { + IndexRequestBuilder requestBuilder = client.prepareIndex(config.index, config.type); + if (idIndex != -1) { + Object id = value.getAs(idIndex); + if (id != null) { + requestBuilder.setId(id.toString()); + } + } + + requestBuilder.setSource(map); + bulkBuilder.add(requestBuilder.request()); + } + if (cnt.getAndIncrement() > MAX_BATCH_BULK) { + client.bulk(bulkBuilder.request()).actionGet(); + cnt.set(0); + bulkBuilder = client.prepareBulk(); + } + } + + @Override + public boolean open(long partitionId, long version) + throws Exception + { + String clusterName = config.clusterName; + String hosts = config.hosts; + Settings settings = Settings.builder().put("cluster.name", clusterName) + .put("client.transport.sniff", true).build(); + + TransportClient client = new PreBuiltTransportClient(settings); + for (String ip : hosts.split(",")) { + client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip.split(":")[0]), Integer.valueOf(ip.split(":")[1]))); + } + this.client = client; + this.bulkBuilder = client.prepareBulk(); + return true; + } + + @Override + public void close(Throwable errorOrNull) + { + try (TransportClient closeClient = client) { + if (bulkBuilder != null && closeClient != null) { + closeClient.bulk(bulkBuilder.request()); + } + } + } + + public static class ElasticsearchSinkConfig + extends PluginConfig + { + @Name("cluster_name") + @Description("this is es cluster name") + private String clusterName; + + @Name("cluster_hosts") + @Description("this is es cluster hosts") + private String hosts; + + @Name("es_index") + @Description("this is es index") + private String index; + + @Name("id_field") + @Description("this is es id_field") + private String idField; + + @Name("update") + @Description("update or insert") + private boolean update = false; + + @Name("index_type") + @Description("this is es index_type, Do not set") + private String type = "default"; + } +} diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java index 91c98c4d2..c0fdae1cf 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java @@ -124,9 +124,6 @@ public long getDataSize() return writer.getDataSize(); } - /** - * 入参list - */ @Override public void writeLine(List evalRow) { @@ -146,9 +143,6 @@ public void writeLine(List evalRow) } } - /** - * 入参list - */ @Override public void writeLine(Row row) { diff --git a/sylph-connectors/sylph-kafka09/build.gradle b/sylph-connectors/sylph-kafka09/build.gradle index b3a0879db..b7eac4a21 100644 --- a/sylph-connectors/sylph-kafka09/build.gradle +++ b/sylph-connectors/sylph-kafka09/build.gradle @@ -5,6 +5,8 @@ dependencies { //--table sql--- compileOnly group: 'org.apache.flink', name: 'flink-table_2.11', version: deps.flink - compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: deps.flink + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.9.0.1' + compile group: 'org.apache.curator', name: 'curator-framework', version: '2.12.0' + compile group: 'com.google.code.gson', name: 'gson', version: '2.2.4' } diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java new file mode 100644 index 000000000..56fa34b1e --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink; + +import com.google.gson.Gson; +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; +import ideal.sylph.plugins.kafka.flink.utils.KafkaProducer; +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState; + +@Name("kafka09") +@Description("this is kafka09 Sink plugin") +public class KafkaSink09 + implements RealTimeSink +{ + private static final Logger logger = LoggerFactory.getLogger(KafkaSink09.class); + private final Kafka09SinkConfig config; + private final Row.Schema schema; + private int idIndex = -1; + private KafkaProducer kafkaProducer; + private final String topic; + + public KafkaSink09(SinkContext context, Kafka09SinkConfig config) + { + schema = context.getSchema(); + if (!Strings.isNullOrEmpty(config.idField)) { + int fieldIndex = schema.getFieldIndex(config.idField); + checkState(fieldIndex != -1, config.idField + " does not exist, only " + schema.getFields()); + this.idIndex = fieldIndex; + } + this.config = config; + this.topic = config.topics; + } + + @Override + public void process(Row value) + { + Gson gson = new Gson(); + Map map = new HashMap<>(); + for (String fieldName : schema.getFieldNames()) { + map.put(fieldName, value.getAs(fieldName)); + } + String message = gson.toJson(map); + kafkaProducer.send(message); + } + + @Override + public boolean open(long partitionId, long version) + throws Exception + { + //config.zookeeper,config.brokers 至少一个 暂时 zookeeper + this.kafkaProducer = new KafkaProducer(config.zookeeper, config.topics); + return true; + } + + @Override + public void close(Throwable errorOrNull) + { + kafkaProducer.close(); + } + + public static class Kafka09SinkConfig + extends PluginConfig + { + private static final long serialVersionUID = 2L; + @Name("kafka_topic") + @Description("this is kafka topic list") + private String topics; + + @Name("kafka_broker") + @Description("this is kafka broker list") + private String brokers = "localhost:6667"; + + @Name("zookeeper.connect") + @Description("this is kafka zk list") + private String zookeeper; + + @Name("id_field") + @Description("this is kafka id_field") + private String idField; + } +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java new file mode 100644 index 000000000..ef9d7b862 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink.utils; + +public interface IProducer +{ + void send(String message); + + void close(); +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java new file mode 100644 index 000000000..73a611195 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink.utils; + +import com.google.common.base.Joiner; +import com.google.gson.Gson; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class KafkaProducer + implements IProducer +{ + private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); + + private String brokersString; + private String topic; + private String partitionKey = ""; + private org.apache.kafka.clients.producer.KafkaProducer producer; + + public KafkaProducer(String zkConnect, String topic) + { + this.topic = topic; + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework client = CuratorFrameworkFactory.newClient(zkConnect, retryPolicy); + client.start(); + + // Get the current kafka brokers and its IDs from ZK + List ids = Collections.emptyList(); + List hosts = new ArrayList<>(); + + try { + ids = client.getChildren().forPath("/brokers/ids"); + } + catch (Exception ex) { + log.error("Couldn't get brokers ids", ex); + } + + // Get the host and port from each of the brokers + for (String id : ids) { + String jsonString = null; + + try { + jsonString = new String(client.getData().forPath("/brokers/ids/" + id), "UTF-8"); + } + catch (Exception ex) { + log.error("Couldn't parse brokers data", ex); + } + + if (jsonString != null) { + try { + Gson gson = new Gson(); + Map json = gson.fromJson(jsonString, Map.class); + Double port = (Double) json.get("port"); + String host = json.get("host") + ":" + port.intValue(); + hosts.add(host); + } + catch (NullPointerException e) { + log.error("Failed converting a JSON tuple to a Map class", e); + } + } + } + + // Close the zookeeper connection + client.close(); + + brokersString = Joiner.on(',').join(hosts); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.RETRIES_CONFIG, "60"); + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10000"); + props.put(ProducerConfig.LINGER_MS_CONFIG, "500"); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "ideal.sylph.plugins.kafka.flink.utils.SimplePartitioner"); + + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + } + + @Override + public void close() + { + producer.close(); + } + + @Override + public void send(String message) + { + ProducerRecord record = new ProducerRecord<>(topic, message); + producer.send(record); + } +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java new file mode 100644 index 000000000..df780e2f2 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink.utils; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner + implements Partitioner +{ + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) + { + if (key != null) { + String stringKey = key.toString(); + int offset = stringKey.hashCode(); + return Math.abs(offset % cluster.partitionCountForTopic(topic)); + } + else { + return 0; + } + } + + @Override + public void close() + { + } + + @Override + public void configure(Map configs) + { + } +} diff --git a/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java b/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java index 6a7df18b3..b98a2803a 100644 --- a/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java +++ b/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java @@ -29,10 +29,10 @@ public class JdbcUtils private JdbcUtils() {} /** - * jdbc ResultSet to List + * jdbc ResultSet to List[Map] * * @param rs input jdbc ResultSet - * @return List + * @return List[Map] */ public static List> resultToList(ResultSet rs) throws SQLException diff --git a/sylph-controller/src/main/java/ideal/sylph/controller/action/JobMangerResurce.java b/sylph-controller/src/main/java/ideal/sylph/controller/action/JobManagerResurce.java similarity index 98% rename from sylph-controller/src/main/java/ideal/sylph/controller/action/JobMangerResurce.java rename to sylph-controller/src/main/java/ideal/sylph/controller/action/JobManagerResurce.java index 6672e9892..f4e9533bf 100644 --- a/sylph-controller/src/main/java/ideal/sylph/controller/action/JobMangerResurce.java +++ b/sylph-controller/src/main/java/ideal/sylph/controller/action/JobManagerResurce.java @@ -45,15 +45,15 @@ @javax.inject.Singleton @Path("/job_manger") -public class JobMangerResurce +public class JobManagerResurce { - private static final Logger logger = LoggerFactory.getLogger(JobMangerResurce.class); + private static final Logger logger = LoggerFactory.getLogger(JobManagerResurce.class); @Context private ServletContext servletContext; @Context private UriInfo uriInfo; private SylphContext sylphContext; - public JobMangerResurce( + public JobManagerResurce( @Context ServletContext servletContext, @Context UriInfo uriInfo) { diff --git a/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java b/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java index 1dba62bd3..9b040e623 100644 --- a/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java +++ b/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java @@ -26,9 +26,7 @@ private JsonFormatUtil() {} /** * 打印输入到控制台 * - * @param jsonStr - * @author lizhgb - * @Date 2015-10-14 下午1:17:22 + * @param jsonStr json text */ public static String printJson(String jsonStr) { @@ -38,10 +36,8 @@ public static String printJson(String jsonStr) /** * 格式化 * - * @param jsonStr - * @return - * @author lizhgb - * @Date 2015-10-14 下午1:17:35 + * @param jsonStr json text + * @return String */ public static String formatJson(String jsonStr) { @@ -85,14 +81,6 @@ public static String formatJson(String jsonStr) return sb.toString(); } - /** - * 添加space - * - * @param sb - * @param indent - * @author lizhgb - * @Date 2015-10-14 上午10:38:04 - */ private static void addIndentBlank(StringBuilder sb, int indent) { for (int i = 0; i < indent; i++) { diff --git a/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java b/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java index cc7a0e75d..f0ad5fdda 100644 --- a/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java +++ b/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java @@ -58,16 +58,16 @@ public static void main(String[] args) /*2 Initialize Guice Injector */ try { logger.info("========={} Bootstrap initialize...========", SylphMaster.class.getCanonicalName()); - IocFactory injector = IocFactory.create(sylphBean, + IocFactory app = IocFactory.create(sylphBean, binder -> binder.bind(ControllerApp.class).withSingle() ); - injector.getInstance(PipelinePluginLoader.class).loadPlugins(); - injector.getInstance(RunnerLoader.class).loadPlugins(); - injector.getInstance(JobStore.class).loadJobs(); + app.getInstance(PipelinePluginLoader.class).loadPlugins(); + app.getInstance(RunnerLoader.class).loadPlugins(); + app.getInstance(JobStore.class).loadJobs(); - injector.getInstance(JobManager.class).start(); - injector.getInstance(ControllerApp.class).start(); + app.getInstance(JobManager.class).start(); + app.getInstance(ControllerApp.class).start(); //ProcessHandle.current().pid() logger.info("\n" + logo); logger.info("======== SERVER STARTED this pid is {}========"); diff --git a/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java b/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java index 82cfbc90a..2f64a9859 100644 --- a/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java +++ b/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java @@ -33,14 +33,15 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static ideal.sylph.spi.exception.StandardErrorCode.ILLEGAL_OPERATION; import static ideal.sylph.spi.exception.StandardErrorCode.JOB_START_ERROR; -import static ideal.sylph.spi.job.Job.Status.KILLING; import static ideal.sylph.spi.job.Job.Status.RUNNING; import static ideal.sylph.spi.job.Job.Status.STARTED_ERROR; import static ideal.sylph.spi.job.Job.Status.STARTING; +import static ideal.sylph.spi.job.Job.Status.STOP; /** * JobManager @@ -54,53 +55,35 @@ public final class JobManager @Autowired private RunnerManager runnerManger; @Autowired private MetadataManager metadataManager; - private final ConcurrentMap runningContainers = new ConcurrentHashMap<>(); - - private volatile boolean run; + private final ConcurrentMap containers = new ConcurrentHashMap<>(); /** - * 用来做耗时的->任务启动提交到yarn的操作 + * Used to do time-consuming task submit operations */ private ExecutorService jobStartPool = Executors.newFixedThreadPool(MaxSubmitJobNum); private final Thread monitorService = new Thread(() -> { - while (run) { + while (true) { Thread.currentThread().setName("job_monitor"); - runningContainers.forEach((jobId, container) -> { - try { - Job.Status status = container.getStatus(); - switch (status) { - case STOP: { - jobStartPool.submit(() -> { - try { - Thread.currentThread().setName("job_submit_" + jobId); - logger.warn("Job {}[{}] Status is {}, Soon to start", jobId, - container.getRunId(), status); - container.setStatus(STARTING); - Optional runId = container.run(); - if (container.getStatus() == KILLING) { - container.shutdown(); - } - else { - container.setStatus(RUNNING); - runId.ifPresent(result -> metadataManager.addMetadata(jobId, result)); - } - } - catch (Exception e) { - container.setStatus(STARTED_ERROR); - logger.warn("job {} start error", jobId, e); - } - }); //需要重启 Job + containers.forEach((jobId, container) -> { + Job.Status status = container.getStatus(); + if (status == STOP) { + Future future = jobStartPool.submit(() -> { + try { + Thread.currentThread().setName("job_submit_" + jobId); + logger.warn("Job {}[{}] Status is {}, Soon to start", jobId, + container.getRunId(), status); + container.setStatus(STARTING); + Optional runId = container.run(); + container.setStatus(RUNNING); + runId.ifPresent(result -> metadataManager.addMetadata(jobId, result)); } - case RUNNING: - case STARTED_ERROR: - case STARTING: - case KILLING: - default: - } - } - catch (Exception e) { - logger.warn("Check job {} status error", jobId, e); + catch (Exception e) { + container.setStatus(STARTED_ERROR); + logger.warn("job {} start error", jobId, e); + } + }); + container.setFuture(future); } }); @@ -118,12 +101,12 @@ public final class JobManager */ public synchronized void startJob(String jobId) { - if (runningContainers.containsKey(jobId)) { + if (containers.containsKey(jobId)) { throw new SylphException(JOB_START_ERROR, "Job " + jobId + " already started"); } Job job = this.getJob(jobId).orElseThrow(() -> new SylphException(JOB_START_ERROR, "Job " + jobId + " not found with jobStore")); - runningContainers.computeIfAbsent(jobId, k -> runnerManger.createJobContainer(job, null)); - logger.info("runningContainers size:{}", runningContainers.size()); + containers.computeIfAbsent(jobId, k -> runnerManger.createJobContainer(job, null)); + logger.info("deploy job :{}", jobId); } /** @@ -132,8 +115,9 @@ public synchronized void startJob(String jobId) public synchronized void stopJob(String jobId) throws Exception { - JobContainer container = runningContainers.remove(jobId); + JobContainer container = containers.remove(jobId); if (container != null) { + logger.warn("job {} Cancel submission", jobId); metadataManager.removeMetadata(jobId); container.shutdown(); } @@ -147,7 +131,7 @@ public void saveJob(@NotNull Job job) public void removeJob(String jobId) throws IOException { - if (runningContainers.containsKey(jobId)) { + if (containers.containsKey(jobId)) { throw new SylphException(ILLEGAL_OPERATION, "Can only delete tasks that have been offline"); } jobStore.removeJob(jobId); @@ -176,15 +160,13 @@ public Collection listJobs() public void start() throws IOException { - this.run = true; monitorService.setDaemon(false); monitorService.start(); //--------- init read metadata job status --------------- Map metadatas = metadataManager.loadMetadata(); metadatas.forEach((jobId, jobInfo) -> this.getJob(jobId).ifPresent(job -> { JobContainer container = runnerManger.createJobContainer(job, jobInfo); - runningContainers.put(job.getId(), container); - logger.info("runningContainers size:{}", runningContainers.size()); + containers.put(job.getId(), container); })); } @@ -193,7 +175,7 @@ public void start() */ public Optional getJobContainer(@NotNull String jobId) { - return Optional.ofNullable(runningContainers.get(jobId)); + return Optional.ofNullable(containers.get(jobId)); } /** @@ -201,7 +183,7 @@ public Optional getJobContainer(@NotNull String jobId) */ public Optional getJobContainerWithRunId(@NotNull String runId) { - for (JobContainer container : runningContainers.values()) { + for (JobContainer container : containers.values()) { if (runId.equals(container.getRunId())) { return Optional.ofNullable(container); } diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java index 5e6a94daf..f65ebf9a3 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java @@ -30,7 +30,6 @@ import ideal.sylph.spi.job.JobContainer; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,20 +64,7 @@ public JobContainer getYarnContainer(Job job, String lastRunid) { FlinkYarnJobLauncher jobLauncher = yarnLauncher.get(); - JobContainer yarnJobContainer = new YarnJobContainer(jobLauncher.getYarnClient(), lastRunid) - { - @Override - public Optional run() - throws Exception - { - logger.info("Instantiating SylphFlinkJob {} at yarnId {}", job.getId()); - this.setYarnAppId(null); - ApplicationId applicationId = jobLauncher.start(job); - this.setYarnAppId(applicationId); - return Optional.of(applicationId.toString()); - } - }; - return YarnJobContainer.proxy(yarnJobContainer); + return YarnJobContainer.of(jobLauncher.getYarnClient(), lastRunid, () -> jobLauncher.start(job)); } @Override diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java index 62d1612e8..91b321f00 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java @@ -15,6 +15,7 @@ */ package ideal.sylph.runner.flink.yarn; +import com.github.harbby.gadtry.base.Throwables; import com.github.harbby.gadtry.ioc.Autowired; import ideal.sylph.runner.flink.FlinkJobConfig; import ideal.sylph.runner.flink.FlinkJobHandle; @@ -22,7 +23,6 @@ import ideal.sylph.runner.flink.actuator.JobParameter; import ideal.sylph.spi.job.Job; import org.apache.flink.api.common.JobID; -import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -38,10 +38,12 @@ import java.io.File; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.URI; import java.net.URL; import java.util.Collection; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -65,7 +67,7 @@ public YarnClient getYarnClient() return yarnClient; } - public ApplicationId start(Job job) + public Optional start(Job job) throws Exception { FlinkJobHandle jobHandle = (FlinkJobHandle) job.getJobHandle(); @@ -81,31 +83,35 @@ public ApplicationId start(Job job) JobGraph jobGraph = jobHandle.getJobGraph(); //todo: How to use `savepoints` to restore a job //jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("hdfs:///tmp/sylph/apps/savepoints")); - return start(descriptor, jobGraph).getClusterId(); + return start(descriptor, jobGraph); } - private ClusterClient start(YarnClusterDescriptor descriptor, JobGraph job) + private Optional start(YarnClusterDescriptor descriptor, JobGraph job) throws Exception { ApplicationId applicationId = null; try { - ClusterClient client = descriptor.deploy(); //create app master + ClusterClient client = descriptor.deploy(); //create yarn appMaster applicationId = client.getClusterId(); - ClusterSpecification specification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(1024) - .setNumberTaskManagers(2) - .setSlotsPerTaskManager(2) - .setTaskManagerMemoryMB(1024) - .createClusterSpecification(); - client.runDetached(job, null); //submit graph to yarn appMaster 并运行分离 + client.runDetached(job, null); //submit graph to appMaster 并分离 stopAfterJob(client, job.getJobID()); - return client; + client.shutdown(); + return Optional.of(applicationId); } catch (Exception e) { if (applicationId != null) { yarnClient.killApplication(applicationId); } - throw e; + Thread thread = Thread.currentThread(); + if (e instanceof InterruptedIOException || + thread.isInterrupted() || + Throwables.getRootCause(e) instanceof InterruptedException) { + logger.warn("job {} Canceled submission", job.getJobID()); + return Optional.empty(); + } + else { + throw e; + } } finally { //Clear temporary directory @@ -114,6 +120,7 @@ private ClusterClient start(YarnClusterDescriptor descriptor, Job FileSystem hdfs = FileSystem.get(clusterConf.yarnConf()); Path appDir = new Path(clusterConf.appRootDir(), applicationId.toString()); hdfs.delete(appDir, true); + logger.info("clear tmp dir: {}", appDir); } } catch (IOException e) { diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java index 03f593dca..4ae7df1d9 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java @@ -131,27 +131,23 @@ public YarnClient getYarnClient() } public ClusterClient deploy() + throws Exception { - try { - YarnClientApplication application = yarnClient.createApplication(); - ApplicationReport report = startAppMaster(application); - - Configuration flinkConfiguration = getFlinkConfiguration(); - flinkConfiguration.setString(JobManagerOptions.ADDRESS.key(), report.getHost()); - flinkConfiguration.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort()); - - flinkConfiguration.setString(RestOptions.ADDRESS, report.getHost()); - flinkConfiguration.setInteger(RestOptions.PORT, report.getRpcPort()); - - //return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()).getMaxSlots(); - return new YarnClusterClient(this, - appConf.getTaskManagerCount(), - appConf.getTaskManagerSlots(), - report, clusterConf.flinkConfiguration(), false); - } - catch (Exception e) { - throw new RuntimeException(e); - } + YarnClientApplication application = yarnClient.createApplication(); + ApplicationReport report = startAppMaster(application); + + Configuration flinkConfiguration = getFlinkConfiguration(); + flinkConfiguration.setString(JobManagerOptions.ADDRESS.key(), report.getHost()); + flinkConfiguration.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort()); + + flinkConfiguration.setString(RestOptions.ADDRESS, report.getHost()); + flinkConfiguration.setInteger(RestOptions.PORT, report.getRpcPort()); + + //return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()).getMaxSlots(); + return new YarnClusterClient(this, + appConf.getTaskManagerCount(), + appConf.getTaskManagerSlots(), + report, clusterConf.flinkConfiguration(), false); } private ApplicationReport startAppMaster(YarnClientApplication application) diff --git a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java index 53d6823f3..da5941549 100644 --- a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java +++ b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java @@ -26,13 +26,11 @@ import ideal.sylph.spi.job.ContainerFactory; import ideal.sylph.spi.job.Job; import ideal.sylph.spi.job.JobContainer; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.StreamingContext; -import java.util.Optional; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; @@ -50,20 +48,8 @@ public class SparkContainerFactory public JobContainer getYarnContainer(Job job, String lastRunid) { SparkAppLauncher appLauncher = yarnLauncher.get(); - final JobContainer yarnJobContainer = new YarnJobContainer(appLauncher.getYarnClient(), lastRunid) - { - @Override - public Optional run() - throws Exception - { - this.setYarnAppId(null); - ApplicationId yarnAppId = appLauncher.run(job); - this.setYarnAppId(yarnAppId); - return Optional.of(yarnAppId.toString()); - } - }; //----create JobContainer Proxy - return YarnJobContainer.proxy(yarnJobContainer); + return YarnJobContainer.of(appLauncher.getYarnClient(), lastRunid, () -> appLauncher.run(job)); } @Override diff --git a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java index 391b1e594..8731777c5 100644 --- a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java +++ b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java @@ -16,8 +16,9 @@ package ideal.sylph.runner.spark.yarn; import com.github.harbby.gadtry.base.Serializables; +import com.github.harbby.gadtry.base.Throwables; +import com.github.harbby.gadtry.ioc.Autowired; import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; import ideal.sylph.runner.spark.SparkJobHandle; import ideal.sylph.spi.job.Job; import org.apache.commons.lang3.StringUtils; @@ -27,17 +28,22 @@ import org.apache.spark.deploy.yarn.Client; import org.apache.spark.deploy.yarn.ClientArguments; import org.apache.spark.ideal.deploy.yarn.SylphSparkYarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class SparkAppLauncher { - @Inject private YarnClient yarnClient; + private static final Logger logger = LoggerFactory.getLogger(SparkAppLauncher.class); + + @Autowired private YarnClient yarnClient; private static final String sparkHome = System.getenv("SPARK_HOME"); public YarnClient getYarnClient() @@ -45,7 +51,7 @@ public YarnClient getYarnClient() return yarnClient; } - public ApplicationId run(Job job) + public Optional run(Job job) throws Exception { System.setProperty("SPARK_YARN_MODE", "true"); @@ -63,7 +69,19 @@ public ApplicationId run(Job job) ClientArguments clientArguments = new ClientArguments(args); // spark-2.0.0 //yarnClient.getConfig().iterator().forEachRemaining(x -> sparkConf.set("spark.hadoop." + x.getKey(), x.getValue())); Client appClient = new SylphSparkYarnClient(clientArguments, sparkConf, yarnClient); - return appClient.submitApplication(); + try { + return Optional.of(appClient.submitApplication()); + } + catch (Exception e) { + Thread thread = Thread.currentThread(); + if (thread.isInterrupted() || Throwables.getRootCause(e) instanceof InterruptedException) { + logger.warn("job {} Canceled submission", job.getId()); + return Optional.empty(); + } + else { + throw e; + } + } } private static void setDistJars(Job job, SparkConf sparkConf) diff --git a/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java b/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java index d16ed5bda..dd10840d4 100644 --- a/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java +++ b/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java @@ -27,8 +27,6 @@ import java.lang.reflect.Field; -import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY; - public class SylphSparkYarnClient extends Client { @@ -37,7 +35,7 @@ public SylphSparkYarnClient(ClientArguments clientArgs, SparkConf spConf, YarnCl throws NoSuchFieldException, IllegalAccessException { super(clientArgs, spConf); - String key = DRIVER_MEMORY; //test + //String key = DRIVER_MEMORY; //test Field field = this.getClass().getSuperclass().getDeclaredField("org$apache$spark$deploy$yarn$Client$$hadoopConf"); field.setAccessible(true); YarnConfiguration yarnConfiguration = new YarnConfiguration(yarnClient.getConfig()); diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java index 72abc8f9e..af710317b 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java @@ -52,8 +52,7 @@ public enum Status RUNNING(0), //运行中 STARTING(1), // 启动中 STOP(2), // 停止运行 - STARTED_ERROR(3), // 启动失败 - KILLING(4); // Killing job + STARTED_ERROR(3); // 启动失败 private final int status; diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java index 06ad5c328..5e277391b 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java @@ -18,6 +18,7 @@ import javax.validation.constraints.NotNull; import java.util.Optional; +import java.util.concurrent.Future; /** * Job Container @@ -44,6 +45,8 @@ Optional run() */ void shutdown(); + void setFuture(Future future); + /** * 获取job的状态 */ diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java deleted file mode 100644 index 8b77ab1ce..000000000 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (C) 2018 The Sylph Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ideal.sylph.spi.job; - -import javax.validation.constraints.NotNull; - -import static ideal.sylph.spi.job.Job.Status.RUNNING; -import static ideal.sylph.spi.job.Job.Status.STOP; -import static java.util.Objects.requireNonNull; - -public abstract class JobContainerAbs - implements JobContainer -{ - private volatile Job.Status status = STOP; - - @Override - public synchronized void setStatus(Job.Status status) - { - this.status = requireNonNull(status, "status is null"); - } - - @NotNull - @Override - public synchronized Job.Status getStatus() - { - if (status == RUNNING) { - return isRunning() ? RUNNING : STOP; - } - return status; - } - - public abstract boolean isRunning(); -} diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java b/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java index 2bc29331e..cdce7ef99 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java @@ -16,14 +16,14 @@ package ideal.sylph.spi.utils; import com.fasterxml.jackson.core.type.TypeReference; -import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; +import com.github.harbby.gadtry.base.JavaType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; /** * demo: - * Map config = MAPPER.readValue(json, new GenericTypeReference(Map.class, String.class, Object.class)); + * Map[String, Object] config = MAPPER.readValue(json, new GenericTypeReference(Map.class, String.class, Object.class)); */ public class GenericTypeReference extends TypeReference @@ -33,7 +33,8 @@ public class GenericTypeReference public GenericTypeReference(Class rawType, Type... typeArguments) { //this.type = new MoreTypes.ParameterizedTypeImpl(null, rawType, typeArguments); - this.type = ParameterizedTypeImpl.make(rawType, typeArguments, null); + //sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl.make(rawType, typeArguments, null); + this.type = JavaType.make(rawType, typeArguments, null); } @Override diff --git a/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java b/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java index fc9824427..afbe503d0 100644 --- a/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java +++ b/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java @@ -26,13 +26,14 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class LocalContainer implements JobContainer { private static final Logger logger = LoggerFactory.getLogger(LocalContainer.class); - private final ExecutorService pool = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final JVMLaunchers.VmBuilder vmBuilder; @@ -74,7 +75,7 @@ public String getRunId() public synchronized Optional run() throws Exception { - pool.submit(() -> { + executor.submit(() -> { launcher.startAndGet(); return true; }); @@ -91,6 +92,11 @@ public void shutdown() } } + @Override + public void setFuture(Future future) + { + } + @Override public Job.Status getStatus() { diff --git a/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java b/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java index 620fbbc84..a2d41c9d1 100644 --- a/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java +++ b/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java @@ -18,8 +18,8 @@ import com.github.harbby.gadtry.aop.AopFactory; import com.github.harbby.gadtry.classloader.ThreadContextClassLoader; import ideal.sylph.spi.exception.SylphException; +import ideal.sylph.spi.job.Job; import ideal.sylph.spi.job.JobContainer; -import ideal.sylph.spi.job.JobContainerAbs; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -31,20 +31,29 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import static ideal.sylph.spi.exception.StandardErrorCode.CONNECTION_ERROR; -import static ideal.sylph.spi.job.Job.Status.KILLING; import static ideal.sylph.spi.job.Job.Status.RUNNING; +import static ideal.sylph.spi.job.Job.Status.STOP; +import static java.util.Objects.requireNonNull; -public abstract class YarnJobContainer - extends JobContainerAbs +public class YarnJobContainer + implements JobContainer { private static final Logger logger = LoggerFactory.getLogger(YarnJobContainer.class); private ApplicationId yarnAppId; private YarnClient yarnClient; + private volatile Job.Status status = STOP; + private volatile Future future; - protected YarnJobContainer(YarnClient yarnClient, String jobInfo) + private final Callable> runnable; + + private YarnJobContainer(YarnClient yarnClient, String jobInfo, Callable> runnable) { + this.runnable = runnable; this.yarnClient = yarnClient; if (jobInfo != null) { this.yarnAppId = Apps.toAppID(jobInfo); @@ -55,8 +64,11 @@ protected YarnJobContainer(YarnClient yarnClient, String jobInfo) @Override public synchronized void shutdown() { + if (!future.isDone() && !future.isCancelled()) { + future.cancel(true); + } + try { - this.setStatus(KILLING); if (yarnAppId != null) { yarnClient.killApplication(yarnAppId); } @@ -66,6 +78,16 @@ public synchronized void shutdown() } } + @Override + public Optional run() + throws Exception + { + this.setYarnAppId(null); + Optional applicationId = runnable.call(); + applicationId.ifPresent(this::setYarnAppId); + return applicationId.map(ApplicationId::toString); + } + @Override public String getRunId() { @@ -82,13 +104,6 @@ public ApplicationId getYarnAppId() return yarnAppId; } - @Override - public boolean isRunning() - { - YarnApplicationState yarnAppStatus = getYarnAppStatus(yarnAppId); - return YarnApplicationState.ACCEPTED.equals(yarnAppStatus) || YarnApplicationState.RUNNING.equals(yarnAppStatus); - } - @Override public String getJobUrl() { @@ -101,35 +116,59 @@ public String getJobUrl() } } + @Override + public synchronized void setStatus(Job.Status status) + { + this.status = requireNonNull(status, "status is null"); + } + + @Override + public synchronized Job.Status getStatus() + { + if (status == RUNNING) { + return isRunning() ? RUNNING : STOP; + } + return status; + } + + @Override + public void setFuture(Future future) + { + this.future = future; + } + /** * 获取yarn Job运行情况 */ - private YarnApplicationState getYarnAppStatus(ApplicationId applicationId) + private boolean isRunning() { try { - ApplicationReport app = yarnClient.getApplicationReport(applicationId); //获取某个指定的任务 - return app.getYarnApplicationState(); + ApplicationReport app = yarnClient.getApplicationReport(getYarnAppId()); //获取某个指定的任务 + YarnApplicationState state = app.getYarnApplicationState(); + return YarnApplicationState.ACCEPTED.equals(state) || YarnApplicationState.RUNNING.equals(state); } catch (ApplicationNotFoundException e) { //app 不存在与yarn上面 - return null; + return false; } catch (YarnException | IOException e) { throw new SylphException(CONNECTION_ERROR, e); } } - public static JobContainer proxy(JobContainer yarnJobContainer) + public static JobContainer of(YarnClient yarnClient, String jobInfo, Callable> runnable) { + JobContainer container = new YarnJobContainer(yarnClient, jobInfo, runnable); + //----create JobContainer Proxy return AopFactory.proxy(JobContainer.class) - .byInstance(yarnJobContainer) + .byInstance(container) .around(proxyContext -> { /* * 通过这个 修改当前YarnClient的ClassLoader的为当前sdk的加载器 * 默认hadoop Configuration使用jvm的AppLoader,会出现 akka.version not setting的错误 原因是找不到akka相关jar包 * 原因是hadoop Configuration 初始化: this.classLoader = Thread.currentThread().getContextClassLoader(); * */ - try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(yarnJobContainer.getClass().getClassLoader())) { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(YarnJobContainer.class.getClassLoader())) { proxyContext.proceed(); } });