@@ -139,7 +139,7 @@ async fn fetch_bridge_data(
139139 }
140140}
141141
142- pub async fn run ( store : Arc < State > , pythnet_ws_endpoint : String ) -> Result < ( ) > {
142+ pub async fn run ( store : Arc < State > , pythnet_ws_endpoint : String ) -> Result < ! > {
143143 let client = PubsubClient :: new ( pythnet_ws_endpoint. as_ref ( ) ) . await ?;
144144
145145 let config = RpcProgramAccountsConfig {
@@ -160,59 +160,54 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
160160 . program_subscribe ( & system_program:: id ( ) , Some ( config) )
161161 . await ?;
162162
163- loop {
164- match notif. next ( ) . await {
165- Some ( update) => {
166- let account: Account = match update. value . account . decode ( ) {
167- Some ( account) => account,
168- None => {
169- tracing:: error!( ?update, "Failed to decode account from update." ) ;
170- continue ;
171- }
172- } ;
173-
174- let accumulator_messages = AccumulatorMessages :: try_from_slice ( & account. data ) ;
175- match accumulator_messages {
176- Ok ( accumulator_messages) => {
177- let ( candidate, _) = Pubkey :: find_program_address (
178- & [
179- b"AccumulatorState" ,
180- & accumulator_messages. ring_index ( ) . to_be_bytes ( ) ,
181- ] ,
182- & system_program:: id ( ) ,
183- ) ;
184-
185- if candidate. to_string ( ) == update. value . pubkey {
186- let store = store. clone ( ) ;
187- tokio:: spawn ( async move {
188- if let Err ( err) = Aggregates :: store_update (
189- & * store,
190- Update :: AccumulatorMessages ( accumulator_messages) ,
191- )
192- . await
193- {
194- tracing:: error!( error = ?err, "Failed to store accumulator messages." ) ;
195- }
196- } ) ;
197- } else {
198- tracing:: error!(
199- ?candidate,
200- ?update. value. pubkey,
201- "Failed to verify message public keys." ,
202- ) ;
203- }
204- }
163+ while let Some ( update) = notif. next ( ) . await {
164+ let account: Account = match update. value . account . decode ( ) {
165+ Some ( account) => account,
166+ None => {
167+ tracing:: error!( ?update, "Failed to decode account from update." ) ;
168+ continue ;
169+ }
170+ } ;
171+
172+ let accumulator_messages = AccumulatorMessages :: try_from_slice ( & account. data ) ;
173+ match accumulator_messages {
174+ Ok ( accumulator_messages) => {
175+ let ( candidate, _) = Pubkey :: find_program_address (
176+ & [
177+ b"AccumulatorState" ,
178+ & accumulator_messages. ring_index ( ) . to_be_bytes ( ) ,
179+ ] ,
180+ & system_program:: id ( ) ,
181+ ) ;
205182
206- Err ( err) => {
207- tracing:: error!( error = ?err, "Failed to parse AccumulatorMessages." ) ;
208- }
209- } ;
183+ if candidate. to_string ( ) == update. value . pubkey {
184+ let store = store. clone ( ) ;
185+ tokio:: spawn ( async move {
186+ if let Err ( err) = Aggregates :: store_update (
187+ & * store,
188+ Update :: AccumulatorMessages ( accumulator_messages) ,
189+ )
190+ . await
191+ {
192+ tracing:: error!( error = ?err, "Failed to store accumulator messages." ) ;
193+ }
194+ } ) ;
195+ } else {
196+ tracing:: error!(
197+ ?candidate,
198+ ?update. value. pubkey,
199+ "Failed to verify message public keys." ,
200+ ) ;
201+ }
210202 }
211- None => {
212- return Err ( anyhow ! ( "Pythnet network listener terminated" ) ) ;
203+
204+ Err ( err) => {
205+ tracing:: error!( error = ?err, "Failed to parse AccumulatorMessages." ) ;
213206 }
214- }
207+ } ;
215208 }
209+
210+ Err ( anyhow ! ( "Pythnet network listener connection terminated" ) )
216211}
217212
218213/// Fetch existing GuardianSet accounts from Wormhole.
0 commit comments