Skip to content

KdbConsumer

Jas edited this page Mar 9, 2017 · 1 revision

KdbConsumer and KdbConsumerThread

These classes provide the ability to subscribe to a kdb+ process (via .u.sub) and notify a listener when an update is received.

Please note that this consumer only supports filtering by table and does not pass a symbol parameter. It will call (`.u.sub;tables) to subscribe.

KdbConsumerThread simply wraps KdbConsumer into its own thread to take the processing of the update messages from the constructing thread.

Configuration

To configure a kdb consumer, you need to specify the following:

  • The target kdb+ process as a KdbProcess object
  • The list of tables you wish to subscribe to as a List<String>. If you want to subscribe to all tables, pass an empty list (e.g. new ArrayList<>()).
  • One of either a raw data consumer or kdb table consumer:
    • A raw data consumer (defined by implementing IKdbRawDataConsumer) will receive the object received from the kdb process unmodified. Useful if the data sent is non-standard
    • A kdb table consumer (defined by implementing IKdbTableConsumer) will receive tables that have been parsed from the received information. See the section below on how to get the consumer to parse the inbound data appropriately.

When using KdbConsumerThread, you can also optionally specify a listen when the consumer fails to successfully subscribe to the process by implementing IKdbConsumerFailedListener.

Table Consumer Support

In order for the consumer to successfully converted the received data into a KdbTable, the received data must adhere to the following rules:

  • A list of 3 elements
  • First element is either "upd" or ".u.upd"
  • Second element is the table name as a symbol
  • Third element is an unkeyed table

Consumer Failed Listener

When using the KdbConsumerThread, any exception in the consumer will simply cause the thread to die. The rest of the application doesn't have a useful way of knowing that the consumer has died and therefore IKdbConsumerFailedListener was added to allow this notification.

By supplying this listener, it will be notified when the subscription fails either due to a connection error (CONNECTION_FAILED) or a subscription error (SUBSCRIPTION_FAILED).

The subscription is classified as successfully if the consumer connects to the kdb process, calls .u.sub with the list of tables and the returned object is not null.

Example

public void startConsumer() throws KdbTargetProcessUnavailableException {
	KdbProcess target = new KdbProcess("localhost", 12345);
	
	IKdbTableConsumer processor = new IKdbTableConsumer() {
		@Override
		public void consume(KdbTable table) throws DataConsumerException {
			System.out.println("Table update received: " + table.getTableName());
		}
	};
	
	// Connects and starts listening for updates on own thread. Thread name is KdbConsumer-*host*:*port*
	new KdbConsumerThread(target, new ArrayList<>(), null, processor, null);
}
Clone this wiki locally