Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions apps/hip-3-pusher/config/config.sample.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
stale_price_threshold_seconds = 5
prometheus_port = 9090

[hyperliquid]
hyperliquid_ws_urls = ["wss://api.hyperliquid-testnet.xyz/ws"]
market_name = "pyth"
asset_context_symbols = ["BTC"]
use_testnet = false
oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
publish_interval = 3.0
publish_timeout = 5.0
enable_publish = false

[multisig]
enable_multisig = false

[kms]
enable_kms = false
aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt"

[lazer]
lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
lazer_api_key = "lazer_api_key"
feed_ids = [1, 8] # BTC, USDT

[hermes]
hermes_urls = ["wss://hermes.pyth.network/ws"]
feed_ids = [
"e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", # BTC
"2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT
]

[price.oracle]
BTC = [
{ source_type = "single", source = { source_name = "hl_oracle", source_id = "BTC" } },
{ source_type = "pair", base_source = { source_name = "lazer", source_id = 1, exponent = -8 }, quote_source = { source_name = "lazer", source_id = 8, exponent = -8 } },
{ source_type = "pair", base_source = { source_name = "hermes", source_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", exponent = -8 }, quote_source = { source_name = "hermes", source_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", exponent = -8 } },
]

[price.external]
BTC = [{ source_type = "single", source = { source_name = "hl_mark", source_id = "BTC" } }]
PYTH = [{ source_type = "constant", value = "0.10" }]
FOGO = [{ source_type = "constant", value = "0.01" }]
35 changes: 0 additions & 35 deletions apps/hip-3-pusher/config/config.toml

This file was deleted.

