Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package mysql

import (
"encoding/binary"
"io"

"vitess.io/vitess/go/mysql/replication"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (c *Conn) parseComBinlogDump(data []byte) (logFile string, binlogPos uint32
return logFile, binlogPos, nil
}

func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint64, position replication.Position, err error) {
func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint64, position replication.Position, nonBlock bool, err error) {
// see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
pos := 1

Expand All @@ -62,31 +61,36 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6

fileNameLen, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
return logFile, logPos, position, nonBlock, readPacketErr
}
if pos+int(fileNameLen) > len(data) {
return logFile, logPos, position, nonBlock, readPacketErr
}
logFile = string(data[pos : pos+int(fileNameLen)])
pos += int(fileNameLen)

logPos, pos, ok = readUint64(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
return logFile, logPos, position, nonBlock, readPacketErr
}

dataSize, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
return logFile, logPos, position, nonBlock, readPacketErr
}
if pos+int(dataSize) > len(data) {
return logFile, logPos, position, nonBlock, readPacketErr
}
if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 {
gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes)
if err != nil {
return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
return logFile, logPos, position, nonBlock, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
}
// ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56
position = replication.Position{GTIDSet: gtid}
}
if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
}

return logFile, logPos, position, nil
nonBlock = flags2&BinlogDumpNonBlock != 0

return logFile, logPos, position, nonBlock, nil
}
53 changes: 44 additions & 9 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ func (c *Conn) endWriterBuffering() error {
c.bufferedWriter = nil
}()

c.flushTimer.Stop()
if c.flushTimer != nil {
c.flushTimer.Stop()
}
return c.bufferedWriter.Flush()
}

Expand Down Expand Up @@ -596,6 +598,21 @@ func (c *Conn) ReadPacket() ([]byte, error) {
return result, err
}

func (c *Conn) WritePacket(data []byte) error {
return c.writePacket(data)
}

// ReadOnePacket reads a single packet from the underlying connection without
// reassembling multi-packet messages. This is useful for streaming raw packets.
// Returns nil, nil for a zero-length packet (which follows a max-size packet).
func (c *Conn) ReadOnePacket() ([]byte, error) {
result, err := c.readOnePacket()
if err != nil {
return nil, sqlerror.NewSQLErrorf(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "%v", err)
}
return result, err
}

// writePacket writes a packet, possibly cutting it into multiple
// chunks. Note this is not very efficient, as the client probably
// has to build the []byte and that makes a memory copy.
Expand Down Expand Up @@ -879,6 +896,12 @@ func (c *Conn) writeErrorPacketFromError(err error) error {
return c.writeErrorPacket(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "unknown error: %v", err)
}

// WriteErrorPacketFromError is the exported version of writeErrorPacketFromError
// for use by external packages (e.g., vtgate's binlog dump handler).
func (c *Conn) WriteErrorPacketFromError(err error) error {
return c.writeErrorPacketFromError(err)
}

// writeEOFPacket writes an EOF packet, through the buffer, and
// doesn't flush (as it is used as part of a query result).
func (c *Conn) writeEOFPacket(flags uint16, warnings uint16) error {
Expand Down Expand Up @@ -977,11 +1000,11 @@ func (c *Conn) handleComRegisterReplica(handler Handler, data []byte) (kontinue
return false
}
if err := handler.ComRegisterReplica(c, replicaHost, replicaPort, replicaUser, replicaPassword); err != nil {
c.writeErrorPacketFromError(err)
c.WriteErrorPacketFromError(err)
return false
}
if err := c.writeOKPacket(&PacketOK{}); err != nil {
c.writeErrorPacketFromError(err)
c.WriteErrorPacketFromError(err)
}
return true
}
Expand All @@ -1000,11 +1023,17 @@ func (c *Conn) handleComBinlogDump(handler Handler, data []byte) (kontinue bool)

