diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java index b7d7e8d35..a6a265c69 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java @@ -1,21 +1,20 @@ package info.bitrich.xchangestream.bitstamp; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook; import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction; +import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import info.bitrich.xchangestream.service.pusher.PusherStreamingService; import io.reactivex.Observable; import org.knowm.xchange.bitstamp.BitstampAdapters; +import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.exceptions.NotAvailableFromExchangeException; -import java.util.Date; - public class BitstampStreamingMarketDataService implements StreamingMarketDataService { private final PusherStreamingService service; @@ -35,16 +34,11 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a private Observable getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) { String channelName = channelPrefix + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "data") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampOrderBook orderBook = mapper.readValue(s, BitstampOrderBook.class); - org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook = - new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook( - orderBook.getTimestamp(), - orderBook.getBids(), - orderBook.getAsks()); - return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair); + return BitstampAdapters.adaptOrderBook(orderBook, currencyPair); }); } @@ -58,12 +52,10 @@ public Observable getTicker(CurrencyPair currencyPair, Object... args) { public Observable getTrades(CurrencyPair currencyPair, Object... args) { String channelName = "live_trades" + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "trade") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampWebSocketTransaction transactions = mapper.readValue(s, BitstampWebSocketTransaction.class); - transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(), - transactions.getPrice(), transactions.getAmount(), transactions.getType()); return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000); }); } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java deleted file mode 100644 index bdb7d1122..000000000 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java +++ /dev/null @@ -1,30 +0,0 @@ -package info.bitrich.xchangestream.bitstamp.dto; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.math.BigDecimal; -import java.util.List; - -public class BitstampOrderBook { - private final long timestamp; - private final List> bids; - private final List> asks; - - public BitstampOrderBook(@JsonProperty("timestamp") long timestamp, @JsonProperty("bids") List> bids, @JsonProperty("asks") List> asks) { - this.timestamp = timestamp; - this.bids = bids; - this.asks = asks; - } - - public List> getBids() { - return bids; - } - - public List> getAsks() { - return asks; - } - - public long getTimestamp() { - return timestamp; - } -} diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java index 35d4dfc0d..4aa13ae23 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampWebSocketTransaction.java @@ -1,14 +1,14 @@ package info.bitrich.xchangestream.bitstamp.dto; import com.fasterxml.jackson.annotation.JsonProperty; - import org.knowm.xchange.bitstamp.dto.marketdata.BitstampTransaction; import java.math.BigDecimal; public class BitstampWebSocketTransaction extends BitstampTransaction { - public BitstampWebSocketTransaction(@JsonProperty("datetime") long date, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price, + public BitstampWebSocketTransaction(@JsonProperty("microtimestamp") BigDecimal microtimestamp, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price, @JsonProperty("amount") BigDecimal amount, @JsonProperty("order_type") int type) { - super(date, tid, price, amount, type); + + super(microtimestamp.longValue() / 1000, tid, price, amount, type); } } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java index 84f51f917..910679c1b 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java @@ -1,20 +1,18 @@ package info.bitrich.xchangestream.bitstamp.v2; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook; import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.reactivex.Observable; import org.knowm.xchange.bitstamp.BitstampAdapters; +import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.exceptions.NotAvailableFromExchangeException; -import java.util.Date; - /** * Bitstamp WebSocket V2 Streaming Market Data Service implementation * Created by Pavel Chertalev on 15.03.2018. @@ -38,16 +36,11 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a private Observable getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) { String channelName = channelPrefix + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "data") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampOrderBook orderBook = mapper.treeToValue(s.get("data"), BitstampOrderBook.class); - org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook = - new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook( - orderBook.getTimestamp(), - orderBook.getBids(), - orderBook.getAsks()); - return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair); + return BitstampAdapters.adaptOrderBook(orderBook, currencyPair); }); } @@ -61,13 +54,11 @@ public Observable getTicker(CurrencyPair currencyPair, Object... args) { public Observable getTrades(CurrencyPair currencyPair, Object... args) { String channelName = "live_trades" + getChannelPostfix(currencyPair); - return service.subscribeChannel(channelName, "trade") + return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE) .map(s -> { ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); BitstampWebSocketTransaction transactions = mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class); - transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(), - transactions.getPrice(), transactions.getAmount(), transactions.getType()); - return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000); + return BitstampAdapters.adaptTrade(transactions, currencyPair, 1); }); } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java index b0aa9384a..b8bb1121d 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java @@ -19,7 +19,11 @@ public class BitstampStreamingService extends JsonNettyStreamingService { private static final String JSON_CHANNEL = "channel"; private static final String JSON_EVENT = "event"; - private static final String JSON_DATA = "data"; + + public static final String EVENT_ORDERBOOK = "data"; + public static final String EVENT_TRADE = "trade"; + private static final String EVENT_SUBSCRIPTION_SUCCEEDED = "bts:subscription_succeeded"; + private static final String EVENT_UNSUBSCRIPTION_SUCCEEDED = "bts:unsubscription_succeeded"; public BitstampStreamingService(String apiUrl) { super(apiUrl, Integer.MAX_VALUE); @@ -52,12 +56,23 @@ protected void handleMessage(JsonNode message) { String channel = channelJsonNode.asText(); String event = eventJsonNode.asText(); - if (!channels.containsKey(channel)) { - LOG.warn("The message has been received from disconnected channel '{}'. Skipped.", channel); - return; - } - if (event.equals(JSON_DATA)) { - super.handleMessage(message); + switch (event) { + case EVENT_ORDERBOOK: + case EVENT_TRADE: + if (!channels.containsKey(channel)) { + LOG.warn("The message has been received from disconnected channel '{}'. Skipped.", channel); + return; + } + super.handleMessage(message); + break; + case EVENT_SUBSCRIPTION_SUCCEEDED: + LOG.info("Channel {} has been successfully subscribed", channel); + break; + case EVENT_UNSUBSCRIPTION_SUCCEEDED: + LOG.info("Channel {} has been successfully unsubscribed", channel); + break; + default: + LOG.warn("Unsupported event type {} in message {}", event, message.toString()); } } diff --git a/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceTest.java b/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceTest.java index f519068b2..d84f3b5ca 100644 --- a/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceTest.java +++ b/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceTest.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.function.Supplier; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; diff --git a/xchange-bitstamp/src/test/resources/order-book.json b/xchange-bitstamp/src/test/resources/order-book.json index 8f0a2faef..edc5d267f 100644 --- a/xchange-bitstamp/src/test/resources/order-book.json +++ b/xchange-bitstamp/src/test/resources/order-book.json @@ -1,4 +1,6 @@ { + "timestamp": "1574865238", + "microtimestamp": "1574865238689641", "bids": [ [ "819.9", diff --git a/xchange-bitstamp/src/test/resources/trade-v2.json b/xchange-bitstamp/src/test/resources/trade-v2.json index e32718d9d..aeb2b1e74 100644 --- a/xchange-bitstamp/src/test/resources/trade-v2.json +++ b/xchange-bitstamp/src/test/resources/trade-v2.json @@ -1,5 +1,6 @@ { "data": { + "microtimestamp": 1484858423000, "price": 914.38999999999999, "amount": 34.390000000000001, "datetime": "1484858423", diff --git a/xchange-bitstamp/src/test/resources/trade.json b/xchange-bitstamp/src/test/resources/trade.json index 13a2956d5..0aeebba8b 100644 --- a/xchange-bitstamp/src/test/resources/trade.json +++ b/xchange-bitstamp/src/test/resources/trade.json @@ -1,4 +1,5 @@ { + "microtimestamp": "1574865139243529", "price": 914.38999999999999, "amount": 34.390000000000001, "datetime": "1484858423",