Skip to content

Commit

Permalink
Merge pull request #29 from ddebrunner/cmd-28
Browse files Browse the repository at this point in the history
Add scalable application support.
  • Loading branch information
ddebrunner committed May 11, 2016
2 parents 339b8ae + f257269 commit 2a5a1f1
Show file tree
Hide file tree
Showing 20 changed files with 465 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,52 +43,87 @@ use com.ibm.streamsx.topology.topic::Subscribe;
* * [com.ibm.streamsx.iot::StatusesSubscribe] - Subscribe to device status updates, with filtering on device type.
* * [com.ibm.streamsx.iot::CommandPublish] - Send a device command that will be sent to the device by this application.
*
* These submission time parameters must be defined:
* * `org` - Organization identifier
* * `authKey` - API key
* * `authToken` - API Token
* `IotPlatform` supports Watson IoT Platform's scalable application
* mode that distributes messages (device events, commands and statuses)
* across multiple MQTT subscribers. This is configured by setting the
* optional parameters `scaleEvents`, `scaleCommands` and
* `scaleStatuses`. IBM Streams parallel regions are used to instantiate
* multiple MQTT source operators and publish those events, commands or
* statuses using multiple streams. Applications that subscribe using
* `EventsSubscribe`, `CommandsSubscribe`, `StatusesSubscribe` or
* `Subscribe` will correctly see the required tuples, regardless
* of the scale factor or if the subscriber is itself using parallel regions.
*
* Scalable application mode also supports failures of the MQTT source
* operators or their connections by automatically delivering messages
* across the remaining active connections.
* Once a connection from this application
* is resetablished it will again be included in message delivery.
*
* All of this composite's parameters can be set using submission
* time values at job submission time.
*
* @param org Organization identifier (required)
* @param authKey API key (required)
* @param authToken API Token (required)
* @param scaleEvents Scaling factor for device events (optional, defaults to `1`)
* @param scaleCommands Scaling factor for sent device commands. Optional, defaults to `1`)
* @param scaleStatuses Scaling factor for device statuses (optional, defaults to `1`)
* @param allowFilters `true` to use allow applications subscribing to device events, sent device commands and device statuses to push filtering to this application. `false` to disallow execution of subscriber filters (optional defaults to `true`)
* @param encrypted `true` to use encrypted (TLSv1.2) connections to IoT Plaform, `false` to use unencrypted (optional, defaults to `false`)
*/
public composite IotPlatform {
param
expression<rstring> $org : getSubmissionTimeValue("org");
expression<rstring> $authKey : getSubmissionTimeValue("authKey");
expression<rstring> $authToken : getSubmissionTimeValue("authToken");
expression<boolean> $encrypted : (boolean) getSubmissionTimeValue("encrypted", "false");
expression<boolean> $allowFilters : true;
expression<boolean> $allowFilters : (boolean) getSubmissionTimeValue("allowFilters", "true");
expression<int32> $scaleEvents : (int32) getSubmissionTimeValue("scaleEvents", "1");
expression<int32> $scaleCommands : (int32) getSubmissionTimeValue("scaleCommands", "1");
expression<int32> $scaleStatuses : (int32) getSubmissionTimeValue("scaleStatuses", "1");

graph
(stream<DeviceStatus> Statuses; stream<DeviceEvent> Events; stream<DeviceCmd> Commands) as IotfDevices = AllDevices()
{
param
org : $org;
authKey : $authKey;
authToken : $authToken;
encrypted : $encrypted;
}

() as PublishedEvents = Publish(Events) {
param topic: "streamsx/iot/device/events";
allowFilter: $allowFilters;
}
() as PublishedCommands = Publish(Commands) {
param topic: "streamsx/iot/device/commands/sent";
allowFilter: $allowFilters;
@parallel(width=$scaleEvents)
() as Events = PublishDeviceEvents() {
param
org : $org;
authKey : $authKey;
authToken : $authToken;
encrypted : $encrypted;
allowFilter : $allowFilters;
config placement: partitionColocation(getThisCompositeInstanceName()+"_evt");
}
() as PublishedStatuses = Publish(Statuses) {
param topic: "streamsx/iot/device/statuses";
allowFilter: $allowFilters;

@parallel(width=$scaleCommands)
() as Commands = PublishDeviceCommands() {
param
org : $org;
authKey : $authKey;
authToken : $authToken;
encrypted : $encrypted;
allowFilter : $allowFilters;
config placement: partitionColocation(getThisCompositeInstanceName()+"_cmd");
}

stream<DeviceCmd> CommandsToSend = Subscribe() {
param
topic: "streamsx/iot/device/commands/send";
streamType: DeviceCmd;
@parallel(width=$scaleStatuses)
() as Status = PublishDeviceStatuses() {
param
org : $org;
authKey : $authKey;
authToken : $authToken;
encrypted : $encrypted;
allowFilter : $allowFilters;
config placement: partitionColocation(getThisCompositeInstanceName()+"_mon");
}
() as SendCommandsToDevice = SendCommandToDevice(CommandsToSend) {

() as SendCommandsToDevice = SubscribeDeviceCommands() {
param
org : $org;
authKey : $authKey;
authToken : $authToken;
encrypted : $encrypted;
config placement: partitionColocation(getThisCompositeInstanceName());
}
}
4 changes: 3 additions & 1 deletion com.ibm.streamsx.iot/com.ibm.streamsx.iot.watson/device.spl
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ composite DeviceTopic(output Messages)
{
logic state: {
rstring _topic = $topic;
rstring _clientId = "a:" + $org + ":" + $appId;
rstring _clientId =
// use scaleable connections when running in parallel
(getMaxChannels() >= 2 ? "A:" : "a:") + $org + ":" + $appId;
rstring _userId = $authKey;
rstring _password = $authToken;
rstring _serverURI = getIotfUri($org, $encrypted);
Expand Down
120 changes: 120 additions & 0 deletions com.ibm.streamsx.iot/com.ibm.streamsx.iot.watson/devicepublish.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2016
*/

namespace com.ibm.streamsx.iot.watson;

use com.ibm.streamsx.iot::*;
use com.ibm.streamsx.topology.topic::Publish;

/**
* Publishes all device statuses for an organization.
*
* Publishes to the topic `streamsx/iot/device/statuses`
* with a tuple type of [DeviceStatus].
*
* @param org Organization identifier.
* @param authKey Application key.
* @param authToken Application authorization token.
* @param encrypted True to use encrypted connections to IBM Watson IoT Platform, false to use unencrypted , defaults to `true`.
* @param allowFilter True to use allow subscribers to push tuple filtering to this composite, defaults to `true`.
*
*/
public composite PublishDeviceStatuses() {
param
expression<rstring> $org;
expression<rstring> $authKey;
expression<rstring> $authToken;
expression<boolean> $encrypted : true;
expression<boolean> $allowFilter : true;

graph

stream<DeviceStatus> Statuses = DeviceStatuses() {
param
org : $org ;
authKey : $authKey ;
authToken : $authToken ;
encrypted: $encrypted;
}

() as PublishedStatuses = Publish(Statuses) {
param topic: "streamsx/iot/device/statuses";
allowFilter: $allowFilter;
}
}

/**
* Publishes all device events for an organization.
*
* Publishes to the topic `streamsx/iot/device/events`
* with a tuple type of [DeviceEvent].
*
* @param org Organization identifier.
* @param authKey Application key.
* @param authToken Application authorization token.
* @param encrypted True to use encrypted connections to IBM Watson IoT Platform, false to use unencrypted , defaults to `true`.
* @param allowFilter True to use allow subscribers to push tuple filtering to this composite, defaults to `true`.
*
*/
public composite PublishDeviceEvents() {
param
expression<rstring> $org;
expression<rstring> $authKey;
expression<rstring> $authToken;
expression<boolean> $encrypted : true;
expression<boolean> $allowFilter : true;

graph

stream<DeviceEvent> Events = DeviceEvents()
{
param
org : $org ;
authKey : $authKey ;
authToken : $authToken ;
encrypted: $encrypted;
}
() as PublishedEvents = Publish(Events) {
param topic: "streamsx/iot/device/events";
allowFilter: $allowFilter;
}
}

/**
* Publishes all device commands for an organization.
*
* Publishes to the topic `streamsx/iot/device/commands/sent`
* with a tuple type of [DeviceCmd].
*
* @param org Organization identifier.
* @param authKey Application key.
* @param authToken Application authorization token.
* @param encrypted True to use encrypted connections to IBM Watson IoT Platform, false to use unencrypted , defaults to `true`.
* @param allowFilter True to use allow subscribers to push tuple filtering to this composite, defaults to `true`.
*
*/
public composite PublishDeviceCommands() {
param
expression<rstring> $org;
expression<rstring> $authKey;
expression<rstring> $authToken;
expression<boolean> $encrypted : true;
expression<boolean> $allowFilter : true;

graph
stream<DeviceCmd> Commands = DeviceCommands()
{
param
org : $org ;
authKey : $authKey ;
authToken : $authToken ;
encrypted: $encrypted;
}

() as PublishedCommands = Publish(Commands) {
param topic: "streamsx/iot/device/commands/sent";
allowFilter: $allowFilter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2016
*/

namespace com.ibm.streamsx.iot.watson;

use com.ibm.streamsx.iot::*;
use com.ibm.streamsx.topology.topic::Subscribe;

/**
* Subsribes to device commands to be sent to devices for an organization.
*
* Subscribes to the topic `streamsx/iot/device/commands/send`
* with a tuple type of [DeviceCmd].
*
* @param org Organization identifier.
* @param authKey Application key.
* @param authToken Application authorization token.
* @param encrypted True to use encrypted connections to IBM Watson IoT Platform, false to use unencrypted , defaults to `true`.
*
*/
public composite SubscribeDeviceCommands() {
param
expression<rstring> $org;
expression<rstring> $authKey;
expression<rstring> $authToken;
expression<boolean> $encrypted : true;

graph
stream<DeviceCmd> CommandsToSend = Subscribe() {
param
topic: "streamsx/iot/device/commands/send";
streamType: DeviceCmd;
}
() as SendCommandsToDevice = SendCommandToDevice(CommandsToSend) {
param
org : $org;
authKey : $authKey;
authToken : $authToken;
encrypted : $encrypted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
*
* Connectivty to IBM Watson IoT Platform is through
* [https://docs.internetofthings.ibmcloud.com/applications/mqtt.html|MQTT].
*
* By default operators and applications in this toolkit use encrypted
* connections through port `8883` with TLSv1.2. Operator
* parameter or submission time parameter `encrypted` can be set to
Expand All @@ -56,6 +57,24 @@
* to allow them to work out of the box. The default will revert
* to encrypted in future pre-release.**
*
* The Watson Iot Platform scalable application model is used
* when operators interacting with Watson IoT Platform are in a
* parallel region of width greater that one.
* This means MQTT messages representing device events,
* commands and statuses are load balanced across
* the multiple channels created by the parallel regions.
*
* In addition if a connection breaks or the processing element
* fails messages are routed by Watson IoT Platform to the
* remaining active connections. When Streams reconnects or
* restarts the PE the new connection will be automaticallyy
* included in the load balancing and start receiving messages.
*
* [com.ibm.streamsx.iot.watson.apps::IotPlatform]
* optionally supports the scalable application mode through
* submission time values that allow independent setting a scale factor
* for device events, commands and statuses.
*
* # Device Events
* Devices publish device events that an SPL application can subscribe to
* using [EventsSubscribe] or [DeviceEvents]. The default is to subscribe to all event identifiers
Expand Down
9 changes: 8 additions & 1 deletion com.ibm.streamsx.iot/com.ibm.streamsx.iot/command.spl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

namespace com.ibm.streamsx.iot;

use com.ibm.streamsx.datetime::currentTimeMillis;
use com.ibm.streamsx.datetime.convert::toIso8601;
use com.ibm.streamsx.json::TupleToJSON;

/**
Expand Down Expand Up @@ -60,8 +62,13 @@ public composite CommandTupleToPayload(input Commands; output stream<DeviceCmd>
param attribute $payload;
graph

stream<DeviceCmd> DeviceCommands = TupleToJSON(Commands) {
stream<DeviceCmd> PreCommands = TupleToJSON(Commands) {
param
inputAttribute: $payload;
}
stream<DeviceCmd> DeviceCommands = Functor(PreCommands) {
output DeviceCommands: jsonString = '{"d":' + jsonString + ","
+ '"ts":"' + toIso8601(currentTimeMillis()) + '"'
+ "}";
}
}
10 changes: 6 additions & 4 deletions com.ibm.streamsx.iot/com.ibm.streamsx.iot/types.spl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type DeviceEventId = Device, tuple<rstring eventId>;
* * `typeId` - Type of device that issued the event.
* * `deviceId` - Identifier of device that issued the event.
* * `eventId` - Event identifier.
* * `jsonString` - Event payload in JSON IBM Watson IoT Platform event format specification.
* * `jsonString` - Event data in JSON IBM Watson IoT Platform event format specification.
*
* The event payload is a JSON object using
* [https://docs.internetofthings.ibmcloud.com/messaging/payload.html|IBM Watson IoT Platform event format specification]
Expand Down Expand Up @@ -76,8 +76,10 @@ type DeviceCmdId = Device, tuple<rstring cmdId>;
* * `cmdId` - Command identifier.
* * `jsonString` - Command data.
*
* The command data is a JSON object using
* [https://docs.internetofthings.ibmcloud.com/messaging/payload.html|IBM Watson IoT Platform event format specification]
* and thus has two elements:
* * `d` - Command data, this is the command's data.
* * `ts` - Command timestamp. This is an optional element, if included its value should be a valid ISO8601 encoded timestamp string.
*/
type DeviceCmd = DeviceCmdId, Json;



4 changes: 2 additions & 2 deletions com.ibm.streamsx.iot/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo">
<info:identity>
<info:name>com.ibm.streamsx.iot</info:name>
<info:description>Connectivity with IBM Watson IoT Platform.
<info:description>IoT integration including connectivity with IBM Watson IoT Platform.

IBM Watson IoT Platform provides a model around devices where devices produce events (for example, sensor data) and subscribe to commands (for example, control instructions, such as reduce maximum rpm for an engine).

Expand All @@ -17,7 +17,7 @@ This toolkit depends on these toolkits:
* `com.ibm.streamsx.topology`

</info:description>
<info:version>0.7.2.commit___dev__</info:version>
<info:version>0.8.0.commit___dev__</info:version>
<info:requiredProductVersion>4.1.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies>
Expand Down
Loading

0 comments on commit 2a5a1f1

Please sign in to comment.