Skip to content

Commit

Permalink
Corrections after PR's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
agourdel committed Jul 15, 2024
1 parent aeb03a8 commit 12cb42c
Show file tree
Hide file tree
Showing 97 changed files with 3,847 additions and 3,410 deletions.
6 changes: 3 additions & 3 deletions ee/webhooks/cmd/flag/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strings"
"time"

component "github.com/formancehq/webhooks/internal/components/commons"
cache "github.com/formancehq/webhooks/internal/app/cache"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand Down Expand Up @@ -57,8 +57,8 @@ func init() {
LoadEnv(viper.GetViper())
}

func LoadRunnerParams() component.RunnerParams {
stateParams := component.DefaultRunnerParams()
func LoadRunnerParams() cache.CacheParams {
stateParams := cache.DefaultCacheParams()
stateParams.MaxRetry = viper.GetInt(MaxRetry)
stateParams.MaxCall = viper.GetInt(MaxCall)
stateParams.TimeOut = viper.GetInt(TimeOut)
Expand Down
39 changes: 39 additions & 0 deletions ee/webhooks/cmd/fx-modules/collector-module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package fxmodules

import (
"context"

"github.com/formancehq/webhooks/internal/app/cache"
webhookcollector "github.com/formancehq/webhooks/internal/app/webhook_collector"
"github.com/formancehq/webhooks/internal/services/httpclient"
storage "github.com/formancehq/webhooks/internal/services/storage/postgres"
"go.uber.org/fx"
)


func InvokeCollector() fx.Option{

return fx.Invoke(func(lc fx.Lifecycle,
database *storage.PostgresStore,
cacheParams *cache.CacheParams,
client *httpclient.DefaultHttpClient,
) {

Collector := webhookcollector.NewCollector(*cacheParams, database, client)
Collector.Init()

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
Collector.Run()
return nil
},
OnStop: func(ctx context.Context) error {
Collector.Stop()
return nil
},
})


})

}
25 changes: 0 additions & 25 deletions ee/webhooks/cmd/fx-modules/fx-modules.go

This file was deleted.

28 changes: 28 additions & 0 deletions ee/webhooks/cmd/fx-modules/server-module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package fxmodules

import (

"github.com/formancehq/stack/libs/go-libs/health"
"github.com/formancehq/webhooks/internal/app/webhook_server/api/router"
apiutils "github.com/formancehq/webhooks/internal/app/webhook_server/api/utils"
httpclient "github.com/formancehq/webhooks/internal/services/httpclient"
storage "github.com/formancehq/webhooks/internal/services/storage/postgres"
"github.com/formancehq/stack/libs/go-libs/httpserver"

"go.uber.org/fx"
)

func InvokeServer() fx.Option {
return fx.Invoke(
func(
lc fx.Lifecycle,
healthcontroller *health.HealthController,
database *storage.PostgresStore,
serverParams *apiutils.DefaultServerParams,
client *httpclient.DefaultHttpClient,
){
router := router.NewRouter(database, client, healthcontroller, serverParams.Auth, serverParams.Info)
lc.Append(httpserver.NewHook(router, httpserver.WithAddress(serverParams.Addr)))
})

}
85 changes: 85 additions & 0 deletions ee/webhooks/cmd/fx-modules/utils-modules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package fxmodules

import (
"net/http"

"github.com/formancehq/stack/libs/go-libs/auth"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/webhooks/cmd/flag"
"github.com/formancehq/webhooks/internal/app/cache"
apiutils "github.com/formancehq/webhooks/internal/app/webhook_server/api/utils"
httpclient "github.com/formancehq/webhooks/internal/services/httpclient"
storage "github.com/formancehq/webhooks/internal/services/storage/postgres"
"github.com/spf13/viper"
"github.com/uptrace/bun"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.uber.org/fx"
)

var Tracer = otel.Tracer("webhook")

func ProvideHttpClient() fx.Option {

return fx.Provide(
func() *httpclient.DefaultHttpClient {

client := http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
}

defaultClient := httpclient.NewDefaultHttpClient(&client)

return &defaultClient

},
)

}

func ProvideCacheParams() fx.Option{

return fx.Provide(
func() *cache.CacheParams {
cacheParams := flag.LoadRunnerParams()
return &cacheParams
},
)

}


func ProvideDatabase() fx.Option {

return fx.Provide(
func(db *bun.DB) *storage.PostgresStore {
database := storage.NewPostgresStoreProvider(db)
return &database
},
)
}

func ProvideServerParams () fx.Option {

return fx.Provide(
func(auth auth.Auth, logger logging.Logger, serviceInfo apiutils.ServiceInfo) *apiutils.DefaultServerParams {
serverParams := apiutils.DefaultServerParams{}
serverParams.Addr = viper.GetString(flag.Listen)
serverParams.Auth = auth
serverParams.Info = serviceInfo
serverParams.Logger = logger

return &serverParams
},
)
}

func ProvideTopics() fx.Option{
return fx.Provide(
func() []string {
return viper.GetStringSlice(flag.KafkaTopics)

},
)
}

51 changes: 51 additions & 0 deletions ee/webhooks/cmd/fx-modules/worker-module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fxmodules

import (
"context"
"fmt"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/formancehq/webhooks/internal/app/cache"
webhookworker "github.com/formancehq/webhooks/internal/app/webhook_worker"
"github.com/formancehq/webhooks/internal/services/httpclient"
storage "github.com/formancehq/webhooks/internal/services/storage/postgres"
"go.uber.org/fx"
)



func InvokeWorker() fx.Option {

return fx.Invoke(func(
lc fx.Lifecycle,
database *storage.PostgresStore,
runnerParams *cache.CacheParams,
client *httpclient.DefaultHttpClient,
r *message.Router,
subscriber message.Subscriber,
topics []string,
){

Worker := webhookworker.NewWorker(*runnerParams, database, client)
Worker.Init()
for _, topic := range topics {
r.AddNoPublisherHandler(fmt.Sprintf("messages-%s", topic), topic, subscriber, Worker.HandleMessage)

}

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {

return nil
},
OnStop: func(ctx context.Context) error {

subscriber.Close()
r.Close()
Worker.Stop()
return nil
},
})
})

}
Loading

0 comments on commit 12cb42c

Please sign in to comment.