通过接收原生flink sql语句或者文本路径,执行flink sql任务。
set execution.checkpointing.interval = 3s;
create table user_info (
    userid bigint,
    username varchar,
    proctime as proctime()
) with (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = 'localhost:9092',
    'topic' = 'mqtest02', "
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);
create table user_sink (
    userid bigint,
    username varchar,
    primary key (userid) not enforced
) with (
    'connector' = 'print'
);
create view user_info_v as
  select userid, username  from user_info;
  
insert into user_sink
select
  u.userid,
  u.username 
from
  user_info  u;
sql 文本:
create  function str2low as 'cn.todd.flink.udf.strtolower' language scala;
create table source_table (
   id int,
   score int,
   address string
) with (
    'connector'='datagen',
    'rows-per-second'='2',
    'fields.id.kind'='sequence',
    'fields.id.start'='1',
    'fields.id.end'='100000',
    'fields.score.min'='1',
    'fields.score.max'='100',
    'fields.address.length'='10'
);
create table console_table (
     id int,
     score int,
     address string
) with (
    'connector' = 'print'
);
insert into console_table select id, score, str2low(address) from source_table;
properties 参数设置external.jars
Properties properties = new Properties();
properties.setProperty(
    StreamEnvConfigManager.EXTERNAL_JARS,
    "/Users/tal/code/flink-taste/flink-demo/target/flink-demo-1.0.jar");
new FlinkSqlExecutor().executeSqlText(testSql, properties);