Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
18e5d4c
go/mysql: GTID event contains and returns lastCommitted and sequenceN…
shlomi-noach Jan 26, 2025
427349c
proto: VEventType has last_committed, sequence_number, must_save
shlomi-noach Jan 26, 2025
3242e14
adapt binlog_streamer
shlomi-noach Jan 26, 2025
78760b5
vstreamer reads and populates vevent.SequenceNumber, vevent.LastCommi…
shlomi-noach Jan 26, 2025
b5d19ce
benchmark CI job
shlomi-noach Jan 26, 2025
68506b6
VReplicationExperimentalFlagVPlayerParallel
shlomi-noach Jan 26, 2025
445571b
benchmark endtoend test
shlomi-noach Jan 26, 2025
37edb42
decorate events with EventGTID
shlomi-noach Jan 27, 2025
0d38020
decorate events with GTID
shlomi-noach Jan 27, 2025
d86850a
Enable writeset based binlog dependency tracking
mattlord Jan 28, 2025
b743481
worker-pos table
shlomi-noach Jan 28, 2025
715cda5
Merge branch 'parallel-vplayer' of github.com:planetscale/vitess into…
shlomi-noach Jan 28, 2025
6305ee1
make proto
shlomi-noach Feb 4, 2025
7b2e600
remove debug message (causes the benchmark to timeout)
shlomi-noach Feb 4, 2025
16d9f8d
vreplication controller has a dbClientGenerator; vplayer has initial …
shlomi-noach Feb 4, 2025
88fdaf5
initial code for parallel producer and parallel worker
shlomi-noach Feb 4, 2025
28e8716
store explicit GTID. not pos
shlomi-noach Feb 10, 2025
48c6f77
AppendGTIDSet
shlomi-noach Feb 10, 2025
8e9b661
read and write to vreplication_worker_pos
shlomi-noach Feb 10, 2025
fcf20f6
initial full implementation
shlomi-noach Feb 10, 2025
b923e15
fix query
shlomi-noach Feb 10, 2025
9caae85
check ctx.Done
shlomi-noach Feb 10, 2025
fd0a026
assign dbClientGen
shlomi-noach Feb 10, 2025
5fdc1ca
mark for completion only after throttling, or else cut-over is attemp…
shlomi-noach Feb 12, 2025
a89323f
include event's transaction timestamp
shlomi-noach Feb 12, 2025
1b6a828
read and write transaction timestamp
shlomi-noach Feb 12, 2025
24551c7
aggregateWorkersPos at the end of fetchAndApply goroutine
shlomi-noach Feb 12, 2025
b7dbd7a
more progress
shlomi-noach Feb 12, 2025
68d6d2a
sequentialize non-ROW events
shlomi-noach Feb 12, 2025
1a9c966
fix dbClient multithread use issue
shlomi-noach Feb 13, 2025
8f53bc0
better handle context timeout. No need to commit all
shlomi-noach Feb 16, 2025
a480d1d
only share first contiguous interval
shlomi-noach Feb 16, 2025
8aecc48
ensure rollback; no need to retry FIELD event as it's now sequentiali…
shlomi-noach Feb 16, 2025
9cd94e0
query to combine GTIDs from _vt.vreplication_worker_pos
shlomi-noach Feb 17, 2025
d704d70
GTIDSet: Empty() and InPlaceUnion() functions
shlomi-noach Feb 17, 2025
2373c6e
get rid of integer length
shlomi-noach Feb 17, 2025
e5eb846
get rid of worker's lastPos tracking. All tracking is in db
shlomi-noach Feb 17, 2025
a001729
sequentializing when rotating into a new binlog. Use MySQL query to a…
shlomi-noach Feb 17, 2025
b662663
use worker's commitFunc(). Send individual completed sequence numbers…
shlomi-noach Feb 17, 2025
710d3e0
clearer name
shlomi-noach Feb 17, 2025
af0ef08
faster bailout in Mysql56GTIDSet.Contains()
shlomi-noach Feb 17, 2025
3df5ee6
do not use InPlaceUnion()
shlomi-noach Feb 17, 2025
3a5781e
remove InPlaceUnion()
shlomi-noach Feb 17, 2025
9ebb1d5
reading completedSequenceNumbers in a goroutine, RWMutex protected. T…
shlomi-noach Feb 17, 2025
11b71dd
compute GTID aggregation query just once
shlomi-noach Feb 17, 2025
7497d2d
cleanup
shlomi-noach Feb 17, 2025
e4638aa
Throttler: reduce regexp/string allocations by pre-computing pascal case
shlomi-noach Feb 18, 2025
b0b0978
scope testing
shlomi-noach Feb 18, 2025
ab4c566
typo
shlomi-noach Feb 18, 2025
64dfb2e
reduce alloc/GC by not parsing worker GTID and using strings. Sending…
shlomi-noach Feb 18, 2025
b51f236
compute in-transation pos updates in-memory, and only apply at commit…
shlomi-noach Feb 18, 2025
708de8f
GenerateUpdateWorkerPos receives string arg
shlomi-noach Feb 18, 2025
2e9d393
Revert "remove InPlaceUnion()"
shlomi-noach Feb 18, 2025
5469e0d
worker appends pos in-place (small GTID evaluation optimization)
shlomi-noach Feb 18, 2025
77f4a3a
reintroduce AppendGTIDSetInPlace
shlomi-noach Feb 18, 2025
43c9e45
decorate VEvent with PinWorker
shlomi-noach Feb 18, 2025
29a2c5e
decorate VEvent with Skippable
shlomi-noach Feb 19, 2025
014fe16
remove unused function ReadVReplicationCombinedWorkersGTIDs
shlomi-noach Feb 19, 2025
632e265
Multiple improvements:
shlomi-noach Feb 19, 2025
a56023d
increase pos varchar length
shlomi-noach Feb 19, 2025
94948c7
increase varbinary length
shlomi-noach Feb 19, 2025
2d005f6
introduce, but not use yet, considerCommitWorkerEvent(). It doesn't s…
shlomi-noach Feb 19, 2025
89974db
add profiling; track vreplication lag
shlomi-noach Feb 19, 2025
1af732a
AddGTIDInPlace, AppendGTIDInPlace
shlomi-noach Feb 20, 2025
9e9f614
use AppendGTIDInPlace
shlomi-noach Feb 20, 2025
476ccd5
ParseMysql56GTID when we know the GTID is singular. Use AppendGTIDInP…
shlomi-noach Feb 20, 2025
ab673ad
remove VtLogStats use
shlomi-noach Feb 20, 2025
c072e25
preallocate relay log buffer (up to some limit)
shlomi-noach Feb 20, 2025
bcfa0b7
further improvements:
shlomi-noach Feb 20, 2025
4287377
simplify Mysql56GTID.GTIDSet()
shlomi-noach Feb 23, 2025
428b465
simplify logic: remove use of ConsiderCommitWorkerEvent, cap batched …
shlomi-noach Feb 23, 2025
58f1c99
increment assignSequence before assigning
shlomi-noach Feb 24, 2025
a9f8b83
increase maxIdleWorkerDuration
shlomi-noach Feb 24, 2025
5a998ab
increment assignSequence sooner
shlomi-noach Mar 12, 2025
faf9cdc
more benchmark knobs: yes/no to throttling online-dll; choosing workl…
shlomi-noach Mar 12, 2025
38446fc
resolved conflict
shlomi-noach Mar 12, 2025
065afa7
fix endtoend cluster/vtctld reference
shlomi-noach Mar 12, 2025
740d9ee
INSERT IGNORE rather than REPLACE vreplication_worker_pos, so as to p…
shlomi-noach Mar 17, 2025
98768c4
use time.Format
shlomi-noach Mar 17, 2025
9c8bc00
only compute time.Now and vs.eventGTID.String() once, instead of per …
shlomi-noach Mar 17, 2025
e58d3d1
respect Skippable flag, do not attempt to skip when Skippable=false
shlomi-noach Mar 17, 2025
b0a79e9
producer fixes:
shlomi-noach Mar 17, 2025
5c8a6cf
workers block until dependent transaction is committed
shlomi-noach Mar 19, 2025
5ea998b
reduce logging
shlomi-noach Mar 20, 2025
fcad0ba
Merge branch 'main' into parallel-vplayer
shlomi-noach Mar 20, 2025
db740e0
Multiple producer/worker improvements:
shlomi-noach Mar 24, 2025
db7f2a7
configurable 'useBatchCommits' in parallel applier
shlomi-noach Apr 6, 2025
011aa69
Merge branch 'main' into parallel-vplayer
shlomi-noach Apr 23, 2025
af832a0
resolved conflicts
shlomi-noach Aug 25, 2025
9ec2b42
resolved conflicts
shlomi-noach Oct 19, 2025
ab8eaf5
Merge remote-tracking branch 'origin/main' into parallel-vplayer
mattlord Dec 11, 2025
99b3dae
Fix linter warning
mattlord Dec 12, 2025
40b0947
Update workflow
mattlord Dec 12, 2025
9c55185
Small changes
mattlord Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions .github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"

