Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[BitMex] Authenticated Channels #267

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5b622dc
4.3.13-SNAPSHOT
mdvx Dec 8, 2018
dcf4ca1
Getting access to authenticated channels,
mdvx Jan 8, 2019
780c46e
4.3.14-SNAPSHOT
mdvx Jan 8, 2019
8d69f38
CoinbaseProStreamingExchange add getStreamingService()
mdvx Jan 8, 2019
585d0c6
getChannelNameFromMessage support for naked channels
mdvx Jan 8, 2019
fd1e93c
remove unnecessary API_URI_SANDBOX
mdvx Jan 8, 2019
07a62f5
remove unnecessary API_URI_SANDBOX
mdvx Jan 8, 2019
7df1989
use init service for sandbox request detect
mdvx Jan 11, 2019
ae51fb0
support naked channel Names
mdvx Jan 11, 2019
c82ca98
saving authentication message builder
mdvx Jan 12, 2019
07a8f30
send authenticationMessage for BitMex
mdvx Jan 12, 2019
37eb5c3
send authenticationMessage for BitMex
mdvx Jan 12, 2019
730f243
resubscribeChannels with authentication
mdvx Jan 12, 2019
a047c64
CoinbasePro, use_Sandbox
mdvx Jan 12, 2019
93bf0b4
BitmexDigest.createInstance null check
mdvx Jan 13, 2019
ef2632d
unique snapshot name 4.3.14-MDVX
mdvx Jan 31, 2019
dd56579
unique snapshot name 4.3.14-MDVX
mdvx Jan 31, 2019
cf395c9
Merge remote-tracking branch 'mdvx/develop' into develop
mdvx Jan 31, 2019
1d0ac63
Merge branch 'develop' of https://github.com/bitrich-info/xchange-str…
mdvx Jan 31, 2019
3714584
fixed my branch
mdvx Mar 10, 2019
a11dfe5
4.3.16-MDVX
mdvx Mar 22, 2019
856118c
remove comments
mdvx Apr 1, 2019
6832744
fix bitfinex snapshot
mdvx Apr 1, 2019
5317da3
BitMex null test
mdvx May 28, 2019
86b3e3a
POM updates to deploy maven with: mvn -U clean deploy -DskipTests -D…
mdvx May 28, 2019
6f52ed0
remove getNonceFactory()
mdvx Jun 16, 2019
f12bd8d
defend against missing levels after reconnect
mdvx Jun 16, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>info.bitrich.xchange-stream</groupId>
<artifactId>xchange-stream-parent</artifactId>
<packaging>pom</packaging>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mdvx! Thanks for contributing, and welcome. A few review comments from me. Firstly, could you remove the version updates? They make the merge harder - we already have #268 lined up to do this next version update.

Copy link
Contributor Author

@mdvx mdvx Jan 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it affect the normal market data channels (e.g. adding more information to trades), or does it make more channels available

It make more channels available, beyond orderbook and klines, I can now subscribe to position, wallet, order and execution channels

My goal is to provide all data required for trading as streams, for every exchange that can do it.

Another avenue I have been looking at is Manifold, but I would rather we have a proper supported api (I have several streaming exchanges in the works)

Please feel free to take what you want can from this PR (I am not a git guru)


<modules>
<module>xchange-stream-core</module>
Expand Down Expand Up @@ -80,7 +80,7 @@
</repositories>

<properties>
<xchange.version>4.3.13</xchange.version>
<xchange.version>4.3.14-SNAPSHOT</xchange.version>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change this to 4.3.14? It's been released now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
2 changes: 1 addition & 1 deletion service-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,15 @@
package info.bitrich.xchangestream.service.netty;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import info.bitrich.xchangestream.service.exception.NotConnectedException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.logging.LogLevel;
Expand All @@ -48,6 +23,19 @@
import io.reactivex.CompletableEmitter;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public abstract class NettyStreamingService<T> {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
Expand Down Expand Up @@ -322,7 +310,20 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
}).share();
}

public String getAuthenticateMessage() throws IOException {
return null;
}

