Skip to content

Commit

Permalink
feat: add flag push (#30)
Browse files Browse the repository at this point in the history
* add flag push

* renamed vars

* use timer instead of wait for channel

* added configs

* added Factory for stream, added stream test

* redo stream using channels, add stream test, fixed stream url config

* added flag config stream api test

* add flag config updater tests

* add stream client test, fix stream concurrency, prioritize ctx check

* fix logger, revert main

* lint

* make types and func non public

* lint

* fix default stream server url

* add config test

* fix stream api deadlock

* fix retry fallback start fail

* comments
  • Loading branch information
zhukaihan authored Nov 4, 2024
1 parent 7064c5d commit 62dad32
Show file tree
Hide file tree
Showing 21 changed files with 2,242 additions and 172 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ require (
github.com/amplitude/analytics-go v1.0.1
github.com/jarcoal/httpmock v1.3.1
github.com/joho/godotenv v1.5.1
github.com/r3labs/sse/v2 v2.10.0
github.com/stretchr/testify v1.9.0
gopkg.in/cenkalti/backoff.v1 v1.1.0
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,29 @@ github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04
github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/assignment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestToEvent(t *testing.T) {
},
},
"flag-key-2": {
Key: "control",
Key: "control",
Metadata: map[string]interface{}{
"default": true,
"default": true,
"segmentName": "All Other Users",
"flagVersion": float64(12),
},
Expand Down
14 changes: 11 additions & 3 deletions pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/amplitude/analytics-go/amplitude"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"sync"

"github.com/amplitude/analytics-go/amplitude"

"github.com/amplitude/experiment-go-server/internal/evaluation"

"github.com/amplitude/experiment-go-server/pkg/experiment"
Expand Down Expand Up @@ -59,9 +60,16 @@ func Initialize(apiKey string, config *Config) *Client {
var deploymentRunner *deploymentRunner
if config.CohortSyncConfig != nil {
cohortDownloadApi := newDirectCohortDownloadApi(config.CohortSyncConfig.ApiKey, config.CohortSyncConfig.SecretKey, config.CohortSyncConfig.MaxCohortSize, config.CohortSyncConfig.CohortServerUrl, config.Debug)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage, config.Debug)
}
var flagStreamApi *flagConfigStreamApiV2
if config.StreamUpdates {
flagStreamApi = newFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout)
}
deploymentRunner = newDeploymentRunner(config, newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), flagConfigStorage, cohortStorage, cohortLoader)
deploymentRunner = newDeploymentRunner(
config,
newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout),
flagStreamApi, flagConfigStorage, cohortStorage, cohortLoader)
client = &Client{
log: log,
apiKey: apiKey,
Expand Down
157 changes: 157 additions & 0 deletions pkg/experiment/local/client_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package local

import (
"log"
"os"
"testing"

"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
)

var streamClient *Client

func init() {
err := godotenv.Load()
if err != nil {
log.Printf("Error loading .env file: %v", err)
}
projectApiKey := os.Getenv("API_KEY")
secretKey := os.Getenv("SECRET_KEY")
cohortSyncConfig := CohortSyncConfig{
ApiKey: projectApiKey,
SecretKey: secretKey,
}
streamClient = Initialize("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz",
&Config{
StreamUpdates: true,
StreamServerUrl: "https://stream.lab.amplitude.com",
CohortSyncConfig: &cohortSyncConfig,
})
err = streamClient.Start()
if err != nil {
panic(err)
}
}

func TestMakeSureStreamEnabled(t *testing.T) {
assert.True(t, streamClient.config.StreamUpdates)
}

func TestStreamEvaluate(t *testing.T) {
user := &experiment.User{UserId: "test_user"}
result, err := streamClient.Evaluate(user, nil)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Payload != "payload" {
t.Fatalf("Unexpected variant %v", variant)
}
variant = result["sdk-ci-test"]
if variant.Key != "" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamEvaluateV2AllFlags(t *testing.T) {
user := &experiment.User{UserId: "test_user"}
result, err := streamClient.EvaluateV2(user, nil)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Payload != "payload" {
t.Fatalf("Unexpected variant %v", variant)
}
variant = result["sdk-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamFlagMetadataLocalFlagKey(t *testing.T) {
md := streamClient.FlagMetadata("sdk-local-evaluation-ci-test")
if md["evaluationMode"] != "local" {
t.Fatalf("Unexpected metadata %v", md)
}
}

func TestStreamEvaluateV2Cohort(t *testing.T) {
targetedUser := &experiment.User{UserId: "12345"}
nonTargetedUser := &experiment.User{UserId: "not_targeted"}
flagKeys := []string{"sdk-local-evaluation-user-cohort-ci-test"}
result, err := streamClient.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-user-cohort-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = streamClient.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-user-cohort-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamEvaluateV2GroupCohort(t *testing.T) {
targetedUser := &experiment.User{
UserId: "12345",
DeviceId: "device_id",
Groups: map[string][]string{
"org id": {"1"},
}}
nonTargetedUser := &experiment.User{
UserId: "12345",
DeviceId: "device_id",
Groups: map[string][]string{
"org id": {"not_targeted"},
}}
flagKeys := []string{"sdk-local-evaluation-group-cohort-ci-test"}
result, err := streamClient.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-group-cohort-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = streamClient.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-group-cohort-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}
5 changes: 3 additions & 2 deletions pkg/experiment/local/client_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package local

import (
"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
"log"
"os"
"testing"

"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
)

var client *Client
Expand Down
39 changes: 38 additions & 1 deletion pkg/experiment/local/cohort_loader.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package local

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/amplitude/experiment-go-server/internal/logger"
)

type cohortLoader struct {
log *logger.Log
cohortDownloadApi cohortDownloadApi
cohortStorage cohortStorage
jobs sync.Map
executor *sync.Pool
lockJobs sync.Mutex
}

func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortStorage) *cohortLoader {
func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortStorage, debug bool) *cohortLoader {
return &cohortLoader{
cohortDownloadApi: cohortDownloadApi,
cohortStorage: cohortStorage,
Expand All @@ -22,6 +27,7 @@ func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortSt
return &CohortLoaderTask{}
},
},
log: logger.New(debug),
}
}

