Skip to content

Commit

Permalink
adding record nil support
Browse files Browse the repository at this point in the history
  • Loading branch information
tjungblu committed Oct 29, 2024
1 parent 242f332 commit 1d38800
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 28 deletions.
29 changes: 28 additions & 1 deletion memstore/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memstore
import (
"errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thomasjungblut/go-sstables/sstables"
"os"
"testing"
Expand Down Expand Up @@ -246,7 +247,7 @@ func TestMemStoreFlushWithTombStonesInclusive(t *testing.T) {

val, err := reader.Get([]byte("akey"))
assert.Nil(t, err)
assert.Equal(t, []byte{}, val)
assert.Nil(t, val)

val, err = reader.Get([]byte("bkey"))
assert.Nil(t, err)
Expand All @@ -257,6 +258,32 @@ func TestMemStoreFlushWithTombStonesInclusive(t *testing.T) {
assert.Equal(t, sstables.NotFound, err)
}

func TestMemStoreTombstoneBehavior(t *testing.T) {
m := newMemStoreTest()
require.NoError(t, m.Upsert([]byte("akey"), []byte("aval")))
require.NoError(t, m.Tombstone([]byte("akey")))
require.NoError(t, m.Tombstone([]byte("bkey")))

tmpDir, err := os.MkdirTemp("", "memstore_flush")
require.Nil(t, err)
defer func() { require.Nil(t, os.RemoveAll(tmpDir)) }()

require.Nil(t, m.FlushWithTombstones(sstables.WriteBasePath(tmpDir)))
reader, err := sstables.NewSSTableReader(
sstables.ReadBasePath(tmpDir),
sstables.ReadWithKeyComparator(m.comparator))
require.Nil(t, err)
defer closeReader(t, reader)

val, err := reader.Get([]byte("akey"))
require.Nil(t, err)
require.Nil(t, val)

val, err = reader.Get([]byte("bkey"))
require.Nil(t, err)
require.Nil(t, val)
}

func TestMemStoreSStableIteratorUpsertOnly(t *testing.T) {
m := newMemStoreTest()
assert.Nil(t, m.Upsert([]byte("akey"), []byte("aval")))
Expand Down
27 changes: 27 additions & 0 deletions recordio/common_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ func readRecordHeaderV2(r io.ByteReader) (uint64, uint64, error) {
return payloadSizeUncompressed, payloadSizeCompressed, nil
}

func readRecordHeaderV3(r io.ByteReader) (uint64, uint64, bool, error) {
magicNumber, err := binary.ReadUvarint(r)
if err != nil {
return 0, 0, false, err
}
if magicNumber != MagicNumberSeparatorLong {
return 0, 0, false, MagicNumberMismatchErr
}

recordNil, err := r.ReadByte()
if err != nil {
return 0, 0, false, err
}

payloadSizeUncompressed, err := binary.ReadUvarint(r)
if err != nil {
return 0, 0, false, err
}

payloadSizeCompressed, err := binary.ReadUvarint(r)
if err != nil {
return 0, 0, false, err
}

return payloadSizeUncompressed, payloadSizeCompressed, recordNil == 1, nil
}

func allocateRecordBuffer(header *Header, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (uint64, []byte) {
expectedBytesRead := payloadSizeUncompressed
if header.compressor != nil {
Expand Down
108 changes: 106 additions & 2 deletions recordio/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ func (r *FileReader) ReadNext() ([]byte, error) {

if r.header.fileVersion == Version1 {
return readNextV1(r)
} else if r.header.fileVersion == Version2 {
return readNextV2(r)
} else {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
payloadSizeUncompressed, payloadSizeCompressed, recordNil, err := readRecordHeaderV3(r.reader)
if err != nil {
// due to the use of blocked writes in DirectIO, we need to test whether the remainder of the file contains only zeros.
// This would indicate a properly written file and the actual end - and not a malformed record.
Expand All @@ -84,6 +86,11 @@ func (r *FileReader) ReadNext() ([]byte, error) {
return nil, fmt.Errorf("error while parsing record header of '%s': %w", r.file.Name(), err)
}

if recordNil {
r.currentOffset = r.currentOffset + (r.reader.Count() - start)
return nil, nil
}

expectedBytesRead, pooledRecordBuffer := allocateRecordBufferPooled(r.bufferPool, r.header, payloadSizeUncompressed, payloadSizeCompressed)
defer r.bufferPool.Put(pooledRecordBuffer)

Expand Down Expand Up @@ -127,9 +134,11 @@ func (r *FileReader) SkipNext() error {

if r.header.fileVersion == Version1 {
return SkipNextV1(r)
} else if r.header.fileVersion == Version2 {
return SkipNextV2(r)
} else {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
payloadSizeUncompressed, payloadSizeCompressed, _, err := readRecordHeaderV3(r.reader)
if err != nil {
return fmt.Errorf("error while reading record header of '%s': %w", r.file.Name(), err)
}
Expand Down Expand Up @@ -199,6 +208,34 @@ func SkipNextV1(r *FileReader) error {
return nil
}

func SkipNextV2(r *FileReader) error {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
if err != nil {
return fmt.Errorf("error while reading record header of '%s': %w", r.file.Name(), err)
}

expectedBytesSkipped := payloadSizeUncompressed
if r.header.compressor != nil {
expectedBytesSkipped = payloadSizeCompressed
}

// here we have to add the header to the offset too, otherwise we will seek not far enough
expectedOffset := int64(r.currentOffset + expectedBytesSkipped + (r.reader.Count() - start))
newOffset, err := r.file.Seek(expectedOffset, 0)
if err != nil {
return fmt.Errorf("error while seeking to offset %d in '%s': %w", expectedOffset, r.file.Name(), err)
}

if newOffset != expectedOffset {
return fmt.Errorf("seeking in '%s' did not return expected offset %d, it was %d", r.file.Name(), expectedOffset, newOffset)
}

r.reader.Reset(r.file)
r.currentOffset = uint64(newOffset)
return nil
}

func (r *FileReader) Close() error {
r.closed = true
r.open = false
Expand Down Expand Up @@ -246,6 +283,73 @@ func readNextV1(r *FileReader) ([]byte, error) {
return recordBuffer, nil
}

func readNextV2(r *FileReader) ([]byte, error) {
start := r.reader.Count()
payloadSizeUncompressed, payloadSizeCompressed, err := readRecordHeaderV2(r.reader)
if err != nil {
// due to the use of blocked writes in DirectIO, we need to test whether the remainder of the file contains only zeros.
// This would indicate a properly written file and the actual end - and not a malformed record.
if errors.Is(err, MagicNumberMismatchErr) {
remainder, err := io.ReadAll(r.reader)
if err != nil {
return nil, fmt.Errorf("error while parsing record header seeking for file end of '%s': %w", r.file.Name(), err)
}
for _, b := range remainder {
if b != 0 {
return nil, fmt.Errorf("error while parsing record header for zeros towards the file end of '%s': %w", r.file.Name(), MagicNumberMismatchErr)
}
}

// no other bytes than zeros have been read so far, that must've been the valid end of the file.
return nil, io.EOF
}

return nil, fmt.Errorf("error while parsing record header of '%s': %w", r.file.Name(), err)
}

expectedBytesRead, pooledRecordBuffer := allocateRecordBufferPooled(r.bufferPool, r.header, payloadSizeUncompressed, payloadSizeCompressed)
defer r.bufferPool.Put(pooledRecordBuffer)

numRead, err := io.ReadFull(r.reader, pooledRecordBuffer)
if err != nil {
return nil, fmt.Errorf("error while reading into record buffer of '%s': %w", r.file.Name(), err)
}

if uint64(numRead) != expectedBytesRead {
return nil, fmt.Errorf("not enough bytes in the record of '%s' found, expected %d but were %d", r.file.Name(), expectedBytesRead, numRead)
}

var returnSlice []byte
if r.header.compressor != nil {
pooledDecompressionBuffer := r.bufferPool.Get(int(payloadSizeUncompressed))
defer r.bufferPool.Put(pooledDecompressionBuffer)

decompressedRecord, err := r.header.compressor.DecompressWithBuf(pooledRecordBuffer, pooledDecompressionBuffer)
if err != nil {
return nil, err
}
if decompressedRecord == nil {
returnSlice = nil
} else {
// we do a defensive copy here not to leak the pooled slice
returnSlice = make([]byte, len(decompressedRecord))
copy(returnSlice, decompressedRecord)
}
} else {
if pooledRecordBuffer == nil {
returnSlice = nil
} else {
// we do a defensive copy here not to leak the pooled slice
returnSlice = make([]byte, len(pooledRecordBuffer))
copy(returnSlice, pooledRecordBuffer)
}
}

// why not just r.currentOffset = r.reader.count? we could've skipped something in between which makes the counts inconsistent
r.currentOffset = r.currentOffset + (r.reader.Count() - start)
return returnSlice, nil
}

// TODO(thomas): we have to add an option pattern here as well

// NewFileReaderWithPath creates a new recordio file reader that can read RecordIO files at the given path.
Expand Down
4 changes: 2 additions & 2 deletions recordio/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ func TestReaderHappyPathSkipAllMultiRecord(t *testing.T) {

func TestReaderVersionMismatchV0(t *testing.T) {
reader := newTestReader("test_files/v2_compat/recordio_UncompressedSingleRecord_v0", t)
expectErrorStringOnOpen(t, reader, "version mismatch, expected a value from 1 to 2 but was 0")
expectErrorStringOnOpen(t, reader, "version mismatch, expected a value from 1 to 3 but was 0")
}

func TestReaderVersionMismatchV256(t *testing.T) {
reader := newTestReader("test_files/v2_compat/recordio_UncompressedSingleRecord_v256", t)
expectErrorStringOnOpen(t, reader, "version mismatch, expected a value from 1 to 2 but was 256")
expectErrorStringOnOpen(t, reader, "version mismatch, expected a value from 1 to 3 but was 256")
}

func TestReaderCompressionGzipHeader(t *testing.T) {
Expand Down
38 changes: 35 additions & 3 deletions recordio/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// The file header has a 32 bit version number and a 32 bit compression type enum according to the table above.
// Each record written in the file follows the following format (sequentially):
// - MagicNumber (encoding/binary/Uvarint) to separate records from each other.
// - single byte set to 1 if the record is supposed to be nil. Otherwise, 0.
// - Uncompressed data payload size (encoding/binary/Uvarint).
// - Compressed data payload size (encoding/binary/Uvarint), or 0 if the data is not compressed.
// - Payload as plain bytes, possibly compressed
Expand Down Expand Up @@ -55,7 +56,7 @@ func (w *FileWriter) Open() error {

w.currentOffset = uint64(offset)
w.open = true
w.recordHeaderCache = make([]byte, RecordHeaderV2MaxSizeBytes)
w.recordHeaderCache = make([]byte, RecordHeaderV3MaxSizeBytes)
w.bufferPool = new(pool.BufferPool)

// we flush early to get a valid file with header written, this is important in crash scenarios
Expand Down Expand Up @@ -88,7 +89,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte {
}

// for legacy reference still around, main paths unused - mostly for tests writing old versions
//noinspection GoUnusedFunction
// noinspection GoUnusedFunction
func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) {
// 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes
bytes := make([]byte, RecordHeaderSizeBytes)
Expand All @@ -110,6 +111,8 @@ func fillRecordHeaderV2(bytes []byte, payloadSizeUncompressed uint64, payloadSiz
return bytes[:off]
}

// for legacy reference still around, main paths unused - mostly for tests writing old versions
// noinspection GoUnusedFunction
func writeRecordHeaderV2(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) {
header := fillRecordHeaderV2(writer.recordHeaderCache, payloadSizeUncompressed, payloadSizeCompressed)
written, err := writer.bufWriter.Write(header)
Expand All @@ -120,6 +123,30 @@ func writeRecordHeaderV2(writer *FileWriter, payloadSizeUncompressed uint64, pay
return written, nil
}

func fillRecordHeaderV3(bytes []byte, payloadSizeUncompressed uint64, payloadSizeCompressed uint64, recordNil bool) []byte {
off := binary.PutUvarint(bytes, MagicNumberSeparatorLong)
if recordNil {
bytes[off] = 1
} else {
bytes[off] = 0
}
off += 1
off += binary.PutUvarint(bytes[off:], payloadSizeUncompressed)
off += binary.PutUvarint(bytes[off:], payloadSizeCompressed)

return bytes[:off]
}

func writeRecordHeaderV3(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64, recordNil bool) (int, error) {
header := fillRecordHeaderV3(writer.recordHeaderCache, payloadSizeUncompressed, payloadSizeCompressed, recordNil)
written, err := writer.bufWriter.Write(header)
if err != nil {
return 0, err
}

return written, nil
}

// Write appends a record of bytes, returns the current offset this item was written to
func (w *FileWriter) Write(record []byte) (uint64, error) {
if !w.open || w.closed {
Expand All @@ -143,11 +170,16 @@ func (w *FileWriter) Write(record []byte) (uint64, error) {
}

prevOffset := w.currentOffset
headerBytesWritten, err := writeRecordHeaderV2(w, uncompressedSize, compressedSize)
headerBytesWritten, err := writeRecordHeaderV3(w, uncompressedSize, compressedSize, record == nil)
if err != nil {
return 0, fmt.Errorf("failed to write record header in file at '%s' failed with %w", w.file.Name(), err)
}

if record == nil {
w.currentOffset = prevOffset + uint64(headerBytesWritten)
return prevOffset, nil
}

recordBytesWritten, err := w.bufWriter.Write(recordToWrite)
if err != nil {
return 0, fmt.Errorf("failed to write record in file at '%s' failed with %w", w.file.Name(), err)
Expand Down
22 changes: 11 additions & 11 deletions recordio/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestWriterWriteNil(t *testing.T) {

buf, err := reader.ReadNext()
require.Nil(t, err)
assert.Equal(t, []byte{}, buf)
require.Nil(t, buf)
readNextExpectEOF(t, reader)
}

Expand All @@ -40,10 +40,10 @@ func TestSingleWriteSize(t *testing.T) {
defer removeFileWriterFile(t, writer)

size := writer.Size()
assert.Equal(t, uint64(0x1a), size)
assert.Equal(t, uint64(0x1b), size)
stat, err := os.Stat(writer.file.Name())
require.Nil(t, err)
assert.Equal(t, int64(0x1a), stat.Size())
assert.Equal(t, int64(0x1b), stat.Size())
assert.Equal(t, size, uint64(stat.Size()))
}

Expand All @@ -53,28 +53,28 @@ func TestWriterMultiRecordWriteOffsetCheck(t *testing.T) {

offset, err := writer.Write(randomRecordOfSize(5))
assert.Equal(t, uint64(FileHeaderSizeBytes), offset)
assert.Equal(t, uint64(0x12), writer.Size())
assert.Equal(t, uint64(0x13), writer.Size())
require.Nil(t, err)

offset, err = writer.Write(randomRecordOfSize(10))
assert.Equal(t, uint64(0x12), offset)
assert.Equal(t, uint64(0x21), writer.Size())
assert.Equal(t, uint64(0x13), offset)
assert.Equal(t, uint64(0x23), writer.Size())
require.Nil(t, err)

offset, err = writer.Write(randomRecordOfSize(25))
assert.Equal(t, uint64(0x21), offset)
assert.Equal(t, uint64(0x3f), writer.Size())
assert.Equal(t, uint64(0x23), offset)
assert.Equal(t, uint64(0x42), writer.Size())
require.Nil(t, err)

assert.Equal(t, uint64(0x3f), writer.currentOffset)
assert.Equal(t, uint64(0x3f), writer.Size())
assert.Equal(t, uint64(0x42), writer.currentOffset)
assert.Equal(t, uint64(0x42), writer.Size())

err = writer.Close()
require.Nil(t, err)

stat, err := os.Stat(writer.file.Name())
require.Nil(t, err)
assert.Equal(t, int64(63), stat.Size())
assert.Equal(t, int64(66), stat.Size())

reader := newReaderOnTopOfWriter(t, writer)
defer closeFileReader(t, reader)
Expand Down
Loading

0 comments on commit 1d38800

Please sign in to comment.