Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Flags:
--backup-storage-compress if set, the backup files will be compressed. (default true)
--backup-storage-number-blocks int if backup-storage-compress is true, backup-storage-number-blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-dump-authorized-users string List of users authorized to execute binlog dump operations, or '%' to allow all users.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog-player-protocol string the protocol to download binlogs from a vttablet (default "grpc")
--buffer-drain-concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1)
Expand Down Expand Up @@ -111,6 +112,7 @@ Flags:
--disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s)
--disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s)
--emit-stats If set, emit stats to push-based monitoring and stats backends
--enable-binlog-dump Allow users to perform binlog dump operations for CDC/replication
--enable-buffer Enable buffering (stalling) of primary traffic during failovers.
--enable-buffer-dry-run Detect and log failover events, but do not actually buffer requests.
--enable-consolidator This option enables the query consolidator. (default true)
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Flags:
--balancer-keyspaces strings Comma-separated list of keyspaces for which to use the balancer (optional). If empty, applies to all keyspaces.
--balancer-vtgate-cells strings Comma-separated list of cells that contain vttablets. For 'prefer-cell' mode, this is required. For 'random' mode, this is optional and filters tablets to those cells.
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-dump-authorized-users string List of users authorized to execute binlog dump operations, or '%' to allow all users.
--buffer-drain-concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1)
--buffer-keyspace-shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true.
--buffer-max-failover-duration duration Stop buffering completely if a failover takes longer than this duration. (default 20s)
Expand All @@ -57,6 +58,7 @@ Flags:
--discovery-low-replication-lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s)
--emit-stats If set, emit stats to push-based monitoring and stats backends
--enable-balancer (DEPRECATED: use --vtgate-balancer-mode instead) Enable the tablet balancer to evenly spread query load for a given tablet type
--enable-binlog-dump Allow users to perform binlog dump operations for CDC/replication
--enable-buffer Enable buffering (stalling) of primary traffic during failovers.
--enable-buffer-dry-run Detect and log failover events, but do not actually buffer requests.
--enable-direct-ddl Allow users to submit direct DDL statements (default true)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgateclienttest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Usage:
Flags:
--alsologtostderr log to standard error as well as files
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-dump-authorized-users string List of users authorized to execute binlog dump operations, or '%' to allow all users.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
Expand Down
24 changes: 14 additions & 10 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package mysql

import (
"encoding/binary"
"io"

"vitess.io/vitess/go/mysql/replication"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (c *Conn) parseComBinlogDump(data []byte) (logFile string, binlogPos uint32
return logFile, binlogPos, nil
}

func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint64, position replication.Position, err error) {
func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint64, position replication.Position, nonBlock bool, err error) {
// see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
pos := 1

Expand All @@ -62,31 +61,36 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6

fileNameLen, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
return logFile, logPos, position, nonBlock, readPacketErr
}
if pos+int(fileNameLen) > len(data) {
return logFile, logPos, position, nonBlock, readPacketErr
}
logFile = string(data[pos : pos+int(fileNameLen)])
pos += int(fileNameLen)

logPos, pos, ok = readUint64(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
return logFile, logPos, position, nonBlock, readPacketErr
}

dataSize, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
return logFile, logPos, position, nonBlock, readPacketErr
}
if pos+int(dataSize) > len(data) {
return logFile, logPos, position, nonBlock, readPacketErr
}
if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 {
gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes)
if err != nil {
return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
return logFile, logPos, position, nonBlock, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
}
// ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56
position = replication.Position{GTIDSet: gtid}
}
if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
}

return logFile, logPos, position, nil
nonBlock = flags2&BinlogDumpNonBlock != 0

return logFile, logPos, position, nonBlock, nil
}
53 changes: 44 additions & 9 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ func (c *Conn) endWriterBuffering() error {
c.bufferedWriter = nil
}()

c.flushTimer.Stop()
if c.flushTimer != nil {
c.flushTimer.Stop()
}
return c.bufferedWriter.Flush()
}

Expand Down Expand Up @@ -596,6 +598,21 @@ func (c *Conn) ReadPacket() ([]byte, error) {
return result, err
}

func (c *Conn) WritePacket(data []byte) error {
return c.writePacket(data)
}

// ReadOnePacket reads a single packet from the underlying connection without
// reassembling multi-packet messages. This is useful for streaming raw packets.
// Returns nil, nil for a zero-length packet (which follows a max-size packet).
func (c *Conn) ReadOnePacket() ([]byte, error) {
result, err := c.readOnePacket()
if err != nil {
return nil, sqlerror.NewSQLErrorf(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "%v", err)
}
return result, err
}