Expand Down Expand Up @@ -86,3 +92,34 @@ func (cl *cohortLoader) downloadCohort(cohortID string) (*Cohort, error) {
cohort := cl.cohortStorage.getCohort(cohortID)
return cl.cohortDownloadApi.getCohort(cohortID, cohort)
}

func (cl *cohortLoader) downloadCohorts(cohortIDs map[string]struct{}) {
var wg sync.WaitGroup
errorChan := make(chan error, len(cohortIDs))

for cohortID := range cohortIDs {
wg.Add(1)
go func(id string) {
defer wg.Done()
task := cl.loadCohort(id)
if err := task.wait(); err != nil {
errorChan <- fmt.Errorf("cohort %s: %v", id, err)
}
}(cohortID)
}

go func() {
wg.Wait()
close(errorChan)
}()

var errorMessages []string
for err := range errorChan {
errorMessages = append(errorMessages, err.Error())
cl.log.Error("Error downloading cohort: %v", err)
}

if len(errorMessages) > 0 {
cl.log.Error("One or more cohorts failed to download:\n%s", strings.Join(errorMessages, "\n"))
}
}
6 changes: 3 additions & 3 deletions pkg/experiment/local/cohort_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestLoadSuccess(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

// Define mock behavior
api.On("getCohort", "a", mock.AnythingOfType("*local.Cohort")).Return(&Cohort{Id: "a", LastModified: 0, Size: 1, MemberIds: []string{"1"}, GroupType: userGroupType}, nil)
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestLoadSuccess(t *testing.T) {
func TestFilterCohortsAlreadyComputed(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

storage.putCohort(&Cohort{Id: "a", LastModified: 0, Size: 0, MemberIds: []string{}})
storage.putCohort(&Cohort{Id: "b", LastModified: 0, Size: 0, MemberIds: []string{}})
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestFilterCohortsAlreadyComputed(t *testing.T) {
func TestLoadDownloadFailureThrows(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

// Define mock behavior
api.On("getCohort", "a", mock.AnythingOfType("*local.Cohort")).Return(&Cohort{Id: "a", LastModified: 0, Size: 1, MemberIds: []string{"1"}, GroupType: userGroupType}, nil)
Expand Down
Loading

0 comments on commit 62dad32

Please sign in to comment.