Skip to content

Commit

Permalink
implement multiple files reader (#55)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 12, 2023
1 parent a5bba27 commit 6f78647
Showing 1 changed file with 47 additions and 13 deletions.
60 changes: 47 additions & 13 deletions go/reader/record_reader/multi_files_sequential_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"sync/atomic"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/milvus-io/milvus-storage/go/common/arrow_util"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/format/parquet"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
)
Expand All @@ -16,41 +18,73 @@ type MultiFilesSequentialReader struct {
files []string
nextPos int
options *option.ReadOptions
currReader *parquet.FileReader
currReader array.RecordReader
err error
ref int64
}

func (m MultiFilesSequentialReader) Retain() {
func (m *MultiFilesSequentialReader) Retain() {
atomic.AddInt64(&m.ref, 1)
}

func (m MultiFilesSequentialReader) Release() {
func (m *MultiFilesSequentialReader) Release() {
if atomic.AddInt64(&m.ref, -1) == 0 {
if m.currReader != nil {
m.currReader.Close()
m.currReader.Release()
m.currReader = nil
}
}
}

func (m MultiFilesSequentialReader) Schema() *arrow.Schema {
func (m *MultiFilesSequentialReader) Schema() *arrow.Schema {
return m.schema
}

func (m MultiFilesSequentialReader) Next() bool {
//TODO implement me
panic("implement me")
func (m *MultiFilesSequentialReader) Next() bool {
for true {
if m.currReader == nil {
if m.nextPos >= len(m.files) {
return false
}

m.nextReader()
if m.err != nil {
return false
}
m.nextPos++
}
if m.currReader.Next() {
return true
}
if m.currReader != nil {
m.currReader.Release()
m.currReader = nil
}
}
return false
}

func (m MultiFilesSequentialReader) Record() arrow.Record {
//TODO implement me
panic("implement me")
func (m *MultiFilesSequentialReader) Record() arrow.Record {
if m.currReader != nil {
return m.currReader.Record()
}
return nil
}

func (m MultiFilesSequentialReader) Err() error {
func (m *MultiFilesSequentialReader) Err() error {
return m.err
}

func (m *MultiFilesSequentialReader) nextReader() {
var fileReader *pqarrow.FileReader
fileReader, m.err = arrow_util.MakeArrowFileReader(m.fs, m.files[m.nextPos])
if m.err != nil {
return
}
m.currReader, m.err = arrow_util.MakeArrowRecordReader(fileReader, m.options)
return
}

func NewMultiFilesSequentialReader(fs fs.Fs, fragments fragment.FragmentVector, schema *arrow.Schema, options *option.ReadOptions) *MultiFilesSequentialReader {
files := make([]string, 0, len(fragments))
for _, f := range fragments {
Expand Down

0 comments on commit 6f78647

Please sign in to comment.