name: Cluster (onlineddl_vrepl_bench)
on: [push, pull_request]
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (onlineddl_vrepl_bench)')
cancel-in-progress: true

permissions: read-all

env:
LAUNCHABLE_ORGANIZATION: "vitess"
LAUNCHABLE_WORKSPACE: "vitess-app"
GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}"

jobs:
build:
timeout-minutes: 60
name: Run endtoend tests on Cluster (onlineddl_vrepl_bench)
runs-on: gh-hosted-runners-16cores-1-24.04

steps:
- name: Skip CI
run: |
if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then
echo "skipping CI due to the 'Skip CI' label"
exit 1
fi

- name: Check if workflow needs to be skipped
id: skip-workflow
run: |
skip='false'
if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then
skip='true'
fi
echo Skip ${skip}
echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT

PR_DATA=$(curl -s\
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}")
draft=$(echo "$PR_DATA" | jq .draft -r)
echo "is_draft=${draft}" >> $GITHUB_OUTPUT

- name: Check out code
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
persist-credentials: 'false'

- name: Check for changes in relevant files
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: dorny/paths-filter@ebc4d7e9ebcb0b1eb21480bb8f43113e996ac77a # v3.0.1
id: changes
with:
token: ''
filters: |
end_to_end:
- 'test/config.json'
- 'go/**/*.go'
- 'go/vt/sidecardb/**/*.sql'
- 'go/test/endtoend/onlineddl/vrepl_suite/**'
- 'test.go'
- 'Makefile'
- 'build.env'
- 'go.sum'
- 'go.mod'
- 'proto/*.proto'
- 'tools/**'
- 'config/**'
- 'bootstrap.sh'
- '.github/workflows/cluster_endtoend_onlineddl_vrepl_bench.yml'
- 'go/test/endtoend/onlineddl/vrepl_suite/testdata'

