Skip to content

Commit

Permalink
bitrich-info#446 Bitstamp v2 not getting trades - fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Chertalev committed Nov 27, 2019
1 parent 8133322 commit c20a7d4
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package info.bitrich.xchangestream.bitstamp;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook;
import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import info.bitrich.xchangestream.service.pusher.PusherStreamingService;
import io.reactivex.Observable;
import org.knowm.xchange.bitstamp.BitstampAdapters;
import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotAvailableFromExchangeException;

import java.util.Date;

public class BitstampStreamingMarketDataService implements StreamingMarketDataService {
private final PusherStreamingService service;

Expand All @@ -35,16 +34,11 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
private Observable<OrderBook> getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) {
String channelName = channelPrefix + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "data")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampOrderBook orderBook = mapper.readValue(s, BitstampOrderBook.class);
org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook =
new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook(
orderBook.getTimestamp(),
orderBook.getBids(),
orderBook.getAsks());
return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair);
return BitstampAdapters.adaptOrderBook(orderBook, currencyPair);
});
}

Expand All @@ -58,12 +52,10 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "live_trades" + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "trade")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampWebSocketTransaction transactions = mapper.readValue(s, BitstampWebSocketTransaction.class);
transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(),
transactions.getPrice(), transactions.getAmount(), transactions.getType());
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000);
});
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package info.bitrich.xchangestream.bitstamp.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

import org.knowm.xchange.bitstamp.dto.marketdata.BitstampTransaction;

import java.math.BigDecimal;

public class BitstampWebSocketTransaction extends BitstampTransaction {
public BitstampWebSocketTransaction(@JsonProperty("datetime") long date, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price,
public BitstampWebSocketTransaction(@JsonProperty("microtimestamp") BigDecimal microtimestamp, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price,
@JsonProperty("amount") BigDecimal amount, @JsonProperty("order_type") int type) {
super(date, tid, price, amount, type);

super(microtimestamp.divide(new BigDecimal(1000), BigDecimal.ROUND_DOWN).longValue(),
tid, price, amount, type);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package info.bitrich.xchangestream.bitstamp.v2;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook;
import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import org.knowm.xchange.bitstamp.BitstampAdapters;
import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotAvailableFromExchangeException;

import java.util.Date;

/**
* Bitstamp WebSocket V2 Streaming Market Data Service implementation
* Created by Pavel Chertalev on 15.03.2018.
Expand All @@ -38,16 +36,11 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
private Observable<OrderBook> getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) {
String channelName = channelPrefix + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "data")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampOrderBook orderBook = mapper.treeToValue(s.get("data"), BitstampOrderBook.class);
org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook =
new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook(
orderBook.getTimestamp(),
orderBook.getBids(),
orderBook.getAsks());
return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair);
return BitstampAdapters.adaptOrderBook(orderBook, currencyPair);
});
}

Expand All @@ -61,13 +54,11 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "live_trades" + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "trade")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampWebSocketTransaction transactions = mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class);
transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(),
transactions.getPrice(), transactions.getAmount(), transactions.getType());
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000);
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class BitstampStreamingService extends JsonNettyStreamingService {

private static final String JSON_CHANNEL = "channel";
private static final String JSON_EVENT = "event";
private static final String JSON_DATA = "data";
public static final String EVENT_ORDERBOOK = "data";
public static final String EVENT_TRADE = "trade";

public BitstampStreamingService(String apiUrl) {
super(apiUrl, Integer.MAX_VALUE);
Expand Down Expand Up @@ -56,8 +57,10 @@ protected void handleMessage(JsonNode message) {
LOG.warn("The message has been received from disconnected channel '{}'. Skipped.", channel);
return;
}
if (event.equals(JSON_DATA)) {
super.handleMessage(message);
switch (event) {
case EVENT_ORDERBOOK:
case EVENT_TRADE:
super.handleMessage(message);
}
}

Expand Down

0 comments on commit c20a7d4

Please sign in to comment.