Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[BitMex] Authenticated Channels #267

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5b622dc
4.3.13-SNAPSHOT
mdvx Dec 8, 2018
dcf4ca1
Getting access to authenticated channels,
mdvx Jan 8, 2019
780c46e
4.3.14-SNAPSHOT
mdvx Jan 8, 2019
8d69f38
CoinbaseProStreamingExchange add getStreamingService()
mdvx Jan 8, 2019
585d0c6
getChannelNameFromMessage support for naked channels
mdvx Jan 8, 2019
fd1e93c
remove unnecessary API_URI_SANDBOX
mdvx Jan 8, 2019
07a62f5
remove unnecessary API_URI_SANDBOX
mdvx Jan 8, 2019
7df1989
use init service for sandbox request detect
mdvx Jan 11, 2019
ae51fb0
support naked channel Names
mdvx Jan 11, 2019
c82ca98
saving authentication message builder
mdvx Jan 12, 2019
07a8f30
send authenticationMessage for BitMex
mdvx Jan 12, 2019
37eb5c3
send authenticationMessage for BitMex
mdvx Jan 12, 2019
730f243
resubscribeChannels with authentication
mdvx Jan 12, 2019
a047c64
CoinbasePro, use_Sandbox
mdvx Jan 12, 2019
93bf0b4
BitmexDigest.createInstance null check
mdvx Jan 13, 2019
ef2632d
unique snapshot name 4.3.14-MDVX
mdvx Jan 31, 2019
dd56579
unique snapshot name 4.3.14-MDVX
mdvx Jan 31, 2019
cf395c9
Merge remote-tracking branch 'mdvx/develop' into develop
mdvx Jan 31, 2019
1d0ac63
Merge branch 'develop' of https://github.com/bitrich-info/xchange-str…
mdvx Jan 31, 2019
3714584
fixed my branch
mdvx Mar 10, 2019
a11dfe5
4.3.16-MDVX
mdvx Mar 22, 2019
856118c
remove comments
mdvx Apr 1, 2019
6832744
fix bitfinex snapshot
mdvx Apr 1, 2019
5317da3
BitMex null test
mdvx May 28, 2019
86b3e3a
POM updates to deploy maven with: mvn -U clean deploy -DskipTests -D…
mdvx May 28, 2019
6f52ed0
remove getNonceFactory()
mdvx Jun 16, 2019
f12bd8d
defend against missing levels after reconnect
mdvx Jun 16, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>info.bitrich.xchange-stream</groupId>
<artifactId>xchange-stream-parent</artifactId>
<packaging>pom</packaging>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>

<modules>
<module>xchange-stream-core</module>
Expand Down Expand Up @@ -61,12 +61,12 @@

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<id>mainbloq-maven-repo</id>
<url>s3://mainbloq-maven-repo/snapshot</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
<id>mainbloq-maven-repo</id>
<url>s3://mainbloq-maven-repo/release</url>
</repository>
</distributionManagement>

Expand All @@ -80,7 +80,7 @@
</repositories>

<properties>
<xchange.version>4.3.14</xchange.version>
<xchange.version>4.3.16</xchange.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down Expand Up @@ -143,6 +143,14 @@
<version>2.5.3</version>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>com.gkatzioura.maven.cloud</groupId>
<artifactId>s3-storage-wagon</artifactId>
<version>1.9</version>
</extension>
</extensions>

</build>
<profiles>
<profile>
Expand Down Expand Up @@ -212,4 +220,5 @@
</build>
</profile>
</profiles>

