Skip to content

Commit

Permalink
Merge pull request bitrich-info#538 from badgerwithagun/run-coveo-fmt
Browse files Browse the repository at this point in the history
Run coveo fmt
  • Loading branch information
badgerwithagun authored Mar 2, 2020
2 parents e01eba1 + aac7518 commit 0c5fa85
Show file tree
Hide file tree
Showing 277 changed files with 17,648 additions and 16,072 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,64 @@
import org.knowm.xchange.bankera.BankeraExchange;
import org.knowm.xchange.bankera.service.BankeraMarketDataService;


public class BankeraStreamingExchange extends BankeraExchange implements StreamingExchange {

private static final String WS_URI = "wss://api-exchange.bankera.com/ws";
private BankeraStreamingService streamingService;
private static final String WS_URI = "wss://api-exchange.bankera.com/ws";
private BankeraStreamingService streamingService;
private BankeraStreamingMarketDataService streamingMarketDataService;

public BankeraStreamingExchange() {
public BankeraStreamingExchange() {
this.streamingService = new BankeraStreamingService(WS_URI);
}

@Override
protected void initServices() {
super.initServices();
streamingMarketDataService = new BankeraStreamingMarketDataService(
streamingService, (BankeraMarketDataService) marketDataService);
}

@Override
public Completable connect(ProductSubscription... args) {
return streamingService.connect();
}
@Override
protected void initServices() {
super.initServices();
streamingMarketDataService =
new BankeraStreamingMarketDataService(
streamingService, (BankeraMarketDataService) marketDataService);
}

@Override
public Completable disconnect() {
return streamingService.disconnect();
}
@Override
public Completable connect(ProductSubscription... args) {
return streamingService.connect();
}

@Override
public boolean isAlive() {
return streamingService.isSocketOpen();
}
@Override
public Completable disconnect() {
return streamingService.disconnect();
}

@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}
@Override
public boolean isAlive() {
return streamingService.isSocketOpen();
}

@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}
@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}

@Override
public ExchangeSpecification getDefaultExchangeSpecification() {
ExchangeSpecification spec = super.getDefaultExchangeSpecification();
spec.setShouldLoadRemoteMetaData(false);
@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}

return spec;
}
@Override
public ExchangeSpecification getDefaultExchangeSpecification() {
ExchangeSpecification spec = super.getDefaultExchangeSpecification();
spec.setShouldLoadRemoteMetaData(false);

@Override
public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}
return spec;
}

