Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,71 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {
return err
}


// AppendBinary appends one or more binary data items to an OCF file in a block.
func (ocfw *OCFWriter) AppendBinary(data [][]byte) error {
// Chunk data so no block has more than MaxBlockCount items.
for int64(len(data)) > MaxBlockCount {
if err := ocfw.appendBinaryDataIntoBlock(data[:MaxBlockCount]); err != nil {
return err
}
data = data[MaxBlockCount:]
}
return ocfw.appendBinaryDataIntoBlock(data)
}

func (ocfw *OCFWriter) appendBinaryDataIntoBlock(data [][]byte) error {
var block []byte // working buffer for encoding data values
var err error

// Encode and concatenate each data item into the block
for _, datum := range data {
block = append(block, datum...)
}

switch ocfw.header.compressionID {
case compressionNull:
// no-op

case compressionDeflate:
// compress into new bytes buffer.
bb := bytes.NewBuffer(make([]byte, 0, len(block)))

cw, _ := flate.NewWriter(bb, flate.DefaultCompression)
// writing bytes to cw will compress bytes and send to bb.
if _, err := cw.Write(block); err != nil {
return err
}
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()

case compressionSnappy:
compressed := snappy.Encode(nil, block)

// OCF requires snappy to have CRC32 checksum after each snappy block
compressed = append(compressed, 0, 0, 0, 0) // expand slice by 4 bytes so checksum will fit
binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(block)) // checksum of decompressed block

block = compressed

default:
return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID)

}

// create file data block
buf := make([]byte, 0, len(block)+ocfBlockConst) // pre-allocate block bytes
buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items)
buf, _ = longBinaryFromNative(buf, len(block)) // block size (number of bytes in block)
buf = append(buf, block...) // serialized objects
buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker

_, err = ocfw.iow.Write(buf)
return err
}

// Codec returns the codec used by OCFWriter. This function provided because
// upstream may be appending to existing OCF which uses a different schema than
// requested during instantiation.
Expand Down