Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #329 from henryxwong/bitmex-merge
Browse files Browse the repository at this point in the history
Bitmex misc merge
  • Loading branch information
badgerwithagun authored Jun 17, 2019
2 parents ba23805 + b180f1b commit 9de93b2
Show file tree
Hide file tree
Showing 28 changed files with 2,010 additions and 77 deletions.
2 changes: 2 additions & 0 deletions xchange-bitmex/bitmex.secret.keys.origin
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bitmex.api.key=
bitmex.secret.key=
46 changes: 45 additions & 1 deletion xchange-bitmex/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
Expand All @@ -9,6 +10,44 @@

<artifactId>xchange-bitmex</artifactId>

<profiles>
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>integration-tests</id>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<includes>
<include>**/*IT.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>info.bitrich.xchange-stream</groupId>
Expand All @@ -25,5 +64,10 @@
<artifactId>xchange-bitmex</artifactId>
<version>${xchange.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>4.5.5</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package info.bitrich.xchangestream.bitmex;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.Charset;

/**
* Created by heath on 2018/3/1.
*/
public class BitmexAuthenticator {

public static String getSHA256String(String str, String key) {

try {
Charset asciiCs = Charset.forName("US-ASCII");
SecretKeySpec signingKey = new SecretKeySpec(asciiCs.encode(key).array(), "HmacSHA256");
Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
sha256_HMAC.init(signingKey);
byte[] mac_data = sha256_HMAC.doFinal(asciiCs.encode(str).array());
StringBuilder result = new StringBuilder();
for (final byte element : mac_data) {
result.append(Integer.toString((element & 0xff) + 0x100, 16).substring(1));
}
// System.out.println("SHA256String Result:[" + result + "]");
return result.toString().toUpperCase();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

public static String generateSignature(String secret, String verb, String url, String nonce, String data) {
String message = verb + url + nonce + data;
// System.out.println(message);
return getSHA256String(message, secret);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,46 @@
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.netty.channel.ChannelHandlerContext;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.bitmex.BitmexExchange;
import si.mazi.rescu.SynchronizedValueFactory;

/**
* Created by Lukas Zaoralek on 12.11.17.
*/
public class BitmexStreamingExchange extends BitmexExchange implements StreamingExchange {
private static final String API_URI = "wss://www.bitmex.com/realtime";
private static final String TESTNET_API_URI = "wss://testnet.bitmex.com/realtime";

private final BitmexStreamingService streamingService;
private BitmexStreamingService streamingService;
private BitmexStreamingMarketDataService streamingMarketDataService;

public BitmexStreamingExchange() {
this.streamingService = new BitmexStreamingService(API_URI);
}

protected BitmexStreamingExchange(BitmexStreamingService streamingService) {
this.streamingService = streamingService;
}

@Override
protected void initServices() {
super.initServices();
streamingMarketDataService = new BitmexStreamingMarketDataService(streamingService);
streamingService = createStreamingService();
streamingMarketDataService = new BitmexStreamingMarketDataService(streamingService, this);
}

@Override
public Completable connect(ProductSubscription... args) {
return streamingService.connect();
}

private BitmexStreamingService createStreamingService() {
ExchangeSpecification exchangeSpec = getExchangeSpecification();
Boolean useSandbox = (Boolean) exchangeSpec.getExchangeSpecificParametersItem(USE_SANDBOX);
String uri = useSandbox == null || !useSandbox ? API_URI : TESTNET_API_URI;
BitmexStreamingService streamingService = new BitmexStreamingService(uri, exchangeSpec.getApiKey(), exchangeSpec.getSecretKey());
applyStreamingSpecification(exchangeSpec, streamingService);
return streamingService;
}

@Override
public Completable disconnect() {
return streamingService.disconnect();
Expand All @@ -53,11 +60,33 @@ public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}

@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}

@Override
public boolean isAlive() {
return streamingService.isSocketOpen();
}

@Override
public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); }

@Override
public Observable<Long> messageDelay() {
return Observable.create(delayEmitter -> {
streamingService.addDelayEmitter(delayEmitter);
});
}

@Override
public void resubscribeChannels() {
streamingService.resubscribeChannels();
}
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,65 @@
package info.bitrich.xchangestream.bitmex;

import info.bitrich.xchangestream.bitmex.dto.BitmexLimitOrder;
import info.bitrich.xchangestream.bitmex.dto.BitmexOrderbook;
import info.bitrich.xchangestream.bitmex.dto.BitmexTicker;
import info.bitrich.xchangestream.bitmex.dto.BitmexTrade;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitmex.dto.*;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import org.knowm.xchange.bitmex.BitmexExchange;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.io.IOException;
import java.util.*;

/**
* Created by Lukas Zaoralek on 13.11.17.
*/
public class BitmexStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(BitmexStreamingMarketDataService.class);

private final ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();

private final BitmexStreamingService streamingService;

private final SortedMap<CurrencyPair, BitmexOrderbook> orderbooks = new TreeMap<>();
private final BitmexExchange bitmexExchange;

private final SortedMap<String, BitmexOrderbook> orderbooks = new TreeMap<>();

public BitmexStreamingMarketDataService(BitmexStreamingService streamingService) {
public BitmexStreamingMarketDataService(BitmexStreamingService streamingService, BitmexExchange bitmexExchange) {
this.streamingService = streamingService;
this.streamingService.subscribeConnectionSuccess().subscribe(o -> {
LOG.info("Bitmex connection succeeded. Clearing orderbooks.");
orderbooks.clear();
});
this.bitmexExchange = bitmexExchange;
}

private String getBitmexSymbol(CurrencyPair currencyPair) {
return currencyPair.base.toString() + currencyPair.counter.toString();
}

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair);
String channelName = String.format("orderBookL2:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
BitmexOrderbook orderbook;
String action = s.getAction();
if (action.equals("partial")) {
orderbook = s.toBitmexOrderbook();
orderbooks.put(currencyPair, orderbook);
orderbooks.put(instrument, orderbook);
} else {
orderbook = orderbooks.get(currencyPair);
orderbook = orderbooks.get(instrument);
//ignore updates until first "partial"
if (orderbook == null) {
return null;
return new OrderBook(new Date(), Collections.emptyList(), Collections.emptyList());
}
BitmexLimitOrder[] levels = s.toBitmexOrderbookLevels();
orderbook.updateLevels(levels, action);
Expand All @@ -57,16 +69,16 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
});
}

public Observable<BitmexTicker> getRawTicker(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
public Observable<BitmexTicker> getRawTicker(CurrencyPair currencyPair) {
String instrument = getBitmexSymbol(currencyPair);
String channelName = String.format("quote:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> s.toBitmexTicker());
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair);
String channelName = String.format("quote:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
Expand All @@ -77,7 +89,7 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair);
String channelName = String.format("trade:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).flatMapIterable(s -> {
Expand All @@ -89,4 +101,42 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
return trades;
});
}


public Observable<BitmexExecution> getRawExecutions(String symbol) {
return streamingService.subscribeBitmexChannel("execution:" + symbol).flatMapIterable(s -> {
JsonNode executions = s.getData();
List<BitmexExecution> bitmexExecutions = new ArrayList<>(executions.size());
for (JsonNode execution : executions) {
bitmexExecutions.add(objectMapper.treeToValue(execution, BitmexExecution.class));
}
return bitmexExecutions;
});
}

public void enableDeadManSwitch() throws IOException {
enableDeadManSwitch(BitmexStreamingService.DMS_RESUBSCRIBE, BitmexStreamingService.DMS_CANCEL_ALL_IN);
}

/**
* @param rate in milliseconds to send updated
* @param timeout milliseconds from now after which orders will be cancelled
*/
public void enableDeadManSwitch(long rate, long timeout) throws IOException {
streamingService.enableDeadMansSwitch(rate, timeout);
}

public boolean isDeadManSwitchEnabled() throws IOException {
return streamingService.isDeadMansSwitchEnabled();
}

public void disableDeadMansSwitch() throws IOException {
streamingService.disableDeadMansSwitch();
}

public Observable<BitmexFunding> getRawFunding() {
String channelName = "funding";
return streamingService.subscribeBitmexChannel(channelName).map(BitmexWebSocketTransaction::toBitmexFunding);
}

}
Loading

0 comments on commit 9de93b2

Please sign in to comment.