@Override
public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); }
@Override
public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public void useCompressedMessages(boolean compressedMessages) {
streamingService.useCompressedMessages(compressedMessages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Observable;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import org.knowm.xchange.bankera.BankeraAdapters;
import org.knowm.xchange.bankera.dto.BankeraException;
import org.knowm.xchange.bankera.dto.marketdata.*;
Expand All @@ -13,65 +16,77 @@
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotAvailableFromExchangeException;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;


public class BankeraStreamingMarketDataService implements StreamingMarketDataService {

private final BankeraStreamingService service;
private final BankeraMarketDataService marketDataService;

public BankeraStreamingMarketDataService(BankeraStreamingService service, BankeraMarketDataService marketDataService) {
public BankeraStreamingMarketDataService(
BankeraStreamingService service, BankeraMarketDataService marketDataService) {
this.service = service;
this.marketDataService = marketDataService;
}

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
BankeraMarket market = getMarketInfo(currencyPair);
return service.subscribeChannel("market-orderbook", market.getId())
.map(o -> {
List<BankeraOrderBook.OrderBookOrder> listBids = new ArrayList<>();
List<BankeraOrderBook.OrderBookOrder> listAsks = new ArrayList<>();
o.get("data").get("bids")
.forEach(b -> listBids.add(new BankeraOrderBook.OrderBookOrder(
0, b.get("price").asText(), b.get("amount").asText())));
o.get("data").get("asks")
.forEach(b -> listAsks.add(new BankeraOrderBook.OrderBookOrder(
0, b.get("price").asText(), b.get("amount").asText())));
return BankeraAdapters.adaptOrderBook(new BankeraOrderBook(listBids, listAsks), currencyPair);
});
return service
.subscribeChannel("market-orderbook", market.getId())
.map(
o -> {
List<BankeraOrderBook.OrderBookOrder> listBids = new ArrayList<>();
List<BankeraOrderBook.OrderBookOrder> listAsks = new ArrayList<>();
o.get("data")
.get("bids")
.forEach(
b ->
listBids.add(
new BankeraOrderBook.OrderBookOrder(
0, b.get("price").asText(), b.get("amount").asText())));
o.get("data")
.get("asks")
.forEach(
b ->
listAsks.add(
new BankeraOrderBook.OrderBookOrder(
0, b.get("price").asText(), b.get("amount").asText())));
return BankeraAdapters.adaptOrderBook(
new BankeraOrderBook(listBids, listAsks), currencyPair);
});
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
throw new NotAvailableFromExchangeException();
}


@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
BankeraMarket market = getMarketInfo(currencyPair);
return service.subscribeChannel("market-trade", market.getId())
.map(t -> new Trade.Builder()
.currencyPair(currencyPair)
.id("-1")
.price(new BigDecimal(t.get("data").get("price").asText()))
.originalAmount(new BigDecimal(t.get("data").get("amount").asText()))
.timestamp(new Date(t.get("data").get("time").asLong()))
.type(t.get("data").get("side").asText().equals("SELL") ? Order.OrderType.ASK : Order.OrderType.BID)
.build()
);
return service
.subscribeChannel("market-trade", market.getId())
.map(
t ->
new Trade.Builder()
.currencyPair(currencyPair)
.id("-1")
.price(new BigDecimal(t.get("data").get("price").asText()))
.originalAmount(new BigDecimal(t.get("data").get("amount").asText()))
.timestamp(new Date(t.get("data").get("time").asLong()))
.type(
t.get("data").get("side").asText().equals("SELL")
? Order.OrderType.ASK
: Order.OrderType.BID)
.build());
}

private BankeraMarket getMarketInfo(CurrencyPair currencyPair) {
try {
BankeraMarketInfo info = this.marketDataService.getMarketInfo();
Optional<BankeraMarket> market = info.getMarkets().stream().filter(
m -> m.getName().equals(currencyPair.toString().replace("/", "-"))
).findFirst();
Optional<BankeraMarket> market =
info.getMarkets().stream()
.filter(m -> m.getName().equals(currencyPair.toString().replace("/", "-")))
.findFirst();

if (market.isPresent()) {
return market.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,34 @@
import info.bitrich.xchangestream.bankera.dto.BankeraWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;

import java.io.IOException;

public class BankeraStreamingService extends JsonNettyStreamingService {

public BankeraStreamingService(String uri) {
super(uri, Integer.MAX_VALUE);
}
public BankeraStreamingService(String uri) {
super(uri, Integer.MAX_VALUE);
}

@Override
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
@Override
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
return message.get("type").asText();
}
}

@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
if (args.length != 1) throw new IOException("SubscribeMessage: Insufficient arguments");
BankeraWebSocketSubscriptionMessage subscribeMessage =
new BankeraWebSocketSubscriptionMessage(String.valueOf(args[0]));
return objectMapper.writeValueAsString(subscribeMessage);
}

@Override
public String getUnsubscribeMessage(String channelName) throws IOException {
return null;
}
}

@Override
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
return null;
}
@Override
public String getUnsubscribeMessage(String channelName) throws IOException {
return null;
}

@Override
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,33 @@
import org.slf4j.LoggerFactory;

public class BankeraManualExample {
private static final Logger LOGGER = LoggerFactory.getLogger(BankeraManualExample.class);

public static void main(String[] args) {
StreamingExchange exchange = StreamingExchangeFactory.INSTANCE
.createExchange(BankeraStreamingExchange.class.getName());

exchange.connect().blockingAwait();
exchange.getStreamingMarketDataService()
.getOrderBook(CurrencyPair.ETH_BTC)
.subscribe(orderBook -> LOGGER.debug("ORDERBOOK: {}", orderBook.toString()),
throwable -> LOGGER.error("ERROR in getting order book: ", throwable));

exchange.getStreamingMarketDataService()
.getTrades(CurrencyPair.ETH_BTC)
.subscribe(trade ->LOGGER.debug("TRADES: {}", trade.toString()),
throwable -> LOGGER.error("ERROR in getting trade ", throwable));

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}

exchange.disconnect().subscribe(() -> LOGGER.info("Disconnected"));

private static final Logger LOGGER = LoggerFactory.getLogger(BankeraManualExample.class);

public static void main(String[] args) {
StreamingExchange exchange =
StreamingExchangeFactory.INSTANCE.createExchange(BankeraStreamingExchange.class.getName());

exchange.connect().blockingAwait();
exchange
.getStreamingMarketDataService()
.getOrderBook(CurrencyPair.ETH_BTC)
.subscribe(
orderBook -> LOGGER.debug("ORDERBOOK: {}", orderBook.toString()),
throwable -> LOGGER.error("ERROR in getting order book: ", throwable));

exchange
.getStreamingMarketDataService()
.getTrades(CurrencyPair.ETH_BTC)
.subscribe(
trade -> LOGGER.debug("TRADES: {}", trade.toString()),
throwable -> LOGGER.error("ERROR in getting trade ", throwable));

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}

exchange.disconnect().subscribe(() -> LOGGER.info("Disconnected"));
}
}
Loading

0 comments on commit 0c5fa85

Please sign in to comment.