@@ -753,25 +753,30 @@ func (a *Applier) initiateStreaming() error {
753753 a .mysqlContext .MarkRowCopyStartTime ()
754754 a .logger .Debugf ("mysql.applier: nats subscribe" )
755755 _ , err := a .natsConn .Subscribe (fmt .Sprintf ("%s_full" , a .subject ), func (m * gonats.Msg ) {
756- a .logger .Debugf ("mysql.applier: recv a msg. copyRowsQueue: %v" , len (a .copyRowsQueue ))
757- // TODO possible optimization: if the queue has a vacant before extractor timeout, ack
758- // the msg to avoid extractor resending.
759- if cap ( a . copyRowsQueue ) - len ( a . copyRowsQueue ) < 1 {
760- a .logger . Debugf ( "applier. full. discarding entries" )
761- a . mysqlContext . Stage = models . StageSlaveWaitingForWorkersToProcessQueue
762- } else {
763- dumpData := & DumpEntry {}
764- if err := Decode ( m . Data , dumpData ); err != nil {
765- a . onError ( TaskStateDead , err )
766- }
767- atomic . AddInt64 ( & a . nDumpEntry , 1 )
768- a . copyRowsQueue <- dumpData
756+ a .logger .Debugf ("mysql.applier: full. recv a msg. copyRowsQueue: %v" , len (a .copyRowsQueue ))
757+
758+ dumpData := & DumpEntry {}
759+ if err := Decode ( m . Data , dumpData ); err != nil {
760+ a .onError ( TaskStateDead , err )
761+ }
762+
763+ timer := time . NewTimer ( DefaultConnectWait / 2 )
764+ atomic . AddInt64 ( & a . nDumpEntry , 1 ) // this must be increased before enqueuing
765+ select {
766+ case a . copyRowsQueue <- dumpData :
767+ a . logger . Debugf ( "mysql.applier: full. enqueue" )
768+ timer . Stop ()
769769 a .mysqlContext .Stage = models .StageSlaveWaitingForWorkersToProcessQueue
770770 if err := a .natsConn .Publish (m .Reply , nil ); err != nil {
771771 a .onError (TaskStateDead , err )
772772 }
773- a .logger .Debugf ("mysql.applier: after publish nats reply" )
773+ a .logger .Debugf ("mysql.applier. full. after publish nats reply" )
774774 atomic .AddInt64 (& a .mysqlContext .RowsEstimate , dumpData .TotalCount )
775+ case <- timer .C :
776+ atomic .AddInt64 (& a .nDumpEntry , - 1 )
777+
778+ a .logger .Debugf ("mysql.applier. full. discarding entries" )
779+ a .mysqlContext .Stage = models .StageSlaveWaitingForWorkersToProcessQueue
775780 }
776781 })
777782 /*if err := sub.SetPendingLimits(a.mysqlContext.MsgsLimit, a.mysqlContext.BytesLimit); err != nil {
@@ -813,24 +818,36 @@ func (a *Applier) initiateStreaming() error {
813818 a .onError (TaskStateDead , err )
814819 }
815820
816- a .logger .Debugf ("applier. incr. recv. nEntries: %v, len(applyDataEntryQueue): %v" ,
817- len (binlogEntries .Entries ), len (a .applyDataEntryQueue ))
818- if cap (a .applyDataEntryQueue )- len (a .applyDataEntryQueue ) < len (binlogEntries .Entries ) {
821+ nEntries := len (binlogEntries .Entries )
822+
823+ handled := false
824+ for i := 0 ; ! handled && (i < DefaultConnectWaitSecond / 2 ); i ++ {
825+ vacancy := cap (a .applyDataEntryQueue )- len (a .applyDataEntryQueue )
826+ a .logger .Debugf ("applier. incr. nEntries: %v, vacancy: %v" , nEntries , vacancy )
827+ if vacancy < nEntries {
828+ a .logger .Debugf ("applier. incr. wait 1s for applyDataEntryQueue" )
829+ time .Sleep (1 * time .Second ) // It will wait an second at the end, but seems no hurt.
830+ } else {
831+ a .logger .Debugf ("applier. incr. applyDataEntryQueue enqueue" )
832+ for _ , binlogEntry := range binlogEntries .Entries {
833+ a .applyDataEntryQueue <- binlogEntry
834+ a .currentCoordinates .RetrievedGtidSet = binlogEntry .Coordinates .GetGtidForThisTx ()
835+ atomic .AddInt64 (& a .mysqlContext .DeltaEstimate , 1 )
836+ }
837+ a .mysqlContext .Stage = models .StageWaitingForMasterToSendEvent
838+
839+ if err := a .natsConn .Publish (m .Reply , nil ); err != nil {
840+ a .onError (TaskStateDead , err )
841+ }
842+ a .logger .Debugf ("applier. incr. ack-recv. nEntries: %v" , nEntries )
843+
844+ handled = true
845+ }
846+ }
847+ if ! handled {
819848 // discard these entries
820849 a .logger .Debugf ("applier. incr. discarding entries" )
821850 a .mysqlContext .Stage = models .StageWaitingForMasterToSendEvent
822- } else {
823- for _ , binlogEntry := range binlogEntries .Entries {
824- a .applyDataEntryQueue <- binlogEntry
825- a .currentCoordinates .RetrievedGtidSet = binlogEntry .Coordinates .GetGtidForThisTx ()
826- atomic .AddInt64 (& a .mysqlContext .DeltaEstimate , 1 )
827- }
828- a .mysqlContext .Stage = models .StageWaitingForMasterToSendEvent
829-
830- if err := a .natsConn .Publish (m .Reply , nil ); err != nil {
831- a .onError (TaskStateDead , err )
832- }
833- a .logger .Debugf ("applier. incr. ack-recv. nEntries: %v" , len (binlogEntries .Entries ))
834851 }
835852 })
836853 if err != nil {
0 commit comments