Skip to content

Commit

Permalink
Merge pull request #89 from IBMStreams/develop
Browse files Browse the repository at this point in the history
1.5.1
  • Loading branch information
markheger authored Oct 5, 2017
2 parents 45c2d27 + 9c5cbac commit 6b079fa
Show file tree
Hide file tree
Showing 67 changed files with 2,330 additions and 25 deletions.
17 changes: 12 additions & 5 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@
<property environment="env" />
<property name="streams.install" value="${env.STREAMS_INSTALL}" />
<property name="toolkit" location="com.ibm.streamsx.monitoring"/>
<property name="toolkit.test" location="tests"/>
<property name="tmp" location="tmp" />

<target name="all" depends="toolkit,samples,spldoc"/>

<target name="clean" depends="cleansamples"
description="clean up" >
<delete includeemptydirs="true">
<fileset dir="${basedir}" includes="release-*/"/>
<fileset dir="${basedir}" includes="tmp/"/>
</delete>
<delete includeemptydirs="true">
<fileset dir="${toolkit}" includes="doc/"/>
</delete>
<ant dir="${toolkit}" target="clean"/>
<ant dir="${toolkit.test}" target="clean"/>
</target>

<target name="toolkit"
Expand All @@ -33,15 +38,12 @@
<ant dir="${toolkit}" target="indexToolkit" />
</target>

<target name="spldoc" depends="samples"
<target name="spldoc" depends="toolkit"
description="Create SPLDOC">
<antcall target="spldoctoolkit">
<param name="tkdir" value="${toolkit}"/>
<param name="tktitle" value="IBMStreams streamsx.monitoring Toolkit"/>
</antcall>
<subant target="spldoctoolkit" genericantfile="${basedir}/build.xml">
<dirset dir="samples" includes="*"/>
</subant>
</target>

<target name="samples" depends="toolkit">
Expand All @@ -63,6 +65,11 @@
</subant>
</target>

<!-- Test targets -->
<target name="test" depends="build-all-samples">
<ant dir="${toolkit.test}" target="test"/>
</target>

<!-- Targets called on samples -->
<target name="buildsample">
<echo message="Sample to build: ${basedir}"/>
Expand Down
6 changes: 6 additions & 0 deletions com.ibm.streamsx.monitoring/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
<delete dir="${gensrc.dir}" />
<delete dir="${basedir}/output" quiet="true"/>
<delete file="${impl.lib.dir}/${jarfile}"/>
<!-- toolkit clean with spl-make-toolkit does not delete generated operator xml files -->
<delete file="${basedir}/com.ibm.streamsx.monitoring.jobs/JobStatusSource/JobStatusSource.xml" quiet="true"/>
<delete file="${basedir}/com.ibm.streamsx.monitoring.metrics/MetricsMonitor/MetricsMonitor.xml" quiet="true"/>
<delete file="${basedir}/com.ibm.streamsx.monitoring.metrics/MetricsSource/MetricsSource.xml" quiet="true"/>
<delete file="${basedir}/com.ibm.streamsx.monitoring.system/LogSource/LogSource.xml" quiet="true"/>
<delete file="${basedir}/com.ibm.streamsx.monitoring.system/SystemMonitorSource/SystemMonitorSource.xml" quiet="true"/>
</target>

