1515 ParsedPriceUpdate ,
1616 PriceIdInput ,
1717 PriceUpdate ,
18+ RpcPriceIdentifier ,
1819 } ,
1920 ApiState ,
2021 } ,
@@ -56,13 +57,21 @@ pub struct StreamPriceUpdatesQueryParams {
5657 #[ param( example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" ) ]
5758 ids : Vec < PriceIdInput > ,
5859
59- /// If true, include the parsed price update in the `parsed` field of each returned feed.
60+ /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`.
6061 #[ serde( default ) ]
6162 encoding : EncodingType ,
6263
63- /// If true, include the parsed price update in the `parsed` field of each returned feed.
64+ /// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`.
6465 #[ serde( default = "default_true" ) ]
6566 parsed : bool ,
67+
68+ /// If true, allows unordered price updates to be included in the stream.
69+ #[ serde( default ) ]
70+ allow_unordered : bool ,
71+
72+ /// If true, only include benchmark prices that are the initial price updates at a given timestamp (i.e., prevPubTime != pubTime).
73+ #[ serde( default ) ]
74+ benchmarks_only : bool ,
6675}
6776
6877fn default_true ( ) -> bool {
@@ -105,10 +114,15 @@ pub async fn price_stream_sse_handler(
105114 price_ids_clone,
106115 params. encoding ,
107116 params. parsed ,
117+ params. benchmarks_only ,
118+ params. allow_unordered ,
108119 )
109120 . await
110121 {
111- Ok ( price_update) => Ok ( Event :: default ( ) . json_data ( price_update) . unwrap ( ) ) ,
122+ Ok ( Some ( update) ) => Ok ( Event :: default ( )
123+ . json_data ( update)
124+ . unwrap_or_else ( |e| error_event ( e) ) ) ,
125+ Ok ( None ) => Ok ( Event :: default ( ) . comment ( "No update available" ) ) ,
112126 Err ( e) => Ok ( error_event ( e) ) ,
113127 }
114128 }
@@ -126,18 +140,64 @@ async fn handle_aggregation_event(
126140 mut price_ids : Vec < PriceIdentifier > ,
127141 encoding : EncodingType ,
128142 parsed : bool ,
129- ) -> Result < PriceUpdate > {
143+ benchmarks_only : bool ,
144+ allow_unordered : bool ,
145+ ) -> Result < Option < PriceUpdate > > {
146+ // Handle out-of-order events
147+ if let AggregationEvent :: OutOfOrder { .. } = event {
148+ if !allow_unordered {
149+ return Ok ( None ) ;
150+ }
151+ }
152+
130153 // We check for available price feed ids to ensure that the price feed ids provided exists since price feeds can be removed.
131154 let available_price_feed_ids = crate :: aggregate:: get_price_feed_ids ( & * state. state ) . await ;
132155
133156 price_ids. retain ( |price_feed_id| available_price_feed_ids. contains ( price_feed_id) ) ;
134157
135- let price_feeds_with_update_data = crate :: aggregate:: get_price_feeds_with_update_data (
158+ let mut price_feeds_with_update_data = crate :: aggregate:: get_price_feeds_with_update_data (
136159 & * state. state ,
137160 & price_ids,
138161 RequestTime :: AtSlot ( event. slot ( ) ) ,
139162 )
140163 . await ?;
164+
165+ let mut parsed_price_updates: Vec < ParsedPriceUpdate > = price_feeds_with_update_data
166+ . price_feeds
167+ . into_iter ( )
168+ . map ( |price_feed| price_feed. into ( ) )
169+ . collect ( ) ;
170+
171+
172+ if benchmarks_only {
173+ // Remove those with metadata.prev_publish_time != price.publish_time from parsed_price_updates
174+ parsed_price_updates. retain ( |price_feed| {
175+ price_feed
176+ . metadata
177+ . prev_publish_time
178+ . map_or ( false , |prev_time| {
179+ prev_time != price_feed. price . publish_time
180+ } )
181+ } ) ;
182+ // Retain price id in price_ids that are in parsed_price_updates
183+ price_ids. retain ( |price_id| {
184+ parsed_price_updates
185+ . iter ( )
186+ . any ( |price_feed| price_feed. id == RpcPriceIdentifier :: from ( * price_id) )
187+ } ) ;
188+ price_feeds_with_update_data = crate :: aggregate:: get_price_feeds_with_update_data (
189+ & * state. state ,
190+ & price_ids,
191+ RequestTime :: AtSlot ( event. slot ( ) ) ,
192+ )
193+ . await ?;
194+ }
195+
196+ // Check if price_ids is empty after filtering and return None if it is
197+ if price_ids. is_empty ( ) {
198+ return Ok ( None ) ;
199+ }
200+
141201 let price_update_data = price_feeds_with_update_data. update_data ;
142202 let encoded_data: Vec < String > = price_update_data
143203 . into_iter ( )
@@ -147,23 +207,15 @@ async fn handle_aggregation_event(
147207 encoding,
148208 data : encoded_data,
149209 } ;
150- let parsed_price_updates: Option < Vec < ParsedPriceUpdate > > = if parsed {
151- Some (
152- price_feeds_with_update_data
153- . price_feeds
154- . into_iter ( )
155- . map ( |price_feed| price_feed. into ( ) )
156- . collect ( ) ,
157- )
158- } else {
159- None
160- } ;
161210
162-
163- Ok ( PriceUpdate {
211+ Ok ( Some ( PriceUpdate {
164212 binary : binary_price_update,
165- parsed : parsed_price_updates,
166- } )
213+ parsed : if parsed {
214+ Some ( parsed_price_updates)
215+ } else {
216+ None
217+ } ,
218+ } ) )
167219}
168220
169221fn error_event < E : std:: fmt:: Debug > ( e : E ) -> Event {
0 commit comments