public void resubscribeChannels() {
try {
final String authenticateMessage = getAuthenticateMessage();
if (authenticateMessage != null)
sendMessage(authenticateMessage);

} catch (IOException e) {
LOG.error("Failed to sendMessage getAuthenticateMessage");
}

for (String channelId : channels.keySet()) {
try {
Subscription subscription = channels.get(channelId);
Expand Down Expand Up @@ -357,6 +358,9 @@ protected void handleError(T message, Throwable t) {

protected void handleChannelMessage(String channel, T message) {
NettyStreamingService<T>.Subscription subscription = channels.get(channel);
if (subscription == null)
subscription = channels.get(channel.split(":")[0]);

if (subscription == null) {
LOG.debug("Channel has been closed {}.", channel);
return;
Expand Down
2 changes: 1 addition & 1 deletion service-pubnub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-pusher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-wamp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-binance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitfinex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitflyer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitmex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
*/
public class BitmexStreamingExchange extends BitmexExchange implements StreamingExchange {
private static final String API_URI = "wss://www.bitmex.com/realtime";
private static final String API_SANDBOX_URI = "wss://testnet.bitmex.com/realtime";

private final BitmexStreamingService streamingService;
private BitmexStreamingService streamingService;
private BitmexStreamingMarketDataService streamingMarketDataService;

public BitmexStreamingExchange() {
this.streamingService = new BitmexStreamingService(API_URI);
}

protected BitmexStreamingExchange(BitmexStreamingService streamingService) {
Expand All @@ -28,6 +28,11 @@ protected BitmexStreamingExchange(BitmexStreamingService streamingService) {
@Override
protected void initServices() {
super.initServices();
if (exchangeSpecification.getExchangeSpecificParametersItem(USE_SANDBOX).equals(true))
this.streamingService = new BitmexStreamingService(API_SANDBOX_URI, exchangeSpecification);
else
this.streamingService = new BitmexStreamingService(API_URI, exchangeSpecification);

streamingMarketDataService = new BitmexStreamingMarketDataService(streamingService);
}

Expand Down Expand Up @@ -65,4 +70,8 @@ public boolean isAlive() {

@Override
public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than exposing the streaming service directly, perhaps it might be cleaner to add some specialised methods to BitfinexStreamingExchange to expose what you need as proper streams? I have an example in my (still unmerged PR for Binance: #246

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, I wanted to open this up for discussuion

public BitmexStreamingService getStreamingService() {
return streamingService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public BitmexStreamingMarketDataService(BitmexStreamingService streamingService)

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = currencyPair.base.getCurrencyCode() + currencyPair.counter.getCurrencyCode();
String channelName = String.format("orderBookL2:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,56 @@
package info.bitrich.xchangestream.bitmex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import info.bitrich.xchangestream.bitmex.dto.BitmexWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.bitmex.dto.BitmexWebSocketTransaction;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.reactivex.Observable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.bitmex.service.BitmexDigest;
import org.knowm.xchange.utils.nonce.ExpirationTimeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import si.mazi.rescu.SynchronizedValueFactory;

import java.io.IOException;

/**
* Created by Lukas Zaoralek on 13.11.17.
*/
public class BitmexStreamingService extends JsonNettyStreamingService {
private static final Logger LOG = LoggerFactory.getLogger(BitmexStreamingService.class);
private ExchangeSpecification exchangeSpecification;

public BitmexStreamingService(String apiUrl) {
super(apiUrl, Integer.MAX_VALUE);
this.exchangeSpecification = null;
}
public BitmexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecification) {
super(apiUrl, Integer.MAX_VALUE);
this.exchangeSpecification = exchangeSpecification;
}

@Override
@Override
protected void handleMessage(JsonNode message) {
// if (message.has("info") && message.get("info").asText().startsWith("Welcome ")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this commented-out code be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

// if (exchangeSpecification.getApiKey() != null && exchangeSpecification.getSecretKey() != null) {
// try {
//TODO: send this onConnect
// sendMessage(getAuthenticateMessage());
// } catch (Exception e) {
// handleError(message, e);
// }
// }
// return;
// }
if (message.has("info") || message.has("success")) {
return;
}
if (message.has("error")) {
String error = message.get("error").asText();
LOG.error("Error with message: " + error);
LOG.debug("Error with message: " + message.toString());
return;
}

Expand All @@ -52,14 +67,19 @@ public Observable<BitmexWebSocketTransaction> subscribeBitmexChannel(String chan
BitmexWebSocketTransaction transaction = objectMapper.treeToValue(s, BitmexWebSocketTransaction.class);
return transaction;
})
.share();
.share();
}

@Override
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
String instrument = message.get("data").get(0).get("symbol").asText();
String table = message.get("table").asText();
return String.format("%s:%s", table, instrument);
final JsonNode data0 = message.get("data").get(0);
final JsonNode symbolNode = data0 == null ? null : data0.get("symbol");
final JsonNode tableNode = message.get("table");

if (symbolNode == null)
return tableNode.asText();
else
return String.format("%s:%s", tableNode.asText(), symbolNode.asText());
}

@Override
Expand All @@ -73,4 +93,30 @@ public String getUnsubscribeMessage(String channelName) throws IOException {
BitmexWebSocketSubscriptionMessage subscribeMessage = new BitmexWebSocketSubscriptionMessage("unsubscribe", new String[]{});
return objectMapper.writeValueAsString(subscribeMessage);
}

@Override // called by NettyStreamingService.resubscribeChannels
public String getAuthenticateMessage() throws IOException {
// connect().blockingAwait();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented out code


BitmexDigest bitmexDigest = BitmexDigest.createInstance(exchangeSpecification.getSecretKey(), exchangeSpecification.getApiKey() );
if (bitmexDigest == null)
return null;

SynchronizedValueFactory<Long> nonceFactory = new ExpirationTimeFactory(30);

long nonce = nonceFactory.createValue();
String payload = "GET/realtime" + nonce;
String digestString = bitmexDigest.digestString(payload);

BitmexWebSocketSubscriptionMessage subscribeMessage =
new BitmexWebSocketSubscriptionMessage("authKeyExpires",
new Object[]{exchangeSpecification.getApiKey(), nonce, digestString});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you just use new String[] here rather than new Object[], and avoid the change to BitmexWebSocketSubscriptionMessage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i first iteration passed a digest object, but a string is suffecient now


//sendMessage( );
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented out code


return objectMapper.writeValueAsString(subscribeMessage);

//streamingService.sendAuthKeyExpires(new Object[]{apiKey, nonce, digestString});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented out code

//Thread.sleep(1500);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ public class BitmexWebSocketSubscriptionMessage {
private String op;

@JsonProperty(ARGS)
private String[] args;
private Object[] args;

public BitmexWebSocketSubscriptionMessage(String op, String[] args) {
this.op = op;
this.args = args;
}
public BitmexWebSocketSubscriptionMessage(String op, Object[] args) {
this.op = op;
this.args = args;
}
}
2 changes: 1 addition & 1 deletion xchange-bitstamp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-cexio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.13-SNAPSHOT</version>
<version>4.3.14-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Loading