// writePacket writes a packet, possibly cutting it into multiple
// chunks. Note this is not very efficient, as the client probably
// has to build the []byte and that makes a memory copy.
Expand Down Expand Up @@ -876,6 +893,12 @@ func (c *Conn) writeErrorPacketFromError(err error) error {
return c.writeErrorPacket(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "unknown error: %v", err)
}

// WriteErrorPacketFromError is the exported version of writeErrorPacketFromError
// for use by external packages (e.g., vtgate's binlog dump handler).
func (c *Conn) WriteErrorPacketFromError(err error) error {
return c.writeErrorPacketFromError(err)
}

// writeEOFPacket writes an EOF packet, through the buffer, and
// doesn't flush (as it is used as part of a query result).
func (c *Conn) writeEOFPacket(flags uint16, warnings uint16) error {
Expand Down Expand Up @@ -974,11 +997,11 @@ func (c *Conn) handleComRegisterReplica(handler Handler, data []byte) (kontinue
return false
}
if err := handler.ComRegisterReplica(c, replicaHost, replicaPort, replicaUser, replicaPassword); err != nil {
c.writeErrorPacketFromError(err)
c.WriteErrorPacketFromError(err)
return false
}
if err := c.writeOKPacket(&PacketOK{}); err != nil {
c.writeErrorPacketFromError(err)
c.WriteErrorPacketFromError(err)
}
return true
}
Expand All @@ -997,11 +1020,17 @@ func (c *Conn) handleComBinlogDump(handler Handler, data []byte) (kontinue bool)

