Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/sql/colexec/left/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (leftJoin *LeftJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
} else {
ctr.rbat.CleanOnlyData()
for i := range ctr.rbat.Vecs {
ctr.rbat.Vecs[i].ResetWithSameType()
}
for i, rp := range leftJoin.Result {
if rp.Rel == 0 {
ctr.rbat.Vecs[i].SetSorted(ctr.inbat.Vecs[rp.Pos].GetSorted())
Expand Down Expand Up @@ -175,8 +178,7 @@ func (ctr *container) emptyProbe(ap *LeftJoin, proc *process.Process, result *vm
return err
}
} else {
ctr.rbat.Vecs[i].SetClass(vector.CONSTANT)
ctr.rbat.Vecs[i].SetLength(ctr.inbat.RowCount())
vector.SetConstNull(ctr.rbat.Vecs[i], ctr.inbat.RowCount(), proc.Mp())
}
}
ctr.rbat.AddRowCount(ap.ctr.inbat.RowCount())
Expand Down
120 changes: 117 additions & 3 deletions pkg/sql/colexec/left/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,114 @@ func TestJoin(t *testing.T) {
}
}

func TestLeftJoinResetAfterEmptyProbe(t *testing.T) {
tc := newTestCase(t, []bool{true}, []types.Type{types.T_int32.ToType()}, []colexec.ResultPos{
colexec.NewResultPos(0, 0),
colexec.NewResultPos(1, 0),
}, [][]*plan.Expr{
{
newExpr(0, types.T_int32.ToType()),
},
{
newExpr(0, types.T_int32.ToType()),
},
})

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, batch.EmptyBatch)
err := tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err := vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.True(t, res.Batch.Vecs[1].IsConst())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, colexec.MakeMockBatchs())
tc.proc.GetMessageBoard().Reset()
err = tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err = vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.False(t, res.Batch.Vecs[1].IsConst())
require.Equal(t, 2, res.Batch.Vecs[1].Length())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)
tc.arg.Free(tc.proc, false, nil)
tc.barg.Free(tc.proc, false, nil)
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}

func TestLeftJoinConstNullAfterNonEmptyProbe(t *testing.T) {
tc := newTestCase(t, []bool{true}, []types.Type{types.T_int32.ToType()}, []colexec.ResultPos{
colexec.NewResultPos(0, 0),
colexec.NewResultPos(1, 0),
}, [][]*plan.Expr{
{
newExpr(0, types.T_int32.ToType()),
},
{
newExpr(0, types.T_int32.ToType()),
},
})

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, colexec.MakeMockBatchs())
err := tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err := vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.False(t, res.Batch.Vecs[1].IsConst())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, batch.EmptyBatch)
tc.proc.GetMessageBoard().Reset()
err = tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err = vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.True(t, res.Batch.Vecs[1].IsConstNull())
require.Equal(t, 2, res.Batch.Vecs[1].Length())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)
tc.arg.Free(tc.proc, false, nil)
tc.barg.Free(tc.proc, false, nil)
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}

