Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ module github.com/linkedin/goavro/v2

go 1.12

require github.com/golang/snappy v0.0.1
require (
github.com/dsnet/compress v0.0.1
github.com/golang/snappy v0.0.1
github.com/klauspost/compress v1.10.3
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q=
github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo=
github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/klauspost/compress v1.4.1 h1:8VMb5+0wMgdBykOV96DwNwKFQ+WTI4pzYURP99CcB9E=
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8=
22 changes: 22 additions & 0 deletions ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ const (
// CompressionSnappyLabel is used when OCF blocks are compressed using the
// snappy algorithm.
CompressionSnappyLabel = "snappy"

// CompressionBzip2Label is used when OCF blocks are compressed using the
// bzip2 algorithm.
CompressionBzip2Label = "bzip2"

// CompressionZstdLabel is used when OCF blocks are compressed using the
// ZStandart algorithm.
CompressionZstdLabel = "zstandard"
)

// compressionID are values used to specify compression algorithm used to compress
Expand All @@ -38,6 +46,8 @@ const (
compressionNull compressionID = iota
compressionDeflate
compressionSnappy
compressionBzip2
compressionZstd
)

const (
Expand Down Expand Up @@ -81,6 +91,10 @@ func newOCFHeader(config OCFConfig) (*ocfHeader, error) {
header.compressionID = compressionDeflate
case CompressionSnappyLabel:
header.compressionID = compressionSnappy
case CompressionBzip2Label:
header.compressionID = compressionBzip2
case CompressionZstdLabel:
header.compressionID = compressionZstd
default:
return nil, fmt.Errorf("cannot create OCF header using unrecognized compression algorithm: %q", config.CompressionName)
}
Expand Down Expand Up @@ -153,6 +167,10 @@ func readOCFHeader(ior io.Reader) (*ocfHeader, error) {
cID = compressionDeflate
case CompressionSnappyLabel:
cID = compressionSnappy
case CompressionBzip2Label:
cID = compressionBzip2
case CompressionZstdLabel:
cID = compressionZstd
default:
return nil, fmt.Errorf("cannot read OCF header using unrecognized compression algorithm from avro.codec: %q", avroCodec)
}
Expand Down Expand Up @@ -197,6 +215,10 @@ func writeOCFHeader(header *ocfHeader, iow io.Writer) (err error) {
avroCodec = CompressionDeflateLabel
case compressionSnappy:
avroCodec = CompressionSnappyLabel
case compressionBzip2:
avroCodec = CompressionBzip2Label
case compressionZstd:
avroCodec = CompressionZstdLabel
default:
return fmt.Errorf("should not get here: cannot write OCF header using unrecognized compression algorithm: %d", header.compressionID)
}
Expand Down
33 changes: 33 additions & 0 deletions ocf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"io"
"io/ioutil"

"github.com/dsnet/compress/bzip2"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

// OCFReader structure is used to read Object Container Files (OCF).
Expand Down Expand Up @@ -80,6 +82,10 @@ func (ocfr *OCFReader) CompressionName() string {
return CompressionDeflateLabel
case compressionSnappy:
return CompressionSnappyLabel
case compressionBzip2:
return CompressionBzip2Label
case compressionZstd:
return CompressionZstdLabel
default:
return "should not get here: unrecognized compression algorithm"
}
Expand Down Expand Up @@ -220,6 +226,33 @@ func (ocfr *OCFReader) Scan() bool {
}
ocfr.block = decoded

case compressionBzip2:
rc, err := bzip2.NewReader(bytes.NewBuffer(ocfr.block), nil)
if err != nil {
ocfr.rerr = fmt.Errorf("bzip2 error: %s", err)
return false
}
ocfr.block, ocfr.rerr = ioutil.ReadAll(rc)
if ocfr.rerr != nil {
_ = rc.Close()
return false
}
if ocfr.rerr = rc.Close(); ocfr.rerr != nil {
return false
}

case compressionZstd:
rc, err := zstd.NewReader(bytes.NewBuffer(ocfr.block))
if err != nil {
ocfr.rerr = fmt.Errorf("zstd error: %s", err)
return false
}
ocfr.block, ocfr.rerr = ioutil.ReadAll(rc)
rc.Close()
if ocfr.rerr != nil {
return false
}

default:
ocfr.rerr = fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfr.header.compressionID)
return false
Expand Down
4 changes: 4 additions & 0 deletions ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func TestOCFWriterCompressionSnappy(t *testing.T) {
testOCFRoundTrip(t, CompressionSnappyLabel)
}

func TestOCFWriterCompressionBzip2(t *testing.T) {
testOCFRoundTrip(t, CompressionBzip2Label)
}

func TestOCFWriterWithApplicationMetaData(t *testing.T) {
testOCFRoundTripWithHeaders(t, CompressionNullLabel, map[string][]byte{"foo": []byte("BOING"), "goo": []byte("zoo")})
}
24 changes: 24 additions & 0 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"io/ioutil"
"os"

"github.com/dsnet/compress/bzip2"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

// OCFConfig is used to specify creation parameters for OCFWriter.
Expand Down Expand Up @@ -212,6 +214,28 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {

block = compressed

case compressionBzip2:
bb := bytes.NewBuffer(make([]byte, 0, len(block)))
cw, _ := bzip2.NewWriter(bb, &bzip2.WriterConfig{Level: 9})
if _, err := cw.Write(block); err != nil {
return err
}
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()

case compressionZstd:
bb := bytes.NewBuffer(make([]byte, 0, len(block)))
cw, _ := zstd.NewWriter(bb)
if _, err := cw.Write(block); err != nil {
return err
}
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()

default:
return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID)

Expand Down