diff --git a/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java index 83bec4f9f..de9a9e5c0 100644 --- a/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java +++ b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingMarketDataService.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction; +import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderEvent; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.reactivex.Observable; @@ -55,19 +56,31 @@ public Observable getTicker(CurrencyPair currencyPair, Object... args) { } @Override - public Observable getTrades(CurrencyPair currencyPair, Object... args) { - String channelName = "live_trades" + getChannelPostfix(currencyPair); + public Observable getTrades(CurrencyPair currencyPair, Object... args) { + String channelName = "live_trades" + getChannelPostfix(currencyPair); - return service - .subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE) - .map( - s -> { - ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); - BitstampWebSocketTransaction transactions = - mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class); - return BitstampAdapters.adaptTrade(transactions, currencyPair, 1); - }); - } + return service + .subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE) + .map( + s -> { + ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + BitstampWebSocketTransaction transactions = + mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class); + return BitstampAdapters.adaptTrade(transactions, currencyPair, 1); + }); + } + + public Observable getOrders(CurrencyPair currencyPair, Object... args) { + String channelName = "live_orders" + getChannelPostfix(currencyPair); + + return service + .subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDER_CREATED) + .map( + s -> { + ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + return mapper.treeToValue(s, BitstampWebSocketOrderEvent.class); + }); + } private String getChannelPostfix(CurrencyPair currencyPair) { return "_" diff --git a/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java index c73e31ef3..21fe0bc08 100644 --- a/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java +++ b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/BitstampStreamingService.java @@ -18,10 +18,13 @@ public class BitstampStreamingService extends JsonNettyStreamingService { private static final String JSON_CHANNEL = "channel"; private static final String JSON_EVENT = "event"; + public static final String EVENT_ORDER_CREATED = "order_created"; + public static final String EVENT_ORDER_CHANGED = "order_changed"; + public static final String EVENT_ORDER_DELETED = "order_deleted"; public static final String EVENT_ORDERBOOK = "data"; public static final String EVENT_TRADE = "trade"; - private static final String EVENT_SUBSCRIPTION_SUCCEEDED = "bts:subscription_succeeded"; - private static final String EVENT_UNSUBSCRIPTION_SUCCEEDED = "bts:unsubscription_succeeded"; + public static final String EVENT_SUBSCRIPTION_SUCCEEDED = "bts:subscription_succeeded"; + public static final String EVENT_UNSUBSCRIPTION_SUCCEEDED = "bts:unsubscription_succeeded"; public BitstampStreamingService(String apiUrl) { super(apiUrl, Integer.MAX_VALUE); @@ -58,6 +61,9 @@ protected void handleMessage(JsonNode message) { String event = eventJsonNode.asText(); switch (event) { + case EVENT_ORDER_CREATED: + case EVENT_ORDER_CHANGED: + case EVENT_ORDER_DELETED: case EVENT_ORDERBOOK: case EVENT_TRADE: if (!channels.containsKey(channel)) { diff --git a/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/dto/BitstampWebSocketOrderData.java b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/dto/BitstampWebSocketOrderData.java new file mode 100644 index 000000000..bf2f84212 --- /dev/null +++ b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/dto/BitstampWebSocketOrderData.java @@ -0,0 +1,102 @@ +package info.bitrich.xchangestream.bitstamp.v2.dto; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.math.BigDecimal; + +public class BitstampWebSocketOrderData { + + private static final String ATTR_ORDER_TYPE = "order_type"; + private static final String ATTR_PRICE = "price"; + private static final String ATTR_DATETIME = "datetime"; + private static final String ATTR_AMOUNT = "amount"; + private static final String ATTR_ID_STR = "id_str"; + private static final String ATTR_AMOUNT_STR = "amount_str"; + private static final String ATTR_PRICE_STR = "price_str"; + private static final String ATTR_ID = "id"; + private static final String ATTR_MICROTIMESTAMP = "microtimestamp"; + + private final int orderType; + private final BigDecimal price; + private final long dateTime; + private final BigDecimal amount; + private final String idStr; + private final String amountStr; + private final String priceStr; + private final long id; + private final String microTimestamp; + + @JsonCreator + public BitstampWebSocketOrderData( + @JsonProperty(ATTR_ORDER_TYPE) int orderType, + @JsonProperty(ATTR_PRICE) BigDecimal price, + @JsonProperty(ATTR_DATETIME) long dateTime, + @JsonProperty(ATTR_AMOUNT) BigDecimal amount, + @JsonProperty(ATTR_ID_STR) String idStr, + @JsonProperty(ATTR_AMOUNT_STR) String amountStr, + @JsonProperty(ATTR_PRICE_STR) String priceStr, + @JsonProperty(ATTR_ID) long id, + @JsonProperty(ATTR_MICROTIMESTAMP) String microTimestamp) { + this.orderType = orderType; + this.price = price; + this.dateTime = dateTime; + this.amount = amount; + this.idStr = idStr; + this.amountStr = amountStr; + this.priceStr = priceStr; + this.id = id; + this.microTimestamp = microTimestamp; + } + + public int getOrderType() { + return orderType; + } + + public BigDecimal getPrice() { + return price; + } + + public long getDateTime() { + return dateTime; + } + + public BigDecimal getAmount() { + return amount; + } + + public String getIdStr() { + return idStr; + } + + public String getAmountStr() { + return amountStr; + } + + public String getPriceStr() { + return priceStr; + } + + public long getId() { + return id; + } + + public String getMicroTimestamp() { + return microTimestamp; + } + + @Override + public String toString() { + return "{\"orderType\":" + orderType + + ",\"price\":" + price + + ",\"dateTime\":" + dateTime + + ",\"amount\":" + amount + + ",\"idStr\":\"" + idStr + '"' + + ",\"amountStr\":\"" + amountStr + '"' + + ",\"priceStr\":\"" + priceStr + '"' + + ",\"id\":" + id + + ",\"microTimestamp\":\"" + microTimestamp + '"' + + '}'; + } + +} diff --git a/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/dto/BitstampWebSocketOrderEvent.java b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/dto/BitstampWebSocketOrderEvent.java new file mode 100644 index 000000000..7fff80d85 --- /dev/null +++ b/xchange-stream-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/v2/dto/BitstampWebSocketOrderEvent.java @@ -0,0 +1,43 @@ +package info.bitrich.xchangestream.bitstamp.v2.dto; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BitstampWebSocketOrderEvent { + + private static final String DATA = "data"; + private static final String EVENT = "event"; + private static final String CHANNEL = "channel"; + + private final BitstampWebSocketOrderData data; + private final String event; + private final String channel; + + @JsonCreator + public BitstampWebSocketOrderEvent( + @JsonProperty(DATA) BitstampWebSocketOrderData data, + @JsonProperty(EVENT) String event, + @JsonProperty(CHANNEL) String channel) { + this.data = data; + this.event = event; + this.channel = channel; + } + + public BitstampWebSocketOrderData getData() { + return data; + } + + public String getEvent() { + return event; + } + + public String getChannel() { + return channel; + } + + @Override + public String toString() { + return "{\"data\":" + data + ",\"event\":\"" + event + "\",\"channel\":\"" + channel + "\"}"; + } + +} diff --git a/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceBaseTest.java b/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceBaseTest.java index 0f166dbbb..e5275a1b8 100644 --- a/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceBaseTest.java +++ b/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceBaseTest.java @@ -3,6 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderEvent; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.reactivex.observers.TestObserver; import java.util.List; @@ -31,6 +32,35 @@ protected void validateTrades(Trade expected, TestObserver test) { }); } + protected void validateTrades(BitstampWebSocketOrderEvent expected, TestObserver test) { + test.assertValue( + actual -> { + assertThat(actual.getData().getOrderType()).as("OrderType") + .isEqualTo(expected.getData().getOrderType()); + assertThat(actual.getData().getPrice()).as("Price") + .isEqualTo(expected.getData().getPrice()); + assertThat(actual.getData().getDateTime()).as("DateTime") + .isEqualTo(expected.getData().getDateTime()); + assertThat(actual.getData().getAmount()).as("Amount") + .isEqualTo(expected.getData().getAmount()); + assertThat(actual.getData().getIdStr()).as("IdStr") + .isEqualTo(expected.getData().getIdStr()); + assertThat(actual.getData().getAmountStr()).as("AmountStr") + .isEqualTo(expected.getData().getAmountStr()); + assertThat(actual.getData().getPriceStr()).as("PriceStr") + .isEqualTo(expected.getData().getPriceStr()); + assertThat(actual.getData().getId()).as("Id") + .isEqualTo(expected.getData().getId()); + assertThat(actual.getData().getMicroTimestamp()).as("MicroTimestamp") + .isEqualTo(expected.getData().getMicroTimestamp()); + assertThat(actual.getEvent()).as("Event") + .isEqualTo(expected.getEvent()); + assertThat(actual.getChannel()).as("Channel") + .isEqualTo(expected.getChannel()); + return true; + }); + } + protected void validateOrderBook( List bids, List asks, TestObserver test) { test.assertValue( diff --git a/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceV2Test.java b/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceV2Test.java index cdc5965aa..18c6a9b31 100644 --- a/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceV2Test.java +++ b/xchange-stream-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataServiceV2Test.java @@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.JsonNode; import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingMarketDataService; import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService; +import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderData; +import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderEvent; import io.reactivex.Observable; import io.reactivex.observers.TestObserver; import java.math.BigDecimal; @@ -136,4 +138,59 @@ public void testGetTrades() throws Exception { public void testGetTicker() throws Exception { marketDataService.getTicker(CurrencyPair.BTC_EUR).test(); } + + @Test + public void testOrders() throws Exception { + String channel = "live_orders_btcusd"; + String event = "order_created"; + + testOrdersCommons("/orders-v2-created.json", channel, event, + new BitstampWebSocketOrderEvent( + new BitstampWebSocketOrderData( + 0, + BigDecimal.valueOf(5901.51d), + 1584902114L, + BigDecimal.valueOf(0.1379724d), + "1212691004391424", + "0.13797240", + "5901.51", + 1212691004391424L, + "1584902114401000"), + "order_created", + "live_orders_btcusd" + ) + ); + + testOrdersCommons("/orders-v2-deleted.json", channel, event, + new BitstampWebSocketOrderEvent( + new BitstampWebSocketOrderData( + 0, + BigDecimal.valueOf(5868.27d), + 1584902114L, + BigDecimal.valueOf(0.06d), + "1212691001229312", + "0.06000000", + "5868.27", + 1212691001229312L, + "1584902114398000"), + "order_deleted", + "live_orders_btcusd" + ) + ); + } + + public void testOrdersCommons(String resourceName, String channel, String event, + BitstampWebSocketOrderEvent expected) throws Exception { + // Given order event in JSON + JsonNode order = mapper.readTree(this.getClass().getResource(resourceName)); + + when(streamingService.subscribeChannel(eq(channel), eq(event))).thenReturn(Observable.just(order)); + + // Call get order book observable + TestObserver test = marketDataService.getOrders(CurrencyPair.BTC_USD).test(); + + // We get order book object in correct order + validateTrades(expected, test); + } + } diff --git a/xchange-stream-bitstamp/src/test/resources/orders-v2-created.json b/xchange-stream-bitstamp/src/test/resources/orders-v2-created.json new file mode 100644 index 000000000..35676c084 --- /dev/null +++ b/xchange-stream-bitstamp/src/test/resources/orders-v2-created.json @@ -0,0 +1,15 @@ +{ + "data": { + "order_type": 0, + "price": 5901.51, + "datetime": "1584902114", + "amount": 0.1379724, + "id_str": "1212691004391424", + "amount_str": "0.13797240", + "price_str": "5901.51", + "id": 1212691004391424, + "microtimestamp": "1584902114401000" + }, + "event": "order_created", + "channel": "live_orders_btcusd" +} diff --git a/xchange-stream-bitstamp/src/test/resources/orders-v2-deleted.json b/xchange-stream-bitstamp/src/test/resources/orders-v2-deleted.json new file mode 100644 index 000000000..eec79dc26 --- /dev/null +++ b/xchange-stream-bitstamp/src/test/resources/orders-v2-deleted.json @@ -0,0 +1,15 @@ +{ + "data": { + "order_type": 0, + "price": 5868.27, + "datetime": "1584902114", + "amount": 0.06, + "id_str": "1212691001229312", + "amount_str": "0.06000000", + "price_str": "5868.27", + "id": 1212691001229312, + "microtimestamp": "1584902114398000" + }, + "event": "order_deleted", + "channel": "live_orders_btcusd" +}