diff --git a/client/client.go b/client/client.go index e36ba8e3c..121294da2 100644 --- a/client/client.go +++ b/client/client.go @@ -510,7 +510,7 @@ func (c *Client) runWatchLoop( if !stream.Receive() { return ErrInitializationNotReceived } - if _, err := handleResponse(stream.Msg(), doc); err != nil { + if _, err := handleResponse(stream.Msg(), doc, c.id); err != nil { return err } if err = stream.Err(); err != nil { @@ -523,7 +523,7 @@ func (c *Client) runWatchLoop( go func() { for stream.Receive() { pbResp := stream.Msg() - resp, err := handleResponse(pbResp, doc) + resp, err := handleResponse(pbResp, doc, c.id) if err != nil { rch <- WatchResponse{Err: err} ctx.Done() @@ -594,16 +594,17 @@ func (c *Client) runWatchLoop( func handleResponse( pbResp *api.WatchDocumentResponse, doc *document.Document, + id *time.ActorID, ) (*WatchResponse, error) { switch resp := pbResp.Body.(type) { case *api.WatchDocumentResponse_Initialization_: var clientIDs []string for _, clientID := range resp.Initialization.ClientIds { - id, err := time.ActorIDFromHex(clientID) + cli, err := time.ActorIDFromHex(clientID) if err != nil { return nil, err } - clientIDs = append(clientIDs, id.String()) + clientIDs = append(clientIDs, cli.String()) } doc.SetOnlineClients(clientIDs...) @@ -619,6 +620,10 @@ func handleResponse( return nil, err } + if cli.Compare(id) == 0 { + return nil, nil + } + switch eventType { case events.DocChangedEvent: return &WatchResponse{Type: DocumentChanged}, nil diff --git a/go.mod b/go.mod index 3f99000a4..496bde4d6 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,10 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) -require github.com/pierrec/lz4/v4 v4.1.15 // indirect +require ( + github.com/pierrec/lz4/v4 v4.1.15 // indirect + golang.org/x/time v0.10.0 // indirect +) require ( github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 3bd98928d..14665358d 100644 --- a/go.sum +++ b/go.sum @@ -575,6 +575,8 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/limit/limiter.go b/pkg/limit/limiter.go new file mode 100644 index 000000000..e7720e434 --- /dev/null +++ b/pkg/limit/limiter.go @@ -0,0 +1,70 @@ +/* + * Copyright 2025 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package limit provide event timing control components +package limit + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "golang.org/x/time/rate" +) + +// Limiter provides a combined throttling and debouncing mechanism +// that ensures eventual consistency. +type Limiter struct { + lim *rate.Limiter + debouncing int32 // 0 means false, 1 means true +} + +// New creates a new instance with the specified throttle intervals. +func New(window time.Duration) *Limiter { + dt := &Limiter{ + lim: rate.NewLimiter(rate.Every(window), 1), + debouncing: 0, + } + return dt +} + +// Execute attempts to run the provided callback function immediately if the rate limiter allows it. +// If the rate limiter does not allow immediate execution, this function blocks until the next token +// is available and then runs the callback. If there is already a debouncing callback, Execute returns +// immediately. This mechanism ensures that the final callback is executed after the final event, +// providing eventual consistency. +func (l *Limiter) Execute(ctx context.Context, callback func() error) error { + if l.lim.Allow() { + return callback() + } + + if !atomic.CompareAndSwapInt32(&l.debouncing, 0, 1) { + return nil + } + + if err := l.lim.Wait(ctx); err != nil { + return fmt.Errorf("wait for limiter: %w", err) + } + + if err := callback(); err != nil { + atomic.StoreInt32(&l.debouncing, 0) + return fmt.Errorf("callback: %w", err) + } + atomic.StoreInt32(&l.debouncing, 0) + + return nil +} diff --git a/pkg/limit/limiter_test.go b/pkg/limit/limiter_test.go new file mode 100644 index 000000000..2d08213b7 --- /dev/null +++ b/pkg/limit/limiter_test.go @@ -0,0 +1,217 @@ +/* + * Copyright 2025 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package limit provide event timing control components +package limit + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// TestSynchronousExecute verifies that synchronous calls to the throttler +// execute the callback immediately without any delays. +func TestSynchronousExecute(t *testing.T) { + ctx := context.Background() + const throttleWindow = 10 * time.Millisecond + + t.Run("Single call executes callback exactly once", func(t *testing.T) { + th := New(throttleWindow) + var callCount int32 + callback := func() error { + atomic.AddInt32(&callCount, 1) + return nil + } + + err := th.Execute(ctx, callback) + assert.NoError(t, err) + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount)) + }) + + t.Run("Ten consecutive calls execute callback each time", func(t *testing.T) { + th := New(throttleWindow) + const numExecute = 10 + var callCount int32 + callback := func() error { + atomic.AddInt32(&callCount, 1) + return nil + } + + for range numExecute { + assert.NoError(t, th.Execute(ctx, callback)) + } + assert.Equal(t, int32(numExecute), atomic.LoadInt32(&callCount)) + }) +} + +// TestConcurrentExecute verifies that the throttler behaves as expected under concurrent invocations. +func TestConcurrentExecute(t *testing.T) { + ctx := context.Background() + const throttleWindow = 100 * time.Millisecond + + // In this test, many concurrent calls are made. The throttler should execute one immediate call + // and then schedule a trailing call, resulting in exactly two executions. + t.Run("Concurrent calls result in one immediate and one trailing execution", func(t *testing.T) { + th := New(throttleWindow) + const numRoutines = 1000 + var callCount int32 + callback := func() error { + atomic.AddInt32(&callCount, 1) + return nil + } + + var wg sync.WaitGroup + wg.Add(numRoutines) + + for range numRoutines { + go func() { + assert.NoError(t, th.Execute(ctx, callback)) + wg.Done() + }() + } + + wg.Wait() + // Expect exactly one immediate and one trailing callback execution. + assert.Equal(t, int32(2), atomic.LoadInt32(&callCount)) + }) + + // This test simulates a continuous stream of events. + // It triggers multiple concurrent calls at a regular interval and checks that throttling + // limits the total number of callback invocations to one per window plus one trailing call. + t.Run("Throttling over continuous event stream", func(t *testing.T) { + const ( + numWindows = 10 + eventPerWindow = 100 + numRoutines = 10 + ) + totalDuration := throttleWindow * time.Duration(numWindows) + interval := throttleWindow / time.Duration(eventPerWindow) + + th := New(throttleWindow) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + timeCtx, cancel := context.WithTimeout(ctx, totalDuration) + defer cancel() + + var callCount int32 + callback := func() error { + atomic.AddInt32(&callCount, 1) + return nil + } + + // Continuously trigger events until the timeout. + for { + select { + case <-ticker.C: + // Each tick triggers multiple concurrent calls. + for range numRoutines { + go func() { + assert.NoError(t, th.Execute(ctx, callback)) + }() + } + case <-timeCtx.Done(): + // Allow any trailing call to execute. + time.Sleep(throttleWindow) + // Expect one execution per window plus one trailing call. + assert.Equal(t, int32(numWindows+1), atomic.LoadInt32(&callCount)) + return + } + } + }) +} + +// TestCallbackErrorPropagation checks that errors returned by the callback +// are immediately propagated back to the caller. +func TestCallbackErrorPropagation(t *testing.T) { + ctx := context.Background() + const throttleWindow = 10 * time.Millisecond + expectedErr := errors.New("callback error") + + t.Run("Immediate callback error is propagated", func(t *testing.T) { + th := New(throttleWindow) + callback := func() error { + return expectedErr + } + err := th.Execute(ctx, callback) + assert.ErrorIs(t, err, expectedErr) + }) +} + +// TestContextCancellation verifies the throttler's behavior when the context +// expires (deadline exceeded) or is canceled. +func TestContextCancellation(t *testing.T) { + const throttleWindow = 10 * time.Millisecond + + // In this test the context deadline is shorter than the throttle window. + // The trailing call should fail with a deadline exceeded error. + t.Run("Trailing call fails due to context deadline exceeded", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), throttleWindow/2) + defer cancel() + th := New(throttleWindow) + var callCount int32 + callback := func() error { + atomic.AddInt32(&callCount, 1) + return nil + } + + // The first call executes immediately. + assert.NoError(t, th.Execute(ctx, callback)) + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount)) + // The second call is delayed and should eventually time out. + err := th.Execute(ctx, callback) + assert.ErrorAs(t, err, &context.DeadlineExceeded) + // Ensure the callback was not executed a second time. + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount)) + }) + + // This test verifies that when the context is canceled, + // any debouncing (trailing) call does not execute. + t.Run("Trailing call fails due to context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + th := New(throttleWindow) + var callCount int32 + callback := func() error { + atomic.AddInt32(&callCount, 1) + return nil + } + + // First call executes immediately. + assert.NoError(t, th.Execute(ctx, callback)) + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount)) + // Launch a trailing call that will be affected by cancellation. + var wg sync.WaitGroup + wg.Add(1) + + go func() { + err := th.Execute(ctx, callback) + assert.ErrorAs(t, err, &context.Canceled) + // Verify that the trailing call was not executed. + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount)) + wg.Done() + }() + // Cancel the context to cancel any debouncing trailing call. + cancel() + wg.Wait() + }) +} diff --git a/server/backend/pubsub/publisher.go b/server/backend/pubsub/publisher.go index 233c7039f..78c439b58 100644 --- a/server/backend/pubsub/publisher.go +++ b/server/backend/pubsub/publisher.go @@ -18,14 +18,16 @@ package pubsub import ( + "context" "strconv" gosync "sync" "sync/atomic" - time "time" + "time" "go.uber.org/zap" "github.com/yorkie-team/yorkie/api/types/events" + "github.com/yorkie-team/yorkie/pkg/limit" "github.com/yorkie-team/yorkie/server/logging" ) @@ -38,7 +40,7 @@ func (c *loggerID) next() string { return "p" + strconv.Itoa(int(next)) } -// BatchPublisher is a publisher that publishes events in batch. +// BatchPublisher is a batchPublisher that publishes events in batch. type BatchPublisher struct { logger *zap.SugaredLogger mutex gosync.Mutex @@ -68,8 +70,6 @@ func (bp *BatchPublisher) Publish(event events.DocEvent) { bp.mutex.Lock() defer bp.mutex.Unlock() - // TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't - // need to add it again. bp.events = append(bp.events, event) } @@ -126,7 +126,51 @@ func (bp *BatchPublisher) publish() { } } -// Close stops the batch publisher +// Close stops the batch batchPublisher func (bp *BatchPublisher) Close() { close(bp.closeChan) } + +// LimitPublisher is a batchPublisher that publishes events with limit. +type LimitPublisher struct { + logger *zap.SugaredLogger + limits map[events.DocEventType]*limit.Limiter + + window time.Duration + subs *Subscriptions +} + +// NewLimitPublisher creates a new NewLimitPublisher instance. +func NewLimitPublisher(subs *Subscriptions, window time.Duration) *LimitPublisher { + lp := &LimitPublisher{ + logger: logging.New(id.next()), + limits: map[events.DocEventType]*limit.Limiter{ + events.DocChangedEvent: limit.New(window), + events.DocWatchedEvent: limit.New(window), + events.DocUnwatchedEvent: limit.New(window), + }, + window: window, + subs: subs, + } + + return lp +} + +// Publish adds the given event to the batch. If the batch is full, it publishes +// the batch. +func (lp *LimitPublisher) Publish(event events.DocEvent) { + _ = lp.limits[event.Type].Execute(context.Background(), func() error { + for _, sub := range lp.subs.Values() { + if ok := sub.Publish(event); !ok { + lp.logger.Infof( + "Publish(%s,%s) to %s timeout or closed", + event.Type, + event.Publisher, + sub.Subscriber(), + ) + } + } + + return nil + }) +} diff --git a/server/backend/pubsub/pubsub.go b/server/backend/pubsub/pubsub.go index f9a1b5e9c..a2d28dc71 100644 --- a/server/backend/pubsub/pubsub.go +++ b/server/backend/pubsub/pubsub.go @@ -32,13 +32,16 @@ import ( const ( // publishTimeout is the timeout for publishing an event. publishTimeout = 100 * gotime.Millisecond + batchWindow = 100 * gotime.Millisecond + limitWindow = 100 * gotime.Millisecond ) // Subscriptions is a map of Subscriptions. type Subscriptions struct { - docKey types.DocRefKey - internalMap *cmap.Map[string, *Subscription] - publisher *BatchPublisher + docKey types.DocRefKey + internalMap *cmap.Map[string, *Subscription] + batchPublisher *BatchPublisher + limitPublisher *LimitPublisher } func newSubscriptions(docKey types.DocRefKey) *Subscriptions { @@ -46,7 +49,8 @@ func newSubscriptions(docKey types.DocRefKey) *Subscriptions { docKey: docKey, internalMap: cmap.New[string, *Subscription](), } - s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond) + s.batchPublisher = NewBatchPublisher(s, batchWindow) + s.limitPublisher = NewLimitPublisher(s, limitWindow) return s } @@ -62,7 +66,16 @@ func (s *Subscriptions) Values() []*Subscription { // Publish publishes the given event. func (s *Subscriptions) Publish(event events.DocEvent) { - s.publisher.Publish(event) + switch event.Type { + case events.DocBroadcastEvent: + s.batchPublisher.Publish(event) + case events.DocChangedEvent: + s.limitPublisher.Publish(event) + case events.DocWatchedEvent: + s.limitPublisher.Publish(event) + case events.DocUnwatchedEvent: + s.limitPublisher.Publish(event) + } } // Delete deletes the subscription of the given id. @@ -82,7 +95,7 @@ func (s *Subscriptions) Len() int { // Close closes the subscriptions. func (s *Subscriptions) Close() { - s.publisher.Close() + s.batchPublisher.Close() } // PubSub is the memory implementation of PubSub, used for single server.