Skip to content

Commit

Permalink
Add functions to create arrow reader (#54)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 12, 2023
1 parent f74f1e6 commit a5bba27
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 18 deletions.
63 changes: 63 additions & 0 deletions go/common/arrow_util/arrow_util.go
Original file line number Diff line number Diff line change
@@ -1 +1,64 @@
package arrow_util

import (
"context"

"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
)

func MakeArrowFileReader(fs fs.Fs, filePath string) (*pqarrow.FileReader, error) {
f, err := fs.OpenFile(filePath)
if err != nil {
return nil, err
}
parquetReader, err := file.NewParquetReader(f, nil)
if err != nil {
return nil, err
}
return pqarrow.NewFileReader(parquetReader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
}

func MakeArrowRecordReader(reader *pqarrow.FileReader, opts *option.ReadOptions) (array.RecordReader, error) {
var rowGroupsIndices []int
var columnIndices []int
metadata := reader.ParquetReader().MetaData()
for _, c := range opts.Columns {
columnIndices = append(columnIndices, metadata.Schema.ColumnIndexByName(c))
}
for _, f := range opts.Filters {
columnIndices = append(columnIndices, metadata.Schema.ColumnIndexByName(f.GetColumnName()))
}

for i := 0; i < int(metadata.NumRows); i++ {
rg := metadata.RowGroup(i)
var canIgnored bool
for _, filter := range opts.Filters {
columnIndex := rg.Schema.ColumnIndexByName(filter.GetColumnName())
columnChunk, err := rg.ColumnChunk(columnIndex)
if err != nil {
return nil, err
}
columnStats, err := columnChunk.Statistics()
if err != nil {
return nil, err
}
if columnStats == nil || !columnStats.HasMinMax() {
continue
}
if filter.CheckStatistics(columnStats) {
canIgnored = true
break
}
}
if !canIgnored {
rowGroupsIndices = append(rowGroupsIndices, i)
}
}

return reader.GetRecordReader(context.TODO(), columnIndices, rowGroupsIndices)
}
38 changes: 20 additions & 18 deletions go/reader/record_reader/multi_files_sequential_reader.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
package record_reader

import (
"sync/atomic"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/format"
"github.com/milvus-io/milvus-storage/go/io/format/parquet"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
)

type MultiFilesSequentialReader struct {
fs fs.Fs
schema *arrow.Schema
files []string
nextPos int
currReader array.RecordReader
holdingFileReader format.Reader
err error
options *option.ReadOptions
fs fs.Fs
schema *arrow.Schema
files []string
nextPos int
options *option.ReadOptions
currReader *parquet.FileReader
err error
ref int64
}

func (m MultiFilesSequentialReader) Retain() {
//TODO implement me
panic("implement me")
atomic.AddInt64(&m.ref, 1)
}

func (m MultiFilesSequentialReader) Release() {
//TODO implement me
panic("implement me")
if atomic.AddInt64(&m.ref, -1) == 0 {
if m.currReader != nil {
m.currReader.Close()
}
}
}

func (m MultiFilesSequentialReader) Schema() *arrow.Schema {
//TODO implement me
panic("implement me")
return m.schema
}

func (m MultiFilesSequentialReader) Next() bool {
Expand All @@ -46,8 +48,7 @@ func (m MultiFilesSequentialReader) Record() arrow.Record {
}

func (m MultiFilesSequentialReader) Err() error {
//TODO implement me
panic("implement me")
return m.err
}

func NewMultiFilesSequentialReader(fs fs.Fs, fragments fragment.FragmentVector, schema *arrow.Schema, options *option.ReadOptions) *MultiFilesSequentialReader {
Expand All @@ -62,5 +63,6 @@ func NewMultiFilesSequentialReader(fs fs.Fs, fragments fragment.FragmentVector,
options: options,
files: files,
nextPos: 0,
ref: 1,
}
}

0 comments on commit a5bba27

Please sign in to comment.