Kai Waehner <[email protected]> 22 Apr 2020
mvn clean package
Copy ksql-udf-deep-learning-mqtt-iot-2.0-jar-with-dependencies.jar
from the target
folder to the ext
folder of your KSQL installation (you will need to create the ext
folder).
So if your Confluent Platform installation is at /Users/kai.waehner/confluent-5.4.0
then copy the JAR to /Users/kai.waehner/confluent-5.4.0/etc/ksql/ext
Set ksql.extension.dir
in etc/ksql/ksql-server.properties
:
ksql.extension.dir=/Users/kai.waehner/confluent-5.4.0/etc/ksql/ext
Make sure to have Confluent folder on PATH. Otherwise, go to $CONFLUENT_INSTALL/bin
to execute commands these commands.
Start the KSQL server and dependencies (Kafka, Zookeeper, Schema Registry).
Please note that Confluent CLI changed with Confluent Platform 5.3+: 'confluent local start' Confluent Platform 5.2 and earlier: 'confluent start'. This guide uses Confluent Platform 5.4. Older versions than Confluent Platform 5.4 might not be compatible as the KSQL / ksqlDB changed slightly.
confluent local start ksql-server
Create a topic for sensor reading:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
Now configure the MQTT proxy, by creating a file called kafka-mqtt-quickstart.properties
:
topic.regex.list=temperature:.*temperature
listeners=0.0.0.0:1883
bootstrap.servers=PLAINTEXT://localhost:9092
confluent.topic.replication.factor=1
Launch the MQTT proxy:
kafka-mqtt-start kafka-mqtt-quickstart.properties
You don’t need to start the MQTT Broker (in this example the Mosquitto server)! You just use Mosquitto later to produce MQTT messages via mosquitto_pub
command. If you (want to) start the Mosquitto server, you might get a port issue with MQTT Proxy, in that case change the default port of Mosquitto server.
To run mosquitto_pub
on the Mac, install it first with Homebrew: brew install mosquitto
. You’ll need this if you want to use the sensor_generator.sh
script later on.
Start KSQL CLI:
ksql http://localhost:8088
Confirm that the UDF has been successfully registered (check the KSQL server log if not):
ksql> LIST FUNCTIONS;
Function Name | Type
-------------------------------------
ABS | SCALAR
ANOMALY | SCALAR
[...]
Register the sensor topic’s schema with KSQL:
CREATE STREAM carsensor (eventid integer, sensorinput varchar) WITH (kafka_topic='temperature', value_format='DELIMITED');
Set a continuous query running in KSQL:
SELECT EVENTID, ANOMALY(SENSORINPUT) FROM CARSENSOR EMIT CHANGES;
Send a sample message to the Kafka topic, from a separate shell prompt (i.e. not KSQL):
echo -e "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08" | kafkacat -b localhost:9092 -P -t temperature
In KSQL you should see the message displayed with the UDF output:
ksql> SELECT EVENTID, ANOMALY(SENSORINPUT) FROM CARSENSOR EMIT CHANGES;
99999 | 1.2104138026620321
Send a message via MQTT, using mosquitto_pub
and the MQTT proxy:
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"
You should see KSQL show another result from this new message.
Now run a script to generate a stream of MQTT messages:
./sensor_generator.sh
You should see in KSQL more messages being printed to the console:
[...]
99999 | 4.03436391020442
99999 | 4.821044621825
99999 | 7.466964581447454
99999 | 4.47345489707657
99999 | 4.388292923118983
[...]
Now persist the results of the UDF applied to the data, into a new Kafka topic:
CREATE STREAM ANOMALYDETECTION AS \
SELECT EVENTID, CAST (ANOMALY(SENSORINPUT) AS DOUBLE) AS ANOMALY_VAL \
FROM CARSENSOR;
From this new stream, create a derived stream that will include only events breaching a given threshold:
CREATE STREAM ANOMALYDETECTIONWITHFILTER AS \
SELECT EVENTID, ANOMALY_VAL \
FROM ANOMALYDETECTION \
WHERE ANOMALY_VAL > 3;
Now you have a KSQL Stream showing breaches where you can create a new SELECT
query or CREATE STREAM
:
ksql> SELECT * FROM ANOMALYDETECTIONWITHFILTER EMIT CHANGES;
21/08/18 12:36:09 BST , car/engine/temperature , 99999,4.193955593608823
Ctrl-C
You also have a Kafka topic, of which you can see the contents using KSQL’s PRINT
command:
ksql> PRINT ANOMALYDETECTIONWITHFILTER FROM BEGINNING;
21/08/18 12:36:09 BST , car/engine/temperature , 99999,4.193955593608823
21/08/18 12:36:12 BST , car/engine/temperature , 99999,5.363750640274894
21/08/18 12:36:20 BST , car/engine/temperature , 99999,7.292092517069437
21/08/18 12:36:23 BST , car/engine/temperature , 99999,5.230135737069109
or using a CLI tool such as kafka-console-consumer
from the shell prompt:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic ANOMALYDETECTIONWITHFILTER --from-beginning
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]