<target name="indexToolkit" depends="package">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,35 +135,35 @@ public abstract class AbstractJmxSource extends AbstractOperator {

private String _domainId = null; // domainId for this PE

private Metric _isConnectedToJMX;
private Metric _nJMXConnectionAttempts;
private Metric _nBrokenJMXConnections;
private Metric isConnected;
private Metric nJMXConnectionAttempts;
private Metric nBrokenJMXConnections;

public Metric get_nJMXConnectionAttempts() {
return _nJMXConnectionAttempts;
return this.nJMXConnectionAttempts;
}

public Metric get_isConnectedToJMX() {
return _isConnectedToJMX;
public Metric get_isConnected() {
return this.isConnected;
}

public Metric get_nBrokenJMXConnections() {
return _nBrokenJMXConnections;
return this.nBrokenJMXConnections;
}

@CustomMetric(name="nBrokenJMXConnections", kind = Kind.COUNTER, description = "Number of broken JMX connections that have occurred. Notifications may have been lost.")
public void set_nConnectionLosts(Metric nBrokenJMXConnections) {
this._nBrokenJMXConnections = nBrokenJMXConnections;
this.nBrokenJMXConnections = nBrokenJMXConnections;
}

@CustomMetric(name="nJMXConnectionAttempts", kind = Kind.COUNTER, description = "Number of connection attempts to JMX service.")
public void set_nJMXConnectionAttempts(Metric nConnectionAttempts) {
this._nJMXConnectionAttempts = nConnectionAttempts;
this.nJMXConnectionAttempts = nConnectionAttempts;
}

@CustomMetric(name="isConnectedToJMX", kind = Kind.GAUGE, description = "Value 1 indicates, that this operator is connected to JMX service. Otherwise value 0 is set, if no connection is established.")
public void set_isConnectedToJMX(Metric isConnectedToJMX) {
this._isConnectedToJMX = isConnectedToJMX;
@CustomMetric(name="isConnected", kind = Kind.GAUGE, description = "Value 1 indicates, that this operator is connected to JMX service. Otherwise value 0 is set, if no connection is established.")
public void set_isConnected(Metric isConnected) {
this.isConnected = isConnected;
}

/**
Expand Down Expand Up @@ -440,12 +440,12 @@ protected void setupJMXConnection() throws Exception {
get_nJMXConnectionAttempts().increment(); // update metric
_trace.info("Connect to : " + urls[i]);
_operatorConfiguration.set_jmxConnector(JMXConnectorFactory.connect(new JMXServiceURL(urls[i]), env));
get_isConnectedToJMX().setValue(1);
get_isConnected().setValue(1);
break; // exit loop here since a valid connection is established, otherwise exception is thrown.
} catch (IOException e) {
_trace.error("connect failed: " + e.getMessage());
if (i == 0) {
get_isConnectedToJMX().setValue(0);
get_isConnected().setValue(0);
throw e;
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ private String autoDetectJmxConnect() throws Exception {
p = Runtime.getRuntime().exec(cmd);
p.waitFor();
BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
String line = "";
while ((line = br.readLine())!= null) {
output.append(line).append(",");
}
Expand Down Expand Up @@ -608,7 +608,7 @@ private String autoDetectJmxSslOption(String user, String password) {
p = Runtime.getRuntime().exec(cmd);
p.waitFor();
BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
String line = "";
while ((line = br.readLine())!= null) {
output.append(line);
}
Expand All @@ -633,11 +633,11 @@ protected void closeDomainHandler() {
catch (Exception ignore) {
}
_domainHandler = null;
if (1 == get_isConnectedToJMX().getValue()) {
if (1 == get_isConnected().getValue()) {
// update metric to indicate connection is broken
get_nBrokenJMXConnections().increment();
// update metric to indicate that we are not connected
get_isConnectedToJMX().setValue(0);
get_isConnected().setValue(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ public class MetricsSource extends AbstractJmxSource {
* Thread for calling <code>produceTuples()</code> to produce tuples
*/
private Thread _processThread;

private boolean isShutdown = false;

/**
* Logger for tracing.
Expand Down Expand Up @@ -346,7 +348,9 @@ public void run() {
try {
produceTuples();
} catch (Exception e) {
_trace.error("Operator error", e);
if (false == isShutdown) {
_trace.error("Operator error", e);
}
}
}

Expand Down Expand Up @@ -424,6 +428,7 @@ private void produceTuples() throws Exception {
* @throws Exception Operator failure, will cause the enclosing PE to terminate.
*/
public synchronized void shutdown() throws Exception {
isShutdown = true;
if (_processThread != null) {
_processThread.interrupt();
_processThread = null;
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.monitoring/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ by toolkit with their namespaces as prefixes.
**streamtool submitjob** command or by using Streams Studio.
]]></info:description>
<info:version>1.5.0</info:version>
<info:version>1.5.1</info:version>
<info:requiredProductVersion>4.1.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
7 changes: 7 additions & 0 deletions tests/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/runTTF/
*StreamsLogs*.tgz
*.pyc
*output/
*toolkit.xml
*done*
*__pycache__/
16 changes: 16 additions & 0 deletions tests/build.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<project name="MonitoringTests" default="test">

<target name="clean">
<ant dir="spl-test" target="clean">
</ant>
</target>

<target name="test" depends="spl-test">
</target>

<target name="spl-test">
<ant dir="spl-test">
</ant>
</target>

</project>
Empty file.
48 changes: 48 additions & 0 deletions tests/spl-test/JobStatusMonitor/test_app_config/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (C) 2017, International Business Machines Corporation.
# All Rights Reserved.

.PHONY: build all clean distributed

SPLC_FLAGS = -a
OUTPUT_DIR = output
ifeq ($(STREAMS_INSTALL),)
$(error error: environment variable STREAMS_INSTALL has to be set)
endif
STREAMSX_MONITORING_TOOLKIT ?=../../../../com.ibm.streamsx.monitoring
SPLC = $(STREAMS_INSTALL)/bin/sc
SPL_PATH = $(STREAMSX_MONITORING_TOOLKIT)
SPLC_FLAGS += -t $(SPL_PATH)
SPL_DATA_DIR = ./data

SPL_CMD_ARGS ?=
SPL_MAIN_COMPOSITE = Monitor
SPL_MAIN_COMPOSITE1 = SampleJob

build: distributed

all: clean build

distributed:
$(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS) --data-directory $(SPL_DATA_DIR) --output-directory=$(OUTPUT_DIR)/monitor
$(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE1) $(SPL_CMD_ARGS) --data-directory $(SPL_DATA_DIR) --output-directory=$(OUTPUT_DIR)/sample

clean:
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
rm -rf $(OUTPUT_DIR)

configure:
-streamtool rmappconfig --noprompt com.ibm.streamsx.monitoring.jobs.JobStatusMonitor.ApplicationConfiguration
streamtool mkappconfig --property user=$(JMX_USER) --property password=$(JMX_PASSWORD) com.ibm.streamsx.monitoring.jobs.JobStatusMonitor.ApplicationConfiguration

start-sample:
streamtool submitjob output/sample/SampleJob.sab --jobname sampleAppConfigJobStatus

start-monitor:
streamtool submitjob output/monitor/Monitor.sab --jobname monitorAppConfigJobStatus -P applicationConfigurationName=com.ibm.streamsx.monitoring.jobs.JobStatusMonitor.ApplicationConfiguration

stop-sample:
streamtool canceljob --jobnames sampleAppConfigJobStatus --force

stop-monitor:
streamtool canceljob --jobnames monitorAppConfigJobStatus --collectlogs

96 changes: 96 additions & 0 deletions tests/spl-test/JobStatusMonitor/test_app_config/Monitor.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//
// ****************************************************************************
// * Copyright (C) 2017, International Business Machines Corporation *
// * All rights reserved. *
// ****************************************************************************
//


use com.ibm.streamsx.monitoring.jobs::*;
use com.ibm.streamsx.monitoring.jmx::ConnectionNotification;

/**
* This sample application demonstrates the usage of the JobStatusMonitor operator and monitors all jobs and reports the Job/PE status events
*
* @param applicationConfigurationName
* Specifies the name of [https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.2.0/com.ibm.streams.admin.doc/doc/creating-secure-app-configs.html|application configuration object] that can contain domainId, connectionURL, user, password, and filterDocument properties. The application configuration overrides values that are specified with the corresponding parameters.
*
* @param user
* Specifies the user that is required for the JMX connection. If the **applicationConfigurationName** parameter is specified, the application configuration can override this parameter value.
*
* @param password
* Specifies the password that is required for the JMX connection. If the **applicationConfigurationName** parameter is specified, the application configuration can override this parameter value.
*
* @param filterDocument
* Specifies the either a path to a JSON-formatted document or a JSON-formatted String that specifies the domain, instance and job filters as regular expressions. Each regular expression must follow the rules that are specified for Java [https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html|Pattern]. If the **applicationConfigurationName** parameter is specified, the application configuration can override this parameter value.
* If parameter is not set, then all jobs in current domain and instance are monitored.
*
* @param connectionURL
* Specifies the connection URL as returned by the `streamtool getjmxconnect` command. If the **applicationConfigurationName** parameter is specified, the application configuration can override this parameter value.If not specified and the domainId parameter value equals the domain id under which this operator is running, then the operator uses the `streamtool getjmxconnect` command to get the value.
*
* @param domainId
* Specifies the domain id that is monitored. If no domain id is specified, the domain id under which this operator is running is used. If the **applicationConfigurationName** parameter is specified, the application configuration can override this parameter value.
*
* @param sslOption
* Specifies the sslOption that is required for the JMX connection. If the **applicationConfigurationName** parameter is specified, the application configuration can override this parameter value. If not specified and the domainId parameter value equals the domain id under which this operator is running, then the operator uses the `streamtool getdomainproperty` command to get the value.
*/
composite Monitor {

param
expression<rstring> $applicationConfigurationName: getSubmissionTimeValue("applicationConfigurationName", ""); // optional, if user and password are set
expression<rstring> $user: getSubmissionTimeValue("user", ""); // optional, if set in application configuration
expression<rstring> $password: getSubmissionTimeValue("password", ""); // optional, if set in application configuration
expression<rstring> $filterDocument: getSubmissionTimeValue("filterDocument", ""); // uses default, if not set
expression<rstring> $connectionURL: getSubmissionTimeValue("connectionURL", ""); // optional, if not set, then domain settings are used, where the PE is running
expression<rstring> $domainId: getSubmissionTimeValue("domainId", ""); // optional, if not set, then domain settings are used, where the PE is running
expression<rstring> $sslOption: getSubmissionTimeValue("sslOption", ""); // optional, if not set, then domain settings are used, where the PE is running

graph

/*
* The JobStatusMonitor generates a tuple for each notified PE status change.
*/
(stream<JobStatusNotification> ChangeNotifications;
stream<ConnectionNotification> ConnectionNotifications
) = JobStatusMonitor() {
param
applicationConfigurationName: $applicationConfigurationName;
user: $user;
password: $password;
filterDocument: $filterDocument;
connectionURL: $connectionURL;
domainId: $domainId;
sslOption: $sslOption;
}

/*
* Verify the received notifications.
*/
(stream <rstring result> SaveDone1) as ChangeNotificationTracer = Custom(ChangeNotifications as I; ConnectionNotifications as C) {
logic
state: {
mutable boolean done1Sent = false;
}
onTuple I: {
printStringLn((rstring)I);
if (I.notifyType == "com.ibm.streams.management.pe.changed") {
if (!done1Sent) {
submit({result="TEST_RESULT_PASS"}, SaveDone1);
submit(Sys.WindowMarker, SaveDone1);
done1Sent = true;
}
}
}
onTuple C: {
printStringLn((rstring)C);
}
}

() as Done1 = FileSink(SaveDone1 as I) {
param file: "done_1"; format: csv; flush: 1u; quoteStrings : false; writePunctuations: false; closeMode: punct; moveFileToDirectory: dataDirectory()+"/..";
}

config
placement : partitionColocation("MONITOR");

}
Loading

0 comments on commit 6b079fa

Please sign in to comment.