</project>
2 changes: 1 addition & 1 deletion service-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,16 @@
package info.bitrich.xchangestream.service.netty;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import info.bitrich.xchangestream.service.ConnectableService;
import info.bitrich.xchangestream.service.exception.NotConnectedException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
Expand All @@ -50,8 +23,21 @@
import io.reactivex.CompletableEmitter;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NettyStreamingService<T> extends ConnectableService {
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public abstract class NettyStreamingService<T> {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(10);
private static final Duration DEFAULT_RETRY_DURATION = Duration.ofSeconds(15);
Expand Down Expand Up @@ -107,8 +93,7 @@ public NettyStreamingService(String apiUrl, int maxFramePayloadLength, Duration
}
}

@Override
protected Completable openConnection() {
public Completable connect() {
return Completable.create(completable -> {
try {

Expand Down Expand Up @@ -157,7 +142,6 @@ uri, WebSocketVersion.V13, null, true, getCustomHeaders(), maxFramePayloadLength
eventLoopGroup = new NioEventLoopGroup(2);
b.group(eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, java.lang.Math.toIntExact(connectionTimeout.toMillis()))
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
Expand Down Expand Up @@ -207,11 +191,7 @@ protected void initChannel(SocketChannel ch) {
handleError(completable, throwable);
}
}).doOnError(t -> {
if (t instanceof WebSocketHandshakeException) {
LOG.warn("Problem with connection: {} - {}", t.getClass(), t.getMessage());
} else {
LOG.warn("Problem with connection", t);
}
LOG.warn("Problem with connection", t);
reconnFailEmitters.forEach(emitter -> emitter.onNext(t));
}).retryWhen(new RetryWithDelay(retryDuration.toMillis()))
.doOnComplete(() -> {
Expand Down Expand Up @@ -330,7 +310,20 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
}).share();
}

public String getAuthenticateMessage() throws IOException {
return null;
}

public void resubscribeChannels() {
try {
final String authenticateMessage = getAuthenticateMessage();
if (authenticateMessage != null)
sendMessage(authenticateMessage);

} catch (IOException e) {
LOG.error("Failed to sendMessage getAuthenticateMessage");
}

for (String channelId : channels.keySet()) {
try {
Subscription subscription = channels.get(channelId);
Expand Down Expand Up @@ -365,6 +358,9 @@ protected void handleError(T message, Throwable t) {

protected void handleChannelMessage(String channel, T message) {
NettyStreamingService<T>.Subscription subscription = channels.get(channel);
if (subscription == null)
subscription = channels.get(channel.split(":")[0]);

if (subscription == null) {
LOG.debug("Channel has been closed {}.", channel);
return;
Expand Down
2 changes: 1 addition & 1 deletion service-pubnub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-pusher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-wamp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-binance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.14-SNAPSHOT</version>
<version>4.3.16-MDVX</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;

import io.reactivex.Completable;
import io.reactivex.Observable;

import si.mazi.rescu.RestProxyFactory;

import org.knowm.xchange.binance.BinanceAuthenticated;
import org.knowm.xchange.binance.BinanceExchange;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.service.BaseExchangeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import si.mazi.rescu.RestProxyFactory;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,15 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.BinanceRawTrade;
import info.bitrich.xchangestream.binance.dto.BinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.DepthBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.ExecutionReportBinanceUserTransaction;
import info.bitrich.xchangestream.binance.dto.*;
import info.bitrich.xchangestream.binance.dto.ExecutionReportBinanceUserTransaction.ExecutionType;
import info.bitrich.xchangestream.binance.dto.TickerBinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.TradeBinanceWebsocketTransaction;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;

import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook;
import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h;
Expand All @@ -43,9 +34,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.DEPTH_UPDATE;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TICKER_24_HR;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TRADE;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.*;

public class BinanceStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package info.bitrich.xchangestream.binance;

import java.io.IOException;

import com.fasterxml.jackson.databind.JsonNode;

import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;

import java.io.IOException;

public class BinanceStreamingService extends JsonNettyStreamingService {

private final ProductSubscription productSubscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;

import org.knowm.xchange.binance.BinanceAuthenticated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package info.bitrich.xchangestream.binance;

import com.fasterxml.jackson.databind.JsonNode;

import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;

import io.reactivex.Observable;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package info.bitrich.xchangestream.binance.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.math.BigDecimal;

import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.currency.Currency;
import org.knowm.xchange.dto.trade.UserTrade;

import java.math.BigDecimal;

public class ExecutionReportBinanceUserTransaction extends ProductBinanceWebSocketTransaction {

public enum ExecutionType {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package info.bitrich.xchangestream.binance;

import info.bitrich.xchangestream.core.ProductSubscription;

import org.junit.Assert;
import org.junit.Test;
import org.knowm.xchange.currency.CurrencyPair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.BeforeClass;
import org.junit.Test;
import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.BeforeClass;
import org.junit.Test;

Expand Down
Loading