Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions go/mysql/binlog/rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,9 @@ func printTimestamp(v uint32) *bytes.Buffer {
}

t := time.Unix(int64(v), 0).UTC()
year, month, day := t.Date()
hour, minute, second := t.Clock()

result := &bytes.Buffer{}
fmt.Fprintf(result, "%04d-%02d-%02d %02d:%02d:%02d", year, int(month), day, hour, minute, second)
result.Write(t.AppendFormat(nil, sqltypes.TimestampFormat))
return result
}

Expand Down
9 changes: 9 additions & 0 deletions go/mysql/binlog/rbr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"

"github.com/stretchr/testify/assert"
)

func TestCellLengthAndData(t *testing.T) {
Expand Down Expand Up @@ -557,3 +559,10 @@ func TestCellLengthAndData(t *testing.T) {
}
}
}

func TestPrintTimestamp(t *testing.T) {
var timestamp uint32 = 1741794544

result := printTimestamp(timestamp).String()
assert.Equal(t, "2025-03-12 15:49:04", result)
}
2 changes: 1 addition & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type BinlogEvent interface {
// GTID returns the GTID from the event, and if this event
// also serves as a BEGIN statement.
// This is only valid if IsGTID() returns true.
GTID(BinlogFormat) (replication.GTID, bool, error)
GTID(BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error)
// Query returns a Query struct representing data from a QUERY_EVENT.
// This is only valid if IsQuery() returns true.
Query(BinlogFormat) (Query, error)
Expand Down
12 changes: 6 additions & 6 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func newFilePosBinlogEvent(buf []byte) *filePosBinlogEvent {
return &filePosBinlogEvent{binlogEvent: binlogEvent(buf)}
}

func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return nil, false, nil
func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return nil, false, 0, 0, nil
}

// IsSemiSyncAckRequested implements BinlogEvent.IsSemiSyncAckRequested().
Expand Down Expand Up @@ -224,8 +224,8 @@ func (ev filePosFakeEvent) Format() (BinlogFormat, error) {
return BinlogFormat{}, nil
}

func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return nil, false, nil
func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return nil, false, 0, 0, nil
}

func (ev filePosFakeEvent) Query(BinlogFormat) (Query, error) {
Expand Down Expand Up @@ -304,6 +304,6 @@ func (ev filePosGTIDEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, e
return ev, nil, nil
}

func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return ev.gtid, false, nil
func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return ev.gtid, false, 0, 0, nil
}
4 changes: 2 additions & 2 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
event, _, err := event.StripChecksum(f)
require.NoError(t, err, "StripChecksum failed: %v", err)

