Skip to content

Commit f4af9d5

Browse files
authored
Merge pull request #53 from butaneprotocol/individual-price-feeds
Publish individual price feeds
2 parents eeec2a0 + 022d6c1 commit f4af9d5

21 files changed

+2047
-1072
lines changed

Diff for: Cargo.lock

+457-331
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.toml

+11-9
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,22 @@ ed25519-dalek = { version = "2.1" }
2727
frost-ed25519 = { version = "2.1", features = ["serialization"] }
2828
futures = "0.3"
2929
hex = "0.4.3"
30+
itertools = "0.14"
3031
kupon = { git = "https://github.com/SundaeSwap-finance/kupon", rev = "ccc0b37" }
31-
minicbor = { version = "0.25", features = ["derive", "std"] }
32-
minicbor-io = { version = "0.20", features = ["async-io"] }
32+
minicbor = { version = "0.26", features = ["derive", "std"] }
33+
minicbor-io = { version = "0.21", features = ["async-io"] }
3334
num-bigint = "0.4"
3435
num-integer = "0.1"
3536
num-rational = "0.4"
3637
num-traits = "0.2"
37-
opentelemetry = "0.27"
38-
opentelemetry-otlp = { version = "0.27", features = ["gzip-tonic", "tls-webpki-roots"] }
39-
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
40-
pallas-crypto = "0.32"
41-
pallas-primitives = "0.32"
38+
opentelemetry = "0.29"
39+
opentelemetry-otlp = { version = "0.29", features = ["grpc-tonic", "gzip-tonic", "tls-webpki-roots"] }
40+
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] }
41+
pallas-crypto = { git = "https://github.com/txpipe/pallas", rev = "c3aad16" }
42+
pallas-primitives = { git = "https://github.com/txpipe/pallas", rev = "c3aad16" }
4243
rand = "0.8.5"
4344
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
44-
rust_decimal = "1.34.3"
45+
rust_decimal = "1"
4546
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "tls12"] }
4647
serde = "1"
4748
serde_json = "1"
@@ -51,8 +52,9 @@ tokio-tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots" ]}
5152
tokio-util = { version = "0.7", features = ["full"] }
5253
tonic = "0.12"
5354
tracing = "0.1.40"
54-
tracing-opentelemetry = "0.28"
55+
tracing-opentelemetry = "0.30"
5556
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
57+
urlencoding = "2"
5658
uuid = { version = "1", features = ["v4"] }
5759
x25519-dalek = { version = "2" }
5860

Diff for: config.base.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ currencies:
177177
digits: 6
178178
- name: XAUt
179179
digits: 6
180+
feeds:
181+
currencies:
182+
- ADA
180183
binance:
181184
tokens:
182185
- token: ADA

Diff for: src/api.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
config::OracleConfig,
1717
network::NodeId,
1818
price_aggregator::TokenPrice,
19-
signature_aggregator::{Payload, TimestampedPayloadEntry},
19+
signature_aggregator::{Payload, SyntheticPayloadEntry},
2020
};
2121

