diff --git a/packages/blobstorage/.default.server.env b/packages/blobstorage/.default.server.env index 446955e696..7c469c7f07 100644 --- a/packages/blobstorage/.default.server.env +++ b/packages/blobstorage/.default.server.env @@ -1,4 +1,4 @@ -PORT=3282 +HTTP_PORT=3282 DATABASE_HOST=localhost DATABASE_PORT=27017 DATABASE_USER=root diff --git a/packages/blobstorage/api/api.go b/packages/blobstorage/api/api.go new file mode 100644 index 0000000000..595b64c104 --- /dev/null +++ b/packages/blobstorage/api/api.go @@ -0,0 +1,72 @@ +package api + +import ( + "context" + "fmt" + + nethttp "net/http" + + "github.com/labstack/echo/v4" + "github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/http" + "github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/repo" + "github.com/urfave/cli/v2" + "golang.org/x/exp/slog" +) + +type API struct { + srv *http.Server + port int +} + +func (a *API) InitFromCli(ctx context.Context, c *cli.Context) error { + cfg, err := NewConfigFromCliContext(c) + if err != nil { + return err + } + + return InitFromConfig(ctx, a, cfg) +} + +// InitFromConfig inits a new Server from a provided Config struct +func InitFromConfig(ctx context.Context, a *API, cfg *Config) (err error) { + db, err := cfg.OpenDBFunc() + if err != nil { + return err + } + + blobHashRepo, err := repo.NewBlobHashRepository(db) + if err != nil { + return err + } + + srv, err := http.NewServer(http.NewServerOpts{ + BlobHashRepo: blobHashRepo, + Echo: echo.New(), + }) + if err != nil { + return err + } + + a.srv = srv + + a.port = int(cfg.Port) + + return nil +} + +func (a *API) Start() error { + go func() { + if err := a.srv.Start(fmt.Sprintf(":%v", a.port)); err != nil && err != nethttp.ErrServerClosed { + slog.Error("error starting server", "error", err) + } + }() + + return nil +} + +func (a *API) Close(ctx context.Context) { +} + +func (a *API) Name() string { + return "server" +} diff --git a/packages/blobstorage/server/config.go b/packages/blobstorage/api/config.go similarity index 99% rename from packages/blobstorage/server/config.go rename to packages/blobstorage/api/config.go index b94726bba8..6c7cd95eb3 100644 --- a/packages/blobstorage/server/config.go +++ b/packages/blobstorage/api/config.go @@ -1,4 +1,4 @@ -package server +package api import ( "database/sql" diff --git a/packages/blobstorage/cmd/flags/api.go b/packages/blobstorage/cmd/flags/api.go new file mode 100644 index 0000000000..84b03e38e7 --- /dev/null +++ b/packages/blobstorage/cmd/flags/api.go @@ -0,0 +1,22 @@ +package flags + +import ( + "github.com/urfave/cli/v2" +) + +var ( + apiCategory = "API" +) + +var ( + Port = &cli.UintFlag{ + Name: "httpPort", + Usage: "Port to run server on", + Category: apiCategory, + EnvVars: []string{"HTTP_PORT"}, + } +) + +var APIFlags = MergeFlags(DatabaseFlags, CommonFlags, []cli.Flag{ + Port, +}) diff --git a/packages/blobstorage/cmd/flags/server.go b/packages/blobstorage/cmd/flags/server.go deleted file mode 100644 index e2d6b2ca43..0000000000 --- a/packages/blobstorage/cmd/flags/server.go +++ /dev/null @@ -1,22 +0,0 @@ -package flags - -import ( - "github.com/urfave/cli/v2" -) - -var ( - serverCategory = "SERVER" -) - -var ( - Port = &cli.UintFlag{ - Name: "port", - Usage: "Block ID to start indexing from", - Category: indexerCategory, - EnvVars: []string{"PORT"}, - } -) - -var ServerFlags = MergeFlags(DatabaseFlags, CommonFlags, []cli.Flag{ - Port, -}) diff --git a/packages/blobstorage/cmd/main.go b/packages/blobstorage/cmd/main.go index 5fbe2eb675..73f2e6c4eb 100644 --- a/packages/blobstorage/cmd/main.go +++ b/packages/blobstorage/cmd/main.go @@ -6,10 +6,10 @@ import ( "os" "github.com/joho/godotenv" + "github.com/taikoxyz/taiko-mono/packages/blobstorage/api" "github.com/taikoxyz/taiko-mono/packages/blobstorage/cmd/flags" "github.com/taikoxyz/taiko-mono/packages/blobstorage/cmd/utils" "github.com/taikoxyz/taiko-mono/packages/blobstorage/indexer" - "github.com/taikoxyz/taiko-mono/packages/blobstorage/server" "github.com/urfave/cli/v2" ) @@ -45,10 +45,10 @@ func main() { }, { Name: "server", - Flags: flags.ServerFlags, + Flags: flags.APIFlags, Usage: "Starts the server software", Description: "Taiko blobcatcher server software", - Action: utils.SubcommandAction(new(server.Server)), + Action: utils.SubcommandAction(new(api.API)), }, } diff --git a/packages/blobstorage/pkg/http/get_blob.go b/packages/blobstorage/pkg/http/get_blob.go new file mode 100644 index 0000000000..c28e99df4e --- /dev/null +++ b/packages/blobstorage/pkg/http/get_blob.go @@ -0,0 +1,76 @@ +package http + +import ( + "errors" + "net/http" + "strings" + + "github.com/cyberhorsey/webutils" + echo "github.com/labstack/echo/v4" + "gorm.io/gorm" +) + +type resp struct { + Data []blobData `bson:"data" json:"data"` +} + +type blobData struct { + Blob string `bson:"blob_hash" json:"blob_hash"` + KzgCommitment string `bson:"kzg_commitment" json:"kzg_commitment"` +} + +func (srv *Server) GetBlob(c echo.Context) error { + blobHashes := c.QueryParam("blobHash") + if blobHashes == "" { + return webutils.LogAndRenderErrors(c, http.StatusBadRequest, errors.New("empty blobHash queryparam")) + } + + data, err := srv.getBlobData(strings.Split(blobHashes, ",")) + if err != nil { + return webutils.LogAndRenderErrors(c, http.StatusBadRequest, err) + } + + response := resp{ + Data: make([]blobData, 0), + } + + // Convert data to the correct type + for _, d := range data { + response.Data = append(response.Data, blobData{ + Blob: d.Blob, + KzgCommitment: d.KzgCommitment, + }, + ) + } + + return c.JSON(http.StatusOK, response) +} + +// getBlobData retrieves blob data from MongoDB based on blobHashes. +func (srv *Server) getBlobData(blobHashes []string) ([]blobData, error) { + var results []blobData + + for _, blobHash := range blobHashes { + var result blobData + + bh, err := srv.blobHashRepo.FirstByBlobHash(blobHash) + + if err != nil { + if err == gorm.ErrRecordNotFound { + // Handle case where blob hash is not found + result.Blob = "NOT_FOUND" + result.KzgCommitment = "NOT_FOUND" + } else { + // Return error for other types of errors + return nil, err + } + } else { + result.Blob = bh.BlobHash + result.KzgCommitment = bh.KzgCommitment + + results = append(results, result) + } + } + + return results, nil +} diff --git a/packages/blobstorage/pkg/http/server.go b/packages/blobstorage/pkg/http/server.go new file mode 100644 index 0000000000..063d1aa855 --- /dev/null +++ b/packages/blobstorage/pkg/http/server.go @@ -0,0 +1,115 @@ +package http + +import ( + "context" + "math/big" + "net/http" + "os" + + "github.com/labstack/echo/v4/middleware" + "github.com/taikoxyz/taiko-mono/packages/blobstorage" + + echo "github.com/labstack/echo/v4" +) + +type ethClient interface { + BlockNumber(ctx context.Context) (uint64, error) + ChainID(ctx context.Context) (*big.Int, error) +} + +// @title Taiko Relayer API +// @version 1.0 +// @termsOfService http://swagger.io/terms/ + +// @contact.name API Support +// @contact.url https://community.taiko.xyz/ +// @contact.email info@taiko.xyz + +// @license.name MIT + +// @host relayer.katla.taiko.xyz +// Server represents an relayer http server instance. +type Server struct { + echo *echo.Echo + blobHashRepo blobstorage.BlobHashRepository +} + +type NewServerOpts struct { + Echo *echo.Echo + CorsOrigins []string + BlobHashRepo blobstorage.BlobHashRepository +} + +func NewServer(opts NewServerOpts) (*Server, error) { + srv := &Server{ + echo: opts.Echo, + blobHashRepo: opts.BlobHashRepo, + } + + corsOrigins := opts.CorsOrigins + if corsOrigins == nil { + corsOrigins = []string{"*"} + } + + srv.configureMiddleware(corsOrigins) + srv.configureRoutes() + + return srv, nil +} + +// Start starts the HTTP server +func (srv *Server) Start(address string) error { + return srv.echo.Start(address) +} + +// Shutdown shuts down the HTTP server +func (srv *Server) Shutdown(ctx context.Context) error { + return srv.echo.Shutdown(ctx) +} + +// ServeHTTP implements the `http.Handler` interface which serves HTTP requests +func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + srv.echo.ServeHTTP(w, r) +} + +// Health endpoints for probes +func (srv *Server) Health(c echo.Context) error { + return c.NoContent(http.StatusOK) +} + +func LogSkipper(c echo.Context) bool { + switch c.Request().URL.Path { + case "/healthz": + return true + case "/metrics": + return true + default: + return true + } +} + +func (srv *Server) configureMiddleware(corsOrigins []string) { + srv.echo.Use(middleware.RequestID()) + + srv.echo.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ + Skipper: LogSkipper, + Format: `{"time":"${time_rfc3339_nano}","level":"INFO","message":{"id":"${id}","remote_ip":"${remote_ip}",` + //nolint:lll + `"host":"${host}","method":"${method}","uri":"${uri}","user_agent":"${user_agent}",` + //nolint:lll + `"response_status":${status},"error":"${error}","latency":${latency},"latency_human":"${latency_human}",` + + `"bytes_in":${bytes_in},"bytes_out":${bytes_out}}}` + "\n", + Output: os.Stdout, + })) + + srv.echo.Use(middleware.CORSWithConfig(middleware.CORSConfig{ + AllowOrigins: corsOrigins, + AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept}, + AllowMethods: []string{http.MethodGet, http.MethodHead}, + })) +} + +func (srv *Server) configureRoutes() { + srv.echo.GET("/healthz", srv.Health) + srv.echo.GET("/", srv.Health) + + srv.echo.GET("/getBlob", srv.GetBlob) +} diff --git a/packages/blobstorage/server/server.go b/packages/blobstorage/server/server.go deleted file mode 100644 index a6ccd3a540..0000000000 --- a/packages/blobstorage/server/server.go +++ /dev/null @@ -1,135 +0,0 @@ -package server - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "net/http" - "strings" - - "github.com/gorilla/mux" - blobstorage "github.com/taikoxyz/taiko-mono/packages/blobstorage" - "github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/repo" - "github.com/urfave/cli/v2" - "gorm.io/gorm" -) - -type resp struct { - Data []blobData `bson:"data" json:"data"` -} - -type blobData struct { - Blob string `bson:"blob_hash" json:"blob_hash"` - KzgCommitment string `bson:"kzg_commitment" json:"kzg_commitment"` -} - -type Server struct { - blobHashRepo blobstorage.BlobHashRepository - port int -} - -func (s *Server) InitFromCli(ctx context.Context, c *cli.Context) error { - cfg, err := NewConfigFromCliContext(c) - if err != nil { - return err - } - - return InitFromConfig(ctx, s, cfg) -} - -// InitFromConfig inits a new Server from a provided Config struct -func InitFromConfig(ctx context.Context, s *Server, cfg *Config) (err error) { - db, err := cfg.OpenDBFunc() - if err != nil { - return err - } - - blobHashRepo, err := repo.NewBlobHashRepository(db) - if err != nil { - return err - } - - s.blobHashRepo = blobHashRepo - - s.port = int(cfg.Port) - - return nil -} - -func (s *Server) Start() error { - slog.Info("Server started!") - - r := mux.NewRouter() - - // Handler functions - r.HandleFunc("/getBlob", s.getBlobHandler).Methods("GET") - - http.Handle("/", r) - return http.ListenAndServe(fmt.Sprintf(":%v", s.port), nil) -} - -func (s *Server) Close(ctx context.Context) { -} - -func (s *Server) getBlobHandler(w http.ResponseWriter, r *http.Request) { - blobHashes, ok := r.URL.Query()["blobHash"] - if !ok || len(blobHashes) == 0 { - http.Error(w, "Url Param 'blobHash' is missing", http.StatusBadRequest) - return - } - - data, err := s.getBlobData(strings.Split(blobHashes[0], ",")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - response := resp{ - Data: make([]blobData, 0), - } - - // Convert data to the correct type - for _, d := range data { - response.Data = append(response.Data, blobData{ - Blob: d.Blob, - KzgCommitment: d.KzgCommitment, - }, - ) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) -} - -func (s *Server) Name() string { - return "server" -} - -// getBlobData retrieves blob data from MongoDB based on blobHashes. -func (s *Server) getBlobData(blobHashes []string) ([]blobData, error) { - var results []blobData - - for _, blobHash := range blobHashes { - var result blobData - - bh, err := s.blobHashRepo.FirstByBlobHash(blobHash) - - if err != nil { - if err == gorm.ErrRecordNotFound { - // Handle case where blob hash is not found - result.Blob = "NOT_FOUND" - result.KzgCommitment = "NOT_FOUND" - } else { - // Return error for other types of errors - return nil, err - } - } else { - result.Blob = bh.BlobHash - result.KzgCommitment = bh.KzgCommitment - - results = append(results, result) - } - } - - return results, nil -} diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index b7ee483651..c9e68743a4 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -272,48 +272,26 @@ func (i *Indexer) Start() error { i.wg.Add(1) - go func() { - if err := i.eventLoop(i.ctx, i.latestIndexedBlockNumber); err != nil { - slog.Error("error in event loop", "error", err) - } - }() + go i.eventLoop(i.ctx, i.latestIndexedBlockNumber) return nil } -func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) error { - defer func() { - i.wg.Done() - }() +func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) { + defer i.wg.Done() t := time.NewTicker(10 * time.Second) defer t.Stop() - var filtering bool = false - for { select { case <-ctx.Done(): slog.Info("event loop context done") - return nil + return case <-t.C: - func() { - defer func() { - filtering = false - }() - }() - - if filtering { - continue - } - - filtering = true - - slog.Info("event loop ticker") - - if err := i.withRetry(func() error { return i.filter(ctx) }); err != nil { - return err + if err := i.filter(ctx); err != nil { + slog.Error("error filtering", "error", err) } } }