Skip to content

Commit

Permalink
feat: Split up feeder, forwarder to independent bin applications (#32)
Browse files Browse the repository at this point in the history
Signed-off-by: sergeyWh1te <[email protected]>
  • Loading branch information
sergeyWh1te authored Oct 8, 2024
1 parent 1ecd683 commit 534ef25
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 20 deletions.
4 changes: 4 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 08.10.2024
1. Split up feeder, forwarder to independent bin applications
2. Added mechanism for using UniqueKey for collecting quorum

## 04.10.2024
1. Removed Forta integration.
2. Fixed issues with Telegram markdown formatting.
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ COPY . .

RUN apk add git=2.45.2-r0

RUN go build -ldflags="-X github.com/lidofinance/finding-forwarder/internal/connectors/metrics.Commit=$(git rev-parse HEAD)" -o ./bin/worker ./cmd/worker
RUN go build -ldflags="-X github.com/lidofinance/finding-forwarder/internal/connectors/metrics.Commit=$(git rev-parse HEAD)" -o ./bin/feeder ./cmd/feeder
RUN go build -ldflags="-X github.com/lidofinance/finding-forwarder/internal/connectors/metrics.Commit=$(git rev-parse HEAD)" -o ./bin/forwarder ./cmd/forwarder

# Run stage
FROM alpine:3.20
Expand All @@ -17,4 +18,3 @@ RUN apk add --no-cache ca-certificates
COPY --from=builder /go/src/app/bin .

USER nobody
CMD ["/app/main"]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
### GO tools
# Makefile
generate-docker:
docker build -t monitoring/ff -f Dockerfile .
docker build -t ff-monitoring -f Dockerfile .
.PHONY: generate-docker

.PHONY: tools
Expand Down
73 changes: 73 additions & 0 deletions cmd/feeder/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/go-chi/chi/v5"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/lidofinance/finding-forwarder/internal/app/feeder"
"github.com/lidofinance/finding-forwarder/internal/app/server"
"github.com/lidofinance/finding-forwarder/internal/connectors/logger"
"github.com/lidofinance/finding-forwarder/internal/connectors/metrics"
nc "github.com/lidofinance/finding-forwarder/internal/connectors/nats"
"github.com/lidofinance/finding-forwarder/internal/env"
)

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
defer stop()
g, gCtx := errgroup.WithContext(ctx)

cfg, envErr := env.Read("")
if envErr != nil {
fmt.Println("Read env error:", envErr.Error())
return
}

log := logger.New(&cfg.AppConfig)

natsClient, natsErr := nc.New(&cfg.AppConfig, log)
if natsErr != nil {
log.Error(fmt.Sprintf(`Could not connect to nats error: %v`, natsErr))
return
}
defer natsClient.Close()
log.Info("Nats connected")

js, jetStreamErr := jetstream.New(natsClient)
if jetStreamErr != nil {
fmt.Println("Could not connect to jetStream error:", jetStreamErr.Error())
return
}
log.Info("Nats jetStream connected")

r := chi.NewRouter()
metricsStore := metrics.New(prometheus.NewRegistry(), cfg.AppConfig.MetricsPrefix, cfg.AppConfig.Name, cfg.AppConfig.Env)

services := server.NewServices(&cfg.AppConfig, metricsStore)
app := server.New(&cfg.AppConfig, log, metricsStore, js, natsClient)

app.Metrics.BuildInfo.Inc()

feederWrk := feeder.New(log, services.ChainSrv, js, metricsStore, cfg.AppConfig.BlockTopic)
feederWrk.Run(gCtx, g)

app.RegisterWorkerRoutes(r)
app.RegisterInfraRoutes(r)
app.RunHTTPServer(gCtx, g, cfg.AppConfig.Port, r)

log.Info(fmt.Sprintf(`Started %s feeder`, cfg.AppConfig.Name))

if err := g.Wait(); err != nil {
log.Error(err.Error())
}