logfile, binlogPos, err := c.parseComBinlogDump(data)
if err != nil {
log.Errorf("conn %v: parseComBinlogDumpGTID failed: %v", c.ID(), err)
log.Errorf("conn %v: parseComBinlogDump failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
if err := handler.ComBinlogDump(c, logfile, binlogPos); err != nil {
log.Error(err.Error())
log.Errorf("conn %v: ComBinlogDump failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
return kontinue
Expand All @@ -1022,13 +1051,19 @@ func (c *Conn) handleComBinlogDumpGTID(handler Handler, data []byte) (kontinue b
}
}()

logFile, logPos, position, err := c.parseComBinlogDumpGTID(data)
logFile, logPos, position, nonBlock, err := c.parseComBinlogDumpGTID(data)
if err != nil {
log.Errorf("conn %v: parseComBinlogDumpGTID failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
if err := handler.ComBinlogDumpGTID(c, logFile, logPos, position.GTIDSet); err != nil {
log.Error(err.Error())
if err := handler.ComBinlogDumpGTID(c, logFile, logPos, position.GTIDSet, nonBlock); err != nil {
log.Errorf("conn %v: ComBinlogDumpGTID failed: %v", c.ID(), err)
if writeErr := c.WriteErrorPacketFromError(err); writeErr != nil {
log.Errorf("conn %v: failed to write error packet: %v", c.ID(), writeErr)
}
return false
}
return kontinue
Expand All @@ -1042,7 +1077,7 @@ func (c *Conn) handleComResetConnection(handler Handler) {
c.PrepareData = make(map[uint32]*PrepareData)
err := c.writeOKPacket(&PacketOK{})
if err != nil {
c.writeErrorPacketFromError(err)
c.WriteErrorPacketFromError(err)
}
}

Expand Down
26 changes: 25 additions & 1 deletion go/mysql/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,30 @@ func TestConnectionErrorWhileWritingComStmtExecute(t *testing.T) {
require.False(t, res, "we should beak the connection in case of error writing error packet")
}

func TestParseComBinlogDumpGTID(t *testing.T) {
sConn := newConn(testConn{}, DefaultFlushDelay, 0)

// Test packet structure (COM_BINLOG_DUMP_GTID):
// - 1 byte: command (0x1e)
// - 2 bytes: flags (0x0001 = NON_BLOCK)
// - 4 bytes: server_id (0)
// - 4 bytes: filename_len (24)
// - 24 bytes: filename ("vt_0000000100-bin.000001")
// - 8 bytes: log_pos (4)
// - 4 bytes: gtid_data_len (48)
// - 48 bytes: SID block for GTID "24bcf1e2-01e0-11ee-8c9c-0242ac120002:1-8"
input, err := hex.DecodeString("1e0100000000001800000076745f303030303030303130302d62696e2e303030303031040000000000000030000000010000000000000024bcf1e201e011ee8c9c0242ac120002010000000000000001000000000000000900000000000000")
require.NoError(t, err)

logFile, logPos, position, nonBlock, err := sConn.parseComBinlogDumpGTID(input)
require.NoError(t, err)

require.Equal(t, "vt_0000000100-bin.000001", logFile)
require.Equal(t, uint64(4), logPos)
require.True(t, nonBlock)
require.Equal(t, "24bcf1e2-01e0-11ee-8c9c-0242ac120002:1-8", position.String())
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randSeq(n int) string {
Expand Down Expand Up @@ -1150,7 +1174,7 @@ func (t testRun) ComBinlogDump(c *Conn, logFile string, binlogPos uint32) error
panic("implement me")
}

func (t testRun) ComBinlogDumpGTID(c *Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error {
func (t testRun) ComBinlogDumpGTID(c *Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet, nonBlock bool) error {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (db *DB) ComBinlogDump(c *mysql.Conn, logFile string, binlogPos uint32) err
}

// ComBinlogDumpGTID is part of the mysql.Handler interface.
func (db *DB) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error {
func (db *DB) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet, nonBlock bool) error {
return nil
}

Expand Down
32 changes: 24 additions & 8 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ type flavor interface {
// startSQLThreadCommand returns the command to start the replica's SQL thread only.
startSQLThreadCommand() string

// sendBinlogDumpCommand sends the packet required to start
// dumping binlogs from the specified location.
sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error
// sendBinlogDumpCommand sends the COM_BINLOG_DUMP packet to start
// dumping binlogs from the specified file and position.
// This is the original file/position-based protocol.
sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error

// sendBinlogDumpGTIDCommand sends the COM_BINLOG_DUMP_GTID packet to start
// dumping binlogs from the specified GTID position.
// If nonBlock is true, the server will return EOF when it reaches the end
// of the binlog instead of blocking and waiting for new events.
sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error

// readBinlogEvent reads the next BinlogEvent from the connection.
readBinlogEvent(c *Conn) (BinlogEvent, error)
Expand Down Expand Up @@ -348,15 +355,24 @@ func (c *Conn) StartSQLThreadCommand() string {
return c.flavor.startSQLThreadCommand()
}

// SendBinlogDumpCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP command to start dumping raw binlog
// SendBinlogDumpCommand sends the COM_BINLOG_DUMP command to start
// dumping raw binlog events over a server connection, starting at
// a given file and position. This is the original file/position-based protocol.
func (c *Conn) SendBinlogDumpCommand(serverID uint32, binlogFilename string, binlogPos uint32) error {
return c.flavor.sendBinlogDumpCommand(c, serverID, binlogFilename, binlogPos)
}

// SendBinlogDumpGTIDCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP_GTID command to start dumping raw binlog
// events over a server connection, starting at a given GTID.
func (c *Conn) SendBinlogDumpCommand(serverID uint32, binlogFilename string, startPos replication.Position) error {
return c.flavor.sendBinlogDumpCommand(c, serverID, binlogFilename, startPos)
// If nonBlock is true, the server will return EOF when it reaches the end
// of the binlog instead of blocking and waiting for new events.
func (c *Conn) SendBinlogDumpGTIDCommand(serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
return c.flavor.sendBinlogDumpGTIDCommand(c, serverID, binlogFilename, startPos, nonBlock)
}

// ReadBinlogEvent reads the next BinlogEvent. This must be used
// in conjunction with SendBinlogDumpCommand.
// in conjunction with SendBinlogDumpCommand or SendBinlogDumpGTIDCommand.
func (c *Conn) ReadBinlogEvent() (BinlogEvent, error) {
return c.flavor.readBinlogEvent(c)
}
Expand Down
9 changes: 8 additions & 1 deletion go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,14 @@ func (flv *filePosFlavor) startSQLThreadCommand() string {
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error {
flv.file = binlogFilename
return c.WriteComBinlogDump(serverID, binlogFilename, uint64(binlogPos), 0)
}

// sendBinlogDumpGTIDCommand is part of the Flavor interface.
// Note: nonBlock is not supported for file position based replication as it uses COM_BINLOG_DUMP.
func (flv *filePosFlavor) sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
rpos, ok := startPos.GTIDSet.(replication.FilePosGTID)
if !ok {
return fmt.Errorf("startPos.GTIDSet is wrong type - expected filePosGTID, got: %#v", startPos.GTIDSet)
Expand Down
8 changes: 7 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ func (mariadbFlavor) startSQLThreadCommand() string {
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error {
return c.WriteComBinlogDump(serverID, binlogFilename, uint64(binlogPos), 0)
}

// sendBinlogDumpGTIDCommand is part of the Flavor interface.
// Note: nonBlock is not supported for MariaDB as it uses a different protocol (COM_BINLOG_DUMP).
func (mariadbFlavor) sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
// Tell the server that we understand GTIDs by setting
// mariadb_slave_capability to MARIA_SLAVE_CAPABILITY_GTID = 4 (MariaDB >= 10.0.1).
if _, err := c.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,12 @@ func (mysqlFlavor) resetReplicationParametersCommands(c *Conn) []string {
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, binlogPos uint32) error {
return c.WriteComBinlogDump(serverID, binlogFilename, uint64(binlogPos), 0)
}

// sendBinlogDumpGTIDCommand is part of the Flavor interface.
func (mysqlFlavor) sendBinlogDumpGTIDCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position, nonBlock bool) error {
gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet)
if !ok {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet)
Expand All @@ -230,6 +235,9 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam
sidBlock = gtidSet.SIDBlock()
}
var flags2 uint16
if nonBlock {
flags2 |= BinlogDumpNonBlock
}
if binlogFilename != "" {
flags2 |= BinlogThroughPosition
}
Expand Down
Loading