Skip to content

Commit

Permalink
[Go] add helper for unit test, add some test for space delete, and fi…
Browse files Browse the repository at this point in the history
…lter test
  • Loading branch information
loloxwg committed Oct 1, 2023
1 parent ed34eb3 commit 60e127b
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 62 deletions.
5 changes: 1 addition & 4 deletions go/storage/manifest/reader_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ type ManifestReaderWriter struct {
}

func findAllManifest(fs fs.Fs, path string) ([]fs.FileEntry, error) {
log.Debug("find all manifest", log.String("path", path))
files, err := fs.List(path)
for _, file := range files {
log.Debug("find all manifest", log.String("file", file.Path))
}
log.Debug("list all manifest:", log.Any("files", files))
if err != nil {
return nil, err
}
Expand Down
265 changes: 207 additions & 58 deletions go/storage/space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SpaceTestSuite struct {
suite.Suite
}

func (suite *SpaceTestSuite) TestSpaceReadWrite() {
func createSchema() *schema.Schema {
pkField := arrow.Field{
Name: "pk_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Expand All @@ -35,7 +35,12 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() {
Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}),
Nullable: false,
}
fields := []arrow.Field{pkField, vsField, vecField}
columnField := arrow.Field{
Name: "column_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
fields := []arrow.Field{pkField, vsField, vecField, columnField}

as := arrow.NewSchema(fields, nil)
schemaOptions := &schema.SchemaOptions{
Expand All @@ -45,9 +50,10 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() {
}

sc := schema.NewSchema(as, schemaOptions)
err := sc.Validate()
suite.NoError(err)
return sc
}

func recordReader() array.RecordReader {
pkBuilder := array.NewInt64Builder(memory.DefaultAllocator)
pkBuilder.AppendValues([]int64{1, 2, 3}, nil)
pkArr := pkBuilder.NewArray()
Expand All @@ -64,21 +70,63 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() {
}, nil)
vecArr := vecBuilder.NewArray()

arrs := []arrow.Array{pkArr, vsArr, vecArr}
columnBuilder := array.NewInt64Builder(memory.DefaultAllocator)
columnBuilder.AppendValues([]int64{1, 2, 3}, nil)
columnArr := columnBuilder.NewArray()

arrs := []arrow.Array{pkArr, vsArr, vecArr, columnArr}

rec := array.NewRecord(createSchema().Schema(), arrs, 3)
recReader, err := array.NewRecordReader(createSchema().Schema(), []arrow.Record{rec})
if err != nil {
panic(err)
}
return recReader
}

func deleteRecordReader() array.RecordReader {
pkField := arrow.Field{
Name: "pk_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vsField := arrow.Field{
Name: "vs_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}

deleteArrowSchema := arrow.NewSchema([]arrow.Field{pkField, vsField}, nil)

rec := array.NewRecord(as, arrs, 3)
recReader, err := array.NewRecordReader(as, []arrow.Record{rec})
deletePkBuilder := array.NewInt64Builder(memory.DefaultAllocator)
deletePkBuilder.AppendValues([]int64{1}, nil)
deletePkArr := deletePkBuilder.NewArray()

deleteVsBuilder := array.NewInt64Builder(memory.DefaultAllocator)
deleteVsBuilder.AppendValues([]int64{1}, nil)
deleteVsArr := deleteVsBuilder.NewArray()

deleteArray := []arrow.Array{deletePkArr, deleteVsArr}
rec := array.NewRecord(deleteArrowSchema, deleteArray, 1)
recReader, err := array.NewRecordReader(deleteArrowSchema, []arrow.Record{rec})
if err != nil {
panic(err)
}
return recReader
}

func (suite *SpaceTestSuite) TestSpaceReadWrite() {
sc := createSchema()
err := sc.Validate()
suite.NoError(err)

opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build()

space, err := storage.Open("file:///"+suite.T().TempDir(), opts)
suite.NoError(err)

writeOpt := &options.WriteOptions{MaxRecordPerFile: 1000}
err = space.Write(recReader, writeOpt)
err = space.Write(recordReader(), writeOpt)
suite.NoError(err)

f := filter.NewConstantFilter(filter.Equal, "pk_field", int64(1))
Expand All @@ -99,58 +147,10 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() {
}

func (suite *SpaceTestSuite) TestSpaceReadWriteConcurrency() {
pkField := arrow.Field{
Name: "pk_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vsField := arrow.Field{
Name: "vs_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vecField := arrow.Field{
Name: "vec_field",
Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}),
Nullable: false,
}
fields := []arrow.Field{pkField, vsField, vecField}

as := arrow.NewSchema(fields, nil)
schemaOptions := &schema.SchemaOptions{
PrimaryColumn: "pk_field",
VersionColumn: "vs_field",
VectorColumn: "vec_field",
}

sc := schema.NewSchema(as, schemaOptions)
sc := createSchema()
err := sc.Validate()
suite.NoError(err)

pkBuilder := array.NewInt64Builder(memory.DefaultAllocator)
pkBuilder.AppendValues([]int64{1, 2, 3}, nil)
pkArr := pkBuilder.NewArray()

vsBuilder := array.NewInt64Builder(memory.DefaultAllocator)
vsBuilder.AppendValues([]int64{1, 2, 3}, nil)
vsArr := vsBuilder.NewArray()

vecBuilder := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: 10})
vecBuilder.AppendValues([][]byte{
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
}, nil)
vecArr := vecBuilder.NewArray()

arrs := []arrow.Array{pkArr, vsArr, vecArr}

rec := array.NewRecord(as, arrs, 3)
recReader, err := array.NewRecordReader(as, []arrow.Record{rec})
if err != nil {
panic(err)
}

opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build()

space, err := storage.Open("file:///"+suite.T().TempDir(), opts)
Expand All @@ -162,7 +162,7 @@ func (suite *SpaceTestSuite) TestSpaceReadWriteConcurrency() {
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
err = space.Write(recReader, writeOpt)
err = space.Write(recordReader(), writeOpt)
suite.NoError(err)
wg.Done()
}()
Expand All @@ -171,6 +171,155 @@ func (suite *SpaceTestSuite) TestSpaceReadWriteConcurrency() {
wg.Wait()
}

func (suite *SpaceTestSuite) TestSpaceDelete() {
sc := createSchema()
err := sc.Validate()
suite.NoError(err)

opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build()

space, err := storage.Open("file:///"+suite.T().TempDir(), opts)
suite.NoError(err)

err = space.Delete(deleteRecordReader())
suite.NoError(err)
}

func (suite *SpaceTestSuite) TestSpaceReadWithFilter() {
sc := createSchema()
err := sc.Validate()
suite.NoError(err)

opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build()

space, err := storage.Open("file:///"+suite.T().TempDir(), opts)
suite.NoError(err)

writeOpt := &options.WriteOptions{MaxRecordPerFile: 1000}
err = space.Write(recordReader(), writeOpt)
suite.NoError(err)

f := filter.NewConstantFilter(filter.Equal, "pk_field", int64(1))
readOpt := options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err := space.Read(readOpt)
suite.NoError(err)
var resValues []int64
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{1}, resValues)

f = filter.NewConstantFilter(filter.GreaterThan, "pk_field", int64(1))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{2, 3}, resValues)

f = filter.NewConstantFilter(filter.NotEqual, "pk_field", int64(1))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{2, 3}, resValues)

f = filter.NewConstantFilter(filter.LessThan, "pk_field", int64(1))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{}, resValues)

f = filter.NewConstantFilter(filter.LessThan, "pk_field", int64(1))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{}, resValues)

f = filter.NewConstantFilter(filter.LessThanOrEqual, "pk_field", int64(1))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{1}, resValues)

f = filter.NewConstantFilter(filter.GreaterThanOrEqual, "pk_field", int64(1))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{1, 2, 3}, resValues)

f = filter.NewConstantFilter(filter.GreaterThan, "pk_field", int64(2))
readOpt = options.NewReadOptions()
readOpt.AddFilter(f)
readOpt.AddColumn("pk_field")
readReader, err = space.Read(readOpt)
suite.NoError(err)
resValues = []int64{}
for readReader.Next() {
rec := readReader.Record()
cols := rec.Columns()
values := cols[0].(*array.Int64).Int64Values()
resValues = append(resValues, values...)
}
suite.ElementsMatch([]int64{3}, resValues)
}

func TestSpaceTestSuite(t *testing.T) {
suite.Run(t, new(SpaceTestSuite))
}

0 comments on commit 60e127b

Please sign in to comment.