diff --git a/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.jmxmp.jar b/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.jmxmp.jar index 0c3441c..42760f6 100644 Binary files a/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.jmxmp.jar and b/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.jmxmp.jar differ diff --git a/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.mx.jar b/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.mx.jar index 2e0a852..c4cef49 100644 Binary files a/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.mx.jar and b/com.ibm.streamsx.monitoring/ext.libs/com.ibm.streams.management.mx.jar differ diff --git a/com.ibm.streamsx.monitoring/impl/java/src/com/ibm/streamsx/monitoring/jmx/internal/InstanceHandler.java b/com.ibm.streamsx.monitoring/impl/java/src/com/ibm/streamsx/monitoring/jmx/internal/InstanceHandler.java index e049cb7..fc41ffc 100644 --- a/com.ibm.streamsx.monitoring/impl/java/src/com/ibm/streamsx/monitoring/jmx/internal/InstanceHandler.java +++ b/com.ibm.streamsx.monitoring/impl/java/src/com/ibm/streamsx/monitoring/jmx/internal/InstanceHandler.java @@ -130,36 +130,45 @@ public void handleNotification(Notification notification, Object handback) { boolean isInfoEnabled = _trace.isInfoEnabled(); if (notification.getType().equals(Notifications.JOB_ADDED)) { + String jobId = null; if(notification.getUserData() instanceof BigInteger) { - /* - * Register existing jobs. - */ - BigInteger jobIdLong = (BigInteger)notification.getUserData(); - String jobId = jobIdLong.toString(); - + jobId = jobIdLong.toString(); + } + else if(notification.getUserData() instanceof String) { + jobId = (String) notification.getUserData(); + } + else { + _trace.error("received JOB_ADDED notification: user data is not an instance of BigInteger or String"); + } + if (null != jobId) { String jobName = addValidJob(jobId); if (null != _operatorConfiguration.get_tupleContainerJobStatusSource()) { final Tuple tuple = _operatorConfiguration.get_tupleContainerJobStatusSource().getTuple(notification, handback, _instanceId, jobId, jobName, null, null, null, null); _operatorConfiguration.get_tupleContainerJobStatusSource().submit(tuple); - } - + } if (isInfoEnabled) { _trace.info("received JOB_ADDED notification: jobId=" + jobId); } } - else { - _trace.error("received JOB_ADDED notification: user data is not an instance of BigInteger"); - } } else if (notification.getType().equals(Notifications.JOB_REMOVED)) { + String jobId = null; if(notification.getUserData() instanceof BigInteger) { + BigInteger jobIdLong = (BigInteger)notification.getUserData(); + jobId = jobIdLong.toString(); + } + else if(notification.getUserData() instanceof String) { + jobId = (String) notification.getUserData(); + } + else { + _trace.error("received JOB_REMOVED notification: user data is not an instance of BigInteger or String"); + } + if (null != jobId) { /* * Unregister existing jobs. */ - BigInteger jobIdLong = (BigInteger)notification.getUserData(); - String jobId = jobIdLong.toString(); if (_jobHandlers.containsKey(jobId)) { String jobName = _jobHandlers.get(jobId).getJobName(); @@ -179,9 +188,6 @@ else if (isInfoEnabled) { _trace.info("received JOB_REMOVED notification for job that is not monitored: jobId=" + jobId); } } - else { - _trace.error("received JOB_REMOVED notification: user data is not an instance of BigInteger"); - } } else { if ((OpType.LOG_SOURCE == _operatorConfiguration.get_OperatorType()) && diff --git a/com.ibm.streamsx.monitoring/info.xml b/com.ibm.streamsx.monitoring/info.xml index 3893110..d50016a 100644 --- a/com.ibm.streamsx.monitoring/info.xml +++ b/com.ibm.streamsx.monitoring/info.xml @@ -29,7 +29,7 @@ Find a description of each sample in the README.md file of the sample directory. To launch the samples to a remote Streams instance, for example IBM Cloud Pak Data, the python scripts in the samples directory can be used. ]]> - 3.0.2 + 3.0.3 4.3.0.0 diff --git a/samples/launch_job_status_monitor_sample.py b/samples/launch_job_status_monitor_sample.py index d662415..cc2a351 100644 --- a/samples/launch_job_status_monitor_sample.py +++ b/samples/launch_job_status_monitor_sample.py @@ -4,6 +4,7 @@ from streamsx.topology.context import * from streamsx.topology.schema import CommonSchema, StreamSchema from streamsx.topology import context +from streamsx.spl.toolkit import add_toolkit import streamsx.spl.op as op @@ -12,20 +13,20 @@ def _launch(main): cfg = {} - cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False - rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg) + cfg[context.ConfigParams.SSL_VERIFY] = False + rc = context.submit('DISTRIBUTED', main, cfg) def monitor_app(): topo = Topology('JobStatusMonitorSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.sample.JobStatusMonitor::Monitor', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) def sample_app(): topo = Topology('CrashSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.sample.JobStatusMonitor::SampleJob', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) diff --git a/samples/launch_logs_monitor_sample.py b/samples/launch_logs_monitor_sample.py index c45c312..8581656 100644 --- a/samples/launch_logs_monitor_sample.py +++ b/samples/launch_logs_monitor_sample.py @@ -4,6 +4,7 @@ from streamsx.topology.context import * from streamsx.topology.schema import CommonSchema, StreamSchema from streamsx.topology import context +from streamsx.spl.toolkit import add_toolkit import streamsx.spl.op as op @@ -12,20 +13,20 @@ def _launch(main): cfg = {} - cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False - rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg) + cfg[context.ConfigParams.SSL_VERIFY] = False + rc = context.submit('DISTRIBUTED', main, cfg) def monitor_app(): topo = Topology('LogsMonitorSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.system.sample.LogSource::Monitor', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) def sample_app(): topo = Topology('LogsSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.system.sample.LogSource::SampleJob', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) diff --git a/samples/launch_metrics_monitor_sample.py b/samples/launch_metrics_monitor_sample.py index f391aff..e9bc41e 100644 --- a/samples/launch_metrics_monitor_sample.py +++ b/samples/launch_metrics_monitor_sample.py @@ -4,6 +4,7 @@ from streamsx.topology.context import * from streamsx.topology.schema import CommonSchema, StreamSchema from streamsx.topology import context +from streamsx.spl.toolkit import add_toolkit import streamsx.spl.op as op @@ -12,20 +13,20 @@ def _launch(main): cfg = {} - cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False - rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg) + cfg[context.ConfigParams.SSL_VERIFY] = False + rc = context.submit('DISTRIBUTED', main, cfg) def monitor_app(): topo = Topology('MetricsMonitorSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.metrics.sample.MetricsSource::Monitor', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) def sample_app(): topo = Topology('MetricsSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.metrics.sample.MetricsSource::SampleJob', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) diff --git a/samples/launch_system_monitor_sample.py b/samples/launch_system_monitor_sample.py index 9b2da1d..7ea0079 100644 --- a/samples/launch_system_monitor_sample.py +++ b/samples/launch_system_monitor_sample.py @@ -4,6 +4,7 @@ from streamsx.topology.context import * from streamsx.topology.schema import CommonSchema, StreamSchema from streamsx.topology import context +from streamsx.spl.toolkit import add_toolkit import streamsx.spl.op as op @@ -12,13 +13,13 @@ def _launch(main): cfg = {} - cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False - rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg) + cfg[context.ConfigParams.SSL_VERIFY] = False + rc = context.submit('DISTRIBUTED', main, cfg) def sample_app(): topo = Topology('SystemMonitorSample') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) - streamsx.spl.toolkit.add_toolkit(topo, sample_toolkit) + add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, sample_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.system.sample.SystemMonitorSource::Monitor', toolkits=[monitoring_toolkit,sample_toolkit]) _launch(r[0]) diff --git a/tests/cloud-test/README.md b/tests/cloud-test/README.md index 4abad2b..c8b65b8 100644 --- a/tests/cloud-test/README.md +++ b/tests/cloud-test/README.md @@ -18,7 +18,7 @@ Test applications require an application configuration with the name "monitoring ant test ``` - python3 -u -m unittest test_monitoring_icp.TestDistributed.test_metrics_monitor + python3 -u -m unittest test_monitoring.TestDistributed.test_metrics_monitor # Clean-up diff --git a/tests/cloud-test/build.xml b/tests/cloud-test/build.xml index 9305d37..7512660 100644 --- a/tests/cloud-test/build.xml +++ b/tests/cloud-test/build.xml @@ -33,7 +33,7 @@ - + diff --git a/tests/cloud-test/test_monitoring_icp.py b/tests/cloud-test/test_monitoring.py similarity index 90% rename from tests/cloud-test/test_monitoring_icp.py rename to tests/cloud-test/test_monitoring.py index 55e5a4d..6375bc4 100644 --- a/tests/cloud-test/test_monitoring_icp.py +++ b/tests/cloud-test/test_monitoring.py @@ -32,8 +32,8 @@ def _build_launch_validate(self, name, composite_name): tester.tuple_count(test_op.stream, 1, exact=True) tester.contents(test_op.stream, [{'result':'TEST_RESULT_PASS'}] ) - self.test_config[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False - job_config = streamsx.topology.context.JobConfig(tracing='info') + self.test_config[context.ConfigParams.SSL_VERIFY] = False + job_config = context.JobConfig(tracing='info') job_config.add(self.test_config) tester.test(self.test_ctxtype, self.test_config, always_collect_logs=True) @@ -45,7 +45,7 @@ def _launch_sample_job(self): # Call the crash composite test_op = op.Source(topo, "test.jobs::SampleCrashSource", 'tuple') config={} - config[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False + config[context.ConfigParams.SSL_VERIFY] = False return context.submit(context.ContextTypes.DISTRIBUTED, topo, config=config) diff --git a/tests/monitoring_microservices.py b/tests/monitoring_microservices.py index a9b423b..63fab09 100644 --- a/tests/monitoring_microservices.py +++ b/tests/monitoring_microservices.py @@ -4,6 +4,7 @@ from streamsx.topology.context import * from streamsx.topology.schema import CommonSchema, StreamSchema from streamsx.topology import context +from streamsx.spl.toolkit import add_toolkit import streamsx.spl.op as op @@ -11,24 +12,26 @@ def _launch(main): cfg = {} - cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False - rc = streamsx.topology.context.submit('DISTRIBUTED', main, cfg) + job_config = context.JobConfig(tracing='info') + job_config.add(cfg) + cfg[context.ConfigParams.SSL_VERIFY] = False + rc = context.submit('DISTRIBUTED', main, cfg) def metrics_ingest_service(): topo = Topology('MetricsIngestService') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, monitoring_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.metrics.services::MetricsIngestService', toolkits=[monitoring_toolkit]) _launch(r[0]) def job_status_service(): topo = Topology('JobStatusService') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, monitoring_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.services::JobStatusService', toolkits=[monitoring_toolkit]) _launch(r[0]) def failed_pe_service(): topo = Topology('FailedPEService') - streamsx.spl.toolkit.add_toolkit(topo, monitoring_toolkit) + add_toolkit(topo, monitoring_toolkit) r = op.main_composite(kind='com.ibm.streamsx.monitoring.jobs.services::FailedPEService', toolkits=[monitoring_toolkit]) _launch(r[0])