Skip to content

Commit

Permalink
feat(read): basic reader (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
loloxwg committed Sep 12, 2023
1 parent 7a8c8ff commit f74f1e6
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 4 deletions.
50 changes: 50 additions & 0 deletions go/reader/common_reader/delete_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package common_reader

import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options/schema_option"
)

type DeleteReader struct {
recordReader array.RecordReader
schemaOptions *schema_option.SchemaOptions
deleteFragments fragment.DeleteFragmentVector
options *option.ReadOptions
}

func (d DeleteReader) Retain() {
//TODO implement me
panic("implement me")
}

func (d DeleteReader) Release() {
//TODO implement me
panic("implement me")
}

func (d DeleteReader) Schema() *arrow.Schema {
//TODO implement me
panic("implement me")
}

func (d DeleteReader) Next() bool {
//TODO implement me
panic("implement me")
}

func (d DeleteReader) Record() arrow.Record {
//TODO implement me
panic("implement me")
}

func (d DeleteReader) Err() error {
//TODO implement me
panic("implement me")
}

func NewDeleteReader(recordReader array.RecordReader, schemaOptions *schema_option.SchemaOptions, deleteFragments fragment.DeleteFragmentVector, options *option.ReadOptions) *DeleteReader {
return &DeleteReader{recordReader: recordReader, schemaOptions: schemaOptions, deleteFragments: deleteFragments, options: options}
}
26 changes: 26 additions & 0 deletions go/reader/common_reader/filter_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common_reader

import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
)
Expand All @@ -11,6 +12,31 @@ type FilterReader struct {
currentFilteredBatchReader array.RecordReader
}

func (r *FilterReader) Retain() {
//TODO implement me
panic("implement me")
}

func (r *FilterReader) Release() {
//TODO implement me
panic("implement me")
}

func (r *FilterReader) Schema() *arrow.Schema {
//TODO implement me
panic("implement me")
}

func (r *FilterReader) Record() arrow.Record {
//TODO implement me
panic("implement me")
}

func (r *FilterReader) Err() error {
//TODO implement me
panic("implement me")
}

func MakeFilterReader(recordReader array.RecordReader, option *option.ReadOptions) *FilterReader {
return &FilterReader{
recordReader: recordReader,
Expand Down
20 changes: 20 additions & 0 deletions go/reader/common_reader/projection_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package common_reader

import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/common/utils"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
)

type ProjectionReader struct {
array.RecordReader
reader array.RecordReader
options *option.ReadOptions
schema *arrow.Schema
}

func NewProjectionReader(reader array.RecordReader, options *option.ReadOptions, schema *arrow.Schema) array.RecordReader {
projectionSchema := utils.ProjectSchema(schema, options.Columns)
return &ProjectionReader{reader: reader, options: options, schema: projectionSchema}
}
30 changes: 30 additions & 0 deletions go/reader/record_reader/multi_files_sequential_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,36 @@ type MultiFilesSequentialReader struct {
options *option.ReadOptions
}

func (m MultiFilesSequentialReader) Retain() {
//TODO implement me
panic("implement me")
}

func (m MultiFilesSequentialReader) Release() {
//TODO implement me
panic("implement me")
}

func (m MultiFilesSequentialReader) Schema() *arrow.Schema {
//TODO implement me
panic("implement me")
}

func (m MultiFilesSequentialReader) Next() bool {
//TODO implement me
panic("implement me")
}

func (m MultiFilesSequentialReader) Record() arrow.Record {
//TODO implement me
panic("implement me")
}

func (m MultiFilesSequentialReader) Err() error {
//TODO implement me
panic("implement me")
}

func NewMultiFilesSequentialReader(fs fs.Fs, fragments fragment.FragmentVector, schema *arrow.Schema, options *option.ReadOptions) *MultiFilesSequentialReader {
files := make([]string, 0, len(fragments))
for _, f := range fragments {
Expand Down
14 changes: 10 additions & 4 deletions go/reader/record_reader/scan_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/milvus-io/milvus-storage/go/io/format"
"github.com/milvus-io/milvus-storage/go/io/format/parquet"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/reader/common_reader"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"go.uber.org/zap"
Expand Down Expand Up @@ -113,12 +114,17 @@ func (r *ScanRecordReader) Record() arrow.Record {
}

func (r *ScanRecordReader) Err() error {
//TODO implement me
panic("implement me")
return r.err
}

func (r *ScanRecordReader) MakeInnerReader() array.RecordReader {
//TODO implement me
// reader := NewMultiFilesSequentialReader(r.fs, r.dataFragments, r.Schema(), r.options)
return nil
reader := NewMultiFilesSequentialReader(r.fs, r.dataFragments, r.Schema(), r.options)

filterReader := common_reader.MakeFilterReader(reader, r.options)

deleteReader := common_reader.NewDeleteReader(filterReader, r.schema.Options(), r.deleteFragments, r.options)

res := common_reader.NewProjectionReader(deleteReader, r.options, r.schema.Schema())
return res
}

0 comments on commit f74f1e6

Please sign in to comment.