Skip to content

Commit

Permalink
Merge pull request #108 from IBMStreams/develop
Browse files Browse the repository at this point in the history
1.7.3
  • Loading branch information
markheger authored Jan 17, 2018
2 parents d4c086c + b5b57db commit 600a130
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 31 deletions.
2 changes: 1 addition & 1 deletion com.ibm.streamsx.monitoring/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ by toolkit with their namespaces as prefixes.
**streamtool submitjob** command or by using Streams Studio.
]]></info:description>
<info:version>1.7.2</info:version>
<info:version>1.7.3</info:version>
<info:requiredProductVersion>4.1.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
26 changes: 26 additions & 0 deletions tests/cloud-test/test_monitoring/test.jobs/SampleCrashSource.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace test.jobs;

public composite SampleCrashSource(output Triggers) {

graph

stream<boolean dummy> Triggers as O = Beacon() {
param
period: 30.0;
initDelay: 30.0;
}

() as CrashSink = Custom(Triggers as I) {
logic
onTuple I: {
appLog(Log.error, "Operator is requested to fail.");
abort();
}
}

config
placement : partitionExlocation("test");

}


25 changes: 6 additions & 19 deletions tests/cloud-test/test_monitoring/test.jobs/TestJobStatusSource.spl
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use com.ibm.streamsx.monitoring.jobs::*;
public composite TestJobStatusSource(output ResultStream) {

param
expression<rstring> $user;
expression<rstring> $password;
expression<rstring> $user: getSubmissionTimeValue("user", ""); // optional, if set in application configuration
expression<rstring> $password: getSubmissionTimeValue("password", ""); // optional, if set in application configuration
expression<rstring> $iamApiKey: getSubmissionTimeValue("iamApiKey", ""); // optional, if user and password are set
expression<rstring> $iamTokenEndpoint: getSubmissionTimeValue("iamTokenEndpoint", ""); // optional, used when iamApiKey is set only

graph

Expand All @@ -17,6 +19,8 @@ public composite TestJobStatusSource(output ResultStream) {
param
user: $user;
password: $password;
iamApiKey: $iamApiKey;
iamTokenEndpoint: $iamTokenEndpoint;
}

/*
Expand All @@ -35,23 +39,6 @@ public composite TestJobStatusSource(output ResultStream) {
}
}
}

stream<boolean dummy> Triggers as O = Beacon() {
param
period: 30.0;
initDelay: 30.0;
}

() as CrashSink = Custom(Triggers as I) {
logic
onTuple I: {
appLog(Log.error, "Operator is requested to fail.");
abort();
}
}

config
placement : partitionExlocation("test");

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use com.ibm.streamsx.monitoring.metrics::Notification;
public composite TestMetricsSource(output ResultStream) {

param
expression<rstring> $user;
expression<rstring> $password;
expression<rstring> $user: getSubmissionTimeValue("user", ""); // optional, if set in application configuration
expression<rstring> $password: getSubmissionTimeValue("password", ""); // optional, if set in application configuration
expression<rstring> $iamApiKey: getSubmissionTimeValue("iamApiKey", ""); // optional, if user and password are set
expression<rstring> $iamTokenEndpoint: getSubmissionTimeValue("iamTokenEndpoint", ""); // optional, used when iamApiKey is set only

graph

Expand All @@ -20,6 +22,8 @@ public composite TestMetricsSource(output ResultStream) {
param
user: $user;
password: $password;
iamApiKey: $iamApiKey;
iamTokenEndpoint: $iamTokenEndpoint;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use com.ibm.streamsx.monitoring.system::*;
public composite TestLogsSource(output ResultStream) {

param
expression<rstring> $user;
expression<rstring> $password;
expression<rstring> $user: getSubmissionTimeValue("user", ""); // optional, if set in application configuration
expression<rstring> $password: getSubmissionTimeValue("password", ""); // optional, if set in application configuration
expression<rstring> $iamApiKey: getSubmissionTimeValue("iamApiKey", ""); // optional, if user and password are set
expression<rstring> $iamTokenEndpoint: getSubmissionTimeValue("iamTokenEndpoint", ""); // optional, used when iamApiKey is set only

graph

Expand All @@ -19,6 +21,8 @@ public composite TestLogsSource(output ResultStream) {
param
user: $user;
password: $password;
iamApiKey: $iamApiKey;
iamTokenEndpoint: $iamTokenEndpoint;
}

/*
Expand Down
62 changes: 55 additions & 7 deletions tests/cloud-test/test_streaming_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import streamsx.spl.toolkit as tk
import os
import streamsx.rest as sr
from streamsx.topology import context

class TestCloud(unittest.TestCase):
""" Test invocations of composite operators in Streaming Analytics Service """
Expand All @@ -15,6 +16,19 @@ def setUpClass(self):
self.service_name = os.environ.get('STREAMING_ANALYTICS_SERVICE_NAME')
# Get credentials from VCAP_SERVICES env, because values are required for the test_op as parameters
self.credentials = sr._get_credentials(sr._get_vcap_services(), self.service_name)
# start streams service
connection = sr.StreamingAnalyticsConnection()
service = connection.get_streaming_analytics()
result = service.start_instance()
#print('Streaming Analytics service ' + connection.service_name + ' is ' + result['state'] + ' and ' + result['status'])

@classmethod
def tearDownClass(self):
# stop streams service
connection = sr.StreamingAnalyticsConnection()
service = connection.get_streaming_analytics()
result = service.stop_instance()
#print('Streaming Analytics service ' + connection.service_name + ' is ' + result['state'])

def setUp(self):
Tester.setup_streaming_analytics(self, force_remote_build=False)
Expand All @@ -23,19 +37,38 @@ def _add_toolkits(self, topo):
tk.add_toolkit(topo, './test_monitoring')
tk.add_toolkit(topo, '../../com.ibm.streamsx.monitoring')

def _get_expected_values(self):
expected_vals = []
expected_vals.append("TEST_RESULT_PASS")
return expected_vals
def _is_test_server(self):
res = False
# check if running on test system
v2_rest_url = self.credentials.get("v2_rest_url")
if v2_rest_url is not None:
if "stage1" in v2_rest_url:
res = True
return res

def _get_iam_endpoint(self):
iamTokenEndpoint = "" # uses operator default
if self._is_test_server():
iamTokenEndpoint = "https://iam.stage1.ng.bluemix.net/oidc/token"
return iamTokenEndpoint

def _build_launch_validate(self, name, composite_name):
topo = Topology(name)
self._add_toolkits(topo)

# Set up parameters to call the test composite
user = self.credentials['userid']
password = self.credentials['password']
params = {'user':user, 'password':password}
# It depends on the Streaming Analytics service if userId and password or iamApiKey
# needs to be provided as job submisssion parameter to the test application
user = self.credentials.get("userid")
password = self.credentials.get("password")
if user is not None:
print("Monitor application in the Streaming Analytics service uses user and password")
params = {'user':user, 'password':password}
else :
print("Monitor application in the Streaming Analytics service uses IAM API KEY")
iamApiKey = self.credentials.get("apikey") # use value from VCAP_SERVICES file
iamTokenEndpoint = self._get_iam_endpoint()
params = {'iamApiKey':iamApiKey, 'iamTokenEndpoint':iamTokenEndpoint}

# Call the test composite
test_op = op.Source(topo, composite_name, 'tuple<rstring result>', params=params)
Expand All @@ -46,13 +79,28 @@ def _build_launch_validate(self, name, composite_name):

tester.test(self.test_ctxtype, self.test_config)

def _launch_sample_job(self):
# this job is monitored by test.jobs::TestJobStatusSource application
# PE crash is forced by this application in order to trigger a notification
topo = Topology("SampleCrashApp")
self._add_toolkits(topo)
# Call the crash composite
test_op = op.Source(topo, "test.jobs::SampleCrashSource", 'tuple<boolean dummy>')
# prepare config and submit the job to Streaming Analytics service
config={}
sc = sr.StreamingAnalyticsConnection()
config[context.ConfigParams.STREAMS_CONNECTION] = sc
context.submit(context.ContextTypes.STREAMING_ANALYTICS_SERVICE, topo, config=config)


def test_metrics_monitor(self):
self._build_launch_validate("test_metrics_monitor", "test.metrics::TestMetricsSource")

def test_logs_monitor(self):
self._build_launch_validate("test_logs_monitor", "test.system::TestLogsSource")

def test_jobs_status_monitor(self):
self._launch_sample_job()
self._build_launch_validate("test_jobs_status_monitor", "test.jobs::TestJobStatusSource")


0 comments on commit 600a130

Please sign in to comment.