Skip to content

Conversation

anil-db
Copy link
Contributor

@anil-db anil-db commented Aug 20, 2025

Summary

at startup writer checks the last written file and if it is corrupted it sets a flag to skip to next file but current write file id in ledger is not update and file is not created.

when reader hit this corrupted file it roll over to next file to read and updates readable file id. at this point reader file id can be greater than writer file id if reader was done with last file. from seek_to_next_record in reader

                    // From that, we can determine that when we've hit a bad read error, that if our
                    // file ID is greater than the writer's file ID, we're now essentially
                    // synchronized.
                    let (reader_file_id, writer_file_id) =
                        self.ledger.get_current_reader_writer_file_id();
                    if reader_file_id > writer_file_id {
                        break;
                    }

at this point both reader, writer and ledger is initialized and expectation would be that any read should block until writer writes new file and data.
but any read at this point keep increasing next file id to read and loop over to current file which was already read. this cause panic at various places.

                  if reader_file_id == writer_file_id {
                        debug!(
                            data_file_path = data_file_path.to_string_lossy().as_ref(),
                            "Data file does not yet exist. Waiting for writer to create."
                        );
                        self.ledger.wait_for_writer().await;
                    } else {
                        self.ledger.increment_acked_reader_file_id();
                    }

fix is to wait for writer to create file if reader_file id is current or next writer file id. writer might end up creating anyone of them based on skip flag.

sequence of logs may also explain what happens when we hit this issue.
reader hit bad file and rollover to next file.
a subsequent read goes through this loop and keep incrementing reader file id until wraps around and opens same file again for reading. this creates condition where new record id is lower than previous record id and gives impression of huge records being skipped also total buffer in legder will also be decreased and may hit less than 0 causing panic.

2025-08-19T00:43:14.400544Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_common::internal_event::events_received: Events received. count=2 byte_size=3670
2025-08-19T00:43:14.400560Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Read record. record_id=12897819 record_events=2 record_bytes=3928 data_file_id=225
2025-08-19T00:43:14.400569Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_common::internal_event::events_received: Events received. count=2 byte_size=3669
2025-08-19T00:43:14.400584Z DEBUG sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Marking data file for deletion. data_file_path="/home/anil.gupta/vector-databricks/sink_buffer/buffer/v2/vector_sink/buffer-data-225.dat" first_record_id=12855483 last_record_id=12897820 record_count=45 event_count=42338 bytes_read=89363944
2025-08-19T00:43:14.400622Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented unacknowledged reader file ID. unacked_reader_file_id_offset=1
2025-08-19T00:43:14.400631Z DEBUG sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Rolling to next data file.
2025-08-19T00:43:14.400662Z ERROR sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::internal_events: Error encountered during buffer read. error=failed to deserialize encoded record from buffer: record length was zero error_code="deser_failed" error_type="reader_failed" stage="processing" internal_log_rate_limit=true
2025-08-19T00:43:14.400804Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=226
2025-08-19T00:43:14.400852Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=227
2025-08-19T00:43:14.400887Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=228
2025-08-19T00:43:14.400924Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=229
2025-08-19T00:43:14.400957Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=230
.......
.......

2025-08-19T00:43:17.002666Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=65530
2025-08-19T00:43:17.002711Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=65531
2025-08-19T00:43:17.002758Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=65532
2025-08-19T00:43:17.002802Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=65533
2025-08-19T00:43:17.002848Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=65534
2025-08-19T00:43:17.002891Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=0
2025-08-19T00:43:17.002938Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=1
2025-08-19T00:43:17.002981Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=2
......
2025-08-19T00:43:17.013068Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=222
2025-08-19T00:43:17.013130Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=223
2025-08-19T00:43:17.013179Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=224
2025-08-19T00:43:17.013227Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::ledger: Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement. unacked_reader_file_id_offset=0 acked_reader_file_id_offset=225
2025-08-19T00:43:17.013302Z DEBUG sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Opened data file for reading. data_file_path="/home/anil.gupta/vector-databricks/sink_buffer/buffer/v2/vector_sink/buffer-data-225.dat"
2025-08-19T00:43:17.014536Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Read record. record_id=12855483 record_events=43 record_bytes=102568 data_file_id=225
2025-08-19T00:43:17.014595Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_common::internal_event::events_received: Events received. count=43 byte_size=97879
2025-08-19T00:43:17.014913Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Read record. record_id=12855526 record_events=2 record_bytes=4120 data_file_id=225
2025-08-19T00:43:17.014936Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_common::internal_event::events_received: Events received. count=2 byte_size=3856
2025-08-19T00:43:17.014971Z TRACE sink{component_kind="sink" component_id=vector_sink component_type=vector}: vector_buffers::variants::disk_v2::reader: Read record. record_id=12855528 record_events=2 record_bytes=3928 data_file_id=225

Vector configuration

How did you test this PR?

added unit test which reproduce panic if fix is removed.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run cargo vdev build licenses to regenerate the license inventory and commit the changes (if any). More details here.

@anil-db anil-db requested a review from a team as a code owner August 20, 2025 01:36
@anil-db anil-db changed the title fix panic in disk buffer when dealing with corrupted file fix(disk buffer): fix panic in disk buffer when dealing with corrupted file Aug 20, 2025
@anil-db anil-db force-pushed the disk-buffer-panic branch from ec0516a to e403542 Compare August 20, 2025 01:53
@anil-db anil-db changed the title fix(disk buffer): fix panic in disk buffer when dealing with corrupted file fix(buffers): fix panic in disk buffer when dealing with corrupted file Aug 20, 2025
@pront pront added the domain: buffers Anything related to Vector's memory/disk buffers label Aug 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: buffers Anything related to Vector's memory/disk buffers
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants