@@ -20,6 +20,7 @@ import (
20
20
"context"
21
21
_ "embed"
22
22
"flag"
23
+ "fmt"
23
24
"os"
24
25
"testing"
25
26
"time"
42
43
43
44
//go:embed schema.sql
44
45
SchemaSQL string
46
+
47
+ tables = []string {"t1" , "t2" , "t3" , "t4" , "t5" }
45
48
)
46
49
47
50
func TestMain (m * testing.M ) {
@@ -100,19 +103,26 @@ func TestQueriesWithRoutingRules(t *testing.T) {
100
103
}()
101
104
startQueries (t , ctx )
102
105
103
- workflow := "TestQueriesWithRoutingRules"
104
- mtw := cluster .NewMoveTables (t , clusterInstance , workflow , tks , sks , "t1" , nil )
105
- out , err := mtw .Create ()
106
- t .Logf ("movetables created: %s" , out )
107
- require .NoError (t , err )
106
+ mtws := make ([]* cluster.MoveTablesWorkflow , 0 , len (tables ))
107
+ for _ , table := range tables {
108
+ workflow := "TestQueriesWithRoutingRules_" + table
109
+ mtw := cluster .NewMoveTables (t , clusterInstance , workflow , tks , sks , table , nil )
110
+ mtws = append (mtws , mtw )
111
+ out , err := mtw .Create ()
112
+ t .Logf ("movetables created: %s" , out )
113
+ require .NoError (t , err )
114
+ }
108
115
109
- mtw .WaitForVreplCatchup (5 * time .Second )
110
- t .Logf ("movetables catchup phase completed" )
116
+ for i , mtw := range mtws {
117
+ mtw .WaitForVreplCatchup (5 * time .Second )
118
+ t .Logf ("%s catchup phase completed" , tables [i ])
119
+ }
111
120
112
- time .Sleep (2 * time .Second )
113
- out , err = mtw .SwitchReads ()
114
- t .Logf ("movetables switch reads: %v" , out )
115
- require .NoError (t , err )
121
+ for _ , mtw := range mtws {
122
+ out , err := mtw .SwitchReads ()
123
+ t .Logf ("switch reads: %v" , out )
124
+ require .NoError (t , err )
125
+ }
116
126
}
117
127
118
128
func startQueries (t * testing.T , ctx context.Context ) {
@@ -124,14 +134,16 @@ func startQueries(t *testing.T, ctx context.Context) {
124
134
conns = append (conns , conn )
125
135
}
126
136
127
- query := "select * from t1 "
128
- for _ , conn := range conns {
129
- go func (conn * mysql.Conn ) {
137
+ queryTemp := "select * from %s "
138
+ for idx , conn := range conns {
139
+ go func (i int , conn * mysql.Conn ) {
130
140
defer conn .Close ()
131
141
if _ , err := conn .ExecuteFetch ("use @replica" , 1000 , true ); err != nil {
132
142
t .Logf ("error in use @primary: (%d, %v)" , conn .ID (), err )
133
143
}
144
+ query := fmt .Sprintf (queryTemp , tables [i % len (tables )])
134
145
for {
146
+ // t.Logf("query: %s", query)
135
147
if ctx .Err () != nil {
136
148
return
137
149
}
@@ -146,6 +158,6 @@ func startQueries(t *testing.T, ctx context.Context) {
146
158
}
147
159
// time.Sleep(10 * time.Millisecond)
148
160
}
149
- }(conn )
161
+ }(idx , conn )
150
162
}
151
163
}
0 commit comments