This repository contains a few examples for getting started with the fiware-cosmos-orion-flink-connector:
- Setup
- Example 1: Receiving simulated notifications
- Example 2: Complete Orion Scenario with docker-compose
- Example 3: Packaging the code and submitting it to the Flink Job Manager
- Example 4: Other operations
- Example 5: Structured values for attributes
In order to run the examples, first you need to clone the repository:
git clone https://github.com/ging/fiware-cosmos-orion-flink-connector-examples
cd fiware-cosmos-orion-flink-connector-examples
Next, download the connector JAR from the connector repository and from the fiware-cosmos-orion-flink-connector
run:
mvn install:install-file -Dfile=$(PATH_DOWNLOAD)/orion.flink.connector-1.2.4.jar -DgroupId=org.fiware.cosmos -DartifactId=orion.flink.connector -Dversion=1.2.4 -Dpackaging=jar
where PATH_DOWNLOAD
is the path where you downloaded the JAR.
The first example makes use of the OrionSource
in order to receive notifications from the Orion Context Broker. For simplicity, in this example the notifications are simulated with a curl command.
Specifically, the example receives a notification every second that a node changed its temperature, and calculates the minimum temperature in a given interval.
In order to simulate the notifications coming from the Context Broker you can run the following script (available at files/example1/curl_Notification.sh
):
while true
do
temp=$(shuf -i 18-53 -n 1)
number=$(shuf -i 1-3113 -n 1)
curl -v -s -S -X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
--header 'Accept: application/json' \
--header 'User-Agent: orion/0.10.0' \
--header "Fiware-Service: demo" \
--header "Fiware-ServicePath: /test" \
-d '{
"data": [
{
"id": "R1","type": "Node",
"co": {"type": "Float","value": 0,"metadata": {}},
"co2": {"type": "Float","value": 0,"metadata": {}},
"humidity": {"type": "Float","value": 40,"metadata": {}},
"pressure": {"type": "Float","value": '$number',"metadata": {}},
"temperature": {"type": "Float","value": '$temp',"metadata": {}},
"wind_speed": {"type": "Float","value": 1.06,"metadata": {}}
}
],
"subscriptionId": "57458eb60962ef754e7c0998"
}'
sleep 1
done
This is the code of the example which is explained step by step below:
package org.fiware.cosmos.orion.flink.connector.examples.example1
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{OrionSource}
object Example1{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
new Temp_Node( entity.id, temp)
})
.keyBy("id")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("temperature")
// Print the results with a single thread, rather than in parallel
processedDataStream.print().setParallelism(1)
env.execute("FIWARE Cosmos Example")
}
case class Temp_Node(id: String, temperature: Float)
}
After importing the necessary dependencies, the first step is creating the source and adding it to the environment.
val eventStream = env.addSource(new OrionSource(9001))
The OrionSource
accepts a port number as a parameter. The data received from this source is a DataStream
of NgsiEvent
objects.
You can check the details of this object in the connector docs.
In the example, the first step of the processing is flat-mapping the entities. This operation is performed in order to put together the entity objects of severall NGSI Events.
val processedDataStream = eventStream
.flatMap(event => event.entities)
Once you have all the entities, you can iterate over them (with map
) and extract the desired attributes; in this case, it is the temperature.
// ...
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
new Temp_Node( entity.id, temp)
})
In each iteration you create a custom object with the properties you need: the entity id and the temperature. For this purpose, you can define a case class like so:
case class Temp_Node(id: String, temperature: Float)
Now you can group the created objects by entity id and perform operations on them:
// ...
.keyBy("id")
You can provide a custom processing window, like so:
// ...
.timeWindow(Time.seconds(5), Time.seconds(2))
And then specify the operation to perform in said time interval:
// ...
.min("temperature")
After the processing, you can print the results on the console:
processedDataStream.print().setParallelism(1)
Or you can persist them using the sink of your choice.
The second example does the same processing as the previous one but it writes the processed data back in the Context Broker.
In order to test this feature, we need to have a Context Broker up and running. For this purpose, a docker-compose
file is provided under files/example2
, which deploys all the necessary containers for this scenario.
You just need to run the following command (probably with sudo
):
docker-compose up
Once you have the Context Broker and the rest of the machines up and running, you need to create some entities and subscribe to them in order to get a notification whenever their value change.
First, let's create a room entity (you can find the script under files/example2/curl_CreateNewEntity.sh
):
curl localhost:1026/v2/entities -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"id": "Room1",
"type": "Room",
"temperature": {
"value": 23,
"type": "Float"
},
"pressure": {
"value": 720,
"type": "Integer"
},
"temperature_min": {
"value": 0,
"type": "Float"
}
}
EOF
Now you can subscribe to any changes in the attributes you are interested in. Again, you can find this script under (files/example2/curl_SubscribeToEntityNotifications.sh
). Do not forget to change $MY_IP
to your machine's IP Address (must be accesible from the docker container):
curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"description": "A subscription to get info about Room1",
"subject": {
"entities": [
{
"id": "Room1",
"type": "Room"
}
],
"condition": {
"attrs": [
"pressure",
"temperature"
]
}
},
"notification": {
"http": {
"url": "http://$MY_IP:9001/notify"
},
"attrs": [
"temperature",
"pressure"
]
},
"expires": "2040-01-01T14:00:00.00Z",
"throttling": 5
}
EOF
You might want to check that you have created it correcty by running:
curl localhost:1026/v2/subscriptions
Now you may start performing changes in the entity's attributes. For that, you can use the following script (files/example2/curl_ChangeAttributes.sh
):
while true
do
timestamp=$(shuf -i 1-100000000 -n 1)
temp=$(shuf -i 18-53 -n 1)
number=$(shuf -i 1-3113 -n 1)
# echo
curl localhost:1026/v2/entities/Room1/attrs -s -S -H 'Content-Type: application/json' -X PATCH -d '{
"temperature": {
"value": '$temp',
"type": "Float"
},
"pressure": {
"value": '$number',
"type": "Float"
}
}'
sleep 1
done
Let's take a look at the Example2 code now:
package org.fiware.cosmos.orion.flink.connector.examples.example2
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector._
object Example2 {
final val URL_CB = "http://flinkexample_orion_1:1026/v2/entities/"
final val CONTENT_TYPE = ContentType.JSON
final val METHOD = HTTPMethod.POST
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
new Temp_Node(entity.id, temp)
})
.keyBy("id")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("temperature")
.map(tempNode => {
val url = URL_CB + tempNode.id + "/attrs"
OrionSinkObject(tempNode.toString, url, CONTENT_TYPE, METHOD)
})
// Add Orion Sink
OrionSink.addSink( processedDataStream )
// print the results with a single thread, rather than in parallel
processedDataStream.map(orionSinkObject => orionSinkObject.content).print().setParallelism(1)
env.execute("FIWARE Cosmos Example")
}
case class Temp_Node(id: String, temperature: Float) extends Serializable {
override def toString :String = { "{\"temperature_min\": { \"value\":" + temperature + ", \"type\": \"Float\"}}" }
}
}
As you can see, it is very similar to the previous example. The main difference is that it writes the processed data back in the Context Broker through the OrionSink
.
After calculating the minimum temperature, the output data needs to be adapted to the format accepted by the OrionSink
. It accepts a stream of OrionSinkObject
, which accepts 4 arguments:
- Message: It is the content of the update that is going to be sent to the Context Broker. It can be a single value converted to string or, more commonly, a stringified JSON containing the new data. The connector does not build the JSON from a Scala object for us; we need to build it ourselves. We may want to override the
toString
message of the case class we've created in order to facilitate this process, as seen on the example. - URL: It is the URL to which the message will be posted. Normally it has a common base but it somewhat varies depending on the entity we're receiving data from.
- Content Type: Whether the message is in JSON format (
ContentType.JSON
) or in plain text (ContentType.Plain
). - HTTP Method: The HTTP method used for sending the update. It can be:
HTTPMethod.POST
,HTTPMethod.PUT
orHTTPMethod.PATCH
. Headers: Optional parameter. You can add the headers you need in a Map(String,String) object.
In the example, an OrionSinkObject
is built from the Temp_Node
object converted to JSON. Thus, the specified data type is JSON. The URL is formed with the hostname of the docker container where the Context Broker is, and the id of the specific entity we are receiving data from. It uses the HTTP Post method in order to send the message to the Context Broker.
// ...
.map(tempNode => {
val url = URL_CB + tempNode.id + "/attrs"
OrionSinkObject(tempNode.toString, url, CONTENT_TYPE, METHOD)
})
Finally, we send the processed DataStream through the OrionSink.
OrionSink.addSink( processedDataStream )
If you run the example, you will see that the minimum temperature calculated is displayed in the console.
You can test that it has been changed in the Context Broker as well by running the following command several times and checking that the temperature_min
attribute is constantly changing:
curl localhost:1026/v2/entities/Room1
In the previous examples, we've seen how to get the connector up and running from an IDE like IntelliJ. In a real case scenario, we might want to package our code and submit it to a Flink cluster in order to run our operations in parallel.
Follow the Setting up the scenario section if you haven't already in order to deploy the containers needed. After that, we need to make some changes to our code.
First, we need to change the notification URL of our subscription to point to our Flink node like so (files/example3/curl_SubscribeToEntityNotifications.sh
):
curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"description": "A subscription to get info about Room1",
"subject": {
"entities": [
{
"id": "Room1",
"type": "Room"
}
],
"condition": {
"attrs": [
"pressure",
"temperature"
]
}
},
"notification": {
"http": {
"url": <b>"http://taskmanager:9001/notify"</b>
},
"attrs": [
"temperature",
"pressure"
]
},
"expires": "2040-01-01T14:00:00.00Z",
"throttling": 5
}
EOF
Since we are going to run the code from inside a Flink Task Manager container, we no longer can refer to the Context Broker as "http://localhost:1026". Instead, the code in Example 3 only differs from the previous example in the URL specified for the Context Broker: "http://orion:1026".
Let's build a JAR package of the example. In it, we need to include all the dependencies we have used, such as the connector, but exclude some of the dependencies provided by the environment (Flink, Scala...).
This can be done through the maven package
command without the add-dependencies-for-IDEA
profile checked.
This will build a JAR file under target/orion.flink.connector.examples-1.2.4.jar
.
Let's submit the Example 3 code to the Flink cluster we have deployed. In order to do this, open the Flink GUI on the browser (http://localhost:8081) and select the Submit new Job section on the left menu. Click the Add New button and upload the JAR. Once uploaded, select it from the Uploaded JARs list and specify the class to execute:
org.fiware.cosmos.orion.flink.connector.examples.example3.Example3
Once filled in this field, you can click the Submit button and you will see that your job is running. Follow the section Triggering notifications.
You can check that the vale for temperature_min
is changing in the Context Broker by running:
curl localhost:1026/v2/entities/Room1
The previous examples focus on how to get the connector up and running but do not give much importance to the actual operations performed on the data received. In fact, the only operation done is calculating the minimum temperature on a time window, which is already available with Flink.
Nevertheless, Flink allows to perform custom operations such as calculating the average. For this, we need to define an AggregateFunction
that performs this operation.
class AverageAggregate extends AggregateFunction[Temp_Node, (Float,Float), Float] {
override def createAccumulator() = (0L, 0L)
override def add(value: (Temp_Node), accumulator: (Float, Float)) =
(accumulator._1 + value.temperature, accumulator._2 + 1L)
override def getResult(accumulator: (Float, Float)) = accumulator._1 / accumulator._2
override def merge(a: (Float, Float), b: (Float, Float)) =
(a._1 + b._1, a._2 + b._2)
}
This aggregator function takes as an input one Temp_Node
at a time, accumulates a tuple containing the sum of the values and the count of the values, and outputs a number with the average of the temperatures in the given timespan.
In order to associate it to our Flink DataStream we do it through the aggregate
function.
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
new Temp_Node( entity.id, temp)
})
.keyBy("id")
.timeWindow(Time.seconds(5), Time.seconds(2))
.aggregate(new AverageAggregate)
So far, the examples provided have dealt with simple attributes in the shape of integers or strings. Some use cases require more complex attributes, such as objects (https://fiware-orion.readthedocs.io/en/master/user/structured_attribute_valued/index.html). This connector parses this sort of values as scala Maps[String,Any], which eases the process of iterating through their properties.
Example 5 provides an example in which the data received is a list of bus schedules and their prices. These prices are constantly changing and Flink is used in order to calculate the minimum prices within a time window.
The simulated Orion notification is as follows (available at files/example5/curl_Notification.sh
):
while true
do
bus1=$(shuf -i 10-53 -n 1)
bus2=$(shuf -i 10-44 -n 1)
curl -v -s -S X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
--header 'Accept: application/json' \
--header 'User-Agent: orion/0.10.0' \
--header "Fiware-Service: demo" \
--header "Fiware-ServicePath: /test" \
-d '{
"data": [
{
"id": "R1",
"type": "Node",
"information": {
"type": "object",
"value": {
"buses":[
{
"name": "BusCompany1",
"schedule": {
"morning": [7,9,11],
"afternoon": [13,15,17,19],
"night" : [23,1,5]
},
"price": '$bus1'
},
{
"name": "BusCompany2",
"schedule": {
"morning": [8,10,12],
"afternoon": [16,20],
"night" : [23]
},
"price": '$bus2'
}
]
},
"metadata": {}
}
}
],
"subscriptionId": "57458eb60962ef754e7c0998"
}'
sleep 1
done
The code for Example 5 is similar to the previous examples. The only difference is that it is necessary to manually parse every item of the object attribute in order to make use of it.
object Example5{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
entity.attrs("information").value.asInstanceOf[Map[String, Any]]
})
.map(list => list("buses").asInstanceOf[List[Map[String,Any]]])
.flatMap(bus => bus )
.map(bus =>
new Bus(bus("name").asInstanceOf[String], bus("price").asInstanceOf[ scala.math.BigInt].intValue()))
.keyBy("name")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("price")
// print the results with a single thread, rather than in parallel
processedDataStream.print().setParallelism(1)
env.execute("FIWARE Cosmos Example")
}
case class Bus(name: String, price: Int)
}