Skip to content

Commit ad8819e

Browse files
authored
Merge pull request #46 from butaneprotocol/more-sources
More sources
2 parents 5578242 + 01621bc commit ad8819e

11 files changed

+666
-206
lines changed

Cargo.lock

+180-124
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ cron = "0.15"
2424
dashmap = "6"
2525
ed25519 = { version = "2.2", features = ["pkcs8", "pem"] }
2626
ed25519-dalek = { version = "2.1" }
27-
frost-ed25519 = { version = "2.0.0-rc.0", features = ["serialization"] }
27+
frost-ed25519 = { version = "2.1", features = ["serialization"] }
2828
futures = "0.3"
2929
hex = "0.4.3"
3030
kupon = { git = "https://github.com/SundaeSwap-finance/kupon", rev = "ccc0b37" }
@@ -47,8 +47,8 @@ serde = "1"
4747
serde_json = "1"
4848
temp-env = "0.3"
4949
tokio = { version = "1", features = ["full"] }
50+
tokio-tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots" ]}
5051
tokio-util = { version = "0.7", features = ["full"] }
51-
tokio-websockets = { version = "0.11", features = ["client", "rustls-webpki-roots", "ring", "rand"] }
5252
tonic = "0.12"
5353
tracing = "0.1.40"
5454
tracing-opentelemetry = "0.28"

config.base.yaml

+40-6
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,6 @@ binance:
185185
- token: BTC
186186
unit: USDT
187187
stream: btcusdt@ticker
188-
- token: EUR
189-
unit: USDT
190-
stream: eurusdt@ticker
191-
- token: PAXG
192-
unit: USDT
193-
stream: paxgusdt@ticker
194188
- token: POL
195189
unit: USDT
196190
stream: polusdt@ticker
@@ -231,6 +225,46 @@ coinbase:
231225
- token: USDT
232226
unit: USD
233227
product_id: USDT-USD
228+
crypto_com:
229+
tokens:
230+
- token: ADA
231+
unit: USD
232+
stream: ticker.ADA_USD
233+
- token: BTC
234+
unit: USD
235+
stream: ticker.BTC_USD
236+
- token: EUR
237+
unit: USDT
238+
stream: ticker.EUR_USDT
239+
- token: PAXG
240+
unit: USD
241+
stream: ticker.PAXG_USD
242+
- token: POL
243+
unit: USD
244+
stream: ticker.POL_USD
245+
- token: SOL
246+
unit: USD
247+
stream: ticker.SOL_USD
248+
- token: USDT
249+
unit: USD
250+
stream: ticker.USDT_USD
251+
kucoin:
252+
tokens:
253+
- token: ADA
254+
unit: USDT
255+
symbol: ADA-USDT
256+
- token: BTC
257+
unit: USDT
258+
symbol: BTC-USDT
259+
- token: PAXG
260+
unit: USDT
261+
symbol: PAXG-USDT
262+
- token: POL
263+
unit: USDT
264+
symbol: POL-USDT
265+
- token: SOL
266+
unit: USDT
267+
symbol: SOL-USDT
234268
maestro:
235269
tokens:
236270
- token: DJED

src/config.rs

