@@ -845,11 +845,11 @@ impl Handler<PruneShardsRequest> for ControlPlane {
845845 ) ,
846846 interval,
847847 ) ;
848- if let CooldownStatus :: Ready = status {
849- if let Err ( metastore_error) = self . metastore . prune_shards ( request) . await {
850- return convert_metastore_error ( metastore_error ) ;
851- } ;
852- }
848+ if let CooldownStatus :: Ready = status
849+ && let Err ( metastore_error) = self . metastore . prune_shards ( request) . await
850+ {
851+ return convert_metastore_error ( metastore_error ) ;
852+ } ;
853853 // Return ok regardless of whether the call was successful or debounced
854854 Ok ( Ok ( EmptyResponse { } ) )
855855 }
@@ -951,27 +951,25 @@ impl ControlPlaneEventSubscriber {
951951#[ async_trait]
952952impl EventSubscriber < LocalShardsUpdate > for ControlPlaneEventSubscriber {
953953 async fn handle_event ( & mut self , local_shards_update : LocalShardsUpdate ) {
954- if let Some ( control_plane_mailbox) = self . 0 . upgrade ( ) {
955- if let Err ( error) = control_plane_mailbox
954+ if let Some ( control_plane_mailbox) = self . 0 . upgrade ( )
955+ && let Err ( error) = control_plane_mailbox
956956 . send_message ( local_shards_update)
957957 . await
958- {
959- error ! ( error=%error, "failed to forward local shards update to control plane" ) ;
960- }
958+ {
959+ error ! ( error=%error, "failed to forward local shards update to control plane" ) ;
961960 }
962961 }
963962}
964963
965964#[ async_trait]
966965impl EventSubscriber < ShardPositionsUpdate > for ControlPlaneEventSubscriber {
967966 async fn handle_event ( & mut self , shard_positions_update : ShardPositionsUpdate ) {
968- if let Some ( control_plane_mailbox) = self . 0 . upgrade ( ) {
969- if let Err ( error) = control_plane_mailbox
967+ if let Some ( control_plane_mailbox) = self . 0 . upgrade ( )
968+ && let Err ( error) = control_plane_mailbox
970969 . send_message ( shard_positions_update)
971970 . await
972- {
973- error ! ( error=%error, "failed to forward shard positions update to control plane" ) ;
974- }
971+ {
972+ error ! ( error=%error, "failed to forward shard positions update to control plane" ) ;
975973 }
976974 }
977975}
@@ -1096,17 +1094,17 @@ async fn watcher_indexers(
10961094 } ;
10971095 match cluster_change {
10981096 ClusterChange :: Add ( node) => {
1099- if node. enabled_services ( ) . contains ( & QuickwitService :: Indexer ) {
1100- if let Err ( error) = mailbox. send_message ( IndexerJoined ( node) ) . await {
1101- error ! ( error=%error , "failed to forward `IndexerJoined` event to control plane" ) ;
1102- }
1097+ if node. enabled_services ( ) . contains ( & QuickwitService :: Indexer )
1098+ && let Err ( error) = mailbox. send_message ( IndexerJoined ( node) ) . await
1099+ {
1100+ error ! ( error=%error , "failed to forward `IndexerJoined` event to control plane" ) ;
11031101 }
11041102 }
11051103 ClusterChange :: Remove ( node) => {
1106- if node. enabled_services ( ) . contains ( & QuickwitService :: Indexer ) {
1107- if let Err ( error) = mailbox. send_message ( IndexerLeft ( node) ) . await {
1108- error ! ( error=%error , "failed to forward `IndexerLeft` event to control plane" ) ;
1109- }
1104+ if node. enabled_services ( ) . contains ( & QuickwitService :: Indexer )
1105+ && let Err ( error) = mailbox. send_message ( IndexerLeft ( node) ) . await
1106+ {
1107+ error ! ( error=%error , "failed to forward `IndexerLeft` event to control plane" ) ;
11101108 }
11111109 }
11121110 ClusterChange :: Update ( _) => {
0 commit comments