Skip to content

Commit

Permalink
adding record nil support (#34)
Browse files Browse the repository at this point in the history
* adding record nil support

* update the compat files

* align tests

* fixup tests

* move to v4 upload

---------

Co-authored-by: Thomas Jungblut <[email protected]>
  • Loading branch information
thomasjungblut and tjungblu authored Oct 29, 2024
1 parent 242f332 commit d5fa566
Show file tree
Hide file tree
Showing 31 changed files with 721 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/simpledb-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
go-version: 1.22
- name: Linearization Test
run: make linear-simpledb
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
if: always()
with:
path: simpledb/porcupine/*.html
Expand Down
47 changes: 47 additions & 0 deletions kaitai/recordio_v3.ksy
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
meta:
id: recordio_v3
endian: le
imports:
- vlq_base128_le
seq:
- id: file_header
type: file_header
- id: record
type: record
repeat: eos
types:
file_header:
doc: |
recordio header format to figure out the version it was written and whether the records are compressed.
seq:
- id: version
type: u4
doc: The version of the recordio format used in this file.
- id: compression_type
type: u4
enum: compression
doc: The compression algorithm used. 0 means no compression, 1 means Snappy, 2 means Gzip.
record:
doc: |
recordio record is an "infinite" stream of magic number separated and length encoded byte arrays.
seq:
- id: magic
contents: [0x91, 0x8D, 0x4C]
- id: record_nil
type: u2
doc: 1 means the record is nil, 0 otherwise
- id: uncompressed_payload_len
type: vlq_base128_le
- id: compressed_payload_len
type: vlq_base128_le
- id: payload
size: record_size
instances:
record_size:
value: uncompressed_payload_len.value ^ compressed_payload_len.value
doc: The size is either the compressed or uncompressed length.
enums:
compression:
0: none
1: snappy
2: gzip
29 changes: 28 additions & 1 deletion memstore/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memstore
import (
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thomasjungblut/go-sstables/sstables"
"os"
"testing"
Expand Down Expand Up @@ -246,7 +247,7 @@ func TestMemStoreFlushWithTombStonesInclusive(t *testing.T) {

val, err := reader.Get([]byte("akey"))
assert.Nil(t, err)
assert.Equal(t, []byte{}, val)
assert.Nil(t, val)

val, err = reader.Get([]byte("bkey"))
assert.Nil(t, err)
Expand All @@ -257,6 +258,32 @@ func TestMemStoreFlushWithTombStonesInclusive(t *testing.T) {
assert.Equal(t, sstables.NotFound, err)
}

func TestMemStoreTombstoneBehavior(t *testing.T) {
m := newMemStoreTest()
require.NoError(t, m.Upsert([]byte("akey"), []byte("aval")))
require.NoError(t, m.Tombstone([]byte("akey")))
require.NoError(t, m.Tombstone([]byte("bkey")))

tmpDir, err := os.MkdirTemp("", "memstore_flush")
require.Nil(t, err)
defer func() { require.Nil(t, os.RemoveAll(tmpDir)) }()

require.Nil(t, m.FlushWithTombstones(sstables.WriteBasePath(tmpDir)))
reader, err := sstables.NewSSTableReader(
sstables.ReadBasePath(tmpDir),
sstables.ReadWithKeyComparator(m.comparator))
require.Nil(t, err)
defer closeReader(t, reader)

val, err := reader.Get([]byte("akey"))
require.Nil(t, err)
require.Nil(t, val)

val, err = reader.Get([]byte("bkey"))
require.Nil(t, err)
require.Nil(t, val)
}

func TestMemStoreSStableIteratorUpsertOnly(t *testing.T) {
m := newMemStoreTest()
assert.Nil(t, m.Upsert([]byte("akey"), []byte("aval")))
Expand Down
31 changes: 29 additions & 2 deletions recordio/common_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func readFileHeaderFromBuffer(buffer []byte) (*Header, error) {
}

func readRecordHeaderV1(buffer []byte) (uint64, uint64, error) {
if len(buffer) != RecordHeaderSizeBytes {
return 0, 0, fmt.Errorf("record header buffer size mismatch, expected %d but was %d", RecordHeaderSizeBytes, len(buffer))
if len(buffer) != RecordHeaderSizeBytesV1V2 {
return 0, 0, fmt.Errorf("record header buffer size mismatch, expected %d but was %d", RecordHeaderSizeBytesV1V2, len(buffer))
}

magicNumber := binary.LittleEndian.Uint32(buffer[0:4])
Expand Down Expand Up @@ -77,6 +77,33 @@ func readRecordHeaderV2(r io.ByteReader) (uint64, uint64, error) {
return payloadSizeUncompressed, payloadSizeCompressed, nil
}

func readRecordHeaderV3(r io.ByteReader) (uint64, uint64, bool, error) {
magicNumber, err := binary.ReadUvarint(r)
if err != nil {
return 0, 0, false, err
}
if magicNumber != MagicNumberSeparatorLong {
return 0, 0, false, MagicNumberMismatchErr
}

recordNil, err := r.ReadByte()
if err != nil {
return 0, 0, false, err
}

payloadSizeUncompressed, err := binary.ReadUvarint(r)
if err != nil {
return 0, 0, false, err
}

payloadSizeCompressed, err := binary.ReadUvarint(r)
if err != nil {
return 0, 0, false, err
}

return payloadSizeUncompressed, payloadSizeCompressed, recordNil == 1, nil
}

func allocateRecordBuffer(header *Header, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (uint64, []byte) {
expectedBytesRead := payloadSizeUncompressed
if header.compressor != nil {
Expand Down
120 changes: 112 additions & 8 deletions recordio/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ func (r *FileReader) ReadNext() ([]byte, error) {

if r.header.fileVersion == Version1 {
return readNextV1(r)
} else if r.header.fileVersion == Version2 {
return readNextV2(r)
} else {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
payloadSizeUncompressed, payloadSizeCompressed, recordNil, err := readRecordHeaderV3(r.reader)
if err != nil {
// due to the use of blocked writes in DirectIO, we need to test whether the remainder of the file contains only zeros.
// This would indicate a properly written file and the actual end - and not a malformed record.
Expand All @@ -84,6 +86,11 @@ func (r *FileReader) ReadNext() ([]byte, error) {
return nil, fmt.Errorf("error while parsing record header of '%s': %w", r.file.Name(), err)
}

if recordNil {
r.currentOffset = r.currentOffset + (r.reader.Count() - start)
return nil, nil
}

expectedBytesRead, pooledRecordBuffer := allocateRecordBufferPooled(r.bufferPool, r.header, payloadSizeUncompressed, payloadSizeCompressed)
defer r.bufferPool.Put(pooledRecordBuffer)

Expand Down Expand Up @@ -127,9 +134,11 @@ func (r *FileReader) SkipNext() error {

if r.header.fileVersion == Version1 {
return SkipNextV1(r)
} else if r.header.fileVersion == Version2 {
return SkipNextV2(r)
} else {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
payloadSizeUncompressed, payloadSizeCompressed, _, err := readRecordHeaderV3(r.reader)
if err != nil {
return fmt.Errorf("error while reading record header of '%s': %w", r.file.Name(), err)
}
Expand Down Expand Up @@ -159,16 +168,16 @@ func (r *FileReader) SkipNext() error {

// SkipNextV1 is legacy support path for non-vint compressed V1
func SkipNextV1(r *FileReader) error {
headerBuf := r.bufferPool.Get(RecordHeaderSizeBytes)
headerBuf := r.bufferPool.Get(RecordHeaderSizeBytesV1V2)
defer r.bufferPool.Put(headerBuf)

numRead, err := io.ReadFull(r.reader, headerBuf)
if err != nil {
return fmt.Errorf("error while reading record header of '%s': %w", r.file.Name(), err)
}

if numRead != RecordHeaderSizeBytes {
return fmt.Errorf("not enough bytes in the record header found, expected %d but were %d", RecordHeaderSizeBytes, numRead)
if numRead != RecordHeaderSizeBytesV1V2 {
return fmt.Errorf("not enough bytes in the record header found, expected %d but were %d", RecordHeaderSizeBytesV1V2, numRead)
}

r.currentOffset = r.currentOffset + uint64(numRead)
Expand Down Expand Up @@ -199,6 +208,34 @@ func SkipNextV1(r *FileReader) error {
return nil
}

func SkipNextV2(r *FileReader) error {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
if err != nil {
return fmt.Errorf("error while reading record header of '%s': %w", r.file.Name(), err)
}

expectedBytesSkipped := payloadSizeUncompressed
if r.header.compressor != nil {
expectedBytesSkipped = payloadSizeCompressed
}

// here we have to add the header to the offset too, otherwise we will seek not far enough
expectedOffset := int64(r.currentOffset + expectedBytesSkipped + (r.reader.Count() - start))
newOffset, err := r.file.Seek(expectedOffset, 0)
if err != nil {
return fmt.Errorf("error while seeking to offset %d in '%s': %w", expectedOffset, r.file.Name(), err)
}

if newOffset != expectedOffset {
return fmt.Errorf("seeking in '%s' did not return expected offset %d, it was %d", r.file.Name(), expectedOffset, newOffset)
}

r.reader.Reset(r.file)
r.currentOffset = uint64(newOffset)
return nil
}

func (r *FileReader) Close() error {
r.closed = true
r.open = false
Expand All @@ -207,16 +244,16 @@ func (r *FileReader) Close() error {

// legacy support path for non-vint compressed V1
func readNextV1(r *FileReader) ([]byte, error) {
headerBuf := r.bufferPool.Get(RecordHeaderSizeBytes)
headerBuf := r.bufferPool.Get(RecordHeaderSizeBytesV1V2)
defer r.bufferPool.Put(headerBuf)

numRead, err := io.ReadFull(r.reader, headerBuf)
if err != nil {
return nil, fmt.Errorf("error while reading record header of '%s': %w", r.file.Name(), err)
}

if numRead != RecordHeaderSizeBytes {
return nil, fmt.Errorf("not enough bytes in the record header of '%s' found, expected %d but were %d", r.file.Name(), RecordHeaderSizeBytes, numRead)
if numRead != RecordHeaderSizeBytesV1V2 {
return nil, fmt.Errorf("not enough bytes in the record header of '%s' found, expected %d but were %d", r.file.Name(), RecordHeaderSizeBytesV1V2, numRead)
}

r.currentOffset = r.currentOffset + uint64(numRead)
Expand Down Expand Up @@ -246,6 +283,73 @@ func readNextV1(r *FileReader) ([]byte, error) {
return recordBuffer, nil
}

func readNextV2(r *FileReader) ([]byte, error) {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
if err != nil {
// due to the use of blocked writes in DirectIO, we need to test whether the remainder of the file contains only zeros.
// This would indicate a properly written file and the actual end - and not a malformed record.
if errors.Is(err, MagicNumberMismatchErr) {
remainder, err := io.ReadAll(r.reader)
if err != nil {
return nil, fmt.Errorf("error while parsing record header seeking for file end of '%s': %w", r.file.Name(), err)
}
for _, b := range remainder {
if b != 0 {
return nil, fmt.Errorf("error while parsing record header for zeros towards the file end of '%s': %w", r.file.Name(), MagicNumberMismatchErr)
}
}

// no other bytes than zeros have been read so far, that must've been the valid end of the file.
return nil, io.EOF
}

return nil, fmt.Errorf("error while parsing record header of '%s': %w", r.file.Name(), err)
}

expectedBytesRead, pooledRecordBuffer := allocateRecordBufferPooled(r.bufferPool, r.header, payloadSizeUncompressed, payloadSizeCompressed)
defer r.bufferPool.Put(pooledRecordBuffer)

numRead, err := io.ReadFull(r.reader, pooledRecordBuffer)
if err != nil {
return nil, fmt.Errorf("error while reading into record buffer of '%s': %w", r.file.Name(), err)
}

if uint64(numRead) != expectedBytesRead {
return nil, fmt.Errorf("not enough bytes in the record of '%s' found, expected %d but were %d", r.file.Name(), expectedBytesRead, numRead)
}

var returnSlice []byte
if r.header.compressor != nil {
pooledDecompressionBuffer := r.bufferPool.Get(int(payloadSizeUncompressed))
defer r.bufferPool.Put(pooledDecompressionBuffer)

decompressedRecord, err := r.header.compressor.DecompressWithBuf(pooledRecordBuffer, pooledDecompressionBuffer)
if err != nil {
return nil, err
}
if decompressedRecord == nil {
returnSlice = nil
} else {
// we do a defensive copy here not to leak the pooled slice
returnSlice = make([]byte, len(decompressedRecord))
copy(returnSlice, decompressedRecord)
}
} else {
if pooledRecordBuffer == nil {
returnSlice = nil
} else {
// we do a defensive copy here not to leak the pooled slice
returnSlice = make([]byte, len(pooledRecordBuffer))
copy(returnSlice, pooledRecordBuffer)
}
}

// why not just r.currentOffset = r.reader.count? we could've skipped something in between which makes the counts inconsistent
r.currentOffset = r.currentOffset + (r.reader.Count() - start)
return returnSlice, nil
}

// TODO(thomas): we have to add an option pattern here as well

// NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path.
Expand Down
13 changes: 12 additions & 1 deletion recordio/file_reader_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestGenerateTestFiles(t *testing.T) {
return
}

prefix := "test_files/v2_compat/"
prefix := "test_files/v3_compat/"
writeUncompressedSingleRecord(t, prefix+"recordio_UncompressedSingleRecord")
writeUncompressedMultiRecordAscending(t, prefix+"recordio_UncompressedWriterMultiRecord_asc")
writeCompressedMultiRecordAscending(t, prefix+"recordio_SnappyWriterMultiRecord_asc")
Expand All @@ -26,11 +26,22 @@ func TestGenerateTestFiles(t *testing.T) {
writeCompressedSingleRecord(t, prefix+"recordio_UncompressedSingleRecord_comp2", CompressionTypeSnappy)
writeCompressedSingleRecordAugmented(t, prefix+"recordio_UncompressedSingleRecord_comp300", 300) //unknown compression type
writeUncompressedSingleRecordAugmentedMagicNumber(t, prefix+"recordio_UncompressedSingleRecord_mnm")
writeUncompressedNilAndEmptyRecords(t, prefix+"recordio_UncompressedNilAndEmptyRecord")

writeDirectIOUncompressedSingleRecord(t, prefix+"recordio_UncompressedSingleRecord_directio")
writeDirectIOUncompressedSingleRecordRandomTrailer(t, prefix+"recordio_UncompressedSingleRecord_directio_trailer")
}

func writeUncompressedNilAndEmptyRecords(t *testing.T, path string) {
writer, err := newUncompressedOpenedWriterAtPath(path)
defer closeFileWriter(t, writer)
assert.Nil(t, err)
_, err = writer.Write(nil)
assert.Nil(t, err)
_, err = writer.Write([]byte{})
assert.Nil(t, err)
}

func writeDirectIOUncompressedSingleRecord(t *testing.T, path string) {
_ = os.Remove(path)
w, err := NewFileWriter(Path(path), BufferSizeBytes(4096), DirectIO())
Expand Down
Loading

0 comments on commit d5fa566

Please sign in to comment.