A light-weight library to run Siddhi CEP within Apache Flink streaming application.
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under Apache Software License v2.0
.
Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries.
This project is mainly to provide a light-weight library to easily run Siddhi CEP within flink streaming application.
- Integrate Siddhi CEP as an stream operator (i.e.
TupleStreamSiddhiOperator
), supporting rich CEP features like- Filter
- Join
- Aggregation
- Group by
- Having
- Window
- Conditions and Expressions
- Pattern processing
- Sequence processing
- Event Tables
- ...
- Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See
SiddhiCEP
andSiddhiStream
)- Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
- Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
- Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
- Integrate siddhi runtime state management with Flink state (See
AbstractSiddhiOperator
) - Support siddhi plugin management to extend CEP functions. (See
SiddhiCEP#registerExtension
)
- Java (Version:
1.8
) - Apache Maven
- Apache Flink (Version:
1.7.0
)
git clone [email protected]:haoch/flink-siddhi.git
mvn clean install -DskipTests
mvn clean test
-
Add
com.github.haoch:flink-siddhi
in project dependencies:<dependencies> <dependency> <groupId>com.github.haoch</groupId> <artifactId>flink-siddhi_2.11</artifactId> <version>LATEST</version> </dependency> </dependencies> <repositories> <repository> <id>clojars</id> <url>http://clojars.org/repo/</url> </repository> </repositories>
-
Execute
SiddhiQL
withSiddhiCEP
API, for example:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep .from("inputStream1").union("inputStream2") .cql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream") .returns("outputStream"); env.execute();
For more examples, please see
org.apache.flink.contrib.siddhi.SiddhiCEPITCase
- Site: https://haoch.github.io/flink-siddhi/
- Wiki: https://github.com/haoch/flink-siddhi/wiki
- Siddhi Query Language: https://docs.wso2.com/display/CEP300/Introduction+to+Siddhi+Query+Language
- Apache Flink: http://flink.apache.org/
- Issues: https://github.com/haoch/flink-siddhi/issues
- Documents: https://github.com/haoch/flink-siddhi/wiki
- Hao Chen @haoch (Author, hao AT apache DOT org)
- Aparup Banerjee @aparup
- Blues Zheng @kisimple
- @aagupta1
- @wittyameta
- Jinhu Wu @wujinhu
- Pi Jing @tammypi
- Jayant Ameta @wittyameta
- Drona @Dr0na
Welcome to make contribution to code or document by sending a pull request, or reporting issues or bugs.
Licensed under the Apache License, Version 2.0. More details, please refer to LICENSE file.