logfile, binlogPos, err := c.parseComBinlogDump(data)
if err != nil {
log.Errorf("conn %v: parseComBinlogDumpGTID failed: %v", c.ID(), err)
log.Errorf("conn %v: parseComBinlogDump failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
if err := handler.ComBinlogDump(c, logfile, binlogPos); err != nil {
log.Error(err.Error())
log.Errorf("conn %v: ComBinlogDump failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
return kontinue
Expand All @@ -1019,13 +1048,19 @@ func (c *Conn) handleComBinlogDumpGTID(handler Handler, data []byte) (kontinue b
}
}()

logFile, logPos, position, err := c.parseComBinlogDumpGTID(data)
logFile, logPos, position, nonBlock, err := c.parseComBinlogDumpGTID(data)
if err != nil {
log.Errorf("conn %v: parseComBinlogDumpGTID failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
if err := handler.ComBinlogDumpGTID(c, logFile, logPos, position.GTIDSet); err != nil {
log.Error(err.Error())
if err := handler.ComBinlogDumpGTID(c, logFile, logPos, position.GTIDSet, nonBlock); err != nil {
log.Errorf("conn %v: ComBinlogDumpGTID failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
return kontinue
Expand All @@ -1039,7 +1074,7 @@ func (c *Conn) handleComResetConnection(handler Handler) {
c.PrepareData = make(map[uint32]*PrepareData)
err := c.writeOKPacket(&PacketOK{})
if err != nil {
c.writeErrorPacketFromError(err)
c.WriteErrorPacketFromError(err)
}
}

Expand Down
26 changes: 25 additions & 1 deletion go/mysql/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,30 @@ func TestConnectionErrorWhileWritingComStmtExecute(t *testing.T) {
require.False(t, res, "we should beak the connection in case of error writing error packet")
}

func TestParseComBinlogDumpGTID(t *testing.T) {
sConn := newConn(testConn{}, DefaultFlushDelay, 0)

// Test packet structure (COM_BINLOG_DUMP_GTID):
// - 1 byte: command (0x1e)
// - 2 bytes: flags (0x0001 = NON_BLOCK)
// - 4 bytes: server_id (0)
// - 4 bytes: filename_len (24)
// - 24 bytes: filename ("vt_0000000100-bin.000001")
// - 8 bytes: log_pos (4)
// - 4 bytes: gtid_data_len (48)
// - 48 bytes: SID block for GTID "24bcf1e2-01e0-11ee-8c9c-0242ac120002:1-8"
input, err := hex.DecodeString("1e0100000000001800000076745f303030303030303130302d62696e2e303030303031040000000000000030000000010000000000000024bcf1e201e011ee8c9c0242ac120002010000000000000001000000000000000900000000000000")
require.NoError(t, err)

logFile, logPos, position, nonBlock, err := sConn.parseComBinlogDumpGTID(input)
require.NoError(t, err)

require.Equal(t, "vt_0000000100-bin.000001", logFile)
require.Equal(t, uint64(4), logPos)
require.True(t, nonBlock)
require.Equal(t, "24bcf1e2-01e0-11ee-8c9c-0242ac120002:1-8", position.String())
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randSeq(n int) string {
Expand Down Expand Up @@ -1151,7 +1175,7 @@ func (t testRun) ComBinlogDump(c *Conn, logFile string, binlogPos uint32) error
panic("implement me")
}

func (t testRun) ComBinlogDumpGTID(c *Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error {
func (t testRun) ComBinlogDumpGTID(c *Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet, nonBlock bool) error {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (db *DB) ComBinlogDump(c *mysql.Conn, logFile string, binlogPos uint32) err
}

// ComBinlogDumpGTID is part of the mysql.Handler interface.
func (db *DB) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error {
func (db *DB) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet, nonBlock bool) error {
return nil
}

Expand Down
32 changes: 24 additions & 8 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ type flavor interface {
// startSQLThreadCommand returns the command to start the replica's SQL thread only.
startSQLThreadCommand() string

// sendBinlogDumpCommand sends the packet required to start
// dumping binlogs from the specified location.
sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error
// sendBinlogDumpCommand sends the COM_BINLOG_DUMP packet to start
// dumping binlogs from the specified file and position.
// This is the original file/position-based protocol.
sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error

// sendBinlogDumpGTIDCommand sends the COM_BINLOG_DUMP_GTID packet to start
// dumping binlogs from the specified GTID position.
// If nonBlock is true, the server will return EOF when it reaches the end
// of the binlog instead of blocking and waiting for new events.
sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error

// readBinlogEvent reads the next BinlogEvent from the connection.
readBinlogEvent(c *Conn) (BinlogEvent, error)
Expand Down Expand Up @@ -348,15 +355,24 @@ func (c *Conn) StartSQLThreadCommand() string {
return c.flavor.startSQLThreadCommand()
}

// SendBinlogDumpCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP command to start dumping raw binlog
// SendBinlogDumpCommand sends the COM_BINLOG_DUMP command to start
// dumping raw binlog events over a server connection, starting at
// a given file and position. This is the original file/position-based protocol.
func (c *Conn) SendBinlogDumpCommand(serverID uint32, binlogFilename string, binlogPos uint32) error {
return c.flavor.sendBinlogDumpCommand(c, serverID, binlogFilename, binlogPos)
}

// SendBinlogDumpGTIDCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP_GTID command to start dumping raw binlog
// events over a server connection, starting at a given GTID.
func (c *Conn) SendBinlogDumpCommand(serverID uint32, binlogFilename string, startPos replication.Position) error {
return c.flavor.sendBinlogDumpCommand(c, serverID, binlogFilename, startPos)
// If nonBlock is true, the server will return EOF when it reaches the end
// of the binlog instead of blocking and waiting for new events.
func (c *Conn) SendBinlogDumpGTIDCommand(serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
return c.flavor.sendBinlogDumpGTIDCommand(c, serverID, binlogFilename, startPos, nonBlock)
}

// ReadBinlogEvent reads the next BinlogEvent. This must be used
// in conjunction with SendBinlogDumpCommand.
// in conjunction with SendBinlogDumpCommand or SendBinlogDumpGTIDCommand.
func (c *Conn) ReadBinlogEvent() (BinlogEvent, error) {
return c.flavor.readBinlogEvent(c)
}
Expand Down
9 changes: 8 additions & 1 deletion go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,14 @@ func (flv *filePosFlavor) startSQLThreadCommand() string {
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error {
flv.file = binlogFilename
return c.WriteComBinlogDump(serverID, binlogFilename, uint64(binlogPos), 0)
}

// sendBinlogDumpGTIDCommand is part of the Flavor interface.
// Note: nonBlock is not supported for file position based replication as it uses COM_BINLOG_DUMP.
func (flv *filePosFlavor) sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
rpos, ok := startPos.GTIDSet.(replication.FilePosGTID)
if !ok {
return fmt.Errorf("startPos.GTIDSet is wrong type - expected filePosGTID, got: %#v", startPos.GTIDSet)
Expand Down
8 changes: 7 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,13 @@ func (mariadbFlavor) startSQLThreadCommand() string {
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error {
return c.WriteComBinlogDump(serverID, binlogFilename, uint64(binlogPos), 0)
}

// sendBinlogDumpGTIDCommand is part of the Flavor interface.
// Note: nonBlock is not supported for MariaDB as it uses a different protocol (COM_BINLOG_DUMP).
func (mariadbFlavor) sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
// Tell the server that we understand GTIDs by setting
// mariadb_slave_capability to MARIA_SLAVE_CAPABILITY_GTID = 4 (MariaDB >= 10.0.1).
if _, err := c.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil {
Expand Down
Loading
Loading