36
36
import java .nio .file .Paths ;
37
37
import java .time .Instant ;
38
38
import java .util .EnumSet ;
39
+ import java .util .HashMap ;
39
40
import java .util .List ;
40
41
import java .util .Map ;
41
42
import java .util .Objects ;
@@ -231,13 +232,14 @@ public CacheNodeAssignment getCacheNodeAssignment() {
231
232
return assignment ;
232
233
}
233
234
234
- private boolean validateS3vsLocalDownLoad () throws Exception {
235
+ private boolean validateS3vsLocalDownLoad () {
235
236
// check if the number of files in S3 matches the local directory
236
237
Map <String , Long > filesWithSizeInS3 = blobStore .listFilesWithSize (snapshotMetadata .snapshotId );
237
238
238
- Map <String , Long > localFiles ;
239
+ Map <String , Long > localFilesSizeMap ;
240
+ Map <String , Long > mismatchFilesSizeMap = new HashMap <String , Long >();
239
241
try (Stream <Path > fileList = Files .list (dataDirectory )) {
240
- localFiles =
242
+ localFilesSizeMap =
241
243
fileList
242
244
.filter (Files ::isRegularFile )
243
245
.collect (
@@ -249,11 +251,12 @@ private boolean validateS3vsLocalDownLoad() throws Exception {
249
251
throw new RuntimeException (
250
252
String .format ("Error reading local files in directory %s" , dataDirectory ), e );
251
253
}
252
- if (localFiles .size () != filesWithSizeInS3 .size ()) {
254
+ if (localFilesSizeMap .size () != filesWithSizeInS3 .size ()) {
253
255
LOG .error (
254
- String .format (
255
- "Mismatch in number of files in S3 (%s) and local directory (%s) for snapshot %s" ,
256
- filesWithSizeInS3 .size (), localFiles .size (), snapshotMetadata .toString ()));
256
+ "Mismatch in number of files in S3 ({}) and local directory ({}) for snapshot {}" ,
257
+ filesWithSizeInS3 .size (),
258
+ localFilesSizeMap .size (),
259
+ snapshotMetadata .toString ());
257
260
return false ;
258
261
}
259
262
@@ -262,18 +265,26 @@ private boolean validateS3vsLocalDownLoad() throws Exception {
262
265
long s3Size = entry .getValue ();
263
266
String fileName = Paths .get (s3Path ).getFileName ().toString ();
264
267
265
- if (!localFiles .containsKey (fileName ) || !localFiles .get (fileName ).equals (s3Size )) {
266
- LOG .error (
267
- String .format (
268
- "Mismatch for file %s in S3 and local directory of size %s for snapshot %s" ,
269
- s3Path , s3Size , snapshotMetadata .toString ()));
270
- return false ;
268
+ if (!localFilesSizeMap .containsKey (fileName )
269
+ || !localFilesSizeMap .get (fileName ).equals (s3Size )) {
270
+ mismatchFilesSizeMap .put (fileName , s3Size );
271
271
}
272
272
}
273
- LOG .info (
274
- "No file mismatch found for snapshot id '{}' and local directory '{}'" ,
275
- snapshotMetadata .snapshotId ,
276
- dataDirectory .toString ());
273
+ if (!mismatchFilesSizeMap .isEmpty ()) {
274
+ String mismatchFilesAndSize =
275
+ mismatchFilesSizeMap .entrySet ().stream ()
276
+ .map (
277
+ e ->
278
+ String .format (
279
+ "%s (S3Size: %s LocalSize: %s)" ,
280
+ e .getKey (), e .getValue (), localFilesSizeMap .get (e .getKey ())))
281
+ .collect (Collectors .joining (", " ));
282
+ LOG .error (
283
+ "Mismatch in file sizes between S3 and local directory for snapshot {}. Mismatch files: {}" ,
284
+ snapshotMetadata .toString (),
285
+ mismatchFilesAndSize );
286
+ return false ;
287
+ }
277
288
return true ;
278
289
}
279
290
@@ -320,7 +331,7 @@ public void downloadChunkData() {
320
331
String .format (
321
332
"Mismatch in number or size of files in S3 and local directory for snapshot %s" ,
322
333
snapshotMetadata );
323
- throw new IOException (errorString );
334
+ throw new RuntimeException (errorString );
324
335
}
325
336
326
337
// check if lucene index is valid and not corrupted
@@ -330,10 +341,6 @@ public void downloadChunkData() {
330
341
String .format (
331
342
"Lucene index is not clean. Found issues for snapshot: %s." , snapshotMetadata ));
332
343
}
333
- LOG .info (
334
- "Lucene index is clean for snapshot id '{}' and local directory '{}'" ,
335
- snapshotMetadata .snapshotId ,
336
- dataDirectory .toString ());
337
344
338
345
// check if schema file exists
339
346
Path schemaPath = Path .of (dataDirectory .toString (), ReadWriteChunk .SCHEMA_FILE_NAME );
@@ -375,16 +382,7 @@ public void downloadChunkData() {
375
382
// disregarding any errors
376
383
setAssignmentState (
377
384
getCacheNodeAssignment (), Metadata .CacheNodeAssignment .CacheNodeAssignmentState .EVICT );
378
- LOG .error (
379
- "Error handling chunk assignment for assignment: {}, snapshot id: {}, snapshot size: {}, replicaId: {}, replicaSet: {}, cacheNodeId: {}" ,
380
- "Error handling chunk assignment for assignment: {}, snapshot id: {}, snapshot size: {}, replicaId: {}, replicaSet: {}, cacheNodeId: {}" ,
381
- assignment .assignmentId ,
382
- assignment .snapshotId ,
383
- assignment .snapshotSize ,
384
- assignment .replicaId ,
385
- assignment .replicaSet ,
386
- assignment .cacheNodeId ,
387
- e );
385
+ LOG .error ("Error handling chunk assignment" , e );
388
386
assignmentTimer .stop (chunkAssignmentTimerFailure );
389
387
} finally {
390
388
chunkAssignmentLock .unlock ();
0 commit comments