Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Commit

Permalink
Restore the Bitmex Execution reports code which was cut out of #191.
Browse files Browse the repository at this point in the history
This is has some issues with the latest codeset (in particular, it doesn't compile against XChange 4.3.11).
It needs the attention of someone who actually uses BitMex.
  • Loading branch information
Foat authored and badgerwithagun committed Oct 25, 2018
1 parent 368454c commit 04b3d10
Show file tree
Hide file tree
Showing 22 changed files with 1,591 additions and 72 deletions.
2 changes: 2 additions & 0 deletions xchange-bitmex/bitmex.secret.keys.origin
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bitmex.api.key=
bitmex.secret.key=
46 changes: 45 additions & 1 deletion xchange-bitmex/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
Expand All @@ -9,6 +10,44 @@

<artifactId>xchange-bitmex</artifactId>

<profiles>
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>integration-tests</id>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<includes>
<include>**/*IT.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>info.bitrich.xchange-stream</groupId>
Expand All @@ -25,5 +64,10 @@
<artifactId>xchange-bitmex</artifactId>
<version>${xchange.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>4.5.5</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,28 @@
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.bitmex.BitmexExchange;
import si.mazi.rescu.SynchronizedValueFactory;

/**
* Created by Lukas Zaoralek on 12.11.17.
*/
public class BitmexStreamingExchange extends BitmexExchange implements StreamingExchange {
private static final String API_URI = "wss://www.bitmex.com/realtime";
private static final String TESTNET_API_URI = "wss://testnet.bitmex.com/realtime";

private final BitmexStreamingService streamingService;
private BitmexStreamingService streamingService;
private BitmexStreamingMarketDataService streamingMarketDataService;

public BitmexStreamingExchange() {
this.streamingService = new BitmexStreamingService(API_URI);
}

protected BitmexStreamingExchange(BitmexStreamingService streamingService) {
this.streamingService = streamingService;
}

@Override
protected void initServices() {
super.initServices();
streamingService = createStreamingService();
ExchangeSpecification exchangeSpecification = getExchangeSpecification();
streamingMarketDataService = new BitmexStreamingMarketDataService(streamingService);
}

Expand All @@ -36,14 +34,18 @@ public Completable connect(ProductSubscription... args) {
return streamingService.connect();
}

@Override
public Completable disconnect() {
return streamingService.disconnect();
private BitmexStreamingService createStreamingService() {
ExchangeSpecification exchangeSpec = getExchangeSpecification();
Boolean useSandbox = (Boolean) exchangeSpec.getExchangeSpecificParametersItem(USE_SANDBOX);
String uri = useSandbox == null || !useSandbox ? API_URI : TESTNET_API_URI;
BitmexStreamingService streamingService = new BitmexStreamingService(uri, exchangeSpec.getApiKey(), exchangeSpec.getSecretKey());
applyStreamingSpecification(exchangeSpec, streamingService);
return streamingService;
}

@Override
public SynchronizedValueFactory<Long> getNonceFactory() {
return null;
public Completable disconnect() {
return streamingService.disconnect();
}

@Override
Expand All @@ -58,6 +60,16 @@ public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}

@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}

@Override
public boolean isAlive() {
return streamingService.isSocketOpen();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,71 @@
package info.bitrich.xchangestream.bitmex;

import info.bitrich.xchangestream.bitmex.dto.BitmexLimitOrder;
import info.bitrich.xchangestream.bitmex.dto.BitmexOrderbook;
import info.bitrich.xchangestream.bitmex.dto.BitmexTicker;
import info.bitrich.xchangestream.bitmex.dto.BitmexTrade;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitmex.dto.*;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import org.knowm.xchange.bitmex.BitmexContract;
import org.knowm.xchange.bitmex.BitmexPrompt;
import org.knowm.xchange.bitmex.BitmexUtils;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.io.IOException;
import java.util.*;

/**
* Created by Lukas Zaoralek on 13.11.17.
*/
public class BitmexStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(BitmexStreamingMarketDataService.class);

private final ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();

private final BitmexStreamingService streamingService;

private final SortedMap<CurrencyPair, BitmexOrderbook> orderbooks = new TreeMap<>();
private final SortedMap<String, BitmexOrderbook> orderbooks = new TreeMap<>();


public BitmexStreamingMarketDataService(BitmexStreamingService streamingService) {
this.streamingService = streamingService;
this.streamingService.subscribeConnectionSuccess().subscribe(o -> {
LOG.info("Bitmex connection succeeded. Clearing orderbooks.");
orderbooks.clear();
});
}

private String getBitmexSymbol(CurrencyPair currencyPair, Object... args) {
if (args.length > 0) {
BitmexPrompt prompt = (BitmexPrompt) args[0];
BitmexContract contract = new BitmexContract(currencyPair, prompt);
return BitmexUtils.translateBitmexContract(contract);
} else {
return currencyPair.base.toString() + currencyPair.counter.toString();
}
}

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("orderBookL2:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
BitmexOrderbook orderbook;
String action = s.getAction();
if (action.equals("partial")) {
orderbook = s.toBitmexOrderbook();
orderbooks.put(currencyPair, orderbook);
orderbooks.put(instrument, orderbook);
} else {
orderbook = orderbooks.get(currencyPair);
orderbook = orderbooks.get(instrument);
//ignore updates until first "partial"
if (orderbook == null) {
return null;
return new OrderBook(null, Collections.emptyList(), Collections.emptyList());
}
BitmexLimitOrder[] levels = s.toBitmexOrderbookLevels();
orderbook.updateLevels(levels, action);
Expand All @@ -58,15 +76,15 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
}

public Observable<BitmexTicker> getRawTicker(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("quote:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> s.toBitmexTicker());
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("quote:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).map(s -> {
Expand All @@ -77,7 +95,7 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String instrument = currencyPair.base.toString() + currencyPair.counter.toString();
String instrument = getBitmexSymbol(currencyPair, args);
String channelName = String.format("trade:%s", instrument);

return streamingService.subscribeBitmexChannel(channelName).flatMapIterable(s -> {
Expand All @@ -89,4 +107,36 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
return trades;
});
}


public Observable<BitmexExecution> getExecutions(String symbol) {
return streamingService.subscribeBitmexChannel("execution:" + symbol).flatMapIterable(s -> {
JsonNode executions = s.getData();
List<BitmexExecution> bitmexExecutions = new ArrayList<>(executions.size());
for (JsonNode execution : executions) {
bitmexExecutions.add(objectMapper.treeToValue(execution, BitmexExecution.class));
}
return bitmexExecutions;
});
}

public void enableDeadManSwitch() throws IOException {
enableDeadManSwitch(BitmexStreamingService.DMS_RESUBSCRIBE, BitmexStreamingService.DMS_CANCEL_ALL_IN);
}

/**
* @param rate in milliseconds to send updated
* @param timeout milliseconds from now after which orders will be cancelled
*/
public void enableDeadManSwitch(long rate, long timeout) throws IOException {
streamingService.enableDeadMansSwitch(rate, timeout);
}

public boolean isDeadManSwitchEnabled() throws IOException {
return streamingService.isDeadMansSwitchEnabled();
}

public void disableDeadMansSwitch() throws IOException {
streamingService.disableDeadMansSwitch();
}
}
Loading

0 comments on commit 04b3d10

Please sign in to comment.