diff --git a/apps/hip-3-pusher/config/config.sample.toml b/apps/hip-3-pusher/config/config.sample.toml new file mode 100644 index 0000000000..f9b9f45bf4 --- /dev/null +++ b/apps/hip-3-pusher/config/config.sample.toml @@ -0,0 +1,43 @@ +stale_price_threshold_seconds = 5 +prometheus_port = 9090 + +[hyperliquid] +hyperliquid_ws_urls = ["wss://api.hyperliquid-testnet.xyz/ws"] +market_name = "pyth" +asset_context_symbols = ["BTC"] +use_testnet = false +oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt" +publish_interval = 3.0 +publish_timeout = 5.0 +enable_publish = false + +[multisig] +enable_multisig = false + +[kms] +enable_kms = false +aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt" + +[lazer] +lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"] +lazer_api_key = "lazer_api_key" +feed_ids = [1, 8] # BTC, USDT + +[hermes] +hermes_urls = ["wss://hermes.pyth.network/ws"] +feed_ids = [ + "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", # BTC + "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT +] + +[price.oracle] +BTC = [ + { source_type = "single", source = { source_name = "hl_oracle", source_id = "BTC" } }, + { source_type = "pair", base_source = { source_name = "lazer", source_id = 1, exponent = -8 }, quote_source = { source_name = "lazer", source_id = 8, exponent = -8 } }, + { source_type = "pair", base_source = { source_name = "hermes", source_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent = -8 }, quote_source = { source_name = "hermes", source_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent = -8 } }, +] + +[price.external] +BTC = [{ source_type = "single", source = { source_name = "hl_mark", source_id = "BTC" } }] +PYTH = [{ source_type = "constant", value = "0.10" }] +FOGO = [{ source_type = "constant", value = "0.01" }] diff --git a/apps/hip-3-pusher/config/config.toml b/apps/hip-3-pusher/config/config.toml deleted file mode 100644 index 20bd0e554b..0000000000 --- a/apps/hip-3-pusher/config/config.toml +++ /dev/null @@ -1,35 +0,0 @@ -stale_price_threshold_seconds = 5 -prometheus_port = 9090 - -[hyperliquid] -hyperliquid_ws_urls = ["wss://api.hyperliquid-testnet.xyz/ws"] -market_name = "" -market_symbol = "BTC" -use_testnet = false -oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt" -publish_interval = 3.0 -publish_timeout = 5.0 -enable_publish = false - -[multisig] -enable_multisig = false -multisig_address = "0x0000000000000000000000000000000000000005" - -[kms] -enable_kms = false -aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt" - -[lazer] -lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"] -lazer_api_key = "lazer_api_key" -base_feed_id = 1 # BTC -base_feed_exponent = -8 -quote_feed_id = 8 # USDT -quote_feed_exponent = -8 - -[hermes] -hermes_urls = ["wss://hermes.pyth.network/ws"] -base_feed_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC -base_feed_exponent = -8 -quote_feed_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT -quote_feed_exponent = -8 diff --git a/apps/hip-3-pusher/pyproject.toml b/apps/hip-3-pusher/pyproject.toml index 715b53c917..1802a322f0 100644 --- a/apps/hip-3-pusher/pyproject.toml +++ b/apps/hip-3-pusher/pyproject.toml @@ -1,12 +1,13 @@ [project] name = "hip-3-pusher" -version = "0.1.7" +version = "0.2.0" description = "Hyperliquid HIP-3 market oracle pusher" readme = "README.md" requires-python = "==3.13.*" dependencies = [ "boto3~=1.40.38", "cryptography~=46.0.1", + "httpx~=0.28.1", "hyperliquid-python-sdk~=0.19.0", "loguru~=0.7.3", "opentelemetry-exporter-prometheus~=0.58b0", diff --git a/apps/hip-3-pusher/src/pusher/config.py b/apps/hip-3-pusher/src/pusher/config.py index 9c7a2e600c..c57ae88fcf 100644 --- a/apps/hip-3-pusher/src/pusher/config.py +++ b/apps/hip-3-pusher/src/pusher/config.py @@ -1,6 +1,7 @@ from hyperliquid.utils.constants import MAINNET_API_URL, TESTNET_API_URL from pydantic import BaseModel, FilePath, model_validator from typing import Optional +from typing import Literal STALE_TIMEOUT_SECONDS = 5 @@ -18,25 +19,19 @@ class MultisigConfig(BaseModel): class LazerConfig(BaseModel): lazer_urls: list[str] lazer_api_key: str - base_feed_id: int - base_feed_exponent: int - quote_feed_id: int - quote_feed_exponent: int + feed_ids: list[int] class HermesConfig(BaseModel): hermes_urls: list[str] - base_feed_id: str - base_feed_exponent: int - quote_feed_id: str - quote_feed_exponent: int + feed_ids: list[str] class HyperliquidConfig(BaseModel): hyperliquid_ws_urls: list[str] push_urls: Optional[list[str]] = None market_name: str - market_symbol: str + asset_context_symbols: list[str] use_testnet: bool oracle_pusher_key_path: Optional[FilePath] = None publish_interval: float @@ -50,6 +45,51 @@ def set_default_urls(self): return self +class SedaFeedConfig(BaseModel): + exec_program_id: str + exec_inputs: str + + +class SedaConfig(BaseModel): + url: str + api_key_path: Optional[FilePath] = None + poll_interval: float + poll_failure_interval: float + poll_timeout: float + feeds: dict[str, SedaFeedConfig] + + +class PriceSource(BaseModel): + source_name: str + source_id: str | int + exponent: Optional[int] = None + + +class SingleSourceConfig(BaseModel): + source_type: Literal["single"] + source: PriceSource + + +class PairSourceConfig(BaseModel): + source_type: Literal["pair"] + base_source: PriceSource + quote_source: PriceSource + + +class ConstantSourceConfig(BaseModel): + source_type: Literal["constant"] + value: str + + +PriceSourceConfig = SingleSourceConfig | PairSourceConfig | ConstantSourceConfig + + +class PriceConfig(BaseModel): + oracle: dict[str, list[PriceSourceConfig]] = {} + mark: dict[str, list[PriceSourceConfig]] = {} + external: dict[str, list[PriceSourceConfig]] = {} + + class Config(BaseModel): stale_price_threshold_seconds: int prometheus_port: int @@ -57,4 +97,6 @@ class Config(BaseModel): kms: KMSConfig lazer: LazerConfig hermes: HermesConfig + seda: SedaConfig multisig: MultisigConfig + price: PriceConfig diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index cac0768e1c..ab25a35c50 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -7,23 +7,22 @@ from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate class HermesListener: """ Subscribe to Hermes price updates for needed feeds. """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, hermes_state: PriceSourceState): self.hermes_urls = config.hermes.hermes_urls - self.base_feed_id = config.hermes.base_feed_id - self.quote_feed_id = config.hermes.quote_feed_id - self.price_state = price_state + self.feed_ids = config.hermes.feed_ids + self.hermes_state = hermes_state def get_subscribe_request(self): return { "type": "subscribe", - "ids": [self.base_feed_id, self.quote_feed_id], + "ids": self.feed_ids, "verbose": False, "binary": True, "allow_out_of_order": False, @@ -81,9 +80,6 @@ def parse_hermes_message(self, data): publish_time = price_object["publish_time"] logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) now = time.time() - if id == self.base_feed_id: - self.price_state.hermes_base_price = PriceUpdate(price, now) - if id == self.quote_feed_id: - self.price_state.hermes_quote_price = PriceUpdate(price, now) + self.hermes_state.put(id, PriceUpdate(price, now)) except Exception as e: logger.error("parse_hermes_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index d0544c9168..916da2e177 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -7,7 +7,7 @@ from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate # This will be in config, but note here. # Other RPC providers exist but so far we've seen their support is incomplete. @@ -20,10 +20,11 @@ class HyperliquidListener: Subscribe to any relevant Hyperliquid websocket streams See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_state: PriceSourceState): self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls - self.market_symbol = config.hyperliquid.market_symbol - self.price_state = price_state + self.asset_context_symbols = config.hyperliquid.asset_context_symbols + self.hl_oracle_state = hl_oracle_state + self.hl_mark_state = hl_mark_state def get_subscribe_request(self, asset): return { @@ -44,9 +45,10 @@ async def subscribe_single(self, url): async def subscribe_single_inner(self, url): async with websockets.connect(url) as ws: - subscribe_request = self.get_subscribe_request(self.market_symbol) - await ws.send(json.dumps(subscribe_request)) - logger.info("Sent subscribe request to {}", url) + for symbol in self.asset_context_symbols: + subscribe_request = self.get_subscribe_request(symbol) + await ws.send(json.dumps(subscribe_request)) + logger.info("Sent subscribe request for symbol: {} to {}", symbol, url) # listen for updates while True: @@ -76,10 +78,10 @@ async def subscribe_single_inner(self, url): def parse_hyperliquid_ws_message(self, message): try: ctx = message["data"]["ctx"] + symbol = message["data"]["coin"] now = time.time() - self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now) - self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now) - logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, - self.price_state.hl_mark_price) + self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now)) + self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now)) + logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", ctx["oraclePx"], ctx["markPx"]) except Exception as e: logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 4bca7b2e45..74e0623270 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -7,25 +7,24 @@ from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError -from pusher.price_state import PriceState, PriceUpdate +from pusher.price_state import PriceSourceState, PriceUpdate class LazerListener: """ Subscribe to Lazer price updates for needed feeds. """ - def __init__(self, config: Config, price_state: PriceState): + def __init__(self, config: Config, lazer_state: PriceSourceState): self.lazer_urls = config.lazer.lazer_urls self.api_key = config.lazer.lazer_api_key - self.base_feed_id = config.lazer.base_feed_id - self.quote_feed_id = config.lazer.quote_feed_id - self.price_state = price_state + self.feed_ids = config.lazer.feed_ids + self.lazer_state = lazer_state def get_subscribe_request(self, subscription_id: int): return { "type": "subscribe", "subscriptionId": subscription_id, - "priceFeedIds": [self.base_feed_id, self.quote_feed_id], + "priceFeedIds": self.feed_ids, "properties": ["price"], "formats": [], "deliveryFormat": "json", @@ -54,7 +53,7 @@ async def subscribe_single_inner(self, router_url): subscribe_request = self.get_subscribe_request(1) await ws.send(json.dumps(subscribe_request)) - logger.info("Sent Lazer subscribe request to {}", router_url) + logger.info("Sent Lazer subscribe request to {} feed_ids {}", router_url, self.feed_ids) # listen for updates while True: @@ -89,9 +88,7 @@ def parse_lazer_message(self, data): price = feed_update.get("price", None) if feed_id is None or price is None: continue - if feed_id == self.base_feed_id: - self.price_state.lazer_base_price = PriceUpdate(price, now) - if feed_id == self.quote_feed_id: - self.price_state.lazer_quote_price = PriceUpdate(price, now) + else: + self.lazer_state.put(feed_id, PriceUpdate(price, now)) except Exception as e: logger.error("parse_lazer_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/main.py b/apps/hip-3-pusher/src/pusher/main.py index 558053c8c1..006d4b7edb 100644 --- a/apps/hip-3-pusher/src/pusher/main.py +++ b/apps/hip-3-pusher/src/pusher/main.py @@ -9,6 +9,7 @@ from pusher.hyperliquid_listener import HyperliquidListener from pusher.lazer_listener import LazerListener from pusher.hermes_listener import HermesListener +from pusher.seda_listener import SedaListener from pusher.price_state import PriceState from pusher.publisher import Publisher from pusher.metrics import Metrics @@ -45,15 +46,17 @@ async def main(): metrics = Metrics(config) publisher = Publisher(config, price_state, metrics) - hyperliquid_listener = HyperliquidListener(config, price_state) - lazer_listener = LazerListener(config, price_state) - hermes_listener = HermesListener(config, price_state) + hyperliquid_listener = HyperliquidListener(config, price_state.hl_oracle_state, price_state.hl_mark_state) + lazer_listener = LazerListener(config, price_state.lazer_state) + hermes_listener = HermesListener(config, price_state.hermes_state) + seda_listener = SedaListener(config, price_state.seda_state) await asyncio.gather( publisher.run(), hyperliquid_listener.subscribe_all(), lazer_listener.subscribe_all(), hermes_listener.subscribe_all(), + seda_listener.run(), ) logger.info("Exiting hip-3-pusher..") diff --git a/apps/hip-3-pusher/src/pusher/price_state.py b/apps/hip-3-pusher/src/pusher/price_state.py index 764ff42015..33ee51e4c5 100644 --- a/apps/hip-3-pusher/src/pusher/price_state.py +++ b/apps/hip-3-pusher/src/pusher/price_state.py @@ -1,83 +1,117 @@ +from dataclasses import dataclass from loguru import logger import time -from pusher.config import Config +from pusher.config import Config, PriceSource, PriceSourceConfig, ConstantSourceConfig, SingleSourceConfig, \ + PairSourceConfig DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5 +@dataclass class PriceUpdate: - def __init__(self, price, timestamp): - self.price = price - self.timestamp = timestamp - - def __str__(self): - return f"PriceUpdate(price={self.price}, timestamp={self.timestamp})" + price: float | str + timestamp: float def time_diff(self, now): return now - self.timestamp +class PriceSourceState: + def __init__(self, name: str): + self.name = name + self.state: dict[str, PriceUpdate] = {} + + def __repr__(self): + return f"PriceSourceState(name={self.name} state={self.state})" + + def get(self, symbol: str) -> PriceUpdate | None: + return self.state.get(symbol) + + def put(self, symbol: str, value: PriceUpdate): + self.state[symbol] = value + + class PriceState: + HL_ORACLE = "hl_oracle" + HL_MARK = "hl_mark" + LAZER = "lazer" + HERMES = "hermes" + SEDA = "seda" + """ Maintain latest prices seen across listeners and publisher. """ def __init__(self, config: Config): self.stale_price_threshold_seconds = config.stale_price_threshold_seconds + self.price_config = config.price + + self.hl_oracle_state = PriceSourceState(self.HL_ORACLE) + self.hl_mark_state = PriceSourceState(self.HL_MARK) + self.lazer_state = PriceSourceState(self.LAZER) + self.hermes_state = PriceSourceState(self.HERMES) + self.seda_state = PriceSourceState(self.SEDA) + + self.all_states = { + self.HL_ORACLE: self.hl_oracle_state, + self.HL_MARK: self.hl_mark_state, + self.LAZER: self.lazer_state, + self.HERMES: self.hermes_state, + self.SEDA: self.seda_state, + } + + def get_all_prices(self, market_name): + logger.debug("get_all_prices state: {}", self.all_states) + + return ( + self.get_prices(self.price_config.oracle, market_name), + self.get_prices(self.price_config.mark, market_name), + self.get_prices(self.price_config.external, market_name) + ) + + def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], market_name: str): + pxs = {} + for symbol in symbol_configs: + for source_config in symbol_configs[symbol]: + # find first valid price in the waterfall + px = self.get_price(source_config) + if px is not None: + pxs[f"{market_name}:{symbol}"] = px + break + return pxs + + def get_price(self, price_source_config: PriceSourceConfig): + if isinstance(price_source_config, ConstantSourceConfig): + return price_source_config.value + elif isinstance(price_source_config, SingleSourceConfig): + return self.get_price_from_single_source(price_source_config.source) + elif isinstance(price_source_config, PairSourceConfig): + return self.get_price_from_pair_source(price_source_config.base_source, price_source_config.quote_source) + else: + raise ValueError - self.hl_oracle_price: PriceUpdate | None = None - self.hl_mark_price: PriceUpdate | None = None - - self.lazer_base_price: PriceUpdate | None = None - self.lazer_base_exponent = config.lazer.base_feed_exponent - self.lazer_quote_price: PriceUpdate | None = None - self.lazer_quote_exponent = config.lazer.quote_feed_exponent - - self.hermes_base_price: PriceUpdate | None = None - self.hermes_base_exponent = config.hermes.base_feed_exponent - self.hermes_quote_price: PriceUpdate | None = None - self.hermes_quote_exponent = config.hermes.quote_feed_exponent - - def get_current_oracle_price(self): + def get_price_from_single_source(self, source: PriceSource): now = time.time() - if self.hl_oracle_price: - time_diff = self.hl_oracle_price.time_diff(now) - if time_diff < self.stale_price_threshold_seconds: - return self.hl_oracle_price.price - else: - logger.error("Hyperliquid oracle price stale by {} seconds", time_diff) + update: PriceUpdate | None = self.all_states.get(source.source_name, {}).get(source.source_id) + if update is None: + logger.warning("source {} id {} is missing", source.source_name, source.source_id) + return None + time_diff = update.time_diff(now) + if time_diff >= self.stale_price_threshold_seconds: + logger.warning("source {} id {} is stale by {} seconds", source.source_name, source.source_id, time_diff) + return None + # valid price found + if source.exponent is not None: + return float(update.price) / (10.0 ** -source.exponent) else: - logger.error("Hyperliquid oracle price not received yet") - - # fall back to Lazer - if self.lazer_base_price and self.lazer_quote_price: - max_time_diff = max(self.lazer_base_price.time_diff(now), self.lazer_quote_price.time_diff(now)) - if max_time_diff < self.stale_price_threshold_seconds: - return self.get_lazer_price() - else: - logger.error("Lazer price stale by {} seconds", max_time_diff) - else: - logger.error("Lazer base/quote prices not received yet") - - # fall back to Hermes - if self.hermes_base_price and self.hermes_quote_price: - max_time_diff = max(self.hermes_base_price.time_diff(now), self.hermes_quote_price.time_diff(now)) - if max_time_diff < self.stale_price_threshold_seconds: - return self.get_hermes_price() - else: - logger.error("Hermes price stale by {} seconds", max_time_diff) - else: - logger.error("Hermes base/quote prices not received yet") - - logger.error("All prices missing or stale!") - return None + return update.price - def get_hermes_price(self): - base_price = float(self.hermes_base_price.price) / (10.0 ** -self.hermes_base_exponent) - quote_price = float(self.hermes_quote_price.price) / (10.0 ** -self.hermes_quote_exponent) - return str(round(base_price / quote_price, 2)) + def get_price_from_pair_source(self, base_source: PriceSource, quote_source: PriceSource): + base_price = self.get_price_from_single_source(base_source) + if base_price is None: + return None + quote_price = self.get_price_from_single_source(quote_source) + if quote_price is None: + return None - def get_lazer_price(self): - base_price = float(self.lazer_base_price.price) / (10.0 ** -self.lazer_base_exponent) - quote_price = float(self.lazer_quote_price.price) / (10.0 ** -self.lazer_quote_exponent) - return str(round(base_price / quote_price, 2)) + return str(round(float(base_price) / float(quote_price), 2)) diff --git a/apps/hip-3-pusher/src/pusher/publisher.py b/apps/hip-3-pusher/src/pusher/publisher.py index a1bb600d0d..36324e1540 100644 --- a/apps/hip-3-pusher/src/pusher/publisher.py +++ b/apps/hip-3-pusher/src/pusher/publisher.py @@ -53,7 +53,6 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics): self.multisig_address = config.multisig.multisig_address self.market_name = config.hyperliquid.market_name - self.market_symbol = config.hyperliquid.market_symbol self.enable_publish = config.hyperliquid.enable_publish self.price_state = price_state @@ -70,20 +69,16 @@ async def run(self): logger.exception("Publisher.publish() exception: {}", repr(e)) def publish(self): - oracle_pxs = {} - oracle_px = self.price_state.get_current_oracle_price() - if not oracle_px: - logger.error("No valid oracle price available") - self.metrics.no_oracle_price_counter.add(1, self.metrics_labels) - return - else: - logger.debug("Current oracle price: {}", oracle_px) - oracle_pxs[f"{self.market_name}:{self.market_symbol}"] = oracle_px + oracle_pxs, mark_pxs, external_perp_pxs = self.price_state.get_all_prices(self.market_name) + logger.debug("oracle_pxs: {}", oracle_pxs) + logger.debug("mark_pxs: {}", mark_pxs) + logger.debug("external_perp_pxs: {}", external_perp_pxs) - mark_pxs = [] - external_perp_pxs = {} - if self.price_state.hl_mark_price: - external_perp_pxs[f"{self.market_name}:{self.market_symbol}"] = self.price_state.hl_mark_price.price + if not oracle_pxs: + logger.error("No valid oracle prices available") + self.metrics.no_oracle_price_counter.add(1, self.metrics_labels) + # markPxs is a list of dicts of length 0-2, and so can be empty + mark_pxs = [mark_pxs] if mark_pxs else [] # TODO: "Each update can change oraclePx and markPx by at most 1%." # TODO: "The markPx cannot be updated such that open interest would be 10x the open interest cap." diff --git a/apps/hip-3-pusher/src/pusher/seda_listener.py b/apps/hip-3-pusher/src/pusher/seda_listener.py new file mode 100644 index 0000000000..5bfdfbebcf --- /dev/null +++ b/apps/hip-3-pusher/src/pusher/seda_listener.py @@ -0,0 +1,70 @@ +import asyncio +import datetime +import httpx +import json +from loguru import logger +from pathlib import Path + +from pusher.config import Config, SedaFeedConfig +from pusher.price_state import PriceSourceState, PriceUpdate + + +class SedaListener: + SOURCE_NAME = "seda" + + """ + Subscribe to SEDA price updates for needed feeds. + """ + def __init__(self, config: Config, seda_state: PriceSourceState): + self.url = config.seda.url + self.api_key = Path(config.seda.api_key_path).read_text().strip() + self.feeds = config.seda.feeds + self.poll_interval = config.seda.poll_interval + self.poll_failure_interval = config.seda.poll_failure_interval + self.poll_timeout = config.seda.poll_timeout + self.seda_state = seda_state + + async def run(self): + await asyncio.gather(*[self._run_single(feed_name, self.feeds[feed_name]) for feed_name in self.feeds]) + + async def _run_single(self, feed_name: str, feed_config: SedaFeedConfig) -> None: + headers = { + "Accept": "application/json", + "Authorization": f"Bearer {self.api_key}", + } + params = { + "execProgramId": feed_config.exec_program_id, + "execInputs": feed_config.exec_inputs, + "encoding": "utf8", + } + + async with httpx.AsyncClient(timeout=self.poll_timeout) as client: + while True: + result = await self._poll(client, headers, params) + if result["ok"]: + self._parse_seda_message(feed_name, result["json"]) + else: + logger.error("SEDA poll request for {} failed: {}", feed_name, result) + + await asyncio.sleep(self.poll_interval if result.get("ok") else self.poll_failure_interval) + + async def _poll(self, + client: httpx.AsyncClient, + headers: dict[str, str], + params: dict[str, str], + ) -> dict: + try: + resp = await client.get(self.url, headers=headers, params=params) + resp.raise_for_status() + return {"ok": True, "status": resp.status_code, "json": resp.json()} + except httpx.HTTPStatusError as e: + return {"ok": False, "status": e.response.status_code, "error": str(e)} + except Exception as e: + return {"ok": False, "status": None, "error": str(e)} + + def _parse_seda_message(self, feed_name, message): + result = json.loads(message["data"]["result"]) + price = result["composite_rate"] + timestamp = datetime.datetime.fromisoformat(result["timestamp"]).timestamp() + logger.debug("Parsed SEDA update for feed: {} price: {} timestamp: {}", feed_name, price, timestamp) + self.seda_state.put(feed_name, PriceUpdate(price, timestamp)) diff --git a/apps/hip-3-pusher/tests/test_price_state.py b/apps/hip-3-pusher/tests/test_price_state.py index 21ac0ad034..52e023119a 100644 --- a/apps/hip-3-pusher/tests/test_price_state.py +++ b/apps/hip-3-pusher/tests/test_price_state.py @@ -1,70 +1,102 @@ import time -from pusher.config import Config, LazerConfig, HermesConfig +from pusher.config import Config, LazerConfig, HermesConfig, PriceConfig, PriceSource, SingleSourceConfig, \ + PairSourceConfig, HyperliquidConfig from pusher.price_state import PriceState, PriceUpdate +DEX = "pyth" +SYMBOL = "BTC" + def get_config(): config: Config = Config.model_construct() config.stale_price_threshold_seconds = 5 + config.hyperliquid = HyperliquidConfig.model_construct() + config.hyperliquid.asset_context_symbols = [SYMBOL] config.lazer = LazerConfig.model_construct() - config.lazer.base_feed_exponent = -8 - config.lazer.quote_feed_exponent = -8 + config.lazer.feed_ids = [1, 8] config.hermes = HermesConfig.model_construct() - config.hermes.base_feed_exponent = -8 - config.hermes.quote_feed_exponent = -8 + config.hermes.feed_ids = ["e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b"] + config.price = PriceConfig( + oracle={ + SYMBOL: [ + SingleSourceConfig(source_type="single", source=PriceSource(source_name="hl_oracle", source_id="BTC", exponent=None)), + PairSourceConfig(source_type="pair", + base_source=PriceSource(source_name="lazer", source_id=1, exponent=-8), + quote_source=PriceSource(source_name="lazer", source_id=8, exponent=-8)), + PairSourceConfig(source_type="pair", + base_source=PriceSource(source_name="hermes", source_id="e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent=-8), + quote_source=PriceSource(source_name="hermes", source_id="2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent=-8)) + ] + }, + mark={}, + external={} + ) return config def test_good_hl_price(): + """ + Pass through fresh HL oracle price. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0) - - oracle_px = price_state.get_current_oracle_price() - assert oracle_px == price_state.hl_oracle_price.price - assert oracle_px == "110000.0" + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0)) + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {f"{DEX}:{SYMBOL}": "110000.0"} def test_fallback_lazer(): + """ + HL oracle price is stale, so fall back to fresh Lazer price. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0)) + price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px = price_state.get_current_oracle_price() - assert oracle_px == price_state.get_lazer_price() - assert oracle_px == "111616.16" + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {f"{DEX}:{SYMBOL}": "111616.16"} def test_fallback_hermes(): + """ + HL oracle price and Lazer prices are stale, so fall back to fresh Hermes price. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0) - price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0) + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) + price_state.hermes_state.put("e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", + PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0)) + price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", + PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px = price_state.get_current_oracle_price() - assert oracle_px == price_state.get_hermes_price() - assert oracle_px == "113265.31" + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {f"{DEX}:{SYMBOL}": "113265.31"} def test_all_fail(): + """ + All prices are stale, so return nothing. + """ config = get_config() price_state = PriceState(config) now = time.time() - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0) - price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0) - assert price_state.get_current_oracle_price() is None + price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.hermes_state.put("e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", + PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0)) + price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", + PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0)) + + oracle_px, _, _ = price_state.get_all_prices(DEX) + assert oracle_px == {} diff --git a/apps/hip-3-pusher/uv.lock b/apps/hip-3-pusher/uv.lock index e00ae1b177..3a4cdc3219 100644 --- a/apps/hip-3-pusher/uv.lock +++ b/apps/hip-3-pusher/uv.lock @@ -11,6 +11,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643, upload-time = "2024-05-20T21:33:24.1Z" }, ] +[[package]] +name = "anyio" +version = "4.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "idna" }, + { name = "sniffio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/78/7d432127c41b50bccba979505f272c16cbcadcc33645d5fa3a738110ae75/anyio-4.11.0.tar.gz", hash = "sha256:82a8d0b81e318cc5ce71a5f1f8b5c4e63619620b63141ef8c995fa0db95a57c4", size = 219094, upload-time = "2025-09-23T09:19:12.58Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/15/b3/9b1a8074496371342ec1e796a96f99c82c945a339cd81a8e73de28b4cf9e/anyio-4.11.0-py3-none-any.whl", hash = "sha256:0287e96f4d26d4149305414d4e3bc32f0dcd0862365a4bddea19d7a1ec38c4fc", size = 109097, upload-time = "2025-09-23T09:19:10.601Z" }, +] + [[package]] name = "bitarray" version = "3.7.1" @@ -318,6 +331,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bf/4d/257cdc01ada430b8e84b9f2385c2553f33218f5b47da9adf0a616308d4b7/eth_utils-5.3.1-py3-none-any.whl", hash = "sha256:1f5476d8f29588d25b8ae4987e1ffdfae6d4c09026e476c4aad13b32dda3ead0", size = 102529, upload-time = "2025-08-27T16:37:15.449Z" }, ] +[[package]] +name = "h11" +version = "0.16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, +] + [[package]] name = "hexbytes" version = "1.3.1" @@ -329,11 +351,12 @@ wheels = [ [[package]] name = "hip-3-pusher" -version = "0.1.7" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "boto3" }, { name = "cryptography" }, + { name = "httpx" }, { name = "hyperliquid-python-sdk" }, { name = "loguru" }, { name = "opentelemetry-exporter-prometheus" }, @@ -352,6 +375,7 @@ dev = [ requires-dist = [ { name = "boto3", specifier = "~=1.40.38" }, { name = "cryptography", specifier = "~=46.0.1" }, + { name = "httpx", specifier = "~=0.28.1" }, { name = "hyperliquid-python-sdk", specifier = "~=0.19.0" }, { name = "loguru", specifier = "~=0.7.3" }, { name = "opentelemetry-exporter-prometheus", specifier = "~=0.58b0" }, @@ -364,6 +388,34 @@ requires-dist = [ [package.metadata.requires-dev] dev = [{ name = "pytest", specifier = "~=8.4.2" }] +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + [[package]] name = "hyperliquid-python-sdk" version = "0.19.0" @@ -732,6 +784,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "sniffio" +version = "1.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372, upload-time = "2024-02-25T23:20:04.057Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" }, +] + [[package]] name = "tenacity" version = "9.1.2"