Skip to content

Commit ea192fa

Browse files
committed
consumer: Add support for standard .u.sub[tables; syms] [#1]
1 parent 9735252 commit ea192fa

File tree

3 files changed

+62
-27
lines changed

3 files changed

+62
-27
lines changed

pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@
6262
<version>2.0.0.0</version>
6363
<scope>test</scope>
6464
</dependency>
65+
<dependency>
66+
<groupId>org.slf4j</groupId>
67+
<artifactId>slf4j-simple</artifactId>
68+
<version>1.7.22</version>
69+
<scope>test</scope>
70+
</dependency>
6571
</dependencies>
6672

6773
<build>

src/main/java/com/buabook/kdb/consumer/KdbConsumer.java

+56-26
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
* <h3>KDB Data Consumer</h3>
2727
* <p>Provides the ability to consume real-time streaming data from a KDB process
2828
* into a {@link KdbTable} for use within a Java application</p>
29-
* (c) 2014 - 2015 Sport Trades Ltd
29+
* (c) 2014 - 2019 Sport Trades Ltd
3030
*
3131
* @author Jas Rajasansir
32-
* @version 1.1.0
32+
* @version 1.2.0
3333
* @since 23 Apr 2014
3434
*/
3535
public class KdbConsumer extends KdbConnection {
@@ -62,6 +62,9 @@ public class KdbConsumer extends KdbConnection {
6262
private KdbConsumer(KdbProcess server, List<String> tables, KdbDict subscriptionConfiguration, IKdbRawDataConsumer rawDataConsumer, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
6363
super(server);
6464

65+
if(tables == null && subscriptionConfiguration == null)
66+
throw new NullPointerException("Must provide at least one subscription method - list of tables or dictionary");
67+
6568
if(rawDataConsumer == null && tableConsumer == null)
6669
throw new NullPointerException("Must provied either a raw data or KdbTable consuming object, or both to this object");
6770

@@ -74,15 +77,16 @@ private KdbConsumer(KdbProcess server, List<String> tables, KdbDict subscription
7477
log.info("Connected to kdb process [ Target: " + server.toString() + " ]");
7578
}
7679

80+
7781
/**
78-
* Generates a new kdb consumer (which is generally a consumer from a kdb TickerPlant)
82+
* Generates a new kdb consumer (which is generally a consumer from a kdb TickerPlant). By specifying a list of tables, the subscription API
83+
* is assumed to be <code>.u.sub[tables; syms]</code> where <code>syms</code> is hardcoded to null symbol
7984
* @param server The kdb process to connect to
8085
* @param tables The list of tables that should be subscribed to. <b>NOTE</b>: This cannot be null, pass an empty list
8186
* @param rawDataConsumer A listener object that will consume every message from the kdb process
8287
* @param tableConsumer A listener object that will consume only table messages from the kdb process
8388
* @throws KdbTargetProcessUnavailableException If the consumer cannot connect to the target kdb process
84-
* @throws NullPointerException If either <code>tables</code> or <code>syms</code> is null. Also if both <code>rawDataConsumer</code>
85-
* and <code>tableConsumer</code> null, the constructor must be passed one or the other
89+
* @throws NullPointerException If <code>tables</code> is null or if both <code>rawDataConsumer</code> and <code>tableConsumer</code> are null
8690
*/
8791
protected KdbConsumer(KdbProcess server, List<String> tables, IKdbRawDataConsumer rawDataConsumer, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
8892
this(server, tables, null, rawDataConsumer, tableConsumer);
@@ -91,18 +95,41 @@ protected KdbConsumer(KdbProcess server, List<String> tables, IKdbRawDataConsume
9195
throw new NullPointerException("Tables for a consumer cannot be null. Provide an empty list for ALL tables.");
9296
}
9397

98+
/**
99+
* Generates a new kdb consumer (which is generally a consumer from a kdb TickerPlant). By specifying a dictionary subscription configuration
100+
* the subscription API is assumed to be <code>.u.sub[subDict]</code>.
101+
* @param server The kdb process to connect to
102+
* @param subscriptionConfiguration The dictionary configuration for the subscription
103+
* @param rawDataConsumer A listener object that will consume every message from the kdb process
104+
* @param tableConsumer A listener object that will consume only table messages from the kdb process
105+
* @throws KdbTargetProcessUnavailableException If the consumer cannot connect to the target kdb process
106+
* @throws NullPointerException If dictionary configuration object is null or empty or if both <code>rawDataConsumer</code> and
107+
* <code>tableConsumer</code> are null
108+
*/
94109
protected KdbConsumer(KdbProcess server, KdbDict subscriptionConfiguration, IKdbRawDataConsumer rawDataConsumer, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
95110
this(server, null, subscriptionConfiguration, rawDataConsumer, tableConsumer);
96111

97112
if(subscriptionConfiguration == null || subscriptionConfiguration.isEmpty())
98113
throw new NullPointerException("No subscription configuration supplied. Cannot subscribe to process");
99114
}
100115

101-
/** @see #KdbConsumer(KdbProcess, List, IKdbRawDataConsumer, IKdbTableConsumer) */
116+
117+
/**
118+
* Table list-based kdb consumer with only a table ({@link IKdbTableConsumer}) interface specified
119+
* @see #KdbConsumer(KdbProcess, List, IKdbRawDataConsumer, IKdbTableConsumer)
120+
*/
102121
public KdbConsumer(KdbProcess server, List<String> tables, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
103122
this(server, tables, null, tableConsumer);
104123
}
105124

125+
/**
126+
* Dict-based kdb consumer with only a table ({@link IKdbTableConsumer}) interface specified
127+
* @see #KdbConsumer(KdbProcess, KdbDict, IKdbRawDataConsumer, IKdbTableConsumer)
128+
*/
129+
public KdbConsumer(KdbProcess server, KdbDict subscriptionConfiguration, IKdbTableConsumer tableConsumer) throws KdbTargetProcessUnavailableException {
130+
this(server, subscriptionConfiguration, null, tableConsumer);
131+
}
132+
106133

107134
/**
108135
* Once connection to the process has been established (performed during object construction), then this function
@@ -153,31 +180,34 @@ public void reconnect() {
153180
* @return <code>True</code> if the subscription result from the kdb process is not null, <code>false</code> otherwise
154181
*/
155182
private Boolean subscribe() throws UnsupportedOperationException {
156-
Object subscribeObject = null;
183+
Object subscribeResult = null;
157184

158185
if(subscriptionTables != null) {
159-
if(subscriptionTables.isEmpty())
160-
subscribeObject = "";
161-
else
162-
subscribeObject = subscriptionTables.toArray();
186+
// Assume subscription API is '.u.sub[tables; syms]' where syms is always a null symbol
187+
Object tableSub = "";
163188

189+
if(! subscriptionTables.isEmpty())
190+
tableSub = subscriptionTables.toArray();
191+
164192
log.info("Attempting to subscribe to kdb process [ Process: {} ] [ Standard Table Subscription: {} ]", getRemoteProcess(), Printers.listToString(subscriptionTables));
165-
} else if(subscriptionConfiguration != null) {
166-
subscribeObject = subscriptionConfiguration.convertToDict();
167193

194+
try {
195+
subscribeResult = getConnection().k(SUB_FUNCTION, tableSub, "");
196+
} catch (KException | IOException e) {
197+
log.error("Subscription to kdb process failed [ Process: {} ]. Error - {}", getRemoteProcess(), e.getMessage());
198+
return false;
199+
}
200+
201+
} else if(subscriptionConfiguration != null) {
202+
// Assume subscription API is '.u.sub[subDict]' where subDict is a dictionary
168203
log.info("Attempting to subscribe to kdb process [ Process: {} ] [ Dict Config Subscription: {} ]", getRemoteProcess(), subscriptionConfiguration);
169-
} else {
170-
log.error("No subscription configuration or subscription tables specified. Cannot subscribe to process!");
171-
throw new UnsupportedOperationException("No subscription configuration or subscription tables");
172-
}
173-
174-
Object subscribeResult = null;
175-
176-
try {
177-
subscribeResult = getConnection().k(SUB_FUNCTION, subscribeObject);
178-
} catch (KException | IOException e) {
179-
log.error("Subscription to kdb process failed [ Process: {} ]. Error - {}", getRemoteProcess(), e.getMessage());
180-
return false;
204+
205+
try {
206+
subscribeResult = getConnection().k(SUB_FUNCTION, subscriptionConfiguration.convertToDict());
207+
} catch (KException | IOException e) {
208+
log.error("Subscription to kdb process failed [ Process: {} ]. Error - {}", getRemoteProcess(), e.getMessage());
209+
return false;
210+
}
181211
}
182212

183213
if(subscribeResult instanceof Dict) {
@@ -193,7 +223,7 @@ private Boolean subscribe() throws UnsupportedOperationException {
193223
}
194224
}
195225

196-
return subscribeResult != null;
226+
return ( subscribeResult instanceof Boolean ) || ( subscribeResult instanceof Dict );
197227
}
198228

199229
/**

src/main/java/com/buabook/kdb/publisher/KdbPublisherThread.java

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public KdbPublisherThread(KdbProcess server, Long threadSleepMs) throws KdbTarge
6161
this(server, threadSleepMs, null);
6262
}
6363

64-
@SuppressWarnings("resource")
6564
public KdbPublisherThread(KdbProcess server, Long threadSleepMs, Duration resetConnectionDuration) throws KdbTargetProcessUnavailableException {
6665
this(new KdbPublisher(server, resetConnectionDuration), threadSleepMs);
6766
}

0 commit comments

Comments
 (0)