diff --git a/go.mod b/go.mod index 7573301..ef6b829 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,8 @@ module github.com/linkedin/goavro/v2 go 1.12 -require github.com/golang/snappy v0.0.1 +require ( + github.com/dsnet/compress v0.0.1 + github.com/golang/snappy v0.0.1 + github.com/klauspost/compress v1.10.3 +) diff --git a/go.sum b/go.sum index 331e6a1..8d73c8c 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,11 @@ +github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q= +github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo= +github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/klauspost/compress v1.4.1 h1:8VMb5+0wMgdBykOV96DwNwKFQ+WTI4pzYURP99CcB9E= +github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8= diff --git a/ocf.go b/ocf.go index 61f703a..254b364 100644 --- a/ocf.go +++ b/ocf.go @@ -28,6 +28,14 @@ const ( // CompressionSnappyLabel is used when OCF blocks are compressed using the // snappy algorithm. CompressionSnappyLabel = "snappy" + + // CompressionBzip2Label is used when OCF blocks are compressed using the + // bzip2 algorithm. + CompressionBzip2Label = "bzip2" + + // CompressionZstdLabel is used when OCF blocks are compressed using the + // ZStandart algorithm. + CompressionZstdLabel = "zstandard" ) // compressionID are values used to specify compression algorithm used to compress @@ -38,6 +46,8 @@ const ( compressionNull compressionID = iota compressionDeflate compressionSnappy + compressionBzip2 + compressionZstd ) const ( @@ -81,6 +91,10 @@ func newOCFHeader(config OCFConfig) (*ocfHeader, error) { header.compressionID = compressionDeflate case CompressionSnappyLabel: header.compressionID = compressionSnappy + case CompressionBzip2Label: + header.compressionID = compressionBzip2 + case CompressionZstdLabel: + header.compressionID = compressionZstd default: return nil, fmt.Errorf("cannot create OCF header using unrecognized compression algorithm: %q", config.CompressionName) } @@ -153,6 +167,10 @@ func readOCFHeader(ior io.Reader) (*ocfHeader, error) { cID = compressionDeflate case CompressionSnappyLabel: cID = compressionSnappy + case CompressionBzip2Label: + cID = compressionBzip2 + case CompressionZstdLabel: + cID = compressionZstd default: return nil, fmt.Errorf("cannot read OCF header using unrecognized compression algorithm from avro.codec: %q", avroCodec) } @@ -197,6 +215,10 @@ func writeOCFHeader(header *ocfHeader, iow io.Writer) (err error) { avroCodec = CompressionDeflateLabel case compressionSnappy: avroCodec = CompressionSnappyLabel + case compressionBzip2: + avroCodec = CompressionBzip2Label + case compressionZstd: + avroCodec = CompressionZstdLabel default: return fmt.Errorf("should not get here: cannot write OCF header using unrecognized compression algorithm: %d", header.compressionID) } diff --git a/ocf_reader.go b/ocf_reader.go index fbcc696..7753c3b 100644 --- a/ocf_reader.go +++ b/ocf_reader.go @@ -19,7 +19,9 @@ import ( "io" "io/ioutil" + "github.com/dsnet/compress/bzip2" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" ) // OCFReader structure is used to read Object Container Files (OCF). @@ -80,6 +82,10 @@ func (ocfr *OCFReader) CompressionName() string { return CompressionDeflateLabel case compressionSnappy: return CompressionSnappyLabel + case compressionBzip2: + return CompressionBzip2Label + case compressionZstd: + return CompressionZstdLabel default: return "should not get here: unrecognized compression algorithm" } @@ -220,6 +226,35 @@ func (ocfr *OCFReader) Scan() bool { } ocfr.block = decoded + case compressionBzip2: + rc, err := bzip2.NewReader(bytes.NewBuffer(ocfr.block), nil) + if err != nil { + ocfr.rerr = fmt.Errorf("bzip2 error: %s", err) + return false + } + ocfr.block, ocfr.rerr = ioutil.ReadAll(rc) + if ocfr.rerr != nil { + _ = rc.Close() + return false + } + if ocfr.rerr = rc.Close(); ocfr.rerr != nil { + return false + } + + case compressionZstd: + rc, err := zstd.NewReader(bytes.NewBuffer(ocfr.block)) + if err != nil { + ocfr.rerr = fmt.Errorf("zstd error: %s", err) + return false + } + ocfr.block, ocfr.rerr = ioutil.ReadAll(rc) + if cerr := rc.Close(); ocfr.rerr == nil { + ocfr.rerr = cerr + } + if ocfr.rerr != nil { + return false + } + default: ocfr.rerr = fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfr.header.compressionID) return false diff --git a/ocf_test.go b/ocf_test.go index 3d4d4bd..010e1e8 100644 --- a/ocf_test.go +++ b/ocf_test.go @@ -92,6 +92,10 @@ func TestOCFWriterCompressionSnappy(t *testing.T) { testOCFRoundTrip(t, CompressionSnappyLabel) } +func TestOCFWriterCompressionBzip2(t *testing.T) { + testOCFRoundTrip(t, CompressionBzip2Label) +} + func TestOCFWriterWithApplicationMetaData(t *testing.T) { testOCFRoundTripWithHeaders(t, CompressionNullLabel, map[string][]byte{"foo": []byte("BOING"), "goo": []byte("zoo")}) } diff --git a/ocf_writer.go b/ocf_writer.go index 820af5c..731a09d 100644 --- a/ocf_writer.go +++ b/ocf_writer.go @@ -20,7 +20,9 @@ import ( "io/ioutil" "os" + "github.com/dsnet/compress/bzip2" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" ) // OCFConfig is used to specify creation parameters for OCFWriter. @@ -212,6 +214,28 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error { block = compressed + case compressionBzip2: + bb := bytes.NewBuffer(make([]byte, 0, len(block))) + cw, _ := bzip2.NewWriter(bb, &bzip2.WriterConfig{Level: 9}) + if _, err := cw.Write(block); err != nil { + return err + } + if err := cw.Close(); err != nil { + return err + } + block = bb.Bytes() + + case compressionZstd: + bb := bytes.NewBuffer(make([]byte, 0, len(block))) + cw, _ := zstd.NewWriter(bb) + if _, err := cw.Write(block); err != nil { + return err + } + if err := cw.Close(); err != nil { + return err + } + block = bb.Bytes() + default: return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID)