Skip to content

Commit

Permalink
Refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
window9u committed Feb 17, 2025
1 parent 5d5da54 commit 3c29e62
Showing 1 changed file with 159 additions and 135 deletions.
294 changes: 159 additions & 135 deletions test/integration/event_webhook_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build integration

/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
Expand Down Expand Up @@ -27,13 +29,12 @@ import (
"io"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"go.uber.org/atomic"

"github.com/stretchr/testify/assert"
"github.com/yorkie-team/yorkie/admin"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/client"
"github.com/yorkie-team/yorkie/pkg/document"
Expand All @@ -55,25 +56,16 @@ func verifySignature(signatureHeader, secret string, body []byte) error {
return nil
}

func newUserServer(t *testing.T, requestCounter *atomic.Int32, secretKey, docKey string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCounter.Add(1)
signatureHeader := r.Header.Get("X-Signature-256")
if signatureHeader == "" {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
func newWebhookServer(t *testing.T, secretKey, docKey string) (*httptest.Server, *int32) {
var reqCnt int32

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&reqCnt, 1)
signatureHeader := r.Header.Get("X-Signature-256")
assert.NotZero(t, len(signatureHeader))
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

if err := verifySignature(signatureHeader, secretKey, body); err != nil {
http.Error(w, "forbidden", http.StatusForbidden)
return
}
assert.NoError(t, err)
assert.NoError(t, verifySignature(signatureHeader, secretKey, body))

req := &types.EventWebhookRequest{}
assert.NoError(t, gojson.Unmarshal(body, req))
Expand All @@ -82,186 +74,217 @@ func newUserServer(t *testing.T, requestCounter *atomic.Int32, secretKey, docKey

w.WriteHeader(http.StatusOK)
}))
t.Cleanup(func() { srv.Close() })

return srv, &reqCnt
}