/*
func BenchmarkJoin(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -293,14 +401,20 @@ func newTestCase(t *testing.T, flgs []bool, ts []types.Type, rp []colexec.Result
}

func resetChildren(arg *LeftJoin) {
bat := colexec.MakeMockBatchs()
resetChildrenWithBatch(arg, colexec.MakeMockBatchs())
}

func resetHashBuildChildren(arg *hashbuild.HashBuild) {
resetHashBuildChildrenWithBatch(arg, colexec.MakeMockBatchs())
}

func resetChildrenWithBatch(arg *LeftJoin, bat *batch.Batch) {
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat})
arg.Children = nil
arg.AppendChild(op)
}

func resetHashBuildChildren(arg *hashbuild.HashBuild) {
bat := colexec.MakeMockBatchs()
func resetHashBuildChildrenWithBatch(arg *hashbuild.HashBuild, bat *batch.Batch) {
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat})
arg.Children = nil
arg.AppendChild(op)
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/loopjoin/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (loopJoin *LoopJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
} else {
ctr.rbat.CleanOnlyData()
for i := range ctr.rbat.Vecs {
ctr.rbat.Vecs[i].ResetWithSameType()
}
for i, rp := range loopJoin.Result {
if rp.Rel == 0 {
ctr.rbat.Vecs[i].SetSorted(ctr.inbat.Vecs[rp.Pos].GetSorted())
Expand Down Expand Up @@ -163,8 +166,7 @@ func (ctr *container) emptyProbe(ap *LoopJoin, proc *process.Process, result *vm
}
} else {
if ap.JoinType == LoopLeft || ap.JoinType == LoopSingle {
ctr.rbat.Vecs[i].SetClass(vector.CONSTANT)
ctr.rbat.Vecs[i].SetLength(ctr.inbat.RowCount())
vector.SetConstNull(ctr.rbat.Vecs[i], ctr.inbat.RowCount(), proc.Mp())
} else if ap.JoinType == LoopMark {
err := vector.SetConstFixed(ctr.rbat.Vecs[i], false, ctr.inbat.RowCount(), proc.Mp())
if err != nil {
Expand Down
114 changes: 111 additions & 3 deletions pkg/sql/colexec/loopjoin/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,108 @@ func TestJoin(t *testing.T) {
}
}

func TestLoopMarkResetAfterEmptyProbe(t *testing.T) {
tc := newTestCase(t, []bool{false}, []types.Type{types.T_int32.ToType()}, []colexec.ResultPos{
colexec.NewResultPos(0, 0),
colexec.NewResultPos(1, 0),
})
tc.arg.JoinType = LoopMark

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, batch.EmptyBatch)
err := tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err := vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.True(t, res.Batch.Vecs[1].IsConst())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, colexec.MakeMockBatchs())
tc.proc.GetMessageBoard().Reset()
err = tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err = vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.False(t, res.Batch.Vecs[1].IsConst())

expectedInput := colexec.MakeMockBatchs().Vecs[0]
expectedMark := testutil.MakeBoolVector([]bool{true, true}, nil)
require.Equal(t, expectedInput.GetType().Oid, res.Batch.Vecs[0].GetType().Oid)
require.Equal(t, bytes.Compare(expectedInput.UnsafeGetRawData(), res.Batch.Vecs[0].UnsafeGetRawData()), 0)
require.Equal(t, expectedMark.GetType().Oid, res.Batch.Vecs[1].GetType().Oid)
require.Equal(t, bytes.Compare(expectedMark.UnsafeGetRawData(), res.Batch.Vecs[1].UnsafeGetRawData()), 0)

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)
tc.arg.Free(tc.proc, false, nil)
tc.barg.Free(tc.proc, false, nil)
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}

func TestLoopLeftConstNullAfterNonEmptyProbe(t *testing.T) {
tc := newTestCase(t, []bool{false}, []types.Type{types.T_int32.ToType()}, []colexec.ResultPos{
colexec.NewResultPos(0, 0),
colexec.NewResultPos(1, 0),
})
tc.arg.JoinType = LoopLeft

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, colexec.MakeMockBatchs())
err := tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err := vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.False(t, res.Batch.Vecs[1].IsConst())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)

resetChildrenWithBatch(tc.arg, colexec.MakeMockBatchs())
resetHashBuildChildrenWithBatch(tc.barg, batch.EmptyBatch)
tc.proc.GetMessageBoard().Reset()
err = tc.arg.Prepare(tc.proc)
require.NoError(t, err)
err = tc.barg.Prepare(tc.proc)
require.NoError(t, err)

res, err = vm.Exec(tc.barg, tc.proc)
require.NoError(t, err)
require.Nil(t, res.Batch)
res, err = vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.True(t, res.Batch.Vecs[1].IsConstNull())
require.Equal(t, 2, res.Batch.Vecs[1].Length())

tc.arg.Reset(tc.proc, false, nil)
tc.barg.Reset(tc.proc, false, nil)
tc.arg.Free(tc.proc, false, nil)
tc.barg.Free(tc.proc, false, nil)
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}

/*
func BenchmarkJoin(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -240,14 +342,20 @@ func newTestCase(t *testing.T, flgs []bool, ts []types.Type, rp []colexec.Result
}

func resetChildren(arg *LoopJoin) {
bat := colexec.MakeMockBatchs()
resetChildrenWithBatch(arg, colexec.MakeMockBatchs())
}

func resetHashBuildChildren(arg *hashbuild.HashBuild) {
resetHashBuildChildrenWithBatch(arg, colexec.MakeMockBatchs())
}

func resetChildrenWithBatch(arg *LoopJoin, bat *batch.Batch) {
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat})
arg.Children = nil
arg.AppendChild(op)
}

func resetHashBuildChildren(arg *hashbuild.HashBuild) {
bat := colexec.MakeMockBatchs()
func resetHashBuildChildrenWithBatch(arg *hashbuild.HashBuild, bat *batch.Batch) {
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat})
arg.Children = nil
arg.AppendChild(op)
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/single/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (singleJoin *SingleJoin) Call(proc *process.Process) (vm.CallResult, error)
}
} else {
ctr.rbat.CleanOnlyData()
for i := range ctr.rbat.Vecs {
ctr.rbat.Vecs[i].ResetWithSameType()
}
}
for i, rp := range singleJoin.Result {
if rp.Rel == 0 {
Expand Down Expand Up @@ -157,8 +160,7 @@ func (singleJoin *SingleJoin) build(analyzer process.Analyzer, proc *process.Pro
func (ctr *container) emptyProbe(bat *batch.Batch, ap *SingleJoin, result *vm.CallResult) error {
for i, rp := range ap.Result {
if rp.Rel != 0 {
ctr.rbat.Vecs[i].SetClass(vector.CONSTANT)
ctr.rbat.Vecs[i].SetLength(bat.RowCount())
vector.SetConstNull(ctr.rbat.Vecs[i], bat.RowCount(), nil)
}
}
ctr.rbat.AddRowCount(bat.RowCount())
Expand Down
Loading
Loading