+30
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ struct RawOracleConfig {
4040
pub binance: BinanceConfig,
4141
pub bybit: ByBitConfig,
4242
pub coinbase: CoinbaseConfig,
43+
pub crypto_com: CryptoComConfig,
4344
pub fxratesapi: FxRatesApiConfig,
45+
pub kucoin: KucoinConfig,
4446
pub maestro: MaestroConfig,
4547
pub okx: OkxConfig,
4648
pub sundaeswap: SundaeSwapConfig,
@@ -72,7 +74,9 @@ pub struct OracleConfig {
7274
pub bybit: ByBitConfig,
7375
pub binance: BinanceConfig,
7476
pub coinbase: CoinbaseConfig,
77+
pub crypto_com: CryptoComConfig,
7578
pub fxratesapi: FxRatesApiConfig,
79+
pub kucoin: KucoinConfig,
7680
pub maestro: MaestroConfig,
7781
pub okx: OkxConfig,
7882
pub sundaeswap: SundaeSwapConfig,
@@ -177,7 +181,9 @@ impl TryFrom<RawOracleConfig> for OracleConfig {
177181
binance: raw.binance,
178182
bybit: raw.bybit,
179183
coinbase: raw.coinbase,
184+
crypto_com: raw.crypto_com,
180185
fxratesapi: raw.fxratesapi,
186+
kucoin: raw.kucoin,
181187
maestro: raw.maestro,
182188
okx: raw.okx,
183189
sundaeswap: raw.sundaeswap,
@@ -305,13 +311,37 @@ pub struct CoinbaseTokenConfig {
305311
pub product_id: String,
306312
}
307313

314+
#[derive(Debug, Deserialize)]
315+
pub struct CryptoComConfig {
316+
pub tokens: Vec<CryptoComTokenConfig>,
317+
}
318+
319+
#[derive(Debug, Deserialize, Clone)]
320+
pub struct CryptoComTokenConfig {
321+
pub token: String,
322+
pub unit: String,
323+
pub stream: String,
324+
}
325+
308326
#[derive(Debug, Deserialize)]
309327
pub struct FxRatesApiConfig {
310328
pub cron: String,
311329
pub currencies: Vec<String>,
312330
pub base: String,
313331
}
314332

333+
#[derive(Debug, Deserialize)]
334+
pub struct KucoinConfig {
335+
pub tokens: Vec<KucoinTokenConfig>,
336+
}
337+
338+
#[derive(Debug, Deserialize, Clone)]
339+
pub struct KucoinTokenConfig {
340+
pub token: String,
341+
pub unit: String,
342+
pub symbol: String,
343+
}
344+
315345
#[derive(Debug, Deserialize)]
316346
pub struct MaestroConfig {
317347
pub tokens: Vec<MaestroTokenConfig>,

src/price_aggregator.rs

+4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::{
2424
binance::BinanceSource,
2525
bybit::ByBitSource,
2626
coinbase::CoinbaseSource,
27+
crypto_com::CryptoComSource,
2728
fxratesapi::FxRatesApiSource,
29+
kucoin::KucoinSource,
2830
maestro::MaestroSource,
2931
minswap::MinswapSource,
3032
okx::OkxSource,
@@ -71,6 +73,8 @@ impl PriceAggregator {
7173
SourceAdapter::new(BinanceSource::new(&config), &config),
7274
SourceAdapter::new(ByBitSource::new(&config), &config),
7375
SourceAdapter::new(CoinbaseSource::new(&config), &config),
76+
SourceAdapter::new(CryptoComSource::new(&config), &config),
77+
SourceAdapter::new(KucoinSource::new(&config), &config),
7478
SourceAdapter::new(MinswapSource::new(&config)?, &config),
7579
SourceAdapter::new(OkxSource::new(&config)?, &config),
7680
SourceAdapter::new(SpectrumSource::new(&config)?, &config),

src/sources/binance.rs

+10-17
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::{collections::BTreeMap, str::FromStr, time::Duration};
22

33
use anyhow::{anyhow, Result};
4-
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
4+
use futures::{future::BoxFuture, FutureExt, StreamExt};
55
use rust_decimal::Decimal;
66
use serde::Deserialize;
77
use tokio::time::timeout;
8-
use tokio_websockets::{ClientBuilder, Message};
8+
use tokio_tungstenite::connect_async;
99
use tracing::{trace, warn};
1010

1111
use crate::{
@@ -53,9 +53,8 @@ impl BinanceSource {
5353
.map(|k| k.as_str())
5454
.collect::<Vec<&str>>()
5555
.join("/");
56-
let url = format!("{BASE_URL}?streams={streams}");
57-
let uri = url.try_into()?;
58-
let (mut stream, _) = ClientBuilder::from_uri(uri).connect().await?;
56+
let uri = format!("{BASE_URL}?streams={streams}");
57+
let (mut stream, _) = connect_async(uri).await?;
5958
trace!("Connected to binance!");
6059

6160
let connection_timeout = Duration::from_secs(60);
@@ -67,18 +66,12 @@ impl BinanceSource {
6766
continue;
6867
}
6968
};
70-
if let Some(contents) = message.as_text() {
71-
if let Err(err) = self.process_binance_message(contents, sink) {
72-
warn!("Unexpected error updating binance data: {:?}", err);
73-
}
74-
} else if message.is_ping() {
75-
let data = message.into_payload();
76-
trace!("Ping received from binance: {:?}", data);
77-
if let Err(err) = stream.send(Message::pong(data)).await {
78-
warn!("Unexpected error replying to binance ping: {}", err);
79-
}
80-
} else {
81-
warn!("Unexpected response from binance: {:?}", message);
69+
if !message.is_text() {
70+
continue;
71+
}
72+
let contents = message.into_text()?;
73+
if let Err(err) = self.process_binance_message(&contents, sink) {
74+
warn!("Unexpected error updating binance data: {:?}", err);
8275
}
8376
}
8477

src/sources/bybit.rs

+43-48
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::str::FromStr;
22

3-
use anyhow::{anyhow, Result};
3+
use anyhow::{anyhow, bail, Result};
44
use dashmap::DashMap;
55
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
66
use rust_decimal::Decimal;
@@ -9,7 +9,7 @@ use tokio::{
99
select,
1010
time::{sleep, timeout, Duration},
1111
};
12-
use tokio_websockets::{ClientBuilder, Message};
12+
use tokio_tungstenite::{connect_async, tungstenite::Message};
1313

1414
use crate::config::OracleConfig;
1515

@@ -68,8 +68,7 @@ impl ByBitSource {
6868
}
6969

7070
async fn query_impl(&self, sink: &PriceSink) -> Result<()> {
71-
let uri = URL.try_into()?;
72-
let (mut stream, _) = ClientBuilder::from_uri(uri).connect().await?;
71+
let (mut stream, _) = connect_async(URL).await?;
7372

7473
let subscribe_timeout = Duration::from_secs(60);
7574
timeout(
@@ -107,53 +106,49 @@ impl ByBitSource {
107106
let consumer = async move {
108107
while let Some(result) = stream.next().await {
109108
let message = result?;
110-
match message.as_text() {
111-
Some(content) => {
112-
let response: ByBitResponse = serde_json::from_str(content)?;
113-
let data = match response {
114-
ByBitResponse::StatusResponse { success, ret_msg } => {
115-
if !success {
116-
return Err(anyhow!("Error subscribing: {}", ret_msg));
117-
}
118-
continue;
119-
}
120-
ByBitResponse::TickerResponse { data } => data,
121-
};
122-
let Some(mut info) = self.stream_info.get_mut(&data.symbol) else {
123-
continue;
124-
};
125-
let Some(value) = data
126-
.mark_price
127-
.and_then(|x| Decimal::from_str(&x).ok())
128-
.or(info.last_value)
129-
else {
130-
continue;
131-
};
132-
info.last_value = Some(value);
133-
134-
let Some(volume) = data
135-
.volume_24h
136-
.and_then(|x| Decimal::from_str(&x).ok())
137-
.or(info.last_volume)
138-
else {
139-
continue;
140-
};
141-
info.last_volume = Some(volume);
142-
143-
let price_info = PriceInfo {
144-
token: info.token.clone(),
145-
unit: info.unit.clone(),
146-
value,
147-
reliability: volume,
148-
};
149-
sink.send(price_info)?;
150-
}
151-
None => {
152-
return Err(anyhow!("Unexpected response {:?}", message));
109+
if !message.is_text() {
110+
continue;
111+
}
112+
let response: ByBitResponse = serde_json::from_str(&message.into_text()?)?;
113+
let data = match response {
114+
ByBitResponse::StatusResponse { success, ret_msg } => {
115+
if !success {
116+
bail!("Error subscribing: {}", ret_msg);
117+
}
118+
continue;
153119
}
120+
ByBitResponse::TickerResponse { data } => data,
121+
};
122+
let Some(mut info) = self.stream_info.get_mut(&data.symbol) else {
123+
continue;
124+
};
125+
let Some(value) = data
126+
.mark_price
127+
.and_then(|x| Decimal::from_str(&x).ok())
128+
.or(info.last_value)
129+
else {
130+
continue;
131+
};
132+
info.last_value = Some(value);
133+
134+
let Some(volume) = data
135+
.volume_24h
136+
.and_then(|x| Decimal::from_str(&x).ok())
137+
.or(info.last_volume)
138+
else {
139+
continue;
140+
};
141+
info.last_volume = Some(volume);
142+
143+
let price_info = PriceInfo {
144+
token: info.token.clone(),
145+
unit: info.unit.clone(),
146+
value,
147+
reliability: volume,
154148
};
149+
sink.send(price_info)?;
155150
}
156-
Err(anyhow!("ByBit stream has closed"))
151+
bail!("ByBit stream has closed")
157152
};
158153

159154
select! {

src/sources/coinbase.rs

+5-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::{collections::BTreeMap, str::FromStr, time::Duration};
22

3-
use anyhow::{anyhow, Result};
3+
use anyhow::{anyhow, Context, Result};
44
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
55
use rust_decimal::Decimal;
66
use serde::{Deserialize, Serialize};
77
use tokio::time::timeout;
8-
use tokio_websockets::{ClientBuilder, Message};
8+
use tokio_tungstenite::{connect_async, tungstenite::Message};
99
use tracing::{trace, warn};
1010

1111
use crate::config::{CoinbaseTokenConfig, OracleConfig};
@@ -46,10 +46,8 @@ impl CoinbaseSource {
4646
async fn query_impl(&self, sink: &PriceSink) -> Result<()> {
4747
trace!("Connecting to coinbase");
4848
let connection_timeout = Duration::from_secs(60);
49-
let uri = URL.try_into()?;
5049

51-
let (mut stream, _) =
52-
timeout(connection_timeout, ClientBuilder::from_uri(uri).connect()).await??;
50+
let (mut stream, _) = timeout(connection_timeout, connect_async(URL)).await??;
5351

5452
let request = CoinbaseRequest::Subscribe {
5553
product_ids: self.products.keys().cloned().collect(),
@@ -143,9 +141,7 @@ impl TryFrom<Message> for CoinbaseResponse {
143141
type Error = anyhow::Error;
144142

145143
fn try_from(value: Message) -> Result<Self> {
146-
match value.as_text() {
147-
Some(msg) => Ok(serde_json::from_str(msg)?),
148-
None => Err(anyhow::anyhow!("Unexpected response: {:?}", value)),
149-
}
144+
let text = value.into_text().context("unexpected response")?;
145+
Ok(serde_json::from_str(&text)?)
150146
}
151147
}

0 commit comments

Comments
 (0)