gtid, hasBegin, err := event.GTID(f)
gtid, hasBegin, _, _, err := event.GTID(f)
require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err)
require.True(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.")

Expand All @@ -178,7 +178,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
event, _, err = event.StripChecksum(f)
require.NoError(t, err, "StripChecksum failed: %v", err)

gtid, hasBegin, err = event.GTID(f)
gtid, hasBegin, _, _, err = event.GTID(f)
require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err)
require.False(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.")

Expand Down
7 changes: 4 additions & 3 deletions go/mysql/binlog_event_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,18 @@ func (ev mariadbBinlogEvent) IsGTID() bool {
// 8 sequence number
// 4 domain ID
// 1 flags2
func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, int64, int64, error) {
const FLStandalone = 1

data := ev.Bytes()[f.HeaderLength:]
flags2 := data[8+4]

return replication.MariadbGTID{
gtid := replication.MariadbGTID{
Sequence: binary.LittleEndian.Uint64(data[:8]),
Domain: binary.LittleEndian.Uint32(data[8 : 8+4]),
Server: ev.ServerID(),
}, flags2&FLStandalone == 0, nil
}
return gtid, flags2&FLStandalone == 0, 0, 0, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/binlog_event_mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestMariadbNotBeginGTID(t *testing.T) {
require.NoError(t, err)

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)}
_, got, err := input.GTID(f)
_, got, _, _, err := input.GTID(f)
require.NoError(t, err)
assert.False(t, got, "%#v", input)
}
Expand All @@ -70,7 +70,7 @@ func TestMariadbIsBeginGTID(t *testing.T) {
require.NoError(t, err)

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)}
_, got, err := input.GTID(f)
_, got, _, _, err := input.GTID(f)
require.NoError(t, err)
assert.True(t, got, "%#v", input)
}
Expand All @@ -81,7 +81,7 @@ func TestMariadbStandaloneBinlogEventGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)}
want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 9}
got, hasBegin, err := input.GTID(f)
got, hasBegin, _, _, err := input.GTID(f)
assert.NoError(t, err)
assert.False(t, hasBegin)
assert.Equal(t, want, got)
Expand All @@ -93,7 +93,7 @@ func TestMariadbBinlogEventGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)}
want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 10}
got, hasBegin, err := input.GTID(f)
got, hasBegin, _, _, err := input.GTID(f)
assert.NoError(t, err)
assert.True(t, hasBegin)
assert.Equal(t, want, got)
Expand Down
23 changes: 19 additions & 4 deletions go/mysql/binlog_event_mysql56.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,27 @@ func (ev mysql56BinlogEvent) IsGTID() bool {
// 1 flags
// 16 SID (server UUID)
// 8 GNO (sequence number, signed int)
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
// 1 lt_type
// 8 last_committed
// 8 sequence_number
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error) {
data := ev.Bytes()[f.HeaderLength:]
var sid replication.SID
copy(sid[:], data[1:1+16])
gno := int64(binary.LittleEndian.Uint64(data[1+16 : 1+16+8]))
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, nil
pos := 1
copy(sid[:], data[pos:pos+16])
pos += 16 // end of SID
gno := int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
pos += 8 // end of GNO
pos += 1 // end of lt_type
if len(data) >= pos+8 {
lastCommitted = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
}
pos += 8 // end of last_committed
if len(data) >= pos+8 {
sequenceNumber = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
}
// pos += 8 // end of sequence_number
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, lastCommitted, sequenceNumber, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
Expand Down
6 changes: 4 additions & 2 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// Sample event data for MySQL 5.6.
var (
mysql56FormatEvent = NewMysql56BinlogEvent([]byte{0x78, 0x4e, 0x49, 0x55, 0xf, 0x64, 0x0, 0x0, 0x0, 0x74, 0x0, 0x0, 0x0, 0x78, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x36, 0x2e, 0x32, 0x34, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x78, 0x4e, 0x49, 0x55, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x1, 0x18, 0x4a, 0xf, 0xca})
mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27})
mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* lt_type: */, 0x0 /* last_committed: */, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* sequence_number: */, 0x9, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27})
mysql56QueryEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
mysql56SemiSyncNoAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x00, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
mysql56SemiSyncAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x01, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
Expand Down Expand Up @@ -90,10 +90,12 @@ func TestMysql56GTID(t *testing.T) {
Server: replication.SID{0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a},
Sequence: 4,
}
got, hasBegin, err := input.GTID(format)
got, hasBegin, lastCommitted, sequenceNumber, err := input.GTID(format)
require.NoError(t, err, "GTID() error: %v", err)
assert.False(t, hasBegin, "GTID() returned hasBegin")
assert.Equal(t, want, got, "GTID() = %#v, want %#v", got, want)
assert.Equal(t, int64(7), lastCommitted)
assert.Equal(t, int64(9), sequenceNumber)
}

