@@ -1156,6 +1156,213 @@ mod tests {
11561156 }
11571157 }
11581158
1159+ // Test-reproducer of the mass-outage+stuck-ingest bug.
1160+ // Link: https://github.com/quickwit-oss/quickwit/issues/6531.
1161+ // It asserts that ingestion recovers after the outage.
1162+ //
1163+ // Scenario: an ingester crashed and restarted having lost its WAL, but the metastore
1164+ // still lists its shard as `Open`. The ingester rejoins (`Ready` state) and, on startup,
1165+ // reports the shards it still holds via `AdviseResetShardsRequest` - an empty inventory,
1166+ // since its WAL is gone. The control plane plans an indexing task for that `Open` shard,
1167+ // which the restarted ingester can never serve.
1168+ // Ingester fail was it does not have access to lost shard anymore, but control plane
1169+ // cannot recover properly and just retries. The system ends up in unrecoverable state.
1170+ //
1171+ // TODO: the fix adds mandatory `shards_snapshot_timestamp` field to `AdviseResetShardsRequest`,
1172+ // set it to `2_000` on the test request below (must exceed the shard's `update_timestamp`
1173+ // of 1_000 for the shard to be treated as lost).
1174+ #[ tokio:: test]
1175+ async fn test_ingest_recovers_after_mass_outage ( ) {
1176+ use quickwit_actors:: { Observe , Universe } ;
1177+ use quickwit_config:: { ClusterConfig , INGEST_V2_SOURCE_ID } ;
1178+ use quickwit_ingest:: IngesterPool ;
1179+ use quickwit_metastore:: ListIndexesMetadataResponseExt ;
1180+ use quickwit_proto:: control_plane:: AdviseResetShardsRequest ;
1181+ use quickwit_proto:: metastore:: {
1182+ ListIndexesMetadataRequest , ListIndexesMetadataResponse , ListShardsRequest ,
1183+ ListShardsResponse , ListShardsSubresponse , MetastoreServiceClient , MockMetastoreService ,
1184+ } ;
1185+ use quickwit_proto:: types:: Position ;
1186+
1187+ use crate :: control_plane:: { ControlPlane , ControlPlaneObservableState } ;
1188+
1189+ // run actors on a mock clock so `universe.sleep` fast-forwards the control loop timers
1190+ let universe = Universe :: with_accelerated_time ( ) ;
1191+
1192+ // control plane node
1193+ let node_id = NodeId :: from_str ( "test-control-plane" ) ;
1194+
1195+ let mut mock_indexer = MockIndexingService :: new ( ) ;
1196+ mock_indexer
1197+ . expect_apply_indexing_plan ( )
1198+ . returning ( |_| Ok ( ApplyIndexingPlanResponse { } ) ) ;
1199+
1200+ // test indexer
1201+ let indexer = IndexerNodeInfo {
1202+ node_id : NodeId :: from_str ( "test-indexer" ) ,
1203+ generation_id : 0 ,
1204+ client : IndexingServiceClient :: from_mock ( mock_indexer) ,
1205+ indexing_tasks : Vec :: new ( ) ,
1206+ indexing_capacity : CpuCapacity :: from_cpu_millis ( 4_000 ) ,
1207+ ingester_status : IngesterStatus :: Ready ,
1208+ } ;
1209+
1210+ // register the indexer so the scheduler has a node to assign indexing tasks to
1211+ let indexer_pool = IndexerPool :: default ( ) ;
1212+ indexer_pool. insert ( indexer. node_id . clone ( ) , indexer) ;
1213+
1214+ // no ingesters needed, test starts already at indexing phase
1215+ let ingester_pool = IngesterPool :: default ( ) ;
1216+
1217+ let mut mock_metastore = MockMetastoreService :: new ( ) ;
1218+
1219+ // one index with a single, enabled ingest-v2 source
1220+ let mut index = IndexMetadata :: for_test ( "test-index-0" , "ram:///test-index-0" ) ;
1221+ let mut source = SourceConfig :: ingest_v2 ( ) ;
1222+ source. enabled = true ;
1223+ index. add_source ( source) . unwrap ( ) ;
1224+
1225+ // on startup the control plane lists all indexes
1226+ let index_clone = index. clone ( ) ;
1227+ mock_metastore. expect_list_indexes_metadata ( ) . return_once (
1228+ move |request : ListIndexesMetadataRequest | {
1229+ assert_eq ! ( request, ListIndexesMetadataRequest :: all( ) ) ;
1230+ Ok ( ListIndexesMetadataResponse :: for_test ( vec ! [ index_clone] ) )
1231+ } ,
1232+ ) ;
1233+
1234+ // phantom shard with `Open` state is in the metastore but its data is gone,
1235+ // led by the restarted ingester
1236+ let mut shard = Shard {
1237+ index_uid : Some ( index. index_uid . clone ( ) ) ,
1238+ source_id : INGEST_V2_SOURCE_ID . to_string ( ) ,
1239+ shard_id : Some ( ShardId :: from ( 17 ) ) , // just random ID
1240+ leader_id : "test-ingester" . to_string ( ) ,
1241+ publish_position_inclusive : Some ( Position :: offset ( 1_000u64 ) ) ,
1242+ update_timestamp : 1_000 ,
1243+ ..Default :: default ( )
1244+ } ;
1245+ shard. set_shard_state ( ShardState :: Open ) ;
1246+ let index_uid = index. index_uid . clone ( ) ;
1247+
1248+ // register shard in metastore
1249+ mock_metastore
1250+ . expect_list_shards ( )
1251+ . return_once ( move |_request : ListShardsRequest | {
1252+ Ok ( ListShardsResponse {
1253+ subresponses : vec ! [ ListShardsSubresponse {
1254+ index_uid: Some ( index_uid) ,
1255+ source_id: INGEST_V2_SOURCE_ID . to_string( ) ,
1256+ shards: vec![ shard] ,
1257+ } ] ,
1258+ } )
1259+ } ) ;
1260+
1261+ // spawn real control plane wired to the mocks above and empty cluster-change stream
1262+ let ( control_plane_mailbox, _handle, mut readiness_rx) = ControlPlane :: spawn (
1263+ & universe,
1264+ ClusterConfig :: for_test ( ) ,
1265+ node_id,
1266+ quickwit_cluster:: ClusterChangeStreamFactoryForTest :: default ( ) ,
1267+ indexer_pool,
1268+ ingester_pool,
1269+ MetastoreServiceClient :: from_mock ( mock_metastore) ,
1270+ ) ;
1271+
1272+ // wait until control plane loads metastore and is ready, first plan is now exists
1273+ tokio:: time:: timeout (
1274+ Duration :: from_secs ( 5 ) ,
1275+ readiness_rx. wait_for ( |ready| * ready) ,
1276+ )
1277+ . await
1278+ . unwrap ( )
1279+ . unwrap ( ) ;
1280+
1281+ // check if any task is still indexing shard 17
1282+ let plan_targets_phantom_shard = |mailbox : & quickwit_actors:: Mailbox < ControlPlane > | {
1283+ let mailbox = mailbox. clone ( ) ;
1284+ async move {
1285+ // `Observe` returns snapshot of the control plane's internal state
1286+ let obs: ControlPlaneObservableState = mailbox. ask ( Observe ) . await . unwrap ( ) ;
1287+ obs. indexing_scheduler
1288+ . last_applied_physical_plan
1289+ . unwrap ( )
1290+ . indexing_tasks_per_indexer ( )
1291+ . get ( "test-indexer" )
1292+ . cloned ( )
1293+ . unwrap_or_default ( )
1294+ . iter ( )
1295+ . any ( |task| task. shard_ids . contains ( & ShardId :: from ( 17 ) ) )
1296+ }
1297+ } ;
1298+
1299+ // precondition: right after startup control plane already planned an indexing task
1300+ // for open phantom shard (this is the setup of the broken situation)
1301+ assert ! (
1302+ plan_targets_phantom_shard( & control_plane_mailbox) . await ,
1303+ "control plane should plan a task for the open phantom shard"
1304+ ) ;
1305+
1306+ // simulate the restarted ingester's startup handshake: it tells the control plane
1307+ // which shards it still has in its WAL; having lost its WAL, it reports none.
1308+ // Currently this is a no-op for `Open` shards, so the phantom shard is left untouched.
1309+ // TODO: add `shards_snapshot_timestamp: 2_000` here once fix applied
1310+ control_plane_mailbox
1311+ . ask ( AdviseResetShardsRequest {
1312+ ingester_id : "test-ingester" . to_string ( ) ,
1313+ shard_ids : Vec :: new ( ) ,
1314+ } )
1315+ . await
1316+ . unwrap ( )
1317+ . unwrap ( ) ;
1318+
1319+ // wait for ingestion to recover, the plan to stop targeting the lost phantom shard;
1320+ // give the control loop several ticks to rebuild
1321+ let phantom_shard_id = ShardId :: from ( 17 ) ;
1322+ let max_ticks = 10 ;
1323+ let mut recovered = false ;
1324+
1325+ let mut last_planned_shard_ids: Vec < ShardId > = Vec :: new ( ) ;
1326+
1327+ for _ in 0 ..max_ticks {
1328+ // fast-forward one second of control-loop time
1329+ universe. sleep ( Duration :: from_secs ( 1 ) ) . await ;
1330+
1331+ // snapshot the control plane and extract the shard IDs the plan assigns to our indexer
1332+ let obs: ControlPlaneObservableState =
1333+ control_plane_mailbox. ask ( Observe ) . await . unwrap ( ) ;
1334+ last_planned_shard_ids = obs
1335+ . indexing_scheduler
1336+ . last_applied_physical_plan
1337+ . unwrap ( )
1338+ . indexing_tasks_per_indexer ( )
1339+ . get ( "test-indexer" )
1340+ . cloned ( )
1341+ . unwrap_or_default ( )
1342+ . iter ( )
1343+ . flat_map ( |task| task. shard_ids . clone ( ) )
1344+ . collect ( ) ;
1345+
1346+ // (!) recovery: the plan no longer references the lost shard
1347+ if !last_planned_shard_ids. contains ( & phantom_shard_id) {
1348+ recovered = true ;
1349+ break ;
1350+ }
1351+ }
1352+
1353+ assert ! (
1354+ recovered,
1355+ "Ingestion never recovers after a mass outage, restarted ingester `test-ingester`\n \
1356+ reported an empty WAL inventory in AdviseResetShardsRequest, but after {max_ticks} ticks\n \
1357+ indexing plan still targets the lost shard {phantom_shard_id:?}.\n \
1358+ Planned shard ids for `test-indexer`: {last_planned_shard_ids:?}.\n \
1359+ Leader no longer holds lost shard, so every ingest attempt will fails and ingestion is stuck.\n \
1360+ Expected: the plan should drop the shard once the ingester reports it lost."
1361+ ) ;
1362+
1363+ universe. assert_quit ( ) . await ;
1364+ }
1365+
11591366 #[ test]
11601367 fn test_select_available_indexers_returns_only_ready_when_available ( ) {
11611368 let indexer_pool = IndexerPool :: default ( ) ;
@@ -1374,7 +1581,7 @@ mod tests {
13741581 }
13751582
13761583 // Applies a plan to a single indexer whose apply RPC hangs forever, then reports whether the
1377- // apply task finished within `observe` — i.e. whether a timeout cancelled it.
1584+ // apply task finished within `observe` - i.e. whether a timeout cancelled it.
13781585 async fn hanging_apply_is_cancelled_within ( status : IngesterStatus , observe : Duration ) -> bool {
13791586 let indexer_pool = IndexerPool :: default ( ) ;
13801587 let indexer = hanging_indexer_node_info ( status) ;
0 commit comments