Skip to content

Commit

Permalink
Merge pull request vitessio#3939 from tinyspeck/add-timeout-mysql-query
Browse files Browse the repository at this point in the history
Adds timeout to mysql queries.
  • Loading branch information
demmer authored May 18, 2018
2 parents 3df4aa1 + 008ac81 commit 5718af7
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 8 deletions.
20 changes: 17 additions & 3 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (

mysqlConnReadTimeout = flag.Duration("mysql_server_read_timeout", 0, "connection read timeout")
mysqlConnWriteTimeout = flag.Duration("mysql_server_write_timeout", 0, "connection write timeout")
mysqlQueryTimeout = flag.Duration("mysql_server_query_timeout", 0, "mysql query timeout")

busyConnections int32
)
Expand All @@ -77,7 +78,14 @@ func (vh *vtgateHandler) NewConnection(c *mysql.Conn) {

func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
// Rollback if there is an ongoing transaction. Ignore error.
ctx := context.Background()
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
defer cancel()
} else {
ctx = context.Background()
}
session, _ := c.ClientData.(*vtgatepb.Session)
if session != nil {
if session.InTransaction {
Expand All @@ -88,8 +96,14 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
}

func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
// FIXME(alainjobart): Add some kind of timeout to the context.
ctx := context.Background()
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
defer cancel()
} else {
ctx = context.Background()
}

// Fill in the ImmediateCallerID with the UserData returned by
// the AuthServer plugin for that user. If nothing was
Expand Down
21 changes: 17 additions & 4 deletions go/vt/vtqueryserver/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (

mysqlConnReadTimeout = flag.Duration("mysql_server_read_timeout", 0, "connection read timeout")
mysqlConnWriteTimeout = flag.Duration("mysql_server_write_timeout", 0, "connection write timeout")
mysqlQueryTimeout = flag.Duration("mysql_server_query_timeout", 0, "mysql query timeout")
)

// proxyHandler implements the Listener interface.
Expand All @@ -71,17 +72,29 @@ func (mh *proxyHandler) NewConnection(c *mysql.Conn) {

func (mh *proxyHandler) ConnectionClosed(c *mysql.Conn) {
// Rollback if there is an ongoing transaction. Ignore error.
ctx := context.Background()
var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
defer cancel()
} else {
ctx = context.Background()
}
session, _ := c.ClientData.(*mysqlproxy.ProxySession)
if session != nil && session.TransactionID != 0 {
_ = mh.mp.Rollback(ctx, session)
}
}

func (mh *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
// FIXME(alainjobart): Add some kind of timeout to the context.
ctx := context.Background()

var ctx context.Context
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
ctx, cancel = context.WithTimeout(context.Background(), *mysqlQueryTimeout)
defer cancel()
} else {
ctx = context.Background()
}
// Fill in the ImmediateCallerID with the UserData returned by
// the AuthServer plugin for that user. If nothing was
// returned, use the User. This lets the plugin map a MySQL
Expand Down
15 changes: 14 additions & 1 deletion test/mysql_server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_mysql_connector(self):
fd.write("""{
"table_groups": [
{
"table_names_or_prefixes": ["vt_insert_test"],
"table_names_or_prefixes": ["vt_insert_test", "dual"],
"readers": ["vtgate client 1"],
"writers": ["vtgate client 1"],
"admins": ["vtgate client 1"]
Expand Down Expand Up @@ -143,6 +143,7 @@ def test_mysql_connector(self):
# start vtgate
utils.VtGate(mysql_server=True).start(
extra_args=['-mysql_auth_server_impl', 'static',
'-mysql_server_query_timeout', '1s',
'-mysql_auth_server_static_file', mysql_auth_server_static])
# We use gethostbyname('localhost') so we don't presume
# of the IP format (travis is only IP v4, really).
Expand Down Expand Up @@ -189,6 +190,18 @@ def test_mysql_connector(self):

conn.close()

# 'vtgate client' this query should timeout
conn = MySQLdb.Connect(**params)
try:
cursor = conn.cursor()
cursor.execute('SELECT SLEEP(5)', {})
self.fail('Execute went through')
except MySQLdb.OperationalError, e:
s = str(e)
# 1317 is DeadlineExceeded error code
self.assertIn('1317', s)
conn.close()

# 'vtgate client 2' is not authorized to access vt_insert_test
params['user'] = 'testuser2'
params['passwd'] = 'testpassword2'
Expand Down

0 comments on commit 5718af7

Please sign in to comment.