3 changes: 2 additions & 1 deletion apps/hip-3-pusher/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[project]
name = "hip-3-pusher"
version = "0.1.7"
version = "0.2.0"
description = "Hyperliquid HIP-3 market oracle pusher"
readme = "README.md"
requires-python = "==3.13.*"
dependencies = [
"boto3~=1.40.38",
"cryptography~=46.0.1",
"httpx~=0.28.1",
"hyperliquid-python-sdk~=0.19.0",
"loguru~=0.7.3",
"opentelemetry-exporter-prometheus~=0.58b0",
Expand Down
60 changes: 51 additions & 9 deletions apps/hip-3-pusher/src/pusher/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from hyperliquid.utils.constants import MAINNET_API_URL, TESTNET_API_URL
from pydantic import BaseModel, FilePath, model_validator
from typing import Optional
from typing import Literal

STALE_TIMEOUT_SECONDS = 5

Expand All @@ -18,25 +19,19 @@ class MultisigConfig(BaseModel):
class LazerConfig(BaseModel):
lazer_urls: list[str]
lazer_api_key: str
base_feed_id: int
base_feed_exponent: int
quote_feed_id: int
quote_feed_exponent: int
feed_ids: list[int]


class HermesConfig(BaseModel):
hermes_urls: list[str]
base_feed_id: str
base_feed_exponent: int
quote_feed_id: str
quote_feed_exponent: int
feed_ids: list[str]


class HyperliquidConfig(BaseModel):
hyperliquid_ws_urls: list[str]
push_urls: Optional[list[str]] = None
market_name: str
market_symbol: str
asset_context_symbols: list[str]
use_testnet: bool
oracle_pusher_key_path: Optional[FilePath] = None
publish_interval: float
Expand All @@ -50,11 +45,58 @@ def set_default_urls(self):
return self


class SedaFeedConfig(BaseModel):
exec_program_id: str
exec_inputs: str


class SedaConfig(BaseModel):
url: str
api_key_path: Optional[FilePath] = None
poll_interval: float
poll_failure_interval: float
poll_timeout: float
feeds: dict[str, SedaFeedConfig]


class PriceSource(BaseModel):
source_name: str
source_id: str | int
exponent: Optional[int] = None


class SingleSourceConfig(BaseModel):
source_type: Literal["single"]
source: PriceSource


class PairSourceConfig(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess generally we would leave "combining feeds" up to SEDA? Or would we use this PairSourceConfig for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set this up specifcally for the case (as in our internal publishers) where we need to quote in e.g. USDT. For more complicated transformations yes, hopefully SEDA can abstract it out.

source_type: Literal["pair"]
base_source: PriceSource
quote_source: PriceSource


class ConstantSourceConfig(BaseModel):
source_type: Literal["constant"]
value: str


PriceSourceConfig = SingleSourceConfig | PairSourceConfig | ConstantSourceConfig


class PriceConfig(BaseModel):
oracle: dict[str, list[PriceSourceConfig]] = {}
mark: dict[str, list[PriceSourceConfig]] = {}
external: dict[str, list[PriceSourceConfig]] = {}


class Config(BaseModel):
stale_price_threshold_seconds: int
prometheus_port: int
hyperliquid: HyperliquidConfig
kms: KMSConfig
lazer: LazerConfig
hermes: HermesConfig
seda: SedaConfig
multisig: MultisigConfig
price: PriceConfig
16 changes: 6 additions & 10 deletions apps/hip-3-pusher/src/pusher/hermes_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@

from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnectionError
from pusher.price_state import PriceState, PriceUpdate
from pusher.price_state import PriceSourceState, PriceUpdate


class HermesListener:
"""
Subscribe to Hermes price updates for needed feeds.
"""
def __init__(self, config: Config, price_state: PriceState):
def __init__(self, config: Config, hermes_state: PriceSourceState):
self.hermes_urls = config.hermes.hermes_urls
self.base_feed_id = config.hermes.base_feed_id
self.quote_feed_id = config.hermes.quote_feed_id
self.price_state = price_state
self.feed_ids = config.hermes.feed_ids
self.hermes_state = hermes_state

def get_subscribe_request(self):
return {
"type": "subscribe",
"ids": [self.base_feed_id, self.quote_feed_id],
"ids": self.feed_ids,
"verbose": False,
"binary": True,
"allow_out_of_order": False,
Expand Down Expand Up @@ -81,9 +80,6 @@ def parse_hermes_message(self, data):
publish_time = price_object["publish_time"]
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
now = time.time()
if id == self.base_feed_id:
self.price_state.hermes_base_price = PriceUpdate(price, now)
if id == self.quote_feed_id:
self.price_state.hermes_quote_price = PriceUpdate(price, now)
self.hermes_state.put(id, PriceUpdate(price, now))
except Exception as e:
logger.error("parse_hermes_message error: {}", e)
24 changes: 13 additions & 11 deletions apps/hip-3-pusher/src/pusher/hyperliquid_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnectionError
from pusher.price_state import PriceState, PriceUpdate
from pusher.price_state import PriceSourceState, PriceUpdate

# This will be in config, but note here.
# Other RPC providers exist but so far we've seen their support is incomplete.
Expand All @@ -20,10 +20,11 @@ class HyperliquidListener:
Subscribe to any relevant Hyperliquid websocket streams
See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
"""
def __init__(self, config: Config, price_state: PriceState):
def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_state: PriceSourceState):
self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls
self.market_symbol = config.hyperliquid.market_symbol
self.price_state = price_state
self.asset_context_symbols = config.hyperliquid.asset_context_symbols
self.hl_oracle_state = hl_oracle_state
self.hl_mark_state = hl_mark_state

def get_subscribe_request(self, asset):
return {
Expand All @@ -44,9 +45,10 @@ async def subscribe_single(self, url):

async def subscribe_single_inner(self, url):
async with websockets.connect(url) as ws:
subscribe_request = self.get_subscribe_request(self.market_symbol)
await ws.send(json.dumps(subscribe_request))
logger.info("Sent subscribe request to {}", url)
for symbol in self.asset_context_symbols:
subscribe_request = self.get_subscribe_request(symbol)
await ws.send(json.dumps(subscribe_request))
logger.info("Sent subscribe request for symbol: {} to {}", symbol, url)

# listen for updates
while True:
Expand Down Expand Up @@ -76,10 +78,10 @@ async def subscribe_single_inner(self, url):
def parse_hyperliquid_ws_message(self, message):
try:
ctx = message["data"]["ctx"]
symbol = message["data"]["coin"]
now = time.time()
self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now)
self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now)
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price,
self.price_state.hl_mark_price)
self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now))
self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now))
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", ctx["oraclePx"], ctx["markPx"])
except Exception as e:
logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e)
19 changes: 8 additions & 11 deletions apps/hip-3-pusher/src/pusher/lazer_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,24 @@

from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnectionError
from pusher.price_state import PriceState, PriceUpdate
from pusher.price_state import PriceSourceState, PriceUpdate


class LazerListener:
"""
Subscribe to Lazer price updates for needed feeds.
"""
def __init__(self, config: Config, price_state: PriceState):
def __init__(self, config: Config, lazer_state: PriceSourceState):
self.lazer_urls = config.lazer.lazer_urls
self.api_key = config.lazer.lazer_api_key
self.base_feed_id = config.lazer.base_feed_id
self.quote_feed_id = config.lazer.quote_feed_id
self.price_state = price_state
self.feed_ids = config.lazer.feed_ids
self.lazer_state = lazer_state

def get_subscribe_request(self, subscription_id: int):
return {
"type": "subscribe",
"subscriptionId": subscription_id,
"priceFeedIds": [self.base_feed_id, self.quote_feed_id],
"priceFeedIds": self.feed_ids,
"properties": ["price"],
"formats": [],
"deliveryFormat": "json",
Expand Down Expand Up @@ -54,7 +53,7 @@ async def subscribe_single_inner(self, router_url):
subscribe_request = self.get_subscribe_request(1)

await ws.send(json.dumps(subscribe_request))
logger.info("Sent Lazer subscribe request to {}", router_url)
logger.info("Sent Lazer subscribe request to {} feed_ids {}", router_url, self.feed_ids)

# listen for updates
while True:
Expand Down Expand Up @@ -89,9 +88,7 @@ def parse_lazer_message(self, data):
price = feed_update.get("price", None)
if feed_id is None or price is None:
continue
if feed_id == self.base_feed_id:
self.price_state.lazer_base_price = PriceUpdate(price, now)
if feed_id == self.quote_feed_id:
self.price_state.lazer_quote_price = PriceUpdate(price, now)
else:
self.lazer_state.put(feed_id, PriceUpdate(price, now))
except Exception as e:
logger.error("parse_lazer_message error: {}", e)
9 changes: 6 additions & 3 deletions apps/hip-3-pusher/src/pusher/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pusher.hyperliquid_listener import HyperliquidListener
from pusher.lazer_listener import LazerListener
from pusher.hermes_listener import HermesListener
from pusher.seda_listener import SedaListener
from pusher.price_state import PriceState
from pusher.publisher import Publisher
from pusher.metrics import Metrics
Expand Down Expand Up @@ -45,15 +46,17 @@ async def main():
metrics = Metrics(config)

publisher = Publisher(config, price_state, metrics)
hyperliquid_listener = HyperliquidListener(config, price_state)
lazer_listener = LazerListener(config, price_state)
hermes_listener = HermesListener(config, price_state)
hyperliquid_listener = HyperliquidListener(config, price_state.hl_oracle_state, price_state.hl_mark_state)
lazer_listener = LazerListener(config, price_state.lazer_state)
hermes_listener = HermesListener(config, price_state.hermes_state)
seda_listener = SedaListener(config, price_state.seda_state)

await asyncio.gather(
publisher.run(),
hyperliquid_listener.subscribe_all(),
lazer_listener.subscribe_all(),
hermes_listener.subscribe_all(),
seda_listener.run(),
)
logger.info("Exiting hip-3-pusher..")

Expand Down
Loading