Skip to content

Commit

Permalink
Merge pull request #149 from IBMStreams/develop
Browse files Browse the repository at this point in the history
3.0.3
  • Loading branch information
markheger authored Oct 31, 2019
2 parents c7e5020 + d99bc24 commit dd85eb7
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 49 deletions.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -130,36 +130,45 @@ public void handleNotification(Notification notification, Object handback) {
boolean isInfoEnabled = _trace.isInfoEnabled();

if (notification.getType().equals(Notifications.JOB_ADDED)) {
String jobId = null;
if(notification.getUserData() instanceof BigInteger) {
/*
* Register existing jobs.
*/

BigInteger jobIdLong = (BigInteger)notification.getUserData();
String jobId = jobIdLong.toString();

jobId = jobIdLong.toString();
}
else if(notification.getUserData() instanceof String) {
jobId = (String) notification.getUserData();
}
else {
_trace.error("received JOB_ADDED notification: user data is not an instance of BigInteger or String");
}
if (null != jobId) {
String jobName = addValidJob(jobId);
if (null != _operatorConfiguration.get_tupleContainerJobStatusSource()) {
final Tuple tuple = _operatorConfiguration.get_tupleContainerJobStatusSource().getTuple(notification, handback, _instanceId, jobId, jobName, null, null, null, null);
_operatorConfiguration.get_tupleContainerJobStatusSource().submit(tuple);
}

}

if (isInfoEnabled) {
_trace.info("received JOB_ADDED notification: jobId=" + jobId);
}
}
else {
_trace.error("received JOB_ADDED notification: user data is not an instance of BigInteger");
}
}
else if (notification.getType().equals(Notifications.JOB_REMOVED)) {
String jobId = null;
if(notification.getUserData() instanceof BigInteger) {
BigInteger jobIdLong = (BigInteger)notification.getUserData();
jobId = jobIdLong.toString();
}
else if(notification.getUserData() instanceof String) {
jobId = (String) notification.getUserData();
}
else {
_trace.error("received JOB_REMOVED notification: user data is not an instance of BigInteger or String");
}
if (null != jobId) {
/*
* Unregister existing jobs.
*/
BigInteger jobIdLong = (BigInteger)notification.getUserData();
String jobId = jobIdLong.toString();
if (_jobHandlers.containsKey(jobId)) {

String jobName = _jobHandlers.get(jobId).getJobName();
Expand All @@ -179,9 +188,6 @@ else if (isInfoEnabled) {
_trace.info("received JOB_REMOVED notification for job that is not monitored: jobId=" + jobId);
}
}
else {
_trace.error("received JOB_REMOVED notification: user data is not an instance of BigInteger");
}
}
else {
if ((OpType.LOG_SOURCE == _operatorConfiguration.get_OperatorType()) &&
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.monitoring/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Find a description of each sample in the README.md file of the sample directory.
To launch the samples to a remote Streams instance, for example IBM Cloud Pak Data, the python scripts in the samples directory can be used.
]]></info:description>
<info:version>3.0.2</info:version>
<info:version>3.0.3</info:version>
<info:requiredProductVersion>4.3.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
13 changes: 7 additions & 6 deletions samples/launch_job_status_monitor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from streamsx.topology.context import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology import context
from streamsx.spl.toolkit import add_toolkit
import streamsx.spl.op as op


Expand All @@ -12,20 +13,20 @@

def _launch(main):
cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg)
cfg[context.ConfigParams.SSL_VERIFY] = False
rc = context.submit('DISTRIBUTED', main, cfg)

