diff --git a/ocf_writer.go b/ocf_writer.go index 32ec042..b28e740 100644 --- a/ocf_writer.go +++ b/ocf_writer.go @@ -228,6 +228,71 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { return err } + +// AppendBinary appends one or more binary data items to an OCF file in a block. +func (ocfw *OCFWriter) AppendBinary(data [][]byte) error { + // Chunk data so no block has more than MaxBlockCount items. + for int64(len(data)) > MaxBlockCount { + if err := ocfw.appendBinaryDataIntoBlock(data[:MaxBlockCount]); err != nil { + return err + } + data = data[MaxBlockCount:] + } + return ocfw.appendBinaryDataIntoBlock(data) +} + +func (ocfw *OCFWriter) appendBinaryDataIntoBlock(data [][]byte) error { + var block []byte // working buffer for encoding data values + var err error + + // Encode and concatenate each data item into the block + for _, datum := range data { + block = append(block, datum...) + } + + switch ocfw.header.compressionID { + case compressionNull: + // no-op + + case compressionDeflate: + // compress into new bytes buffer. + bb := bytes.NewBuffer(make([]byte, 0, len(block))) + + cw, _ := flate.NewWriter(bb, flate.DefaultCompression) + // writing bytes to cw will compress bytes and send to bb. + if _, err := cw.Write(block); err != nil { + return err + } + if err := cw.Close(); err != nil { + return err + } + block = bb.Bytes() + + case compressionSnappy: + compressed := snappy.Encode(nil, block) + + // OCF requires snappy to have CRC32 checksum after each snappy block + compressed = append(compressed, 0, 0, 0, 0) // expand slice by 4 bytes so checksum will fit + binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(block)) // checksum of decompressed block + + block = compressed + + default: + return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID) + + } + + // create file data block + buf := make([]byte, 0, len(block)+ocfBlockConst) // pre-allocate block bytes + buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items) + buf, _ = longBinaryFromNative(buf, len(block)) // block size (number of bytes in block) + buf = append(buf, block...) // serialized objects + buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker + + _, err = ocfw.iow.Write(buf) + return err +} + // Codec returns the codec used by OCFWriter. This function provided because // upstream may be appending to existing OCF which uses a different schema than // requested during instantiation.