// setupYorkieServer initializes the Yorkie server and admin client.
func setupYorkieServer(t *testing.T, webhookCacheTTL string) (*server.Yorkie, *admin.Client) {
// newYorkieServer initializes the Yorkie server and admin client.
func newYorkieServer(t *testing.T, webhookCacheTTL, projectCacheTTL string) *server.Yorkie {
conf := helper.TestConfig()
conf.Backend.EventWebhookCacheTTL = webhookCacheTTL
conf.Backend.ProjectCacheTTL = "0ms"
if webhookCacheTTL != "default" {
conf.Backend.EventWebhookCacheTTL = webhookCacheTTL
}
if projectCacheTTL != "default" {
conf.Backend.ProjectCacheTTL = projectCacheTTL
}
svr, err := server.New(conf)
assert.NoError(t, err)
assert.NoError(t, svr.Start())
t.Cleanup(func() { assert.NoError(t, svr.Shutdown(true)) })

adminCli := helper.CreateAdminCli(t, svr.RPCAddr())
t.Cleanup(func() { adminCli.Close() })
t.Cleanup(func() {
assert.NoError(t, svr.Shutdown(true))
})

return svr, adminCli
return svr
}

// createProjectAndClient creates a project and sets up a Yorkie client.
func createProjectAndClient(t *testing.T, ctx context.Context, adminCli *admin.Client, rpcAddr string) (*types.Project, *client.Client) {
project, err := adminCli.CreateProject(ctx, "event-webhook-test")
assert.NoError(t, err)

cli, err := client.Dial(rpcAddr, client.WithAPIKey(project.PublicKey))
func newActivatedClient(t *testing.T, ctx context.Context, addr, publicKey string) *client.Client {
cli, err := client.Dial(addr, client.WithAPIKey(publicKey))
assert.NoError(t, err)
assert.NoError(t, cli.Activate(ctx))
t.Cleanup(func() {
assert.NoError(t, cli.Deactivate(ctx))
assert.NoError(t, cli.Close())
})

return project, cli
return cli
}

// attachTestDocument attaches a new document with an initial root.
func attachTestDocument(t *testing.T, ctx context.Context, cli *client.Client) (*document.Document, string) {
docKey := helper.TestDocKey(t)
doc := document.New(docKey)
initialRoot := map[string]any{
func TestRegisterEventWebhook(t *testing.T) {
ctx := context.Background()

// Set up yorkie server
projectCacheTTL := 1 * time.Millisecond
svr := newYorkieServer(t, "0ms", projectCacheTTL.String())

// Set up project
adminCli := helper.CreateAdminCli(t, svr.RPCAddr())
defer func() { adminCli.Close() }()

project, err := adminCli.CreateProject(ctx, "register-event-webhook")
assert.NoError(t, err)

doc := document.New(helper.TestDocKey(t))
userServer, getReqCnt := newWebhookServer(t, project.SecretKey, doc.Key().String())

cli := newActivatedClient(t, ctx, svr.RPCAddr(), project.PublicKey)

assert.NoError(t, cli.Attach(ctx, doc, client.WithInitialRoot(map[string]any{
"counter": json.NewCounter(0, crdt.LongCnt),
}
assert.NoError(t, cli.Attach(ctx, doc, client.WithInitialRoot(initialRoot)))
return doc, docKey.String()
}
})))

// registerWebhook updates the project to register (or unregister) the webhook.
func registerWebhook(t *testing.T, ctx context.Context, adminCli *admin.Client, project *types.Project, webhookURL string, eventTypes []string) *types.Project {
project.EventWebhookURL = webhookURL
prj, err := adminCli.UpdateProject(ctx, project.ID.String(), &types.UpdatableProjectFields{
EventWebhookURL: &project.EventWebhookURL,
EventWebhookTypes: &eventTypes,
waitWebhookReceived := 10 * time.Millisecond

t.Run("register event webhook test", func(t *testing.T) {
// 01. Register event webhook
prj, err := adminCli.UpdateProject(ctx, project.ID.String(), &types.UpdatableProjectFields{
EventWebhookURL: &userServer.URL,
EventWebhookTypes: &[]string{string(types.DocRootChanged)},
})
assert.NoError(t, err)
assert.Equal(t, userServer.URL, prj.EventWebhookURL)
assert.Equal(t, string(types.DocRootChanged), prj.EventWebhookTypes[0])

// 02. Wait project cache expired
time.Sleep(projectCacheTTL)

// 03. Check webhook received
prev := atomic.LoadInt32(getReqCnt)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("counter").Increase(1)
return nil
}))
assert.NoError(t, cli.Sync(ctx))
time.Sleep(waitWebhookReceived)
assert.Equal(t, prev+1, atomic.LoadInt32(getReqCnt))
})
assert.NoError(t, err)
return prj
}

func TestRegisterEventWebhook(t *testing.T) {
ctx := context.Background()
svr, adminCli := setupYorkieServer(t, "0ms")
project, cli := createProjectAndClient(t, ctx, adminCli, svr.RPCAddr())

// 01. Attach a new document.
doc, docKey := attachTestDocument(t, ctx, cli)

// 02. Set up the user webhook server.
requestCnt := atomic.NewInt32(0)
userServer := newUserServer(t, requestCnt, project.SecretKey, docKey)
defer userServer.Close()

// 03. Register the event webhook.
prj := registerWebhook(t, ctx, adminCli, project, userServer.URL, []string{string(types.DocRootChanged)})
assert.Equal(t, userServer.URL, prj.EventWebhookURL)
assert.Equal(t, string(types.DocRootChanged), prj.EventWebhookTypes[0])

// 04. Test the DocRootChanged event is sent to the user server
prev := requestCnt.Load()
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("counter").Increase(1)
return nil
}))
assert.NoError(t, cli.Sync(ctx))
assert.Equal(t, prev+1, requestCnt.Load())

// 05. Unregister the event webhook.
prj = registerWebhook(t, ctx, adminCli, project, userServer.URL, []string{})
assert.Equal(t, userServer.URL, prj.EventWebhookURL)
assert.Equal(t, 0, len(prj.EventWebhookTypes))

