From c20a7d40b06f56ae94cd69cbf0c193022e73c7f1 Mon Sep 17 00:00:00 2001 From: Pavel Chertalev Date: Wed, 27 Nov 2019 17:13:44 +0300 Subject: [PATCH] #446 Bitstamp v2 not getting trades - fixed --- .../BitstampStreamingMarketDataService.java | 18 ++++------- .../bitstamp/dto/BitstampOrderBook.java | 30 ------------------- .../dto/BitstampWebSocketTransaction.java | 7 +++-- .../BitstampStreamingMarketDataService.java | 19 ++++-------- .../bitstamp/v2/BitstampStreamingService.java | 9 ++++-- 5 files changed, 20 insertions(+), 63 deletions(-) delete mode 100644 xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java index b7d7e8d35..a6a265c69 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java @@ -1,21 +1,20 @@ package info.bitrich.xchangestream.bitstamp; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook; import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction; +import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import info.bitrich.xchangestream.service.pusher.PusherStreamingService; import io.reactivex.Observable; import org.knowm.xchange.bitstamp.BitstampAdapters; +import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.exceptions.NotAvailableFromExchangeException; -import java.util.Date; - public class BitstampStreamingMarketDataService implements StreamingMarketDataService { private final PusherStreamingService service; @@ -35,16 +34,11 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a private Observable getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) { String channelName = channelPrefix + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "data") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampOrderBook orderBook = mapper.readValue(s, BitstampOrderBook.class); - org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook = - new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook( - orderBook.getTimestamp(), - orderBook.getBids(), - orderBook.getAsks()); - return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair); + return BitstampAdapters.adaptOrderBook(orderBook, currencyPair); }); } @@ -58,12 +52,10 @@ public Observable getTicker(CurrencyPair currencyPair, Object... args) { public Observable getTrades(CurrencyPair currencyPair, Object... args) { String channelName = "live_trades" + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "trade") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampWebSocketTransaction transactions = mapper.readValue(s, BitstampWebSocketTransaction.class); - transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(), - transactions.getPrice(), transactions.getAmount(), transactions.getType()); return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000); }); } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java deleted file mode 100644 index bdb7d1122..000000000 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java +++ /dev/null @@ -1,30 +0,0 @@ -package info.bitrich.xchangestream.bitstamp.dto; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.math.BigDecimal; -import java.util.List; - -public class BitstampOrderBook { - private final long timestamp; - private final List> bids; - private final List> asks; - - public BitstampOrderBook(@JsonProperty("timestamp") long timestamp, @JsonProperty("bids") List> bids, @JsonProperty("asks") List> asks) { - this.timestamp = timestamp; - this.bids = bids; - this.asks = asks; - } - - public List> getBids() { - return bids; - } - - public List> getAsks() { - return asks; - } - - public long getTimestamp() { - return timestamp; - } -} diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java index 35d4dfc0d..8701aeb84 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java @@ -1,14 +1,15 @@ package info.bitrich.xchangestream.bitstamp.dto; import com.fasterxml.jackson.annotation.JsonProperty; - import org.knowm.xchange.bitstamp.dto.marketdata.BitstampTransaction; import java.math.BigDecimal; public class BitstampWebSocketTransaction extends BitstampTransaction { - public BitstampWebSocketTransaction(@JsonProperty("datetime") long date, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price, + public BitstampWebSocketTransaction(@JsonProperty("microtimestamp") BigDecimal microtimestamp, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price, @JsonProperty("amount") BigDecimal amount, @JsonProperty("order_type") int type) { - super(date, tid, price, amount, type); + + super(microtimestamp.divide(new BigDecimal(1000), BigDecimal.ROUND_DOWN).longValue(), + tid, price, amount, type); } } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java index 84f51f917..910679c1b 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java @@ -1,20 +1,18 @@ package info.bitrich.xchangestream.bitstamp.v2; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook; import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.reactivex.Observable; import org.knowm.xchange.bitstamp.BitstampAdapters; +import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.exceptions.NotAvailableFromExchangeException; -import java.util.Date; - /** * Bitstamp WebSocket V2 Streaming Market Data Service implementation * Created by Pavel Chertalev on 15.03.2018. @@ -38,16 +36,11 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a private Observable getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) { String channelName = channelPrefix + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "data") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampOrderBook orderBook = mapper.treeToValue(s.get("data"), BitstampOrderBook.class); - org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook = - new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook( - orderBook.getTimestamp(), - orderBook.getBids(), - orderBook.getAsks()); - return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair); + return BitstampAdapters.adaptOrderBook(orderBook, currencyPair); }); } @@ -61,13 +54,11 @@ public Observable getTicker(CurrencyPair currencyPair, Object... args) { public Observable getTrades(CurrencyPair currencyPair, Object... args) { String channelName = "live_trades" + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "trade") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampWebSocketTransaction transactions = mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class); - transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(), - transactions.getPrice(), transactions.getAmount(), transactions.getType()); - return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000); + return BitstampAdapters.adaptTrade(transactions, currencyPair, 1); }); } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java index b0aa9384a..67fa6afce 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java @@ -19,7 +19,8 @@ public class BitstampStreamingService extends JsonNettyStreamingService { private static final String JSON_CHANNEL = "channel"; private static final String JSON_EVENT = "event"; - private static final String JSON_DATA = "data"; + public static final String EVENT_ORDERBOOK = "data"; + public static final String EVENT_TRADE = "trade"; public BitstampStreamingService(String apiUrl) { super(apiUrl, Integer.MAX_VALUE); @@ -56,8 +57,10 @@ protected void handleMessage(JsonNode message) { LOG.warn("The message has been received from disconnected channel '{}'. Skipped.", channel); return; } - if (event.equals(JSON_DATA)) { - super.handleMessage(message); + switch (event) { + case EVENT_ORDERBOOK: + case EVENT_TRADE: + super.handleMessage(message); } }