1
1
use std:: {
2
2
io:: { self , SeekFrom } ,
3
+ ops:: Add ,
3
4
sync:: atomic:: { AtomicU32 , Ordering } ,
4
5
} ;
5
6
@@ -8,6 +9,7 @@ use memmap2::MmapMut;
8
9
use tokio:: {
9
10
fs:: OpenOptions ,
10
11
io:: { AsyncSeekExt , AsyncWriteExt } ,
12
+ time:: { timeout, Duration } ,
11
13
} ;
12
14
use tracing:: Instrument ;
13
15
use vector_common:: byte_size_of:: ByteSizeOf ;
@@ -817,3 +819,148 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
817
819
} )
818
820
. await ;
819
821
}
822
+
823
+ #[ tokio:: test]
824
+ async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data ( ) {
825
+ let assertion_registry = install_tracing_helpers ( ) ;
826
+ let fut = with_temp_dir ( |dir| {
827
+ let data_dir = dir. to_path_buf ( ) ;
828
+
829
+ async move {
830
+ let marked_for_skip = assertion_registry
831
+ . build ( )
832
+ . with_name ( "mark_for_skip" )
833
+ . with_parent_name ( "writer_detects_when_last_record_has_scrambled_archive_data" )
834
+ . was_entered ( )
835
+ . finalize ( ) ;
836
+
837
+ // Create a regular buffer, no customizations required.
838
+ let ( mut writer, mut reader, ledger) = create_default_buffer_v2 ( data_dir. clone ( ) ) . await ;
839
+ let starting_writer_file_id = ledger. get_current_writer_file_id ( ) ;
840
+ let expected_final_writer_file_id = ledger. get_next_writer_file_id ( ) ;
841
+ let expected_final_write_data_file = ledger. get_next_writer_data_file_path ( ) ;
842
+ assert_file_does_not_exist_async ! ( & expected_final_write_data_file) ;
843
+
844
+ // Write a `SizedRecord` record that we can scramble. Since it will be the last record
845
+ // in the data file, the writer should detect this error when the buffer is recreated,
846
+ // even though it doesn't actually _emit_ anything we can observe when creating the
847
+ // buffer... but it should trigger a call to `reset`, which we _can_ observe with
848
+ // tracing assertions.
849
+ let bytes_written_1 = writer
850
+ . write_record ( SizedRecord :: new ( 64 ) )
851
+ . await
852
+ . expect ( "write should not fail" ) ;
853
+ let bytes_written_2 = writer
854
+ . write_record ( SizedRecord :: new ( 68 ) )
855
+ . await
856
+ . expect ( "write should not fail" ) ;
857
+ writer. flush ( ) . await . expect ( "flush should not fail" ) ;
858
+ writer. close ( ) ;
859
+ let expected_data_file_len = bytes_written_1. add ( bytes_written_2) as u64 ;
860
+
861
+ let first_read = reader
862
+ . next ( )
863
+ . await
864
+ . expect ( "should not fail to read record" )
865
+ . expect ( "should contain first record" ) ;
866
+ assert_eq ! ( SizedRecord :: new( 64 ) , first_read) ;
867
+ acknowledge ( first_read) . await ;
868
+
869
+ let second_read = reader
870
+ . next ( )
871
+ . await
872
+ . expect ( "should not fail to read record" )
873
+ . expect ( "should contain first record" ) ;
874
+ assert_eq ! ( SizedRecord :: new( 68 ) , second_read) ;
875
+ acknowledge ( second_read) . await ;
876
+
877
+ let third_read = reader. next ( ) . await . expect ( "should not fail to read record" ) ;
878
+ assert ! ( third_read. is_none( ) ) ;
879
+
880
+ ledger. flush ( ) . expect ( "should not fail to flush ledger" ) ;
881
+
882
+ // Grab the current writer data file path, and then drop the writer/reader. Once the
883
+ // buffer is closed, we'll purposefully scramble the archived data -- but not the length
884
+ // delimiter -- which should trigger `rkyv` to throw an error when we check the data.
885
+ let data_file_path = ledger. get_current_writer_data_file_path ( ) ;
886
+ drop ( writer) ;
887
+ drop ( reader) ;
888
+ drop ( ledger) ;
889
+
890
+ // We should not have seen a call to `mark_for_skip` yet.
891
+ assert ! ( !marked_for_skip. try_assert( ) ) ;
892
+
893
+ // Open the file and set the last eight bytes of the record to something clearly
894
+ // wrong/invalid, which should end up messing with the relative pointer stuff in the
895
+ // archive.
896
+ let mut data_file = OpenOptions :: new ( )
897
+ . write ( true )
898
+ . open ( & data_file_path)
899
+ . await
900
+ . expect ( "open should not fail" ) ;
901
+
902
+ // Just to make sure the data file matches our expected state before futzing with it.
903
+ let metadata = data_file
904
+ . metadata ( )
905
+ . await
906
+ . expect ( "metadata should not fail" ) ;
907
+ assert_eq ! ( expected_data_file_len, metadata. len( ) ) ;
908
+
909
+ let target_pos = expected_data_file_len - 8 ;
910
+ let pos = data_file
911
+ . seek ( SeekFrom :: Start ( target_pos) )
912
+ . await
913
+ . expect ( "seek should not fail" ) ;
914
+ assert_eq ! ( target_pos, pos) ;
915
+ data_file
916
+ . write_all ( & [ 0xd , 0xe , 0xa , 0xd , 0xb , 0xe , 0xe , 0xf ] )
917
+ . await
918
+ . expect ( "write should not fail" ) ;
919
+ data_file. flush ( ) . await . expect ( "flush should not fail" ) ;
920
+ data_file. sync_all ( ) . await . expect ( "sync should not fail" ) ;
921
+ drop ( data_file) ;
922
+
923
+ // Now reopen the buffer, which should trigger a `Writer::mark_for_skip` call which
924
+ // instructs the writer to skip to the next data file, although this doesn't happen
925
+ // until the first write is attempted.
926
+ let ( mut writer, mut reader, ledger) =
927
+ create_default_buffer_v2 :: < _ , SizedRecord > ( data_dir) . await ;
928
+ marked_for_skip. assert ( ) ;
929
+ // When writer see last record as corrupted set flag to skip to next file but reader moves to next file id and wait for writer to create it.
930
+ assert_reader_writer_v2_file_positions ! (
931
+ ledger,
932
+ expected_final_writer_file_id,
933
+ starting_writer_file_id
934
+ ) ;
935
+ assert_file_does_not_exist_async ! ( & expected_final_write_data_file) ;
936
+
937
+ // At this point reader is waiting for writer to create next data file, so we can test that reader.next() times out.
938
+ let result = timeout ( Duration :: from_millis ( 100 ) , reader. next ( ) ) . await ;
939
+ assert ! ( result. is_err( ) , "expected reader.next() to time out" ) ;
940
+
941
+ // Do a simple write to ensure it opens the next data file.
942
+ let _bytes_written = writer
943
+ . write_record ( SizedRecord :: new ( 72 ) )
944
+ . await
945
+ . expect ( "write should not fail" ) ;
946
+ writer. flush ( ) . await . expect ( "flush should not fail" ) ;
947
+ assert_reader_writer_v2_file_positions ! (
948
+ ledger,
949
+ expected_final_writer_file_id,
950
+ expected_final_writer_file_id
951
+ ) ;
952
+ assert_file_exists_async ! ( & expected_final_write_data_file) ;
953
+
954
+ let read = reader
955
+ . next ( )
956
+ . await
957
+ . expect ( "should not fail to read record" )
958
+ . expect ( "should contain first record" ) ;
959
+ assert_eq ! ( SizedRecord :: new( 72 ) , read) ;
960
+ acknowledge ( read) . await ;
961
+ }
962
+ } ) ;
963
+
964
+ let parent = trace_span ! ( "writer_detects_when_last_record_has_scrambled_archive_data" ) ;
965
+ fut. instrument ( parent. or_current ( ) ) . await ;
966
+ }
0 commit comments