// 06. Test the DocRootChanged event is not sent to the user server
prev = requestCnt.Load()
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("counter").Increase(1)
return nil
}))
assert.NoError(t, cli.Sync(ctx))
assert.Equal(t, prev, requestCnt.Load())
t.Run("unregister event webhook test", func(t *testing.T) {
// 01. Unregister event webhook
prj, err := adminCli.UpdateProject(ctx, project.ID.String(), &types.UpdatableProjectFields{
EventWebhookURL: &userServer.URL,
EventWebhookTypes: &[]string{},
})
assert.NoError(t, err)
assert.Equal(t, userServer.URL, prj.EventWebhookURL)
assert.Equal(t, 0, len(prj.EventWebhookTypes))

// 02. Wait project cache expired
time.Sleep(projectCacheTTL)

// 03. Check webhook doesn't trigger
prev := atomic.LoadInt32(getReqCnt)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("counter").Increase(1)
return nil
}))
assert.NoError(t, cli.Sync(ctx))

// 04. Wait webhook received
time.Sleep(waitWebhookReceived)
assert.Equal(t, prev, atomic.LoadInt32(getReqCnt))
})
}

func TestDocRootChangedEventWebhook(t *testing.T) {
ctx := context.Background()
svr, adminCli := setupYorkieServer(t, "0ms")
project, cli := createProjectAndClient(t, ctx, adminCli, svr.RPCAddr())

t.Run("DocRootChanged event test", func(t *testing.T) {
// 01. Attach a new document.
doc, docKey := attachTestDocument(t, ctx, cli)
svr := newYorkieServer(t, "0ms", "default")
adminCli := helper.CreateAdminCli(t, svr.RPCAddr())

// 02. Set up the user webhook server.
requestCnt := atomic.NewInt32(0)
userServer := newUserServer(t, requestCnt, project.SecretKey, docKey)
defer userServer.Close()
project, err := adminCli.CreateProject(ctx, "doc-root-changed-event-webhook")
assert.NoError(t, err)

// 03. Register the event webhook.
prj := registerWebhook(t, ctx, adminCli, project, userServer.URL, []string{string(types.DocRootChanged)})
assert.Equal(t, userServer.URL, prj.EventWebhookURL)
assert.Equal(t, string(types.DocRootChanged), prj.EventWebhookTypes[0])
doc := document.New(helper.TestDocKey(t))
userServer, getReqCnt := newWebhookServer(t, project.SecretKey, doc.Key().String())

project.EventWebhookURL = userServer.URL
_, err = adminCli.UpdateProject(ctx, project.ID.String(), &types.UpdatableProjectFields{
EventWebhookURL: &project.EventWebhookURL,
EventWebhookTypes: &[]string{string(types.DocRootChanged)},
})
assert.NoError(t, err)

// 04. Update only root.
prev := requestCnt.Load()
cli := newActivatedClient(t, ctx, svr.RPCAddr(), project.PublicKey)

assert.NoError(t, cli.Attach(ctx, doc, client.WithInitialRoot(map[string]any{
"counter": json.NewCounter(0, crdt.LongCnt),
})))

waitWebhookReceived := 10 * time.Millisecond
t.Run("root element changed test", func(t *testing.T) {
prev := atomic.LoadInt32(getReqCnt)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("counter").Increase(1)
return nil
}))
assert.NoError(t, cli.Sync(ctx))
assert.Equal(t, prev+1, requestCnt.Load())
time.Sleep(waitWebhookReceived)
assert.Equal(t, prev+1, atomic.LoadInt32(getReqCnt))
})

// 05. Update only presence.
prev = requestCnt.Load()
t.Run("presence changed test", func(t *testing.T) {
prev := atomic.LoadInt32(getReqCnt)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Set("update", "2")
return nil
}))
assert.NoError(t, cli.Sync(ctx))
assert.Equal(t, prev, requestCnt.Load())
time.Sleep(waitWebhookReceived)
assert.Equal(t, prev, atomic.LoadInt32(getReqCnt))
})

