Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Live Orders subscription support @ Bitstamp #562

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.dto.BitstampWebSocketTransaction;
import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderEvent;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
Expand Down Expand Up @@ -55,19 +56,31 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
}

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "live_trades" + getChannelPostfix(currencyPair);
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "live_trades" + getChannelPostfix(currencyPair);

return service
.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE)
.map(
s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampWebSocketTransaction transactions =
mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class);
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1);
});
}
return service
.subscribeChannel(channelName, BitstampStreamingService.EVENT_TRADE)
.map(
s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
BitstampWebSocketTransaction transactions =
mapper.treeToValue(s.get("data"), BitstampWebSocketTransaction.class);
return BitstampAdapters.adaptTrade(transactions, currencyPair, 1);
});
}

public Observable<BitstampWebSocketOrderEvent> getOrders(CurrencyPair currencyPair, Object... args) {
String channelName = "live_orders" + getChannelPostfix(currencyPair);

return service
.subscribeChannel(channelName, BitstampStreamingService.EVENT_ORDER_CREATED)
.map(
s -> {
ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
return mapper.treeToValue(s, BitstampWebSocketOrderEvent.class);
});
}

private String getChannelPostfix(CurrencyPair currencyPair) {
return "_"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ public class BitstampStreamingService extends JsonNettyStreamingService {
private static final String JSON_CHANNEL = "channel";
private static final String JSON_EVENT = "event";

public static final String EVENT_ORDER_CREATED = "order_created";
public static final String EVENT_ORDER_CHANGED = "order_changed";
public static final String EVENT_ORDER_DELETED = "order_deleted";
public static final String EVENT_ORDERBOOK = "data";
public static final String EVENT_TRADE = "trade";
private static final String EVENT_SUBSCRIPTION_SUCCEEDED = "bts:subscription_succeeded";
private static final String EVENT_UNSUBSCRIPTION_SUCCEEDED = "bts:unsubscription_succeeded";
public static final String EVENT_SUBSCRIPTION_SUCCEEDED = "bts:subscription_succeeded";
public static final String EVENT_UNSUBSCRIPTION_SUCCEEDED = "bts:unsubscription_succeeded";

public BitstampStreamingService(String apiUrl) {
super(apiUrl, Integer.MAX_VALUE);
Expand Down Expand Up @@ -58,6 +61,9 @@ protected void handleMessage(JsonNode message) {
String event = eventJsonNode.asText();

switch (event) {
case EVENT_ORDER_CREATED:
case EVENT_ORDER_CHANGED:
case EVENT_ORDER_DELETED:
case EVENT_ORDERBOOK:
case EVENT_TRADE:
if (!channels.containsKey(channel)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package info.bitrich.xchangestream.bitstamp.v2.dto;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.math.BigDecimal;

public class BitstampWebSocketOrderData {

private static final String ATTR_ORDER_TYPE = "order_type";
private static final String ATTR_PRICE = "price";
private static final String ATTR_DATETIME = "datetime";
private static final String ATTR_AMOUNT = "amount";
private static final String ATTR_ID_STR = "id_str";
private static final String ATTR_AMOUNT_STR = "amount_str";
private static final String ATTR_PRICE_STR = "price_str";
private static final String ATTR_ID = "id";
private static final String ATTR_MICROTIMESTAMP = "microtimestamp";

private final int orderType;
private final BigDecimal price;
private final long dateTime;
private final BigDecimal amount;
private final String idStr;
private final String amountStr;
private final String priceStr;
private final long id;
private final String microTimestamp;

@JsonCreator
public BitstampWebSocketOrderData(
@JsonProperty(ATTR_ORDER_TYPE) int orderType,
@JsonProperty(ATTR_PRICE) BigDecimal price,
@JsonProperty(ATTR_DATETIME) long dateTime,
@JsonProperty(ATTR_AMOUNT) BigDecimal amount,
@JsonProperty(ATTR_ID_STR) String idStr,
@JsonProperty(ATTR_AMOUNT_STR) String amountStr,
@JsonProperty(ATTR_PRICE_STR) String priceStr,
@JsonProperty(ATTR_ID) long id,
@JsonProperty(ATTR_MICROTIMESTAMP) String microTimestamp) {
this.orderType = orderType;
this.price = price;
this.dateTime = dateTime;
this.amount = amount;
this.idStr = idStr;
this.amountStr = amountStr;
this.priceStr = priceStr;
this.id = id;
this.microTimestamp = microTimestamp;
}

public int getOrderType() {
return orderType;
}

public BigDecimal getPrice() {
return price;
}

public long getDateTime() {
return dateTime;
}

public BigDecimal getAmount() {
return amount;
}

public String getIdStr() {
return idStr;
}

public String getAmountStr() {
return amountStr;
}

public String getPriceStr() {
return priceStr;
}

public long getId() {
return id;
}

public String getMicroTimestamp() {
return microTimestamp;
}

@Override
public String toString() {
return "{\"orderType\":" + orderType +
",\"price\":" + price +
",\"dateTime\":" + dateTime +
",\"amount\":" + amount +
",\"idStr\":\"" + idStr + '"' +
",\"amountStr\":\"" + amountStr + '"' +
",\"priceStr\":\"" + priceStr + '"' +
",\"id\":" + id +
",\"microTimestamp\":\"" + microTimestamp + '"' +
'}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package info.bitrich.xchangestream.bitstamp.v2.dto;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class BitstampWebSocketOrderEvent {

private static final String DATA = "data";
private static final String EVENT = "event";
private static final String CHANNEL = "channel";

private final BitstampWebSocketOrderData data;
private final String event;
private final String channel;

@JsonCreator
public BitstampWebSocketOrderEvent(
@JsonProperty(DATA) BitstampWebSocketOrderData data,
@JsonProperty(EVENT) String event,
@JsonProperty(CHANNEL) String channel) {
this.data = data;
this.event = event;
this.channel = channel;
}

public BitstampWebSocketOrderData getData() {
return data;
}

public String getEvent() {
return event;
}

public String getChannel() {
return channel;
}

@Override
public String toString() {
return "{\"data\":" + data + ",\"event\":\"" + event + "\",\"channel\":\"" + channel + "\"}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderEvent;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.observers.TestObserver;
import java.util.List;
Expand Down Expand Up @@ -31,6 +32,35 @@ protected void validateTrades(Trade expected, TestObserver<Trade> test) {
});
}

protected void validateTrades(BitstampWebSocketOrderEvent expected, TestObserver<BitstampWebSocketOrderEvent> test) {
test.assertValue(
actual -> {
assertThat(actual.getData().getOrderType()).as("OrderType")
.isEqualTo(expected.getData().getOrderType());
assertThat(actual.getData().getPrice()).as("Price")
.isEqualTo(expected.getData().getPrice());
assertThat(actual.getData().getDateTime()).as("DateTime")
.isEqualTo(expected.getData().getDateTime());
assertThat(actual.getData().getAmount()).as("Amount")
.isEqualTo(expected.getData().getAmount());
assertThat(actual.getData().getIdStr()).as("IdStr")
.isEqualTo(expected.getData().getIdStr());
assertThat(actual.getData().getAmountStr()).as("AmountStr")
.isEqualTo(expected.getData().getAmountStr());
assertThat(actual.getData().getPriceStr()).as("PriceStr")
.isEqualTo(expected.getData().getPriceStr());
assertThat(actual.getData().getId()).as("Id")
.isEqualTo(expected.getData().getId());
assertThat(actual.getData().getMicroTimestamp()).as("MicroTimestamp")
.isEqualTo(expected.getData().getMicroTimestamp());
assertThat(actual.getEvent()).as("Event")
.isEqualTo(expected.getEvent());
assertThat(actual.getChannel()).as("Channel")
.isEqualTo(expected.getChannel());
return true;
});
}

protected void validateOrderBook(
List<LimitOrder> bids, List<LimitOrder> asks, TestObserver<OrderBook> test) {
test.assertValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingMarketDataService;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService;
import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderData;
import info.bitrich.xchangestream.bitstamp.v2.dto.BitstampWebSocketOrderEvent;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import java.math.BigDecimal;
Expand Down Expand Up @@ -136,4 +138,59 @@ public void testGetTrades() throws Exception {
public void testGetTicker() throws Exception {
marketDataService.getTicker(CurrencyPair.BTC_EUR).test();
}

@Test
public void testOrders() throws Exception {
String channel = "live_orders_btcusd";
String event = "order_created";

testOrdersCommons("/orders-v2-created.json", channel, event,
new BitstampWebSocketOrderEvent(
new BitstampWebSocketOrderData(
0,
BigDecimal.valueOf(5901.51d),
1584902114L,
BigDecimal.valueOf(0.1379724d),
"1212691004391424",
"0.13797240",
"5901.51",
1212691004391424L,
"1584902114401000"),
"order_created",
"live_orders_btcusd"
)
);

testOrdersCommons("/orders-v2-deleted.json", channel, event,
new BitstampWebSocketOrderEvent(
new BitstampWebSocketOrderData(
0,
BigDecimal.valueOf(5868.27d),
1584902114L,
BigDecimal.valueOf(0.06d),
"1212691001229312",
"0.06000000",
"5868.27",
1212691001229312L,
"1584902114398000"),
"order_deleted",
"live_orders_btcusd"
)
);
}

public void testOrdersCommons(String resourceName, String channel, String event,
BitstampWebSocketOrderEvent expected) throws Exception {
// Given order event in JSON
JsonNode order = mapper.readTree(this.getClass().getResource(resourceName));

when(streamingService.subscribeChannel(eq(channel), eq(event))).thenReturn(Observable.just(order));

// Call get order book observable
TestObserver<BitstampWebSocketOrderEvent> test = marketDataService.getOrders(CurrencyPair.BTC_USD).test();

// We get order book object in correct order
validateTrades(expected, test);
}

}
15 changes: 15 additions & 0 deletions xchange-stream-bitstamp/src/test/resources/orders-v2-created.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"order_type": 0,
"price": 5901.51,
"datetime": "1584902114",
"amount": 0.1379724,
"id_str": "1212691004391424",
"amount_str": "0.13797240",
"price_str": "5901.51",
"id": 1212691004391424,
"microtimestamp": "1584902114401000"
},
"event": "order_created",
"channel": "live_orders_btcusd"
}
15 changes: 15 additions & 0 deletions xchange-stream-bitstamp/src/test/resources/orders-v2-deleted.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"order_type": 0,
"price": 5868.27,
"datetime": "1584902114",
"amount": 0.06,
"id_str": "1212691001229312",
"amount_str": "0.06000000",
"price_str": "5868.27",
"id": 1212691001229312,
"microtimestamp": "1584902114398000"
},
"event": "order_deleted",
"channel": "live_orders_btcusd"
}