func TestMysql56DecodeTransactionPayload(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestRowReplicationWithRealDatabase(t *testing.T) {
switch {
case be.IsGTID():
// We expect one of these at least.
gtid, hasBegin, err := be.GTID(f)
gtid, hasBegin, _, _, err := be.GTID(f)
if err != nil {
t.Fatalf("GTID event is broken: %v", err)
}
Expand Down
16 changes: 16 additions & 0 deletions go/mysql/replication/filepos_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (gtid FilePosGTID) Flavor() string {
return FilePosFlavorID
}

func (gtid FilePosGTID) Empty() bool {
return gtid.File == ""
}

// SequenceDomain implements GTID.SequenceDomain().
func (gtid FilePosGTID) SequenceDomain() any {
return nil
Expand Down Expand Up @@ -137,6 +141,11 @@ func (gtid FilePosGTID) AddGTID(other GTID) GTIDSet {
return filePosOther
}

// AddGTIDInPlace implements GTIDSet.AddGTIDInPlace().
func (gtid FilePosGTID) AddGTIDInPlace(other GTID) GTIDSet {
return gtid.AddGTID(other)
}

// Union implements GTIDSet.Union().
func (gtid FilePosGTID) Union(other GTIDSet) GTIDSet {
filePosOther, ok := other.(FilePosGTID)
Expand All @@ -147,6 +156,13 @@ func (gtid FilePosGTID) Union(other GTIDSet) GTIDSet {
return filePosOther
}

// Union implements GTIDSet.Union().
func (gtid FilePosGTID) UnionInPlace(other GTIDSet) GTIDSet {
gtid = gtid.Union(other).(FilePosGTID)

return gtid
}

// Last returns last filePosition
// For filePos based GTID we have only one position
// here we will just return the current filePos
Expand Down
9 changes: 9 additions & 0 deletions go/mysql/replication/gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type GTIDSet interface {
// registered in the transactionSetParsers map.
Flavor() string

// Empty returns true when the GTID has no entries
Empty() bool

// ContainsGTID returns true if the set contains the specified transaction.
ContainsGTID(GTID) bool

Expand All @@ -47,9 +50,15 @@ type GTIDSet interface {
// AddGTID returns a new GTIDSet that is expanded to contain the given GTID.
AddGTID(GTID) GTIDSet

// AddGTID returns a new GTIDSet that is expanded to contain the given GTID, modifying the receiver GTID set
AddGTIDInPlace(GTID) GTIDSet

// Union returns a union of the receiver GTIDSet and the supplied GTIDSet.
Union(GTIDSet) GTIDSet

// UnionInPlace returns a union of the receiver GTIDSet and the supplied GTIDSet, modifying the receiver GTID set
UnionInPlace(GTIDSet) GTIDSet

// Union returns a union of the receiver GTIDSet and the supplied GTIDSet.
Last() string
}
Expand Down
11 changes: 7 additions & 4 deletions go/mysql/replication/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,22 @@ type fakeGTID struct {
func (f fakeGTID) String() string { return f.value }
func (f fakeGTID) Last() string { panic("not implemented") }
func (f fakeGTID) Flavor() string { return f.flavor }
func (f fakeGTID) Empty() bool { return false }
func (fakeGTID) SourceServer() any { return int(1) }
func (fakeGTID) SequenceNumber() any { return int(1) }
func (fakeGTID) SequenceDomain() any { return int(1) }
func (f fakeGTID) GTIDSet() GTIDSet { return nil }

func (fakeGTID) ContainsGTID(GTID) bool { return false }
func (fakeGTID) Contains(GTIDSet) bool { return false }
func (f fakeGTID) Union(GTIDSet) GTIDSet { return f }
func (fakeGTID) ContainsGTID(GTID) bool { return false }
func (fakeGTID) Contains(GTIDSet) bool { return false }
func (f fakeGTID) Union(GTIDSet) GTIDSet { return f }
func (f fakeGTID) UnionInPlace(GTIDSet) GTIDSet { return f }
func (f fakeGTID) Equal(other GTIDSet) bool {
otherFake, ok := other.(fakeGTID)
if !ok {
return false
}
return f == otherFake
}
func (fakeGTID) AddGTID(GTID) GTIDSet { return nil }
func (fakeGTID) AddGTID(GTID) GTIDSet { return nil }
func (fakeGTID) AddGTIDInPlace(GTID) GTIDSet { return nil }
24 changes: 24 additions & 0 deletions go/mysql/replication/mariadb_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (gtidSet MariadbGTIDSet) Flavor() string {
return MariadbFlavorID
}

func (gtidSet MariadbGTIDSet) Empty() bool {
return len(gtidSet) == 0
}

// ContainsGTID implements GTIDSet.ContainsGTID().
func (gtidSet MariadbGTIDSet) ContainsGTID(other GTID) bool {
if other == nil {
Expand Down Expand Up @@ -216,6 +220,19 @@ func (gtidSet MariadbGTIDSet) AddGTID(other GTID) GTIDSet {
return newSet
}

// AddGTID implements GTIDSet.AddGTID().
func (gtidSet MariadbGTIDSet) AddGTIDInPlace(other GTID) GTIDSet {
if other == nil {
return gtidSet
}
mdbOther, ok := other.(MariadbGTID)
if !ok {
return gtidSet
}
gtidSet.addGTID(mdbOther)
return gtidSet
}

// Union implements GTIDSet.Union(). This is a pure method, and does not mutate the receiver.
func (gtidSet MariadbGTIDSet) Union(other GTIDSet) GTIDSet {
if gtidSet == nil && other != nil {
Expand All @@ -237,6 +254,13 @@ func (gtidSet MariadbGTIDSet) Union(other GTIDSet) GTIDSet {
return newSet
}

// Union implements GTIDSet.Union().
func (gtid MariadbGTIDSet) UnionInPlace(other GTIDSet) GTIDSet {
gtid = gtid.Union(other).(MariadbGTIDSet)

return gtid
}

// Last returns the last gtid
func (gtidSet MariadbGTIDSet) Last() string {
// Sort domains so the string format is deterministic.
Expand Down
11 changes: 8 additions & 3 deletions go/mysql/replication/mysql56_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ var (
ErrExpectMysql56Flavor = vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "expected MySQL GTID position but found a different or invalid format.")
)

func ParseMysql56GTID(s string) (GTID, error) {
return parseMysql56GTID(s)
}

// parseMysql56GTID is registered as a GTID parser.
func parseMysql56GTID(s string) (GTID, error) {
// Split into parts.
Expand Down Expand Up @@ -94,8 +98,7 @@ func ParseSID(s string) (sid SID, err error) {
type Mysql56GTID struct {
// Server is the SID of the server that originally committed the transaction.
Server SID
// Sequence is the sequence number of the transaction within a given Server's
// scope.
// Sequence is the sequence number of the transaction within a given Server's scope.
Sequence int64
}

Expand Down Expand Up @@ -126,7 +129,9 @@ func (gtid Mysql56GTID) SequenceNumber() any {

// GTIDSet implements GTID.GTIDSet().
func (gtid Mysql56GTID) GTIDSet() GTIDSet {
return Mysql56GTIDSet{}.AddGTID(gtid)
return Mysql56GTIDSet{
gtid.Server: []interval{{start: gtid.Sequence, end: gtid.Sequence}},
}
}

func init() {
Expand Down
Loading
Loading