Skip to content

Commit

Permalink
Use etcd to speed up DDL process
Browse files Browse the repository at this point in the history
Add the mock owner-manager and schema-syncer and enable using etcd to speed up DDL process by default.
  • Loading branch information
zimulala authored and shenli committed Jun 4, 2017
1 parent 7a872e4 commit 62450b9
Show file tree
Hide file tree
Showing 26 changed files with 575 additions and 523 deletions.
17 changes: 4 additions & 13 deletions ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (d *ddl) handleBgJobQueue() error {

job := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, bgJobFlag)
if err != nil {
return errors.Trace(filterError(err, errNotOwner))
if !d.isOwner(bgJobFlag) {
return nil
}

var err error
t := meta.NewMeta(txn)
// Get the first background job and run it.
job, err = d.getFirstBgJob(t)
if err != nil {
Expand All @@ -52,15 +52,6 @@ func (d *ddl) handleBgJobQueue() error {
} else {
err = d.updateBgJob(t, job)
}
if err != nil {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetBgJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
Expand Down
55 changes: 31 additions & 24 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ type DDL interface {
Stop() error
// RegisterEventCh registers event channel for ddl.
RegisterEventCh(chan<- *Event)
// SchemaVersionSyncer gets the schema version syncer.
SchemaVersionSyncer() *schemaVersionSyncer
// SchemaSyncer gets the schema syncer.
SchemaSyncer() SchemaSyncer
}

// Event is an event that a ddl operation happened.
Expand Down Expand Up @@ -159,12 +159,12 @@ func (e *Event) String() string {
type ddl struct {
m sync.RWMutex

infoHandle *infoschema.Handle
hook Callback
hookMu sync.RWMutex
store kv.Storage
// worker is used for electing the owner.
worker *worker
infoHandle *infoschema.Handle
hook Callback
hookMu sync.RWMutex
store kv.Storage
ownerManager OwnerManager
schemaSyncer SchemaSyncer
// lease is schema seconds.
lease time.Duration
uuid string
Expand Down Expand Up @@ -228,15 +228,17 @@ func newDDL(ctx goctx.Context, etcdCli *clientv3.Client, store kv.Storage,

id := uuid.NewV4().String()
ctx, cancelFunc := goctx.WithCancel(ctx)
worker := &worker{
schemaVersionSyncer: &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", ddlAllSchemaVersions, id),
},
ddlID: id,
cancel: cancelFunc,
var manager OwnerManager
var syncer SchemaSyncer
// If etcdCli is nil, it's the local store, so use the mockOwnerManager and mockSchemaSyncer.
// It's always used for testing.
if etcdCli == nil {
manager = NewMockOwnerManager(id, cancelFunc)
syncer = NewMockSchemaSyncer()
} else {
manager = NewOwnerManager(etcdCli, id, cancelFunc)
syncer = NewSchemaSyncer(etcdCli, id)
}

d := &ddl{
infoHandle: infoHandle,
hook: hook,
Expand All @@ -246,7 +248,8 @@ func newDDL(ctx goctx.Context, etcdCli *clientv3.Client, store kv.Storage,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
bgJobCh: make(chan struct{}, 1),
worker: worker,
ownerManager: manager,
schemaSyncer: syncer,
}

d.start(ctx)
Expand Down Expand Up @@ -300,9 +303,7 @@ func (d *ddl) Stop() error {

func (d *ddl) start(ctx goctx.Context) {
d.quitCh = make(chan struct{})
if ChangeOwnerInNewWay {
d.campaignOwners(ctx)
}
d.ownerManager.CampaignOwners(ctx, &d.wait)

d.wait.Add(2)
go d.onBackgroundWorker()
Expand All @@ -320,7 +321,11 @@ func (d *ddl) close() {
}

close(d.quitCh)
d.worker.cancel()
err := d.schemaSyncer.RemoveSelfVersionPath()
if err != nil {
log.Errorf("[ddl] remove self version path failed %v", err)
}
d.ownerManager.Cancel()

d.wait.Wait()
log.Infof("close DDL:%s", d.uuid)
Expand Down Expand Up @@ -379,8 +384,9 @@ func (d *ddl) genGlobalID() (int64, error) {
return globalID, errors.Trace(err)
}

func (d *ddl) SchemaVersionSyncer() *schemaVersionSyncer {
return d.worker.schemaVersionSyncer
// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() SchemaSyncer {
return d.schemaSyncer
}

func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
Expand All @@ -403,7 +409,8 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
jobID := job.ID
// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
// But we use etcd to speed up, normally it takes less than 1s now, so we use 3s as the max value.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 3*time.Second))
startTime := time.Now()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc()
defer func() {
Expand Down
152 changes: 26 additions & 126 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func (d *ddl) onDDLWorker() {
}

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
// every 2 * lease time. If lease is 0, we will use default 1s.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value.
checkTime := chooseLeaseTime(2*d.lease, 1*time.Second)

ticker := time.NewTicker(checkTime)
defer ticker.Stop()
Expand Down Expand Up @@ -68,93 +69,15 @@ func asyncNotify(ch chan struct{}) {
}
}

const maxOwnerTimeout = int64(20 * time.Minute)

// We define minBgOwnerTimeout and minDDLOwnerTimeout as variable,
// because we need to change them in test.
var (
minBgOwnerTimeout = int64(20 * time.Second)
minDDLOwnerTimeout = int64(4 * time.Second)
)

func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 {
// we must wait 2 * lease time to guarantee other servers update the schema,
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
timeout := int64(4 * d.lease)
if timeout > maxOwnerTimeout {
return maxOwnerTimeout
}

// The value of lease may be less than 1 second, so the operation of
// checking owner is frequent and it isn't necessary.
// So if timeout is less than 4 second, we will use default minDDLOwnerTimeout.
if flag == ddlJobFlag && timeout < minDDLOwnerTimeout {
return minDDLOwnerTimeout
}
if flag == bgJobFlag && timeout < minBgOwnerTimeout {
// Background job is serial processing, so we can extend the owner timeout to make sure
// a batch of rows will be processed before timeout.
// If timeout is less than maxBgOwnerTimeout, we will use default minBgOwnerTimeout.
return minBgOwnerTimeout
}
return timeout
}

func (d *ddl) isOwner(flag JobType) error {
func (d *ddl) isOwner(flag JobType) bool {
if flag == ddlJobFlag {
if d.worker.isOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}
if d.worker.isBgOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}

func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
if ChangeOwnerInNewWay {
return nil, errors.Trace(d.isOwner(flag))
}
owner, err := d.getJobOwner(t, flag)
if err != nil {
return nil, errors.Trace(err)
}
if owner == nil {
owner = &model.Owner{}
// try to set onwer
owner.OwnerID = d.uuid
}

now := time.Now().UnixNano()
maxTimeout := d.getCheckOwnerTimeout(flag)
sub := now - owner.LastUpdateTS
if owner.OwnerID == d.uuid || sub > maxTimeout {
owner.OwnerID = d.uuid
owner.LastUpdateTS = now
// update status.
switch flag {
case ddlJobFlag:
err = t.SetDDLJobOwner(owner)
case bgJobFlag:
err = t.SetBgJobOwner(owner)
}
if err != nil {
return nil, errors.Trace(err)
}
log.Debugf("[ddl] become %s job owner, owner is %s sub %vs", flag, owner, sub/1e9)
}

if owner.OwnerID != d.uuid {
log.Debugf("[ddl] not %s job owner, self id %s owner is %s", flag, d.uuid, owner.OwnerID)
return nil, errors.Trace(errNotOwner)
isOwner := d.ownerManager.IsOwner()
log.Debugf("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid)
return isOwner
}

return owner, nil
isOwner := d.ownerManager.IsBgOwner()
log.Debugf("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid)
return isOwner
}

func (d *ddl) getJobOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
Expand Down Expand Up @@ -258,6 +181,7 @@ func (j JobType) String() string {
}

func (d *ddl) handleDDLJobQueue() error {
once := true
for {
if d.isClosed() {
return nil
Expand All @@ -267,15 +191,13 @@ func (d *ddl) handleDDLJobQueue() error {
var job *model.Job
var schemaVer int64
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlJobFlag)
if terror.ErrorEqual(err, errNotOwner) {
// We are not owner, return and retry checking later.
// We are not owner, return and retry checking later.
if !d.isOwner(ddlJobFlag) {
return nil
} else if err != nil {
return errors.Trace(err)
}

var err error
t := meta.NewMeta(txn)
// We become the owner. Get the first job and run it.
job, err = d.getFirstDDLJob(t)
if job == nil || err != nil {
Expand All @@ -288,13 +210,16 @@ func (d *ddl) handleDDLJobQueue() error {
// let other servers update the schema.
// So here we must check the elapsed time from last update, if < 2 * lease, we must
// wait again.
// TODO: Check all versions to handle this.
elapsed := time.Duration(int64(txn.StartTS()) - job.LastUpdateTS)
if elapsed > 0 && elapsed < waitTime {
if once && elapsed > 0 && elapsed < waitTime {
log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
waitTime -= elapsed
time.Sleep(time.Millisecond)
return nil
}
}
once = false

d.hookMu.Lock()
d.hook.OnJobRunBefore(job)
Expand All @@ -309,15 +234,6 @@ func (d *ddl) handleDDLJobQueue() error {
} else {
err = d.updateDDLJob(t, job, txn.StartTS())
}
if err != nil {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
Expand All @@ -335,14 +251,7 @@ func (d *ddl) handleDDLJobQueue() error {
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
switch job.Type {
case model.ActionCreateSchema, model.ActionDropSchema, model.ActionCreateTable,
model.ActionTruncateTable, model.ActionDropTable:
// Do not need to wait for those DDL, because those DDL do not need to modify data,
// So there is no data inconsistent issue.
default:
d.waitSchemaChanged(waitTime, schemaVer)
}
d.waitSchemaChanged(waitTime, schemaVer)
}
if job.IsFinished() {
d.startBgJob(job.Type)
Expand All @@ -351,12 +260,11 @@ func (d *ddl) handleDDLJobQueue() error {
}
}

func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
if n1 > 0 {
return n1
func chooseLeaseTime(t, max time.Duration) time.Duration {
if t == 0 || t > max {
return max
}

return n2
return t
}

// runDDLJob runs a DDL job. It returns the current schema version in this transaction.
Expand Down Expand Up @@ -439,14 +347,6 @@ func (d *ddl) waitSchemaChanged(waitTime time.Duration, latestSchemaVersion int6
return
}

if !ChangeOwnerInNewWay {
select {
case <-time.After(waitTime):
case <-d.quitCh:
}
return
}

// TODO: Do we need to wait for a while?
if latestSchemaVersion == 0 {
log.Infof("[ddl] schema version doesn't change")
Expand All @@ -455,12 +355,12 @@ func (d *ddl) waitSchemaChanged(waitTime time.Duration, latestSchemaVersion int6
// TODO: Make ctx exits when the d is close.
ctx, cancelFunc := goctx.WithTimeout(goctx.Background(), waitTime)
defer cancelFunc()
err := d.worker.updateGlobalVersion(ctx, latestSchemaVersion)
err := d.schemaSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion)
if err != nil {
log.Infof("[ddl] update latest schema version %d failed %v", latestSchemaVersion, err)
}

err = d.worker.checkAllVersions(ctx, latestSchemaVersion)
err = d.schemaSyncer.OwnerCheckAllVersions(ctx, latestSchemaVersion)
if err != nil {
log.Infof("[ddl] wait latest schema version %d to deadline %v", latestSchemaVersion, err)
}
Expand Down
Loading

0 comments on commit 62450b9

Please sign in to comment.