- name: Set up Go
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
with:
go-version-file: go.mod

- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1

- name: Tune the OS
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
run: |
# Limit local port range to not use ports that overlap with server side
# ports that we listen on.
sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535"
# Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio
echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p /etc/sysctl.conf

- name: Get dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 10
run: |

# Get key to latest MySQL repo
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys A8D3785C
# Setup MySQL 8.0
wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.33-1_all.deb
echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections
sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config*
sudo apt-get -qq update

# We have to install this old version of libaio1 in case we end up testing with MySQL 5.7. See also:
# https://bugs.launchpad.net/ubuntu/+source/libaio/+bug/2067501
curl -L -O http://mirrors.kernel.org/ubuntu/pool/main/liba/libaio/libaio1_0.3.112-13build1_amd64.deb
sudo dpkg -i libaio1_0.3.112-13build1_amd64.deb
# libtinfo5 is also needed for older MySQL 5.7 builds.
curl -L -O http://mirrors.kernel.org/ubuntu/pool/universe/n/ncurses/libtinfo5_6.3-2ubuntu0.1_amd64.deb
sudo dpkg -i libtinfo5_6.3-2ubuntu0.1_amd64.deb

# Install everything else we need, and configure
sudo apt-get -qq install -y mysql-server mysql-shell mysql-client make unzip g++ etcd-client etcd-server curl git wget eatmydata xz-utils libncurses6

sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download

# install JUnit report formatter
go install github.com/vitessio/go-junit-report@HEAD

- name: Setup launchable dependencies
if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main'
run: |
# Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up
pip3 install --user launchable~=1.0 > /dev/null

# verify that launchable setup is all correct.
launchable verify || true

# Tell Launchable about the build you are producing and testing
launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source .

- name: Run cluster endtoend test
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 45
run: |
# We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file
# which musn't be more than 107 characters long.
export VTDATAROOT="/tmp/"
source build.env

set -exo pipefail

cat <<-EOF>>./config/mycnf/mysql8026.cnf
binlog-transaction-compression=ON
EOF

cat <<-EOF>>./config/mycnf/mysql8026.cnf
binlog-row-value-options=PARTIAL_JSON
EOF

# run the tests however you normally do, then produce a JUnit XML file
eatmydata -- go run test.go -docker=false -follow -shard onlineddl_vrepl_bench | tee -a output.txt | go-junit-report -set-exit-code > report.xml

- name: Print test output and Record test result in launchable if PR is not a draft
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always()
run: |
if [[ "${{steps.skip-workflow.outputs.is_draft}}" == "false" ]]; then
# send recorded tests to launchable
launchable record tests --build "$GITHUB_RUN_ID" go-test . || true
fi

# print test output
cat output.txt

- name: Test Summary
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always()
uses: test-summary/action@31493c76ec9e7aa675f1585d3ed6f1da69269a86 # v2.4
with:
paths: "report.xml"
show: "fail"
6 changes: 6 additions & 0 deletions config/mycnf/mysql8026.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ super-read-only
# Replication parameters to ensure reparents are fast.
replica_net_timeout = 8


binlog_transaction_dependency_tracking=WRITESET
slave_preserve_commit_order=ON
slave_parallel_type=LOGICAL_CLOCK
transaction_write_set_extraction=XXHASH64