// 06. Update both root and presence.
prev = requestCnt.Load()
t.Run("root element and presence changed test", func(t *testing.T) {
prev := atomic.LoadInt32(getReqCnt)
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Set("update", "3")
root.GetCounter("counter").Increase(1)
return nil
}))
assert.NoError(t, cli.Sync(ctx))
assert.Equal(t, prev+1, requestCnt.Load())
time.Sleep(waitWebhookReceived)
assert.Equal(t, prev+1, atomic.LoadInt32(getReqCnt))
})
}

func TestEventWebhookCache(t *testing.T) {
cacheTTL := 10 * time.Millisecond
ctx := context.Background()
svr, adminCli := setupYorkieServer(t, cacheTTL.String())
project, cli := createProjectAndClient(t, ctx, adminCli, svr.RPCAddr())

t.Run("throttling event test", func(t *testing.T) {
// 01. Attach a new document.
doc, docKey := attachTestDocument(t, ctx, cli)
webhookCacheTTL := 10 * time.Millisecond
svr := newYorkieServer(t, webhookCacheTTL.String(), "default")
adminCli := helper.CreateAdminCli(t, svr.RPCAddr())

// 02. Set up the user webhook server.
requestCnt := atomic.NewInt32(0)
userServer := newUserServer(t, requestCnt, project.SecretKey, docKey)
defer userServer.Close()
project, err := adminCli.CreateProject(ctx, "event-webhook-cache-webhook")
assert.NoError(t, err)

// 03. Register the event webhook.
prj := registerWebhook(t, ctx, adminCli, project, userServer.URL, []string{string(types.DocRootChanged)})
assert.Equal(t, userServer.URL, prj.EventWebhookURL)
assert.Equal(t, string(types.DocRootChanged), prj.EventWebhookTypes[0])
doc := document.New(helper.TestDocKey(t))
userServer, getReqCnt := newWebhookServer(t, project.SecretKey, doc.Key().String())
_, err = adminCli.UpdateProject(ctx, project.ID.String(), &types.UpdatableProjectFields{
EventWebhookURL: &userServer.URL,
EventWebhookTypes: &[]string{string(types.DocRootChanged)},
})
assert.NoError(t, err)

// 04. Test request throttled
prevCount := requestCnt.Load()
cli := newActivatedClient(t, ctx, svr.RPCAddr(), project.PublicKey)
assert.NoError(t, cli.Attach(ctx, doc, client.WithInitialRoot(map[string]any{
"counter": json.NewCounter(0, crdt.LongCnt),
})))

waitWebhookReceived := 20 * time.Millisecond
t.Run("throttling event test", func(t *testing.T) {
t.Skip("remove this after implement advanced event cache control")

expectedUpdates := 5
testDuration := cacheTTL * time.Duration(expectedUpdates)
interval := cacheTTL / 10
testDuration := webhookCacheTTL * time.Duration(expectedUpdates)
interval := webhookCacheTTL / 10

ticker := time.NewTicker(interval)
defer ticker.Stop()

timeCtx, cancel := context.WithTimeout(ctx, testDuration)
defer cancel()

prevCnt := atomic.LoadInt32(getReqCnt)
for {
select {
case <-ticker.C:
Expand All @@ -271,7 +294,8 @@ func TestEventWebhookCache(t *testing.T) {
}))
assert.NoError(t, cli.Sync(ctx))
case <-timeCtx.Done():
assert.Equal(t, prevCount+int32(expectedUpdates), requestCnt.Load(), "Throttling did not trigger expected number of updates")
time.Sleep(waitWebhookReceived)
assert.Equal(t, prevCnt+int32(expectedUpdates), atomic.LoadInt32(getReqCnt))
return
}
}
Expand Down

0 comments on commit 3c29e62

Please sign in to comment.