Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[BitMex] Restore the Bitmex Execution reports code which was cut out of #191. #244

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions xchange-bitmex/bitmex.secret.keys.origin
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bitmex.api.key=
bitmex.secret.key=
46 changes: 45 additions & 1 deletion xchange-bitmex/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
Expand All @@ -9,6 +10,44 @@

<artifactId>xchange-bitmex</artifactId>

<profiles>
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>integration-tests</id>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<includes>
<include>**/*IT.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>info.bitrich.xchange-stream</groupId>
Expand All @@ -25,5 +64,10 @@
<artifactId>xchange-bitmex</artifactId>
<version>${xchange.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>4.5.5</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,28 @@
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.bitmex.BitmexExchange;
import si.mazi.rescu.SynchronizedValueFactory;

/**
* Created by Lukas Zaoralek on 12.11.17.
*/
public class BitmexStreamingExchange extends BitmexExchange implements StreamingExchange {
private static final String API_URI = "wss://www.bitmex.com/realtime";
private static final String TESTNET_API_URI = "wss://testnet.bitmex.com/realtime";

private final BitmexStreamingService streamingService;
private BitmexStreamingService streamingService;
private BitmexStreamingMarketDataService streamingMarketDataService;

public BitmexStreamingExchange() {
this.streamingService = new BitmexStreamingService(API_URI);
}

protected BitmexStreamingExchange(BitmexStreamingService streamingService) {
this.streamingService = streamingService;
}

@Override
protected void initServices() {
super.initServices();
streamingService = createStreamingService();
ExchangeSpecification exchangeSpecification = getExchangeSpecification();
streamingMarketDataService = new BitmexStreamingMarketDataService(streamingService);
}

Expand All @@ -36,14 +34,18 @@ public Completable connect(ProductSubscription... args) {
return streamingService.connect();
}

@Override
public Completable disconnect() {
return streamingService.disconnect();
private BitmexStreamingService createStreamingService() {
ExchangeSpecification exchangeSpec = getExchangeSpecification();
Boolean useSandbox = (Boolean) exchangeSpec.getExchangeSpecificParametersItem(USE_SANDBOX);
String uri = useSandbox == null || !useSandbox ? API_URI : TESTNET_API_URI;
BitmexStreamingService streamingService = new BitmexStreamingService(uri, exchangeSpec.getApiKey(), exchangeSpec.getSecretKey());
applyStreamingSpecification(exchangeSpec, streamingService);
return streamingService;
}

@Override
public SynchronizedValueFactory<Long> getNonceFactory() {
return null;
public Completable disconnect() {
return streamingService.disconnect();
}

@Override
Expand All @@ -58,6 +60,16 @@ public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}

@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}

@Override
public boolean isAlive() {
return streamingService.isSocketOpen();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,71 @@
package info.bitrich.xchangestream.bitmex;

import info.bitrich.xchangestream.bitmex.dto.BitmexLimitOrder;
import info.bitrich.xchangestream.bitmex.dto.BitmexOrderbook;
import info.bitrich.xchangestream.bitmex.dto.BitmexTicker;
import info.bitrich.xchangestream.bitmex.dto.BitmexTrade;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitmex.dto.*;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import org.knowm.xchange.bitmex.BitmexContract;
import org.knowm.xchange.bitmex.BitmexPrompt;
import org.knowm.xchange.bitmex.BitmexUtils;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.io.IOException;
import java.util.*;

/**
* Created by Lukas Zaoralek on 13.11.17.
*/
public class BitmexStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(BitmexStreamingMarketDataService.class);

private final ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();

private final BitmexStreamingService streamingService;

private final SortedMap<CurrencyPair, BitmexOrderbook> orderbooks = new TreeMap<>();
private final SortedMap<String, BitmexOrderbook> orderbooks = new TreeMap<>();


public BitmexStreamingMarketDataService(BitmexStreamingService streamingService) {
this.streamingService = streamingService;
this.streamingService.subscribeConnectionSuccess().subscribe(o -> {
LOG.info("Bitmex connection succeeded. Clearing orderbooks.");
orderbooks.clear();
});
}

private String getBitmexSymbol(CurrencyPair currencyPair, Object... args) {
if (args.length > 0) {
BitmexPrompt prompt = (BitmexPrompt) args[0];
BitmexContract contract = new BitmexContract(currencyPair, prompt);
return BitmexUtils.translateBitmexContract(contract);
} else {
return currencyPair.base.toString() + currencyPair.counter.toString();
}
}

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("orderBookL2:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
BitmexOrderbook orderbook;
String action = s.getAction();
if (action.equals("partial")) {
orderbook = s.toBitmexOrderbook();
orderbooks.put(currencyPair, orderbook);
orderbooks.put(instrument, orderbook);
} else {
orderbook = orderbooks.get(currencyPair);
orderbook = orderbooks.get(instrument);
//ignore updates until first "partial"
if (orderbook == null) {
return null;
return new OrderBook(null, Collections.emptyList(), Collections.emptyList());
}
BitmexLimitOrder[] levels = s.toBitmexOrderbookLevels();
orderbook.updateLevels(levels, action);
Expand All @@ -58,15 +76,15 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
}

public Observable<BitmexTicker> getRawTicker(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("quote:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> s.toBitmexTicker());
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("quote:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
Expand All @@ -77,7 +95,7 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("trade:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).flatMapIterable(s -> {
Expand All @@ -89,4 +107,36 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
return trades;
});
}


public Observable<BitmexExecution> getExecutions(String symbol) {
return streamingService.subscribeBitmexChannel("execution:" + symbol).flatMapIterable(s -> {
JsonNode executions = s.getData();
List<BitmexExecution> bitmexExecutions = new ArrayList<>(executions.size());
for (JsonNode execution : executions) {
bitmexExecutions.add(objectMapper.treeToValue(execution, BitmexExecution.class));
}
return bitmexExecutions;
});
}

public void enableDeadManSwitch() throws IOException {
enableDeadManSwitch(BitmexStreamingService.DMS_RESUBSCRIBE, BitmexStreamingService.DMS_CANCEL_ALL_IN);
}

/**
* @param rate in milliseconds to send updated
* @param timeout milliseconds from now after which orders will be cancelled
*/
public void enableDeadManSwitch(long rate, long timeout) throws IOException {
streamingService.enableDeadMansSwitch(rate, timeout);
}

public boolean isDeadManSwitchEnabled() throws IOException {
return streamingService.isDeadMansSwitchEnabled();
}

public void disableDeadMansSwitch() throws IOException {
streamingService.disableDeadMansSwitch();
}
}
Loading