diff --git a/client/client.go b/client/client.go index e36ba8e3c..e18db027f 100644 --- a/client/client.go +++ b/client/client.go @@ -624,6 +624,10 @@ func handleResponse( return &WatchResponse{Type: DocumentChanged}, nil case events.DocWatchedEvent: doc.AddOnlineClient(cli.String()) + + // NOTE(hackerwins): If the presence does not exist, it means that + // PushPull is not received before watching. In that case, the 'watched' + // event is ignored here, and it will be triggered by PushPull. if doc.Presence(cli.String()) == nil { return nil, nil } @@ -637,6 +641,10 @@ func handleResponse( case events.DocUnwatchedEvent: p := doc.Presence(cli.String()) doc.RemoveOnlineClient(cli.String()) + + // NOTE(hackerwins): If the presence does not exist, it means that + // PushPull is already received before unwatching. In that case, the + // 'unwatched' event is ignored here, and it was triggered by PushPull. if p == nil { return nil, nil } diff --git a/server/backend/pubsub/publisher.go b/server/backend/pubsub/publisher.go index 233c7039f..21b82889b 100644 --- a/server/backend/pubsub/publisher.go +++ b/server/backend/pubsub/publisher.go @@ -40,9 +40,10 @@ func (c *loggerID) next() string { // BatchPublisher is a publisher that publishes events in batch. type BatchPublisher struct { - logger *zap.SugaredLogger - mutex gosync.Mutex - events []events.DocEvent + logger *zap.SugaredLogger + mutex gosync.Mutex + events []events.DocEvent + docChangedCountMap map[string]int window time.Duration closeChan chan struct{} @@ -52,10 +53,11 @@ type BatchPublisher struct { // NewBatchPublisher creates a new BatchPublisher instance. func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher { bp := &BatchPublisher{ - logger: logging.New(id.next()), - window: window, - closeChan: make(chan struct{}), - subs: subs, + logger: logging.New(id.next()), + docChangedCountMap: make(map[string]int), + window: window, + closeChan: make(chan struct{}), + subs: subs, } go bp.processLoop() @@ -68,8 +70,18 @@ func (bp *BatchPublisher) Publish(event events.DocEvent) { bp.mutex.Lock() defer bp.mutex.Unlock() - // TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't - // need to add it again. + // NOTE(hackerwins): If the queue contains multiple DocChangedEvents from + // the same publisher, only the two events are processed. + // This occurs when a client attaches/detaches a document since the order + // of Changed and Watch/Unwatch events is not guaranteed. + if event.Type == events.DocChangedEvent { + count, exists := bp.docChangedCountMap[event.Publisher.String()] + if exists && count > 1 { + return + } + bp.docChangedCountMap[event.Publisher.String()] = count + 1 + } + bp.events = append(bp.events, event) } @@ -97,6 +109,7 @@ func (bp *BatchPublisher) publish() { events := bp.events bp.events = nil + bp.docChangedCountMap = make(map[string]int) bp.mutex.Unlock()