Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

#446 Bitstamp v2 not getting trades - fixed #456

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package info.bitrich.xchangestream.bitstamp;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook;
import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import info.bitrich.xchangestream.service.pusher.PusherStreamingService;
import io.reactivex.Observable;
import org.knowm.xchange.bitstamp.BitstampAdapters;
import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotAvailableFromExchangeException;

import java.util.Date;

public class BitstampStreamingMarketDataService implements StreamingMarketDataService {
private final PusherStreamingService service;

Expand All @@ -35,16 +34,11 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
private Observable<OrderBook> getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) {
String channelName = channelPrefix + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "data")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampOrderBook orderBook = mapper.readValue(s, BitstampOrderBook.class);
org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook =
new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook(
orderBook.getTimestamp(),
orderBook.getBids(),
orderBook.getAsks());
return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair);
return BitstampAdapters.adaptOrderBook(orderBook, currencyPair);
});
}

Expand All @@ -58,12 +52,10 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "live_trades" + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "trade")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampWebSocketTransaction transactions = mapper.readValue(s, BitstampWebSocketTransaction.class);
transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(),
transactions.getPrice(), transactions.getAmount(), transactions.getType());
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000);
});
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package info.bitrich.xchangestream.bitstamp.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

import org.knowm.xchange.bitstamp.dto.marketdata.BitstampTransaction;

import java.math.BigDecimal;

public class BitstampWebSocketTransaction extends BitstampTransaction {
public BitstampWebSocketTransaction(@JsonProperty("datetime") long date, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price,
public BitstampWebSocketTransaction(@JsonProperty("microtimestamp") BigDecimal microtimestamp, @JsonProperty("id") long tid, @JsonProperty("price") BigDecimal price,
@JsonProperty("amount") BigDecimal amount, @JsonProperty("order_type") int type) {
super(date, tid, price, amount, type);

super(microtimestamp.longValue() / 1000, tid, price, amount, type);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package info.bitrich.xchangestream.bitstamp.v2;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.dto.BitstampOrderBook;
import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import org.knowm.xchange.bitstamp.BitstampAdapters;
import org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotAvailableFromExchangeException;

import java.util.Date;

/**
* Bitstamp WebSocket V2 Streaming Market Data Service implementation
* Created by Pavel Chertalev on 15.03.2018.
Expand All @@ -38,16 +36,11 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
private Observable<OrderBook> getOrderBook(String channelPrefix, CurrencyPair currencyPair, Object... args) {
String channelName = channelPrefix + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "data")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDERBOOK)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampOrderBook orderBook = mapper.treeToValue(s.get("data"), BitstampOrderBook.class);
org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook =
new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook(
orderBook.getTimestamp(),
orderBook.getBids(),
orderBook.getAsks());
return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair);
return BitstampAdapters.adaptOrderBook(orderBook, currencyPair);
});
}

Expand All @@ -61,13 +54,11 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "live_trades" + getChannelPostfix(currencyPair);

return service.subscribeChannel(channelName, "trade")
return service.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE)
.map(s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampWebSocketTransaction transactions = mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class);
transactions = new BitstampWebSocketTransaction(new Date().getTime() / 1000L, transactions.getTid(),
transactions.getPrice(), transactions.getAmount(), transactions.getType());
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1000);
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public class BitstampStreamingService extends JsonNettyStreamingService {

private static final String JSON_CHANNEL = "channel";
private static final String JSON_EVENT = "event";
private static final String JSON_DATA = "data";

public static final String EVENT_ORDERBOOK = "data";
public static final String EVENT_TRADE = "trade";
private static final String EVENT_SUBSCRIPTION_SUCCEEDED = "bts:subscription_succeeded";
private static final String EVENT_UNSUBSCRIPTION_SUCCEEDED = "bts:unsubscription_succeeded";

public BitstampStreamingService(String apiUrl) {
super(apiUrl, Integer.MAX_VALUE);
Expand Down Expand Up @@ -52,12 +56,23 @@ protected void handleMessage(JsonNode message) {
String channel = channelJsonNode.asText();
String event = eventJsonNode.asText();

if (!channels.containsKey(channel)) {
LOG.warn("The message has been received from disconnected channel '{}'. Skipped.", channel);
return;
}
if (event.equals(JSON_DATA)) {
super.handleMessage(message);
switch (event) {
case EVENT_ORDERBOOK:
case EVENT_TRADE:
if (!channels.containsKey(channel)) {
LOG.warn("The message has been received from disconnected channel '{}'. Skipped.", channel);
return;
}
super.handleMessage(message);
badgerwithagun marked this conversation as resolved.
Show resolved Hide resolved
break;
case EVENT_SUBSCRIPTION_SUCCEEDED:
LOG.info("Channel {} has been successfully subscribed", channel);
break;
case EVENT_UNSUBSCRIPTION_SUCCEEDED:
LOG.info("Channel {} has been successfully unsubscribed", channel);
break;
default:
LOG.warn("Unsupported event type {} in message {}", event, message.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

Expand Down
2 changes: 2 additions & 0 deletions xchange-bitstamp/src/test/resources/order-book.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{
"timestamp": "1574865238",
"microtimestamp": "1574865238689641",
"bids": [
[
"819.9",
Expand Down
1 change: 1 addition & 0 deletions xchange-bitstamp/src/test/resources/trade-v2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"data": {
"microtimestamp": 1484858423000,
"price": 914.38999999999999,
"amount": 34.390000000000001,
"datetime": "1484858423",
Expand Down
1 change: 1 addition & 0 deletions xchange-bitstamp/src/test/resources/trade.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"microtimestamp": "1574865139243529",
"price": 914.38999999999999,
"amount": 34.390000000000001,
"datetime": "1484858423",
Expand Down