Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Event Timing Control Package with Throttling and Debouncing Mechanism #1166

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
13 changes: 9 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (c *Client) runWatchLoop(
if !stream.Receive() {
return ErrInitializationNotReceived
}
if _, err := handleResponse(stream.Msg(), doc); err != nil {
if _, err := handleResponse(stream.Msg(), doc, c.id); err != nil {
return err
}
if err = stream.Err(); err != nil {
Expand All @@ -523,7 +523,7 @@ func (c *Client) runWatchLoop(
go func() {
for stream.Receive() {
pbResp := stream.Msg()
resp, err := handleResponse(pbResp, doc)
resp, err := handleResponse(pbResp, doc, c.id)
if err != nil {
rch <- WatchResponse{Err: err}
ctx.Done()
Expand Down Expand Up @@ -594,16 +594,17 @@ func (c *Client) runWatchLoop(
func handleResponse(
pbResp *api.WatchDocumentResponse,
doc *document.Document,
id *time.ActorID,
) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
cli, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
clientIDs = append(clientIDs, cli.String())
}

doc.SetOnlineClients(clientIDs...)
Expand All @@ -619,6 +620,10 @@ func handleResponse(
return nil, err
}

if cli.Compare(id) == 0 {
return nil, nil
}

switch eventType {
case events.DocChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require github.com/pierrec/lz4/v4 v4.1.15 // indirect
require (
github.com/pierrec/lz4/v4 v4.1.15 // indirect
golang.org/x/time v0.10.0 // indirect
)

require (
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4=
golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
70 changes: 70 additions & 0 deletions pkg/limit/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package limit provide event timing control components
package limit

import (
"context"
"fmt"
"sync/atomic"
"time"

"golang.org/x/time/rate"
)

// Limiter provides a combined throttling and debouncing mechanism
// that ensures eventual consistency.
type Limiter struct {
lim *rate.Limiter
debouncing int32 // 0 means false, 1 means true
}

// New creates a new instance with the specified throttle intervals.
func New(window time.Duration) *Limiter {
dt := &Limiter{
lim: rate.NewLimiter(rate.Every(window), 1),
debouncing: 0,
}
return dt
}

// Execute attempts to run the provided callback function immediately if the rate limiter allows it.
// If the rate limiter does not allow immediate execution, this function blocks until the next token
// is available and then runs the callback. If there is already a debouncing callback, Execute returns
// immediately. This mechanism ensures that the final callback is executed after the final event,
// providing eventual consistency.
func (l *Limiter) Execute(ctx context.Context, callback func() error) error {
if l.lim.Allow() {
return callback()
}

if !atomic.CompareAndSwapInt32(&l.debouncing, 0, 1) {
return nil
}

if err := l.lim.Wait(ctx); err != nil {
return fmt.Errorf("wait for limiter: %w", err)
}

if err := callback(); err != nil {
atomic.StoreInt32(&l.debouncing, 0)
return fmt.Errorf("callback: %w", err)
}
atomic.StoreInt32(&l.debouncing, 0)

return nil
}
217 changes: 217 additions & 0 deletions pkg/limit/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package limit provide event timing control components
package limit

import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// TestSynchronousExecute verifies that synchronous calls to the throttler
// execute the callback immediately without any delays.
func TestSynchronousExecute(t *testing.T) {
ctx := context.Background()
const throttleWindow = 10 * time.Millisecond

t.Run("Single call executes callback exactly once", func(t *testing.T) {
th := New(throttleWindow)
var callCount int32
callback := func() error {
atomic.AddInt32(&callCount, 1)
return nil
}

err := th.Execute(ctx, callback)
assert.NoError(t, err)
assert.Equal(t, int32(1), atomic.LoadInt32(&callCount))
})

t.Run("Ten consecutive calls execute callback each time", func(t *testing.T) {
th := New(throttleWindow)
const numExecute = 10
var callCount int32
callback := func() error {
atomic.AddInt32(&callCount, 1)
return nil
}

for range numExecute {
assert.NoError(t, th.Execute(ctx, callback))
}
assert.Equal(t, int32(numExecute), atomic.LoadInt32(&callCount))
})
}

// TestConcurrentExecute verifies that the throttler behaves as expected under concurrent invocations.
func TestConcurrentExecute(t *testing.T) {
ctx := context.Background()
const throttleWindow = 100 * time.Millisecond

// In this test, many concurrent calls are made. The throttler should execute one immediate call
// and then schedule a trailing call, resulting in exactly two executions.
t.Run("Concurrent calls result in one immediate and one trailing execution", func(t *testing.T) {
th := New(throttleWindow)
const numRoutines = 1000
var callCount int32
callback := func() error {
atomic.AddInt32(&callCount, 1)
return nil
}

var wg sync.WaitGroup
wg.Add(numRoutines)

for range numRoutines {
go func() {
assert.NoError(t, th.Execute(ctx, callback))
wg.Done()
}()
}

wg.Wait()
// Expect exactly one immediate and one trailing callback execution.
assert.Equal(t, int32(2), atomic.LoadInt32(&callCount))
})

// This test simulates a continuous stream of events.
// It triggers multiple concurrent calls at a regular interval and checks that throttling
// limits the total number of callback invocations to one per window plus one trailing call.
t.Run("Throttling over continuous event stream", func(t *testing.T) {
const (
numWindows = 10
eventPerWindow = 100
numRoutines = 10
)
totalDuration := throttleWindow * time.Duration(numWindows)
interval := throttleWindow / time.Duration(eventPerWindow)

th := New(throttleWindow)

ticker := time.NewTicker(interval)
defer ticker.Stop()

timeCtx, cancel := context.WithTimeout(ctx, totalDuration)
defer cancel()

var callCount int32
callback := func() error {
atomic.AddInt32(&callCount, 1)
return nil
}

// Continuously trigger events until the timeout.
for {
select {
case <-ticker.C:
// Each tick triggers multiple concurrent calls.
for range numRoutines {
go func() {
assert.NoError(t, th.Execute(ctx, callback))
}()
}
case <-timeCtx.Done():
// Allow any trailing call to execute.
time.Sleep(throttleWindow)
// Expect one execution per window plus one trailing call.
assert.Equal(t, int32(numWindows+1), atomic.LoadInt32(&callCount))
return
}
}
})
}

// TestCallbackErrorPropagation checks that errors returned by the callback
// are immediately propagated back to the caller.
func TestCallbackErrorPropagation(t *testing.T) {
ctx := context.Background()
const throttleWindow = 10 * time.Millisecond
expectedErr := errors.New("callback error")

t.Run("Immediate callback error is propagated", func(t *testing.T) {
th := New(throttleWindow)
callback := func() error {
return expectedErr
}
err := th.Execute(ctx, callback)
assert.ErrorIs(t, err, expectedErr)
})
}

// TestContextCancellation verifies the throttler's behavior when the context
// expires (deadline exceeded) or is canceled.
func TestContextCancellation(t *testing.T) {
const throttleWindow = 10 * time.Millisecond

// In this test the context deadline is shorter than the throttle window.
// The trailing call should fail with a deadline exceeded error.
t.Run("Trailing call fails due to context deadline exceeded", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), throttleWindow/2)
defer cancel()
th := New(throttleWindow)
var callCount int32
callback := func() error {
atomic.AddInt32(&callCount, 1)
return nil
}

// The first call executes immediately.
assert.NoError(t, th.Execute(ctx, callback))
assert.Equal(t, int32(1), atomic.LoadInt32(&callCount))
// The second call is delayed and should eventually time out.
err := th.Execute(ctx, callback)
assert.ErrorAs(t, err, &context.DeadlineExceeded)
// Ensure the callback was not executed a second time.
assert.Equal(t, int32(1), atomic.LoadInt32(&callCount))
})

// This test verifies that when the context is canceled,
// any debouncing (trailing) call does not execute.
t.Run("Trailing call fails due to context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
th := New(throttleWindow)
var callCount int32
callback := func() error {
atomic.AddInt32(&callCount, 1)
return nil
}

// First call executes immediately.
assert.NoError(t, th.Execute(ctx, callback))
assert.Equal(t, int32(1), atomic.LoadInt32(&callCount))
// Launch a trailing call that will be affected by cancellation.
var wg sync.WaitGroup
wg.Add(1)

go func() {
err := th.Execute(ctx, callback)
assert.ErrorAs(t, err, &context.Canceled)
// Verify that the trailing call was not executed.
assert.Equal(t, int32(1), atomic.LoadInt32(&callCount))
wg.Done()
}()
// Cancel the context to cancel any debouncing trailing call.
cancel()
wg.Wait()
})
}
Loading
Loading