2222
#[derive(Clone, Serialize)]
@@ -105,7 +105,7 @@ async fn report_all_prices(State(state): State<APIState>) -> impl IntoResponse {
105105

106106
#[allow(clippy::large_enum_variant)]
107107
pub enum Response {
108-
Ok(TimestampedPayloadEntry),
108+
Ok(SyntheticPayloadEntry),
109109
NotFound,
110110
}
111111
impl IntoResponse for Response {
@@ -125,9 +125,9 @@ async fn report_payload(
125125
) -> impl IntoResponse {
126126
let payload = state.payload_source.borrow().clone();
127127
match payload
128-
.entries
128+
.synthetics
129129
.iter()
130-
.find(|&p| p.entry.synthetic == synthetic)
130+
.find(|&p| p.synthetic == synthetic)
131131
{
132132
Some(entry) => Response::Ok(entry.clone()),
133133
None => Response::NotFound,

Diff for: src/config.rs

+16
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ struct RawOracleConfig {
3232
pub use_persisted_prices: bool,
3333
pub max_synthetic_divergence: Decimal,
3434
pub publish_url: Option<String>,
35+
pub publish_feed_base_url: Option<String>,
3536
pub logs: RawLogConfig,
3637
pub frost_address: Option<String>,
3738
pub keygen: KeygenConfig,
3839
pub synthetics: Vec<SyntheticConfig>,
3940
pub currencies: Vec<CurrencyConfig>,
41+
pub feeds: FeedConfig,
4042
pub binance: BinanceConfig,
4143
pub bybit: ByBitConfig,
4244
pub coinbase: CoinbaseConfig,
@@ -66,11 +68,13 @@ pub struct OracleConfig {
6668
pub use_persisted_prices: bool,
6769
pub max_synthetic_divergence: Decimal,
6870
pub publish_url: Option<String>,
71+
pub publish_feed_base_url: Option<String>,
6972
pub logs: LogConfig,
7073
pub frost_address: Option<String>,
7174
pub keygen: KeygenConfig,
7275
pub synthetics: Vec<SyntheticConfig>,
7376
pub currencies: Vec<CurrencyConfig>,
77+
pub feeds: FeedConfig,
7478
pub bybit: ByBitConfig,
7579
pub binance: BinanceConfig,
7680
pub coinbase: CoinbaseConfig,
@@ -157,6 +161,11 @@ impl TryFrom<RawOracleConfig> for OracleConfig {
157161
uptrace_dsn: raw.logs.uptrace_dsn,
158162
};
159163

164+
let publish_feed_base_url = raw.publish_feed_base_url.or_else(|| {
165+
let base_url = raw.publish_url.as_ref()?.strip_suffix("/oraclePrices")?;
166+
Some(format!("{base_url}/oracles"))
167+
});
168+
160169
Ok(Self {
161170
id,
162171
label,
@@ -173,11 +182,13 @@ impl TryFrom<RawOracleConfig> for OracleConfig {
173182
use_persisted_prices: raw.use_persisted_prices,
174183
max_synthetic_divergence: raw.max_synthetic_divergence,
175184
publish_url: raw.publish_url,
185+
publish_feed_base_url,
176186
logs,
177187
frost_address: raw.frost_address,
178188
keygen: raw.keygen,
179189
synthetics: raw.synthetics,
180190
currencies: raw.currencies,
191+
feeds: raw.feeds,
181192
binance: raw.binance,
182193
bybit: raw.bybit,
183194
coinbase: raw.coinbase,
@@ -275,6 +286,11 @@ pub struct CurrencyConfig {
275286
pub digits: u32,
276287
}
277288

289+
#[derive(Debug, Deserialize)]
290+
pub struct FeedConfig {
291+
pub currencies: Vec<String>,
292+
}
293+
278294
#[derive(Debug, Deserialize)]
279295
pub struct BinanceConfig {
280296
pub tokens: Vec<BinanceTokenConfig>,

Diff for: src/instrumentation.rs

+19-17
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@ use tracing_subscriber::{
1414
use crate::{config::LogConfig, network::NodeId};
1515

1616
pub enum OtelGuard {
17-
Enabled(metrics::SdkMeterProvider),
17+
Enabled(trace::SdkTracerProvider, metrics::SdkMeterProvider),
1818
Disabled,
1919
}
2020
impl Drop for OtelGuard {
2121
fn drop(&mut self) {
22-
if let Self::Enabled(meter_provider) = &self {
22+
if let Self::Enabled(tracer_provider, meter_provider) = &self {
2323
if let Err(err) = meter_provider.shutdown() {
2424
eprintln!("{:?}", err);
2525
}
26-
global::shutdown_tracer_provider();
26+
if let Err(err) = tracer_provider.shutdown() {
27+
eprintln!("{:?}", err);
28+
}
2729
}
2830
}
2931
}
@@ -57,7 +59,7 @@ where
5759
let metrics_layer = tracing_opentelemetry::MetricsLayer::new(meter_provider.clone());
5860

5961
Dispatch::new(subscriber.with(metrics_layer).with(tracer_layer)).init();
60-
Ok(OtelGuard::Enabled(meter_provider))
62+
Ok(OtelGuard::Enabled(tracer_provider, meter_provider))
6163
}
6264
None => {
6365
Dispatch::new(subscriber).init();
@@ -77,13 +79,15 @@ fn init_providers(
7779
name: &str,
7880
endpoint: &str,
7981
uptrace_dsn: Option<&String>,
80-
) -> Result<(trace::TracerProvider, metrics::SdkMeterProvider)> {
81-
let resource = Resource::default().merge(&Resource::new([
82-
KeyValue::new("service.name", name.to_string()),
83-
KeyValue::new("service.namespace", "oracles"),
84-
KeyValue::new("service.instance.id", id.to_string()),
85-
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
86-
]));
82+
) -> Result<(trace::SdkTracerProvider, metrics::SdkMeterProvider)> {
83+
let resource = Resource::builder()
84+
.with_attributes([
85+
KeyValue::new("service.name", name.to_string()),
86+
KeyValue::new("service.namespace", "oracles"),
87+
KeyValue::new("service.instance.id", id.to_string()),
88+
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
89+
])
90+
.build();
8791

8892
let exporter_provider = ExporterProvider::new(endpoint, uptrace_dsn)?;
8993

@@ -96,9 +100,9 @@ fn init_providers(
96100
fn init_tracer_provider(
97101
resource: Resource,
98102
exporter_provider: &ExporterProvider,
99-
) -> Result<trace::TracerProvider> {
103+
) -> Result<trace::SdkTracerProvider> {
100104
let exporter = exporter_provider.span()?;
101-
let processor = trace::BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio)
105+
let processor = trace::BatchSpanProcessor::builder(exporter)
102106
.with_batch_config(
103107
opentelemetry_sdk::trace::BatchConfigBuilder::default()
104108
.with_max_queue_size(30000)
@@ -108,7 +112,7 @@ fn init_tracer_provider(
108112
)
109113
.build();
110114

111-
let provider = trace::TracerProvider::builder()
115+
let provider = trace::SdkTracerProvider::builder()
112116
.with_span_processor(processor)
113117
.with_resource(resource)
114118
.build();
@@ -123,10 +127,8 @@ fn init_meter_provider(
123127
exporter_provider: &ExporterProvider,
124128
) -> Result<metrics::SdkMeterProvider> {
125129
let exporter = exporter_provider.metric()?;
126-
let reader =
127-
metrics::PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
128130
let provider = metrics::SdkMeterProvider::builder()
129-
.with_reader(reader)
131+
.with_periodic_exporter(exporter)
130132
.with_resource(resource)
131133
.build();
132134

Diff for: src/main.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use oracles::{
1010
instrumentation,
1111
network::Network,
1212
price_aggregator::PriceAggregator,
13+
price_feed::PriceData,
1314
publisher::Publisher,
1415
raft::{Raft, RaftClient, RaftLeader},
1516
signature_aggregator::SignatureAggregator,
@@ -45,7 +46,10 @@ struct Node {
4546
impl Node {
4647
pub fn new(config: Arc<OracleConfig>) -> Result<Self> {
4748
let (leader_sink, leader_source) = watch::channel(RaftLeader::Unknown);
48-
let (price_feed_sink, price_feed_source) = watch::channel(vec![]);
49+
let (price_sink, price_source) = watch::channel(PriceData {
50+
synthetics: vec![],
51+
generics: vec![],
52+
});
4953
let (price_audit_sink, price_audit_source) = watch::channel(vec![]);
5054
let (raft_client, raft_source) = RaftClient::new();
5155

@@ -63,7 +67,7 @@ impl Node {
6367
&config,
6468
&mut network,
6569
leader_sink,
66-
price_feed_source.clone(),
70+
price_source.clone(),
6771
raft_source,
6872
);
6973

@@ -72,15 +76,15 @@ impl Node {
7276
&config,
7377
&mut network,
7478
raft_client,
75-
price_feed_source,
79+
price_source,
7680
leader_source,
7781
)?
7882
} else {
79-
SignatureAggregator::single(&config, raft_client, price_feed_source, leader_source)?
83+
SignatureAggregator::single(&config, raft_client, price_source, leader_source)?
8084
};
8185

8286
let price_aggregator = PriceAggregator::new(
83-
price_feed_sink,
87+
price_sink,
8488
price_audit_sink,
8589
payload_source.clone(),
8690
config.clone(),

0 commit comments

Comments
 (0)