fmt.Println(`Main done`)
}
20 changes: 8 additions & 12 deletions cmd/worker/main.go → cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/lidofinance/finding-forwarder/internal/app/feeder"
"github.com/lidofinance/finding-forwarder/internal/app/forwarder"
"github.com/lidofinance/finding-forwarder/internal/app/server"
"github.com/lidofinance/finding-forwarder/internal/app/worker"
"github.com/lidofinance/finding-forwarder/internal/connectors/logger"
"github.com/lidofinance/finding-forwarder/internal/connectors/metrics"
nc "github.com/lidofinance/finding-forwarder/internal/connectors/nats"
Expand Down Expand Up @@ -101,15 +100,15 @@ func main() {

const LruSize = 125
cache := expirable.NewLRU[string, uint](LruSize, nil, time.Minute*10)
protocolWorker := worker.NewFindingWorker(
protocolWorker := forwarder.NewFindingWorker(
log, metricsStore, cache,
rds, natStream,
protocolNatsSubject, cfg.AppConfig.QuorumSize,
worker.WithFindingConsumer(services.OnChainAlertsTelegram, `OnChainAlerts_Telegram_Consumer`, registry.OnChainAlerts, Telegram),
worker.WithFindingConsumer(services.OnChainUpdatesTelegram, `OnChainUpdates_Telegram_Consumer`, registry.OnChainUpdates, Telegram),
worker.WithFindingConsumer(services.ErrorsTelegram, `Protocol_Errors_Telegram_Consumer`, registry.OnChainErrors, Telegram),
worker.WithFindingConsumer(services.Discord, `Protocol_Discord_Consumer`, registry.FallBackAlerts, Discord),
worker.WithFindingConsumer(services.OpsGenie, `Protocol_OpGenie_Consumer`, registry.OnChainAlerts, OpsGenie),
forwarder.WithFindingConsumer(services.OnChainAlertsTelegram, `OnChainAlerts_Telegram_Consumer`, registry.OnChainAlerts, Telegram),
forwarder.WithFindingConsumer(services.OnChainUpdatesTelegram, `OnChainUpdates_Telegram_Consumer`, registry.OnChainUpdates, Telegram),
forwarder.WithFindingConsumer(services.ErrorsTelegram, `Protocol_Errors_Telegram_Consumer`, registry.OnChainErrors, Telegram),
forwarder.WithFindingConsumer(services.Discord, `Protocol_Discord_Consumer`, registry.FallBackAlerts, Discord),
forwarder.WithFindingConsumer(services.OpsGenie, `Protocol_OpGenie_Consumer`, registry.OnChainAlerts, OpsGenie),
)

// Listen findings from Nats
Expand All @@ -118,13 +117,10 @@ func main() {
return
}

feederWrk := feeder.New(log, services.ChainSrv, js, metricsStore, cfg.AppConfig.BlockTopic)
feederWrk.Run(gCtx, g)

app.RegisterInfraRoutes(r)
app.RunHTTPServer(gCtx, g, cfg.AppConfig.Port, r)

log.Info(fmt.Sprintf(`Started %s worker`, cfg.AppConfig.Name))
log.Info(fmt.Sprintf(`Started %s forwarder`, cfg.AppConfig.Name))

if err := g.Wait(); err != nil {
log.Error(err.Error())
Expand Down
40 changes: 36 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ services:
- ./nats/jetstream:/data/jetstream
- ./nats/nats.conf:/etc/nats/nats.conf

forwarder-worker:
image: monitoring/ff
container_name: finding-forwarder-worker
forwarder:
image: ff-monitoring
container_name: forwarder
build: ./
restart: always
command:
- ./worker
- ./forwarder
env_file:
- .env
environment:
Expand All @@ -58,6 +58,38 @@ services:
- redis
- nats

feeder:
image: ff-monitoring
container_name: feeder
build: ./
restart: always
command:
- ./feeder
env_file:
- .env
environment:
- READ_ENV_FROM_SHELL=true
- ENV=${ENV}
- APP_NAME=${APP_NAME}
- PORT=${PORT}
- LOG_FORMAT=${LOG_FORMAT}
- LOG_LEVEL=${LOG_LEVEL}
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- TELEGRAM_ERRORS_CHAT_ID=${TELEGRAM_ERRORS_CHAT_ID}
- TELEGRAM_UPDATES_CHAT_ID=${TELEGRAM_UPDATES_CHAT_ID}
- TELEGRAM_ALERTS_CHAT_ID=${TELEGRAM_ALERTS_CHAT_ID}
- OPSGENIE_API_KEY=${OPSGENIE_API_KEY}
- DISCORD_WEBHOOK_URL=${DISCORD_WEBHOOK_URL}
- NATS_DEFAULT_URL=${NATS_DEFAULT_URL}
- BOT_CONTAINERS=${BOT_CONTAINERS}
- REDIS_ADDRESS=${REDIS_ADDRESS}
- QUORUM_SIZE=${QUORUM_SIZE}
ports:
- "8082:8080"
depends_on:
- redis
- nats

#service-ethereum-steth-v2:
# container_name: ethereum-steth-v2
# logging: *default-logging
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package worker
package forwarder

import (
"bytes"
Expand Down

0 comments on commit 534ef25

Please sign in to comment.