Skip to content

Commit

Permalink
Merge pull request #48 from harbby/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
harbby authored Dec 27, 2018
2 parents 7693515 + 3b392de commit aa9cc01
Show file tree
Hide file tree
Showing 30 changed files with 985 additions and 229 deletions.
15 changes: 15 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,19 @@ subprojects {
//assemble.dependsOn 'licenseMain','licenseTest'
//licenseMain.includes
//license.mapping('javascript', 'JAVADOC_STYLE')

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
//javadoc.failOnError = false
}

artifacts {
archives sourcesJar, javadocJar
}
}
6 changes: 4 additions & 2 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ include 'sylph-connectors:sylph-hdfs'
include 'sylph-connectors:sylph-kafka09'
include 'sylph-connectors:sylph-hbase'
include 'sylph-connectors:sylph-elasticsearch6'

include 'sylph-connectors:sylph-elasticsearch5'
include 'sylph-connectors:sylph-clickhouse'
//----
include 'sylph-dist'
include 'sylph-parser'
include 'sylph-docs'
include 'sylph-yarn'

//include 'sylph-clickhouse'
//include 'sylph-elasticsearch5'

9 changes: 9 additions & 0 deletions sylph-connectors/sylph-clickhouse/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dependencies {
compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) {
exclude(module: 'flink-shaded-hadoop2')
}
compileOnly group: 'org.apache.flink', name: 'flink-table_2.11', version: deps.flink
compileOnly group: 'org.slf4j', name: 'slf4j-api', version: deps.log4j12
compile group: 'com.github.housepower', name: 'clickhouse-native-jdbc', version: '1.5-stable'

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.clickhouse;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkState;

@Name("ClickHouseSink")
@Description("this is ClickHouseSink sink plugin")
public class ClickHouseSink
implements RealTimeSink
{
private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);

private final ClickHouseSinkConfig config;
private final String prepareStatementQuery;
private final Row.Schema schema;
private int idIndex = -1;
private transient Connection connection;
private transient PreparedStatement statement;
private int num = 0;
private final Map<String, String> nametypes;

public ClickHouseSink(SinkContext context, ClickHouseSinkConfig clickHouseSinkConfig)
{
this.config = clickHouseSinkConfig;
checkState(config.getQuery() != null, "insert into query not setting");
this.prepareStatementQuery = config.getQuery().replaceAll("\\$\\{.*?}", "?");
schema = context.getSchema();
Map<String, String> nt = new HashMap<String, String>();
for (int i = 0; i < schema.getFieldNames().size(); i++) {
nt.put(schema.getFieldNames().get(i), schema.getFieldTypes().get(i).toString().split(" ")[1]);
}
this.nametypes = nt;
}

@Override
public void process(Row row)
{
int ith = 1;
try {
for (String fieldName : schema.getFieldNames()) {
//Byte Double String Date Long .....
if (nametypes.get(fieldName).equals("java.sql.Date")) {
statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString()));
}
else if ((nametypes.get(fieldName).equals("java.lang.Long"))) {
statement.setLong(ith, row.getAs(fieldName));
}
else if ((nametypes.get(fieldName).equals("java.lang.Double"))) {
statement.setDouble(ith, row.getAs(fieldName));
}
else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) {
statement.setByte(ith, Byte.valueOf(row.getAs(fieldName)));
}
else {
statement.setString(ith, row.getAs(fieldName));
}
ith += 1;
}
statement.addBatch();
if (num++ >= config.bulkSize) {
statement.executeBatch();
num = 0;
}
}
catch (SQLException e) {
e.printStackTrace();
}
}

@Override
public boolean open(long partitionId, long version)
throws SQLException, ClassNotFoundException
{
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
this.connection = DriverManager.getConnection(config.jdbcUrl, config.user, config.password);
this.statement = connection.prepareStatement(prepareStatementQuery);
return true;
}

@Override
public void close(Throwable errorOrNull)
{
try (Connection conn = connection) {
try (Statement stmt = statement) {
if (stmt != null) {
stmt.executeBatch();
}
}
catch (SQLException e) {
logger.error("close executeBatch fail", e);
}
}
catch (SQLException e) {
logger.error("close connection fail", e);
}
}

public static class ClickHouseSinkConfig
extends PluginConfig
{
@Name("url")
@Description("this is ck jdbc url")
private String jdbcUrl = "jdbc:clickhouse://localhost:9000";

@Name("userName")
@Description("this is ck userName")
private String user = "default";

@Name("password")
@Description("this is ck password")
private String password = "default";

@Name("query")
@Description("this is ck save query")
private String query = null;

@Name("bulkSize")
@Description("this is ck bulkSize")
private int bulkSize = 20000;

@Name("eventDate_field")
@Description("this is your data eventDate_field, 必须是 YYYY-mm--dd位时间戳")
private String eventTimeName;

public String getJdbcUrl()
{
return jdbcUrl;
}

public String getUser()
{
return user;
}

public String getPassword()
{
return password;
}

public String getQuery()
{
return query;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.clickhouse;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.guava18.com.google.common.base.Supplier;
import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;

/**
* test source
**/
@Name("testCK")
@Description("this flink test source inputStream")
@Version("1.0.0")
public class TestCKSource
implements Source<DataStream<Row>>
{
private static final long serialVersionUID = 2L;
private static final Logger logger = LoggerFactory.getLogger(TestCKSource.class);
private final transient Supplier<DataStream<Row>> loadStream;

public TestCKSource(StreamExecutionEnvironment execEnv)
{
this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource()));
}

@Override
public DataStream<Row> getSource()
{
return loadStream.get();
}

public static class MyDataSource
extends RichParallelSourceFunction<Row>
implements ResultTypeQueryable<Row>
{
private static final ObjectMapper MAPPER = new ObjectMapper();
private volatile boolean running = true;

@Override
public void run(SourceContext<Row> sourceContext)
throws Exception
{
Random random = new Random(1000000);
int numKeys = 10;
while (running) {
java.time.LocalDate date = java.time.LocalDate.now();
java.sql.Date now = java.sql.Date.valueOf(date);
String msg = "https://github.com/harbby/sylph/" + random.nextLong();
Row row = Row.of("github.com" + random.nextLong(), msg, now);
sourceContext.collect(row);
}
}

@Override
public TypeInformation<Row> getProducedType()
{
TypeInformation<?>[] types = new TypeInformation<?>[] {
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(java.sql.Date.class)
};

RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "mes_time"});
return rowTypeInfo;
}

@Override
public void cancel()
{
running = false;
}

@Override
public void close()
throws Exception
{
this.cancel();
super.close();
}
}
}
28 changes: 28 additions & 0 deletions sylph-connectors/sylph-elasticsearch5/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
plugins {
id "com.github.johnrengelman.shadow" version "4.0.3"
}

dependencies {
shadow group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0'
compile 'org.elasticsearch.client:transport:5.6.0'
}

shadowJar {
baseName = project.name
classifier = 'shaded'
version = project.version

configurations = [project.configurations.compile]

dependencies {
exclude(dependency('junit:junit:'))
}

//relocate 'com.google.protobuf', 'shaded.com.google.protobuf'
relocate 'com.google.common', 'shaded.elasticsearch6.com.google.common'
relocate 'io.netty', 'shaded.elasticsearch5.io.netty'
relocate 'io.netty', 'shaded.elasticsearch5.io.netty'
relocate 'org.apache.logging', 'shaded.elasticsearch5.org.apache.logging'
}
assemble.dependsOn shadowJar
buildPlugins.dependsOn shadowJar
Loading

0 comments on commit aa9cc01

Please sign in to comment.