2 changes: 1 addition & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type BinlogEvent interface {
// GTID returns the GTID from the event, and if this event
// also serves as a BEGIN statement.
// This is only valid if IsGTID() returns true.
GTID(BinlogFormat) (replication.GTID, bool, error)
GTID(BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error)
// Query returns a Query struct representing data from a QUERY_EVENT.
// This is only valid if IsQuery() returns true.
Query(BinlogFormat) (Query, error)
Expand Down
12 changes: 6 additions & 6 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func newFilePosBinlogEvent(buf []byte) *filePosBinlogEvent {
return &filePosBinlogEvent{binlogEvent: binlogEvent(buf)}
}

func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return nil, false, nil
func (*filePosBinlogEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return nil, false, 0, 0, nil
}

// IsSemiSyncAckRequested implements BinlogEvent.IsSemiSyncAckRequested().
Expand Down Expand Up @@ -224,8 +224,8 @@ func (ev filePosFakeEvent) Format() (BinlogFormat, error) {
return BinlogFormat{}, nil
}

func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return nil, false, nil
func (ev filePosFakeEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return nil, false, 0, 0, nil
}

func (ev filePosFakeEvent) Query(BinlogFormat) (Query, error) {
Expand Down Expand Up @@ -304,6 +304,6 @@ func (ev filePosGTIDEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, e
return ev, nil, nil
}

func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, error) {
return ev.gtid, false, nil
func (ev filePosGTIDEvent) GTID(BinlogFormat) (replication.GTID, bool, int64, int64, error) {
return ev.gtid, false, 0, 0, nil
}
4 changes: 2 additions & 2 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
event, _, err := event.StripChecksum(f)
require.NoError(t, err, "StripChecksum failed: %v", err)

gtid, hasBegin, err := event.GTID(f)
gtid, hasBegin, _, _, err := event.GTID(f)
require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err)
require.True(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.")

Expand All @@ -178,7 +178,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
event, _, err = event.StripChecksum(f)
require.NoError(t, err, "StripChecksum failed: %v", err)

gtid, hasBegin, err = event.GTID(f)
gtid, hasBegin, _, _, err = event.GTID(f)
require.NoError(t, err, "NewMariaDBGTIDEvent().GTID() returned error: %v", err)
require.False(t, hasBegin, "NewMariaDBGTIDEvent() didn't store hasBegin properly.")

Expand Down
7 changes: 4 additions & 3 deletions go/mysql/binlog_event_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,18 @@ func (ev mariadbBinlogEvent) IsGTID() bool {
// 8 sequence number
// 4 domain ID
// 1 flags2
func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
func (ev mariadbBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, int64, int64, error) {
const FLStandalone = 1

data := ev.Bytes()[f.HeaderLength:]
flags2 := data[8+4]

return replication.MariadbGTID{
gtid := replication.MariadbGTID{
Sequence: binary.LittleEndian.Uint64(data[:8]),
Domain: binary.LittleEndian.Uint32(data[8 : 8+4]),
Server: ev.ServerID(),
}, flags2&FLStandalone == 0, nil
}
return gtid, flags2&FLStandalone == 0, 0, 0, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/binlog_event_mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestMariadbNotBeginGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)}
want := false
if _, got, err := input.GTID(f); got != want {
if _, got, _, _, err := input.GTID(f); got != want {
t.Errorf("%#v.GTID() = %v (%v), want %v", input, got, err, want)
}
}
Expand All @@ -88,7 +88,7 @@ func TestMariadbIsBeginGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)}
want := true
if _, got, err := input.GTID(f); got != want {
if _, got, _, _, err := input.GTID(f); got != want {
t.Errorf("%#v.IsBeginGTID() = %v (%v), want %v", input, got, err, want)
}
}
Expand All @@ -102,7 +102,7 @@ func TestMariadbStandaloneBinlogEventGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbStandaloneGTIDEvent)}
want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 9}
got, hasBegin, err := input.GTID(f)
got, hasBegin, _, _, err := input.GTID(f)
assert.NoError(t, err, "unexpected error: %v", err)
assert.False(t, hasBegin, "unexpected hasBegin")
assert.True(t, reflect.DeepEqual(got, want), "%#v.GTID() = %#v, want %#v", input, got, want)
Expand All @@ -118,7 +118,7 @@ func TestMariadbBinlogEventGTID(t *testing.T) {

input := mariadbBinlogEvent{binlogEvent: binlogEvent(mariadbBeginGTIDEvent)}
want := replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 10}
got, hasBegin, err := input.GTID(f)
got, hasBegin, _, _, err := input.GTID(f)
assert.NoError(t, err, "unexpected error: %v", err)
assert.True(t, hasBegin, "unexpected !hasBegin")
assert.True(t, reflect.DeepEqual(got, want), "%#v.GTID() = %#v, want %#v", input, got, want)
Expand Down
23 changes: 19 additions & 4 deletions go/mysql/binlog_event_mysql56.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,27 @@ func (ev mysql56BinlogEvent) IsGTID() bool {
// 1 flags
// 16 SID (server UUID)
// 8 GNO (sequence number, signed int)
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
// 1 lt_type
// 8 last_committed
// 8 sequence_number
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (gtid replication.GTID, hasBegin bool, lastCommitted int64, sequenceNumber int64, err error) {
data := ev.Bytes()[f.HeaderLength:]
var sid replication.SID
copy(sid[:], data[1:1+16])
gno := int64(binary.LittleEndian.Uint64(data[1+16 : 1+16+8]))
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, nil
pos := 1
copy(sid[:], data[pos:pos+16])
pos += 16 // end of SID
gno := int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
pos += 8 // end of GNO
pos += 1 // end of lt_type
if len(data) >= pos+8 {
lastCommitted = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
}
pos += 8 // end of last_committed
if len(data) >= pos+8 {
sequenceNumber = int64(binary.LittleEndian.Uint64(data[pos : pos+8]))
}
// pos += 8 // end of sequence_number
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, lastCommitted, sequenceNumber, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
Expand Down
6 changes: 4 additions & 2 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// Sample event data for MySQL 5.6.
var (
mysql56FormatEvent = NewMysql56BinlogEvent([]byte{0x78, 0x4e, 0x49, 0x55, 0xf, 0x64, 0x0, 0x0, 0x0, 0x74, 0x0, 0x0, 0x0, 0x78, 0x0, 0x0, 0x0, 0x1, 0x0, 0x4, 0x0, 0x35, 0x2e, 0x36, 0x2e, 0x32, 0x34, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x78, 0x4e, 0x49, 0x55, 0x13, 0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x1, 0x18, 0x4a, 0xf, 0xca})
mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27})
mysql56GTIDEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x21, 0x64, 0x0, 0x0, 0x0, 0x30, 0x0, 0x0, 0x0, 0xf5, 0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* lt_type: */, 0x0 /* last_committed: */, 0x7, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 /* sequence_number: */, 0x9, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x48, 0x45, 0x82, 0x27})
mysql56QueryEvent = NewMysql56BinlogEvent([]byte{0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
mysql56SemiSyncNoAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x00, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
mysql56SemiSyncAckQueryEvent = NewMysql56BinlogEvent([]byte{0xef, 0x01, 0xff, 0x4e, 0x49, 0x55, 0x2, 0x64, 0x0, 0x0, 0x0, 0x77, 0x0, 0x0, 0x0, 0xdb, 0x3, 0x0, 0x0, 0x0, 0x0, 0x3d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x21, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x20, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x3, 0x73, 0x74, 0x64, 0x4, 0x8, 0x0, 0x8, 0x0, 0x21, 0x0, 0xc, 0x1, 0x74, 0x65, 0x73, 0x74, 0x0, 0x74, 0x65, 0x73, 0x74, 0x0, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x20, 0x69, 0x6e, 0x74, 0x6f, 0x20, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x20, 0x28, 0x6d, 0x73, 0x67, 0x29, 0x20, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x20, 0x28, 0x27, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x27, 0x29, 0x92, 0x12, 0x79, 0xc3})
Expand Down Expand Up @@ -90,10 +90,12 @@ func TestMysql56GTID(t *testing.T) {
Server: replication.SID{0x43, 0x91, 0x92, 0xbd, 0xf3, 0x7c, 0x11, 0xe4, 0xbb, 0xeb, 0x2, 0x42, 0xac, 0x11, 0x3, 0x5a},
Sequence: 4,
}
got, hasBegin, err := input.GTID(format)
got, hasBegin, lastCommitted, sequenceNumber, err := input.GTID(format)
require.NoError(t, err, "GTID() error: %v", err)
assert.False(t, hasBegin, "GTID() returned hasBegin")
assert.Equal(t, want, got, "GTID() = %#v, want %#v", got, want)
assert.Equal(t, int64(7), lastCommitted)
assert.Equal(t, int64(9), sequenceNumber)
}

func TestMysql56DecodeTransactionPayload(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestRowReplicationWithRealDatabase(t *testing.T) {
switch {
case be.IsGTID():
// We expect one of these at least.
gtid, hasBegin, err := be.GTID(f)
gtid, hasBegin, _, _, err := be.GTID(f)
if err != nil {
t.Fatalf("GTID event is broken: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions go/mysql/replication/mysql56_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ func ParseSID(s string) (sid SID, err error) {
type Mysql56GTID struct {
// Server is the SID of the server that originally committed the transaction.
Server SID
// Sequence is the sequence number of the transaction within a given Server's
// scope.
// Sequence is the sequence number of the transaction within a given Server's scope.
Sequence int64
}

Expand Down
Loading
Loading