From 534ef25a3b9288e8a00ddccd879d4c14261bb1a4 Mon Sep 17 00:00:00 2001 From: Sergey Wh1te <102730938+sergeyWh1te@users.noreply.github.com> Date: Tue, 8 Oct 2024 16:31:55 +0300 Subject: [PATCH] feat: Split up feeder, forwarder to independent bin applications (#32) Signed-off-by: sergeyWh1te --- Changelog.md | 4 + Dockerfile | 4 +- Makefile | 2 +- cmd/feeder/main.go | 73 +++++++++++++++++++ cmd/{worker => forwarder}/main.go | 20 ++--- docker-compose.yml | 40 +++++++++- .../worker.go => forwarder/forwarder.go} | 2 +- 7 files changed, 125 insertions(+), 20 deletions(-) create mode 100644 cmd/feeder/main.go rename cmd/{worker => forwarder}/main.go (77%) rename internal/app/{worker/worker.go => forwarder/forwarder.go} (99%) diff --git a/Changelog.md b/Changelog.md index b8285f2..cd2249e 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,3 +1,7 @@ +## 08.10.2024 +1. Split up feeder, forwarder to independent bin applications +2. Added mechanism for using UniqueKey for collecting quorum + ## 04.10.2024 1. Removed Forta integration. 2. Fixed issues with Telegram markdown formatting. diff --git a/Dockerfile b/Dockerfile index da7e160..4503426 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,8 @@ COPY . . RUN apk add git=2.45.2-r0 -RUN go build -ldflags="-X github.com/lidofinance/finding-forwarder/internal/connectors/metrics.Commit=$(git rev-parse HEAD)" -o ./bin/worker ./cmd/worker +RUN go build -ldflags="-X github.com/lidofinance/finding-forwarder/internal/connectors/metrics.Commit=$(git rev-parse HEAD)" -o ./bin/feeder ./cmd/feeder +RUN go build -ldflags="-X github.com/lidofinance/finding-forwarder/internal/connectors/metrics.Commit=$(git rev-parse HEAD)" -o ./bin/forwarder ./cmd/forwarder # Run stage FROM alpine:3.20 @@ -17,4 +18,3 @@ RUN apk add --no-cache ca-certificates COPY --from=builder /go/src/app/bin . USER nobody -CMD ["/app/main"] diff --git a/Makefile b/Makefile index 3a29e70..1e690d1 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ ### GO tools # Makefile generate-docker: - docker build -t monitoring/ff -f Dockerfile . + docker build -t ff-monitoring -f Dockerfile . .PHONY: generate-docker .PHONY: tools diff --git a/cmd/feeder/main.go b/cmd/feeder/main.go new file mode 100644 index 0000000..0998318 --- /dev/null +++ b/cmd/feeder/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/go-chi/chi/v5" + "github.com/nats-io/nats.go/jetstream" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/errgroup" + + "github.com/lidofinance/finding-forwarder/internal/app/feeder" + "github.com/lidofinance/finding-forwarder/internal/app/server" + "github.com/lidofinance/finding-forwarder/internal/connectors/logger" + "github.com/lidofinance/finding-forwarder/internal/connectors/metrics" + nc "github.com/lidofinance/finding-forwarder/internal/connectors/nats" + "github.com/lidofinance/finding-forwarder/internal/env" +) + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + defer stop() + g, gCtx := errgroup.WithContext(ctx) + + cfg, envErr := env.Read("") + if envErr != nil { + fmt.Println("Read env error:", envErr.Error()) + return + } + + log := logger.New(&cfg.AppConfig) + + natsClient, natsErr := nc.New(&cfg.AppConfig, log) + if natsErr != nil { + log.Error(fmt.Sprintf(`Could not connect to nats error: %v`, natsErr)) + return + } + defer natsClient.Close() + log.Info("Nats connected") + + js, jetStreamErr := jetstream.New(natsClient) + if jetStreamErr != nil { + fmt.Println("Could not connect to jetStream error:", jetStreamErr.Error()) + return + } + log.Info("Nats jetStream connected") + + r := chi.NewRouter() + metricsStore := metrics.New(prometheus.NewRegistry(), cfg.AppConfig.MetricsPrefix, cfg.AppConfig.Name, cfg.AppConfig.Env) + + services := server.NewServices(&cfg.AppConfig, metricsStore) + app := server.New(&cfg.AppConfig, log, metricsStore, js, natsClient) + + app.Metrics.BuildInfo.Inc() + + feederWrk := feeder.New(log, services.ChainSrv, js, metricsStore, cfg.AppConfig.BlockTopic) + feederWrk.Run(gCtx, g) + + app.RegisterWorkerRoutes(r) + app.RegisterInfraRoutes(r) + app.RunHTTPServer(gCtx, g, cfg.AppConfig.Port, r) + + log.Info(fmt.Sprintf(`Started %s feeder`, cfg.AppConfig.Name)) + + if err := g.Wait(); err != nil { + log.Error(err.Error()) + } + + fmt.Println(`Main done`) +} diff --git a/cmd/worker/main.go b/cmd/forwarder/main.go similarity index 77% rename from cmd/worker/main.go rename to cmd/forwarder/main.go index 95c6485..fdafcd5 100644 --- a/cmd/worker/main.go +++ b/cmd/forwarder/main.go @@ -16,9 +16,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" - "github.com/lidofinance/finding-forwarder/internal/app/feeder" + "github.com/lidofinance/finding-forwarder/internal/app/forwarder" "github.com/lidofinance/finding-forwarder/internal/app/server" - "github.com/lidofinance/finding-forwarder/internal/app/worker" "github.com/lidofinance/finding-forwarder/internal/connectors/logger" "github.com/lidofinance/finding-forwarder/internal/connectors/metrics" nc "github.com/lidofinance/finding-forwarder/internal/connectors/nats" @@ -101,15 +100,15 @@ func main() { const LruSize = 125 cache := expirable.NewLRU[string, uint](LruSize, nil, time.Minute*10) - protocolWorker := worker.NewFindingWorker( + protocolWorker := forwarder.NewFindingWorker( log, metricsStore, cache, rds, natStream, protocolNatsSubject, cfg.AppConfig.QuorumSize, - worker.WithFindingConsumer(services.OnChainAlertsTelegram, `OnChainAlerts_Telegram_Consumer`, registry.OnChainAlerts, Telegram), - worker.WithFindingConsumer(services.OnChainUpdatesTelegram, `OnChainUpdates_Telegram_Consumer`, registry.OnChainUpdates, Telegram), - worker.WithFindingConsumer(services.ErrorsTelegram, `Protocol_Errors_Telegram_Consumer`, registry.OnChainErrors, Telegram), - worker.WithFindingConsumer(services.Discord, `Protocol_Discord_Consumer`, registry.FallBackAlerts, Discord), - worker.WithFindingConsumer(services.OpsGenie, `Protocol_OpGenie_Consumer`, registry.OnChainAlerts, OpsGenie), + forwarder.WithFindingConsumer(services.OnChainAlertsTelegram, `OnChainAlerts_Telegram_Consumer`, registry.OnChainAlerts, Telegram), + forwarder.WithFindingConsumer(services.OnChainUpdatesTelegram, `OnChainUpdates_Telegram_Consumer`, registry.OnChainUpdates, Telegram), + forwarder.WithFindingConsumer(services.ErrorsTelegram, `Protocol_Errors_Telegram_Consumer`, registry.OnChainErrors, Telegram), + forwarder.WithFindingConsumer(services.Discord, `Protocol_Discord_Consumer`, registry.FallBackAlerts, Discord), + forwarder.WithFindingConsumer(services.OpsGenie, `Protocol_OpGenie_Consumer`, registry.OnChainAlerts, OpsGenie), ) // Listen findings from Nats @@ -118,13 +117,10 @@ func main() { return } - feederWrk := feeder.New(log, services.ChainSrv, js, metricsStore, cfg.AppConfig.BlockTopic) - feederWrk.Run(gCtx, g) - app.RegisterInfraRoutes(r) app.RunHTTPServer(gCtx, g, cfg.AppConfig.Port, r) - log.Info(fmt.Sprintf(`Started %s worker`, cfg.AppConfig.Name)) + log.Info(fmt.Sprintf(`Started %s forwarder`, cfg.AppConfig.Name)) if err := g.Wait(); err != nil { log.Error(err.Error()) diff --git a/docker-compose.yml b/docker-compose.yml index f04bb62..daa48e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,13 +26,13 @@ services: - ./nats/jetstream:/data/jetstream - ./nats/nats.conf:/etc/nats/nats.conf - forwarder-worker: - image: monitoring/ff - container_name: finding-forwarder-worker + forwarder: + image: ff-monitoring + container_name: forwarder build: ./ restart: always command: - - ./worker + - ./forwarder env_file: - .env environment: @@ -58,6 +58,38 @@ services: - redis - nats + feeder: + image: ff-monitoring + container_name: feeder + build: ./ + restart: always + command: + - ./feeder + env_file: + - .env + environment: + - READ_ENV_FROM_SHELL=true + - ENV=${ENV} + - APP_NAME=${APP_NAME} + - PORT=${PORT} + - LOG_FORMAT=${LOG_FORMAT} + - LOG_LEVEL=${LOG_LEVEL} + - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} + - TELEGRAM_ERRORS_CHAT_ID=${TELEGRAM_ERRORS_CHAT_ID} + - TELEGRAM_UPDATES_CHAT_ID=${TELEGRAM_UPDATES_CHAT_ID} + - TELEGRAM_ALERTS_CHAT_ID=${TELEGRAM_ALERTS_CHAT_ID} + - OPSGENIE_API_KEY=${OPSGENIE_API_KEY} + - DISCORD_WEBHOOK_URL=${DISCORD_WEBHOOK_URL} + - NATS_DEFAULT_URL=${NATS_DEFAULT_URL} + - BOT_CONTAINERS=${BOT_CONTAINERS} + - REDIS_ADDRESS=${REDIS_ADDRESS} + - QUORUM_SIZE=${QUORUM_SIZE} + ports: + - "8082:8080" + depends_on: + - redis + - nats + #service-ethereum-steth-v2: # container_name: ethereum-steth-v2 # logging: *default-logging diff --git a/internal/app/worker/worker.go b/internal/app/forwarder/forwarder.go similarity index 99% rename from internal/app/worker/worker.go rename to internal/app/forwarder/forwarder.go index 698714c..4633fa2 100644 --- a/internal/app/worker/worker.go +++ b/internal/app/forwarder/forwarder.go @@ -1,4 +1,4 @@ -package worker +package forwarder import ( "bytes"