@@ -30,7 +30,9 @@ use crate::config::*;
30
30
31
31
use codec:: { Decode , Encode } ;
32
32
use futures:: { channel:: oneshot, prelude:: * , stream:: FuturesUnordered , FutureExt } ;
33
- use prometheus_endpoint:: { register, Counter , PrometheusError , Registry , U64 } ;
33
+ use prometheus_endpoint:: {
34
+ register, Counter , Histogram , HistogramOpts , PrometheusError , Registry , U64 ,
35
+ } ;
34
36
use sc_network:: {
35
37
config:: { NonReservedPeerMode , SetConfig } ,
36
38
error, multiaddr,
@@ -89,6 +91,8 @@ const LOG_TARGET: &str = "statement-gossip";
89
91
struct Metrics {
90
92
propagated_statements : Counter < U64 > ,
91
93
known_statements_received : Counter < U64 > ,
94
+ skipped_oversized_statements : Counter < U64 > ,
95
+ propagated_statements_chunks : Histogram ,
92
96
}
93
97
94
98
impl Metrics {
@@ -108,6 +112,22 @@ impl Metrics {
108
112
) ?,
109
113
r,
110
114
) ?,
115
+ skipped_oversized_statements : register (
116
+ Counter :: new (
117
+ "substrate_sync_skipped_oversized_statements" ,
118
+ "Number of oversized statements that were skipped to be gossiped" ,
119
+ ) ?,
120
+ r,
121
+ ) ?,
122
+ propagated_statements_chunks : register (
123
+ Histogram :: with_opts (
124
+ HistogramOpts :: new (
125
+ "substrate_sync_propagated_statements_chunks" ,
126
+ "Distribution of chunk sizes when propagating statements" ,
127
+ ) ,
128
+ ) ?,
129
+ r,
130
+ ) ?,
111
131
} )
112
132
}
113
133
}
@@ -139,7 +159,7 @@ impl StatementHandlerPrototype {
139
159
let ( config, notification_service) = Net :: notification_config (
140
160
protocol_name. clone ( ) . into ( ) ,
141
161
Vec :: new ( ) ,
142
- MAX_STATEMENT_SIZE ,
162
+ MAX_STATEMENT_NOTIFICATION_SIZE ,
143
163
None ,
144
164
SetConfig {
145
165
in_peers : 0 ,
@@ -485,9 +505,56 @@ where
485
505
486
506
propagated_statements += to_send. len ( ) ;
487
507
488
- if !to_send. is_empty ( ) {
489
- log:: trace!( target: LOG_TARGET , "Sending {} statements to {}" , to_send. len( ) , who) ;
490
- self . notification_service . send_sync_notification ( who, to_send. encode ( ) ) ;
508
+ let mut offset = 0 ;
509
+ while offset < to_send. len ( ) {
510
+ // Try to send as many statements as possible in one notification
511
+ let chunk_size = to_send. len ( ) - offset;
512
+ let chunk_end = offset + chunk_size;
513
+ let mut current_end = chunk_end;
514
+
515
+ loop {
516
+ let chunk = & to_send[ offset..current_end] ;
517
+ let encoded = chunk. encode ( ) ;
518
+
519
+ // If chunk fits, send it
520
+ if encoded. len ( ) <= MAX_STATEMENT_NOTIFICATION_SIZE as usize {
521
+ log:: trace!(
522
+ target: LOG_TARGET ,
523
+ "Sending {} statements ({} KB) to {}" ,
524
+ chunk. len( ) ,
525
+ encoded. len( ) / 1024 ,
526
+ who
527
+ ) ;
528
+ self . notification_service . send_sync_notification ( who, encoded) ;
529
+ offset = current_end;
530
+ if let Some ( ref metrics) = self . metrics {
531
+ metrics. propagated_statements_chunks . observe ( chunk. len ( ) as f64 ) ;
532
+ }
533
+ break ;
534
+ }
535
+
536
+ // Size exceeded - split the chunk
537
+ let split_factor =
538
+ ( encoded. len ( ) / MAX_STATEMENT_NOTIFICATION_SIZE as usize ) + 1 ;
539
+ let new_chunk_size = ( current_end - offset) / split_factor;
540
+
541
+ // Single statement is too large
542
+ if new_chunk_size == 0 {
543
+ log:: warn!(
544
+ target: LOG_TARGET ,
545
+ "Statement too large ({} KB), skipping" ,
546
+ encoded. len( ) / 1024
547
+ ) ;
548
+ if let Some ( ref metrics) = self . metrics {
549
+ metrics. skipped_oversized_statements . inc ( ) ;
550
+ }
551
+ offset = current_end;
552
+ break ;
553
+ }
554
+
555
+ // Reduce chunk size and try again
556
+ current_end = offset + new_chunk_size;
557
+ }
491
558
}
492
559
}
493
560
0 commit comments