Skip to content

Commit

Permalink
fix logger, revert main
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Oct 5, 2024
1 parent 2e93085 commit fc4230b
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 50 deletions.
53 changes: 18 additions & 35 deletions cmd/xpmt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,24 @@ import (
)

func main() {
// connTimeout := 2 * time.Second
// api := local.NewFlagConfigStreamApiV2("server-tUTqR62DZefq7c73zMpbIr1M5VDtwY8T", "https://skylab-stream.stag2.amplitude.com", connTimeout)
// cohortStorage := local.NewInMemoryCohortStorage()
// flagConfigStorage := local.NewInMemoryFlagConfigStorage()
// dr := local.NewDeploymentRunner(
// local.DefaultConfig,
// local.NewFlagConfigApiV2("server-tUTqR62DZefq7c73zMpbIr1M5VDtwY8T", "https://skylab-api.staging.amplitude.com", connTimeout),
// api,
// flagConfigStorage, cohortStorage, nil)
// dr.Start()

// for {
// // fmt.Printf("%v+\n", time.Now())
// fmt.Println(flagConfigStorage.GetFlagConfigs())
// time.Sleep(500 * time.Millisecond)
// }

// if len(os.Args) < 2 {
// fmt.Printf("error: command required\n")
// fmt.Printf("Available commands:\n" +
// " fetch\n" +
// " flags\n" +
// " evaluate\n")
// return
// }
// switch os.Args[1] {
// case "fetch":
// fetch()
// case "flags":
// flags()
// case "evaluate":
// evaluate()
// default:
// fmt.Printf("error: unknown sub-command '%v'", os.Args[1])
// }
if len(os.Args) < 2 {
fmt.Printf("error: command required\n")
fmt.Printf("Available commands:\n" +
" fetch\n" +
" flags\n" +
" evaluate\n")
return
}
switch os.Args[1] {
case "fetch":
fetch()
case "flags":
flags()
case "evaluate":
evaluate()
default:
fmt.Printf("error: unknown sub-command '%v'", os.Args[1])
}
}

func fetch() {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/amplitude/analytics-go v1.0.1
github.com/jarcoal/httpmock v1.3.1
github.com/joho/godotenv v1.5.1
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/r3labs/sse/v2 v2.10.0
github.com/stretchr/testify v1.9.0
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmaxmax/go-sse v0.8.0 h1:pPpTgyyi1r7vG2o6icebnpGEh3ebcnBXqDWkb7aTofs=
github.com/tmaxmax/go-sse v0.8.0/go.mod h1:HLoxqxdH+7oSUItjtnpxjzJedfr/+Rrm/dNWBcTxJFM=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Initialize(apiKey string, config *Config) *Client {
var deploymentRunner *deploymentRunner
if config.CohortSyncConfig != nil {
cohortDownloadApi := newDirectCohortDownloadApi(config.CohortSyncConfig.ApiKey, config.CohortSyncConfig.SecretKey, config.CohortSyncConfig.MaxCohortSize, config.CohortSyncConfig.CohortServerUrl, config.Debug)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage, config.Debug)
}
var flagStreamApi *flagConfigStreamApiV2
if config.StreamUpdates {
Expand Down
11 changes: 8 additions & 3 deletions pkg/experiment/local/cohort_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ package local

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/amplitude/experiment-go-server/internal/logger"
)

type cohortLoader struct {
log *logger.Log
cohortDownloadApi cohortDownloadApi
cohortStorage cohortStorage
jobs sync.Map
executor *sync.Pool
lockJobs sync.Mutex
}

func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortStorage) *cohortLoader {
func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortStorage, debug bool) *cohortLoader {
return &cohortLoader{
cohortDownloadApi: cohortDownloadApi,
cohortStorage: cohortStorage,
Expand All @@ -23,6 +27,7 @@ func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortSt
return &CohortLoaderTask{}
},
},
log: logger.New(debug),
}
}

Expand Down Expand Up @@ -111,10 +116,10 @@ func (cl *cohortLoader) downloadCohorts(cohortIDs map[string]struct{}) {
var errorMessages []string
for err := range errorChan {
errorMessages = append(errorMessages, err.Error())
// dr.log.Error("Error downloading cohort: %v", err)
cl.log.Error("Error downloading cohort: %v", err)
}

if len(errorMessages) > 0 {
// dr.log.Error("One or more cohorts failed to download:\n%s", strings.Join(errorMessages, "\n"))
cl.log.Error("One or more cohorts failed to download:\n%s", strings.Join(errorMessages, "\n"))
}
}
6 changes: 3 additions & 3 deletions pkg/experiment/local/cohort_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestLoadSuccess(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

// Define mock behavior
api.On("getCohort", "a", mock.AnythingOfType("*local.Cohort")).Return(&Cohort{Id: "a", LastModified: 0, Size: 1, MemberIds: []string{"1"}, GroupType: userGroupType}, nil)
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestLoadSuccess(t *testing.T) {
func TestFilterCohortsAlreadyComputed(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

storage.putCohort(&Cohort{Id: "a", LastModified: 0, Size: 0, MemberIds: []string{}})
storage.putCohort(&Cohort{Id: "b", LastModified: 0, Size: 0, MemberIds: []string{}})
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestFilterCohortsAlreadyComputed(t *testing.T) {
func TestLoadDownloadFailureThrows(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

// Define mock behavior
api.On("getCohort", "a", mock.AnythingOfType("*local.Cohort")).Return(&Cohort{Id: "a", LastModified: 0, Size: 1, MemberIds: []string{"1"}, GroupType: userGroupType}, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/deployment_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestStartThrowsIfFirstFlagConfigLoadFails(t *testing.T) {
cohortDownloadAPI := &mockCohortDownloadApi{}
flagConfigStorage := newInMemoryFlagConfigStorage()
cohortStorage := newInMemoryCohortStorage()
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage)
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage, true)

runner := newDeploymentRunner(
&Config{},
Expand All @@ -46,7 +46,7 @@ func TestStartSucceedsEvenIfFirstCohortLoadFails(t *testing.T) {
}}
flagConfigStorage := newInMemoryFlagConfigStorage()
cohortStorage := newInMemoryCohortStorage()
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage)
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage, true)

runner := newDeploymentRunner(
DefaultConfig,
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/flag_config_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func createTestPollerObjs() (mockFlagConfigApi, flagConfigStorage, cohortStorage
cohortDownloadAPI := &mockCohortDownloadApi{}
flagConfigStorage := newInMemoryFlagConfigStorage()
cohortStorage := newInMemoryCohortStorage()
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage)
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage, true)
return api, flagConfigStorage, cohortStorage, cohortLoader
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func createTestStreamerObjs() (mockFlagConfigStreamApi, flagConfigStorage, cohor
cohortDownloadAPI := &mockCohortDownloadApi{}
flagConfigStorage := newInMemoryFlagConfigStorage()
cohortStorage := newInMemoryCohortStorage()
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage)
cohortLoader := newCohortLoader(cohortDownloadAPI, cohortStorage, true)
return api, flagConfigStorage, cohortStorage, cohortLoader
}

Expand Down

0 comments on commit fc4230b

Please sign in to comment.