Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/test/utils/noleak.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func ensureNoLeaks() error {

func ensureNoGoroutines() error {
var ignored = []goleak.Option{
goleak.IgnoreTopFunction("internal/synctest.Run"),
goleak.IgnoreTopFunction("testing/synctest.testingSynctestTest"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
Expand Down
40 changes: 21 additions & 19 deletions go/vt/vtgate/executor_scatter_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package vtgate
import (
"net/http/httptest"
"testing"
"time"
"testing/synctest"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -53,29 +53,31 @@ func TestScatterStatsWithSingleScatterQuery(t *testing.T) {
}

func TestScatterStatsHttpWriting(t *testing.T) {
executor, _, _, _, ctx := createExecutorEnv(t)
session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@primary"})
synctest.Test(t, func(t *testing.T) {
executor, _, _, _, ctx := createExecutorEnv(t)
session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@primary"})

_, err := executorExecSession(ctx, executor, session, "select * from user", nil)
require.NoError(t, err)
_, err := executorExecSession(ctx, executor, session, "select * from user", nil)
require.NoError(t, err)

_, err = executorExecSession(ctx, executor, session, "select * from user where Id = 15", nil)
require.NoError(t, err)
_, err = executorExecSession(ctx, executor, session, "select * from user where Id = 15", nil)
require.NoError(t, err)

_, err = executorExecSession(ctx, executor, session, "select * from user where Id > 15", nil)
require.NoError(t, err)
_, err = executorExecSession(ctx, executor, session, "select * from user where Id > 15", nil)
require.NoError(t, err)

query4 := "select * from user as u1 join user as u2 on u1.Id = u2.Id"
_, err = executorExecSession(ctx, executor, session, query4, nil)
require.NoError(t, err)
query4 := "select * from user as u1 join user as u2 on u1.Id = u2.Id"
_, err = executorExecSession(ctx, executor, session, query4, nil)
require.NoError(t, err)

time.Sleep(500 * time.Millisecond)
synctest.Wait()

recorder := httptest.NewRecorder()
executor.WriteScatterStats(recorder)
recorder := httptest.NewRecorder()
executor.WriteScatterStats(recorder)

// Here we are checking that the template was executed correctly.
// If it wasn't, instead of html, we'll get an error message
require.Contains(t, recorder.Body.String(), "select * from `user` as u1 join `user` as u2 on u1.Id = u2.Id")
require.NoError(t, err)
// Here we are checking that the template was executed correctly.
// If it wasn't, instead of html, we'll get an error message
require.Contains(t, recorder.Body.String(), "select * from `user` as u1 join `user` as u2 on u1.Id = u2.Id")
require.NoError(t, err)
})
}
83 changes: 44 additions & 39 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"testing"
"testing/synctest"
"time"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -3484,46 +3485,50 @@ func TestSelectFromInformationSchema(t *testing.T) {
}

func TestStreamOrderByWithMultipleResults(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Special setup: Don't use createExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
u := createSandbox(KsTestUnsharded)
s := createSandbox(KsTestSharded)
s.VSchema = executorVSchema
u.VSchema = unshardedVSchema
serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
count := 1
for _, shard := range shards {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count, count)),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count+10, count)),
})
count++
}
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig())
executor.SetQueryLogger(queryLogger)
defer executor.Close()
// some sleep for all goroutines to start
time.Sleep(100 * time.Millisecond)
before := runtime.NumGoroutine()
synctest.Test(t, func(t *testing.T) {
ctx := utils.LeakCheckContext(t)

query := "select id, col from user order by id"
gotResult, err := executorStream(ctx, executor, query)
require.NoError(t, err)

wantResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"),
"1|1", "2|2", "3|3", "4|4", "5|5", "6|6", "7|7", "8|8", "11|1", "12|2", "13|3", "14|4", "15|5", "16|6", "17|7", "18|8")
assert.Equal(t, fmt.Sprintf("%v", wantResult.Rows), fmt.Sprintf("%v", gotResult.Rows))
// some sleep to close all goroutines.
time.Sleep(100 * time.Millisecond)
assert.GreaterOrEqual(t, before, runtime.NumGoroutine(), "left open goroutines lingering")
// Special setup: Don't use createExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
u := createSandbox(KsTestUnsharded)
s := createSandbox(KsTestSharded)
s.VSchema = executorVSchema
u.VSchema = unshardedVSchema
serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
count := 1
for _, shard := range shards {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count, count)),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col|weight_string(id)", "int32|int32|varchar"), fmt.Sprintf("%d|%d|NULL", count+10, count)),
})
count++
}
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig())
executor.SetQueryLogger(queryLogger)
defer executor.Close()

// some sleep for all goroutines to start
synctest.Wait()
before := runtime.NumGoroutine()

query := "select id, col from user order by id"
gotResult, err := executorStream(ctx, executor, query)
require.NoError(t, err)

wantResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"),
"1|1", "2|2", "3|3", "4|4", "5|5", "6|6", "7|7", "8|8", "11|1", "12|2", "13|3", "14|4", "15|5", "16|6", "17|7", "18|8")
assert.Equal(t, fmt.Sprintf("%v", wantResult.Rows), fmt.Sprintf("%v", gotResult.Rows))

// some sleep to close all goroutines.
synctest.Wait()
assert.GreaterOrEqual(t, before, runtime.NumGoroutine(), "left open goroutines lingering")
})
}

func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
Expand Down
25 changes: 14 additions & 11 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sort"
"strings"
"testing"
"testing/synctest"
"time"
"unsafe"

Expand Down Expand Up @@ -1578,17 +1579,19 @@ func TestExecutorUnrecognized(t *testing.T) {
}

func TestExecutorDeniedErrorNoBuffer(t *testing.T) {
executor, sbc1, _, _, ctx := createExecutorEnv(t)
sbc1.EphemeralShardErr = errors.New("enforce denied tables")

vschemaWaitTimeout = 500 * time.Millisecond

session := econtext.NewAutocommitSession(&vtgatepb.Session{TargetString: "@primary"})
startExec := time.Now()
_, err := executorExecSession(ctx, executor, session, "select * from user", nil)
require.NoError(t, err, "enforce denied tables not buffered")
endExec := time.Now()
require.GreaterOrEqual(t, endExec.Sub(startExec).Milliseconds(), int64(500))
synctest.Test(t, func(t *testing.T) {
executor, sbc1, _, _, ctx := createExecutorEnv(t)
sbc1.EphemeralShardErr = errors.New("enforce denied tables")

vschemaWaitTimeout = 500 * time.Millisecond

session := econtext.NewAutocommitSession(&vtgatepb.Session{TargetString: "@primary"})
startExec := time.Now()
_, err := executorExecSession(ctx, executor, session, "select * from user", nil)
require.NoError(t, err, "enforce denied tables not buffered")
endExec := time.Now()
require.GreaterOrEqual(t, endExec.Sub(startExec).Milliseconds(), int64(500))
})
}

// TestVSchemaStats makes sure the building and displaying of the
Expand Down
54 changes: 29 additions & 25 deletions go/vt/vtgate/plugin_mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"strings"
"syscall"
"testing"
"time"
"testing/synctest"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -298,35 +298,39 @@ func TestInitTLSConfigWithServerCA(t *testing.T) {
}

func testInitTLSConfig(t *testing.T, serverCA bool) {
// Create the certs.
ctx := utils.LeakCheckContext(t)

root := t.TempDir()
tlstest.CreateCA(root)
tlstest.CreateCRL(root, tlstest.CA)
tlstest.CreateSignedCert(root, tlstest.CA, "01", "server", "server.example.com")
synctest.Test(t, func(t *testing.T) {
// Create the certs.
ctx := utils.LeakCheckContext(t)

root := t.TempDir()
tlstest.CreateCA(root)
tlstest.CreateCRL(root, tlstest.CA)
tlstest.CreateSignedCert(root, tlstest.CA, "01", "server", "server.example.com")

serverCACert := ""
if serverCA {
serverCACert = path.Join(root, "ca-cert.pem")
}

serverCACert := ""
if serverCA {
serverCACert = path.Join(root, "ca-cert.pem")
}
srv := &mysqlServer{tcpListener: &mysql.Listener{}}
if err := initTLSConfig(ctx, srv, path.Join(root, "server-cert.pem"), path.Join(root, "server-key.pem"), path.Join(root, "ca-cert.pem"), path.Join(root, "ca-crl.pem"), serverCACert, true, tls.VersionTLS12); err != nil {
t.Fatalf("init tls config failure due to: +%v", err)
}

srv := &mysqlServer{tcpListener: &mysql.Listener{}}
if err := initTLSConfig(ctx, srv, path.Join(root, "server-cert.pem"), path.Join(root, "server-key.pem"), path.Join(root, "ca-cert.pem"), path.Join(root, "ca-crl.pem"), serverCACert, true, tls.VersionTLS12); err != nil {
t.Fatalf("init tls config failure due to: +%v", err)
}
serverConfig := srv.tcpListener.TLSConfig.Load()
if serverConfig == nil {
t.Fatalf("init tls config shouldn't create nil server config")
}

serverConfig := srv.tcpListener.TLSConfig.Load()
if serverConfig == nil {
t.Fatalf("init tls config shouldn't create nil server config")
}
srv.sigChan <- syscall.SIGHUP

srv.sigChan <- syscall.SIGHUP
time.Sleep(100 * time.Millisecond) // wait for signal handler
// wait for signal handler
synctest.Wait()

if srv.tcpListener.TLSConfig.Load() == serverConfig {
t.Fatalf("init tls config should have been recreated after SIGHUP")
}
if srv.tcpListener.TLSConfig.Load() == serverConfig {
t.Fatalf("init tls config should have been recreated after SIGHUP")
}
})
}

// TestKillMethods test the mysql plugin for kill method calls.
Expand Down
Loading
Loading