def monitor_app():
topo = Topology('JobStatusMonitorSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.sample.JobStatusMonitor::Monitor', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

def sample_app():
topo = Topology('CrashSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.sample.JobStatusMonitor::SampleJob', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

Expand Down
13 changes: 7 additions & 6 deletions samples/launch_logs_monitor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from streamsx.topology.context import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology import context
from streamsx.spl.toolkit import add_toolkit
import streamsx.spl.op as op


Expand All @@ -12,20 +13,20 @@

def _launch(main):
cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg)
cfg[context.ConfigParams.SSL_VERIFY] = False
rc = context.submit('DISTRIBUTED', main, cfg)

def monitor_app():
topo = Topology('LogsMonitorSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.system.sample.LogSource::Monitor', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

def sample_app():
topo = Topology('LogsSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.system.sample.LogSource::SampleJob', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

Expand Down
13 changes: 7 additions & 6 deletions samples/launch_metrics_monitor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from streamsx.topology.context import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology import context
from streamsx.spl.toolkit import add_toolkit
import streamsx.spl.op as op


Expand All @@ -12,20 +13,20 @@

def _launch(main):
cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg)
cfg[context.ConfigParams.SSL_VERIFY] = False
rc = context.submit('DISTRIBUTED', main, cfg)

def monitor_app():
topo = Topology('MetricsMonitorSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.metrics.sample.MetricsSource::Monitor', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

def sample_app():
topo = Topology('MetricsSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.metrics.sample.MetricsSource::SampleJob', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

Expand Down
9 changes: 5 additions & 4 deletions samples/launch_system_monitor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from streamsx.topology.context import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology import context
from streamsx.spl.toolkit import add_toolkit
import streamsx.spl.op as op


Expand All @@ -12,13 +13,13 @@

def _launch(main):
cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg)
cfg[context.ConfigParams.SSL_VERIFY] = False
rc = context.submit('DISTRIBUTED', main, cfg)

def sample_app():
topo = Topology('SystemMonitorSample')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit)
add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, sample_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.system.sample.SystemMonitorSource::Monitor', toolkits=[monitoring_toolkit,sample_toolkit])
_launch(r[0])

Expand Down
2 changes: 1 addition & 1 deletion tests/cloud-test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Test applications require an application configuration with the name "monitoring
ant test
```

python3 -u -m unittest test_monitoring_icp.TestDistributed.test_metrics_monitor
python3 -u -m unittest test_monitoring.TestDistributed.test_metrics_monitor

# Clean-up

Expand Down
2 changes: 1 addition & 1 deletion tests/cloud-test/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<arg value="-u"/>
<arg value="-m"/>
<arg value="unittest"/>
<arg value="test_monitoring_icp.TestDistributed"/>
<arg value="test_monitoring.TestDistributed"/>
</exec>
</target>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def _build_launch_validate(self, name, composite_name):
tester.tuple_count(test_op.stream, 1, exact=True)
tester.contents(test_op.stream, [{'result':'TEST_RESULT_PASS'}] )

self.test_config[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
job_config = streamsx.topology.context.JobConfig(tracing='info')
self.test_config[context.ConfigParams.SSL_VERIFY] = False
job_config = context.JobConfig(tracing='info')
job_config.add(self.test_config)
tester.test(self.test_ctxtype, self.test_config, always_collect_logs=True)

Expand All @@ -45,7 +45,7 @@ def _launch_sample_job(self):
# Call the crash composite
test_op = op.Source(topo, "test.jobs::SampleCrashSource", 'tuple<boolean dummy>')
config={}
config[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
config[context.ConfigParams.SSL_VERIFY] = False
return context.submit(context.ContextTypes.DISTRIBUTED, topo, config=config)


Expand Down
13 changes: 8 additions & 5 deletions tests/monitoring_microservices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,34 @@
from streamsx.topology.context import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology import context
from streamsx.spl.toolkit import add_toolkit
import streamsx.spl.op as op


monitoring_toolkit = '../com.ibm.streamsx.monitoring'

def _launch(main):
cfg = {}
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg)
job_config = context.JobConfig(tracing='info')
job_config.add(cfg)
cfg[context.ConfigParams.SSL_VERIFY] = False
rc = context.submit('DISTRIBUTED', main, cfg)

def metrics_ingest_service():
topo = Topology('MetricsIngestService')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, monitoring_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.metrics.services::MetricsIngestService', toolkits=[monitoring_toolkit])
_launch(r[0])

def job_status_service():
topo = Topology('JobStatusService')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, monitoring_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.services::JobStatusService', toolkits=[monitoring_toolkit])
_launch(r[0])

def failed_pe_service():
topo = Topology('FailedPEService')
streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit)
add_toolkit(topo, monitoring_toolkit)
r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.services::FailedPEService', toolkits=[monitoring_toolkit])
_launch(r[0])

Expand Down

0 comments on commit dd85eb7

Please sign in to comment.