diff --git a/benchmark_test.go b/benchmark_test.go index cd7c02a7..2f716adc 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -85,3 +85,62 @@ func BenchmarkNativeFromTextualUsingV2(b *testing.B) { _ = nativeFromTextUsingV2(b, codec, textData) } } + +func benchWriteSimilarSizeRecords(b *testing.B, reuseBuffers, deflate bool) { + const schema = `{ + "namespace": "my.namespace.com", + "type": "record", + "name": "indentity", + "fields": [ + { "name": "Name", "type": "string"}, + { "name": "Picture", "type": "bytes"} + ] + }` + + record := map[string]interface{}{ + "Name": "MyName", + "Picture": make([]byte, 2048), + } + + var records []map[string]interface{} + for i := 0; i < 1000; i++ { + records = append(records, record) + } + + var compressionName string + if deflate { + compressionName = CompressionDeflateLabel + } else { + compressionName = CompressionNullLabel + } + + w, _ := NewOCFWriter(OCFConfig{ + W: ioutil.Discard, + Schema: schema, + ReuseBuffers: reuseBuffers, + CompressionName: compressionName, + }) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for j := 0; j < 10; j++ { + w.Append(records) + } + } +} + +func BenchmarkWriteSimilarSizeRecords(b *testing.B) { + b.Run("ReuseBuffers=false Deflate=false", func(b *testing.B) { + benchWriteSimilarSizeRecords(b, false, false) + }) + b.Run("ReuseBuffers=true Deflate=false", func(b *testing.B) { + benchWriteSimilarSizeRecords(b, true, false) + }) + b.Run("ReuseBuffers=false Deflate=true", func(b *testing.B) { + benchWriteSimilarSizeRecords(b, false, true) + }) + b.Run("ReuseBuffers=true Deflate=true", func(b *testing.B) { + benchWriteSimilarSizeRecords(b, true, true) + }) +} diff --git a/ocf_writer.go b/ocf_writer.go index 820af5c4..876bb57a 100644 --- a/ocf_writer.go +++ b/ocf_writer.go @@ -56,13 +56,21 @@ type OCFConfig struct { //the OCF file. When appending to an existing OCF, this field //is ignored MetaData map[string][]byte + + // Advanced option that allows to reuse internal buffers when + // appending records. May save memory allocations and may improve + // performances. Do benchmarks before activate this option. + ReuseBuffers bool } // OCFWriter is used to create a new or append to an existing Avro Object // Container File (OCF). type OCFWriter struct { - header *ocfHeader - iow io.Writer + header *ocfHeader + iow io.Writer + reuseBuffers bool + expandedBlock []byte + deflateBlock []byte } // NewOCFWriter returns a new OCFWriter instance that may be used for appending @@ -70,7 +78,7 @@ type OCFWriter struct { // new OCF file. func NewOCFWriter(config OCFConfig) (*OCFWriter, error) { var err error - ocf := &OCFWriter{iow: config.W} + ocf := &OCFWriter{iow: config.W, reuseBuffers: config.ReuseBuffers} switch config.W.(type) { case nil: @@ -178,6 +186,10 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { var block []byte // working buffer for encoding data values var err error + if ocfw.reuseBuffers { + block = ocfw.expandedBlock + } + // Encode and concatenate each data item into the block for _, datum := range data { if block, err = ocfw.header.codec.BinaryFromNative(block, datum); err != nil { @@ -185,13 +197,29 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { } } + if ocfw.reuseBuffers { + ocfw.expandedBlock = block[:0] + } + + var block2 []byte switch ocfw.header.compressionID { case compressionNull: // no-op + block2 = block case compressionDeflate: // compress into new bytes buffer. - bb := bytes.NewBuffer(make([]byte, 0, len(block))) + var deflateBlock []byte + if ocfw.reuseBuffers { + deflateBlock = ocfw.deflateBlock + if cap(deflateBlock) < len(block) { + deflateBlock = make([]byte, 0, len(block)) + } + } else { + deflateBlock = make([]byte, 0, len(block)) + } + + bb := bytes.NewBuffer(deflateBlock) cw, _ := flate.NewWriter(bb, flate.DefaultCompression) // writing bytes to cw will compress bytes and send to bb. @@ -201,7 +229,11 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { if err := cw.Close(); err != nil { return err } - block = bb.Bytes() + block2 = bb.Bytes() + + if ocfw.reuseBuffers { + ocfw.deflateBlock = deflateBlock[:0] + } case compressionSnappy: compressed := snappy.Encode(nil, block) @@ -210,7 +242,7 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { compressed = append(compressed, 0, 0, 0, 0) // expand slice by 4 bytes so checksum will fit binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(block)) // checksum of decompressed block - block = compressed + block2 = compressed default: return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID) @@ -218,11 +250,11 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { } // create file data block - buf := make([]byte, 0, len(block)+ocfBlockConst) // pre-allocate block bytes - buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items) - buf, _ = longBinaryFromNative(buf, len(block)) // block size (number of bytes in block) - buf = append(buf, block...) // serialized objects - buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker + buf := make([]byte, 0, len(block2)+ocfBlockConst) // pre-allocate block bytes + buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items) + buf, _ = longBinaryFromNative(buf, len(block2)) // block size (number of bytes in block) + buf = append(buf, block2...) // serialized objects + buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker _, err = ocfw.iow.Write(buf) return err