Skip to content

Commit

Permalink
Prevent duplicate DocChangedEvents in the batch
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Mar 4, 2025
1 parent c7797cf commit 3c429b0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
8 changes: 8 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ func handleResponse(
return &WatchResponse{Type: DocumentChanged}, nil
case events.DocWatchedEvent:
doc.AddOnlineClient(cli.String())

// NOTE(hackerwins): If the presence does not exist, it means that
// PushPull is not received before watching. In that case, the 'watched'
// event is ignored here, and it will be triggered by PushPull.
if doc.Presence(cli.String()) == nil {
return nil, nil
}
Expand All @@ -637,6 +641,10 @@ func handleResponse(
case events.DocUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())

// NOTE(hackerwins): If the presence does not exist, it means that
// PushPull is already received before unwatching. In that case, the
// 'unwatched' event is ignored here, and it was triggered by PushPull.
if p == nil {
return nil, nil
}
Expand Down
31 changes: 22 additions & 9 deletions server/backend/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func (c *loggerID) next() string {

// BatchPublisher is a publisher that publishes events in batch.
type BatchPublisher struct {
logger *zap.SugaredLogger
mutex gosync.Mutex
events []events.DocEvent
logger *zap.SugaredLogger
mutex gosync.Mutex
events []events.DocEvent
docChangedCountMap map[string]int

window time.Duration
closeChan chan struct{}
Expand All @@ -52,10 +53,11 @@ type BatchPublisher struct {
// NewBatchPublisher creates a new BatchPublisher instance.
func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher {
bp := &BatchPublisher{
logger: logging.New(id.next()),
window: window,
closeChan: make(chan struct{}),
subs: subs,
logger: logging.New(id.next()),
docChangedCountMap: make(map[string]int),
window: window,
closeChan: make(chan struct{}),
subs: subs,
}

go bp.processLoop()
Expand All @@ -68,8 +70,18 @@ func (bp *BatchPublisher) Publish(event events.DocEvent) {
bp.mutex.Lock()
defer bp.mutex.Unlock()

// TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't
// need to add it again.
// NOTE(hackerwins): If the queue contains multiple DocChangedEvents from
// the same publisher, only the two events are processed.
// This occurs when a client attaches/detaches a document since the order
// of Changed and Watch/Unwatch events is not guaranteed.
if event.Type == events.DocChangedEvent {
count, exists := bp.docChangedCountMap[event.Publisher.String()]
if exists && count > 1 {
return
}
bp.docChangedCountMap[event.Publisher.String()] = count + 1
}

bp.events = append(bp.events, event)
}

Expand Down Expand Up @@ -97,6 +109,7 @@ func (bp *BatchPublisher) publish() {

events := bp.events
bp.events = nil
bp.docChangedCountMap = make(map[string]int)

bp.mutex.Unlock()

Expand Down

0 comments on commit 3c429b0

Please sign in to comment.