Skip to content

Commit

Permalink
feat(blobstorage): add health check, change regular mux for echo, fil…
Browse files Browse the repository at this point in the history
…ter changes (#16449)
  • Loading branch information
cyberhorsey authored Mar 15, 2024
1 parent ecbf797 commit ee1233d
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 190 deletions.
2 changes: 1 addition & 1 deletion packages/blobstorage/.default.server.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PORT=3282
HTTP_PORT=3282
DATABASE_HOST=localhost
DATABASE_PORT=27017
DATABASE_USER=root
Expand Down
72 changes: 72 additions & 0 deletions packages/blobstorage/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package api

import (
"context"
"fmt"

nethttp "net/http"

"github.com/labstack/echo/v4"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/http"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/repo"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slog"
)

type API struct {
srv *http.Server
port int
}

func (a *API) InitFromCli(ctx context.Context, c *cli.Context) error {
cfg, err := NewConfigFromCliContext(c)
if err != nil {
return err
}

return InitFromConfig(ctx, a, cfg)
}

// InitFromConfig inits a new Server from a provided Config struct
func InitFromConfig(ctx context.Context, a *API, cfg *Config) (err error) {
db, err := cfg.OpenDBFunc()
if err != nil {
return err
}

blobHashRepo, err := repo.NewBlobHashRepository(db)
if err != nil {
return err
}

srv, err := http.NewServer(http.NewServerOpts{
BlobHashRepo: blobHashRepo,
Echo: echo.New(),
})
if err != nil {
return err
}

a.srv = srv

a.port = int(cfg.Port)

return nil
}

func (a *API) Start() error {
go func() {
if err := a.srv.Start(fmt.Sprintf(":%v", a.port)); err != nil && err != nethttp.ErrServerClosed {
slog.Error("error starting server", "error", err)
}
}()

return nil
}

func (a *API) Close(ctx context.Context) {
}

func (a *API) Name() string {
return "server"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package api

import (
"database/sql"
Expand Down
22 changes: 22 additions & 0 deletions packages/blobstorage/cmd/flags/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package flags

import (
"github.com/urfave/cli/v2"
)

var (
apiCategory = "API"
)

var (
Port = &cli.UintFlag{
Name: "httpPort",
Usage: "Port to run server on",
Category: apiCategory,
EnvVars: []string{"HTTP_PORT"},
}
)

var APIFlags = MergeFlags(DatabaseFlags, CommonFlags, []cli.Flag{
Port,
})
22 changes: 0 additions & 22 deletions packages/blobstorage/cmd/flags/server.go

This file was deleted.

6 changes: 3 additions & 3 deletions packages/blobstorage/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"os"

"github.com/joho/godotenv"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/api"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/cmd/flags"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/cmd/utils"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/indexer"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/server"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -45,10 +45,10 @@ func main() {
},
{
Name: "server",
Flags: flags.ServerFlags,
Flags: flags.APIFlags,
Usage: "Starts the server software",
Description: "Taiko blobcatcher server software",
Action: utils.SubcommandAction(new(server.Server)),
Action: utils.SubcommandAction(new(api.API)),
},
}

Expand Down
76 changes: 76 additions & 0 deletions packages/blobstorage/pkg/http/get_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package http

import (
"errors"
"net/http"
"strings"

"github.com/cyberhorsey/webutils"
echo "github.com/labstack/echo/v4"
"gorm.io/gorm"
)

type resp struct {
Data []blobData `bson:"data" json:"data"`
}

type blobData struct {
Blob string `bson:"blob_hash" json:"blob_hash"`
KzgCommitment string `bson:"kzg_commitment" json:"kzg_commitment"`
}

func (srv *Server) GetBlob(c echo.Context) error {
blobHashes := c.QueryParam("blobHash")
if blobHashes == "" {
return webutils.LogAndRenderErrors(c, http.StatusBadRequest, errors.New("empty blobHash queryparam"))
}

data, err := srv.getBlobData(strings.Split(blobHashes, ","))
if err != nil {
return webutils.LogAndRenderErrors(c, http.StatusBadRequest, err)
}

response := resp{
Data: make([]blobData, 0),
}

// Convert data to the correct type
for _, d := range data {
response.Data = append(response.Data, blobData{
Blob: d.Blob,
KzgCommitment: d.KzgCommitment,
},
)
}

return c.JSON(http.StatusOK, response)
}

// getBlobData retrieves blob data from MongoDB based on blobHashes.
func (srv *Server) getBlobData(blobHashes []string) ([]blobData, error) {
var results []blobData

for _, blobHash := range blobHashes {
var result blobData

bh, err := srv.blobHashRepo.FirstByBlobHash(blobHash)

if err != nil {
if err == gorm.ErrRecordNotFound {
// Handle case where blob hash is not found
result.Blob = "NOT_FOUND"
result.KzgCommitment = "NOT_FOUND"
} else {
// Return error for other types of errors
return nil, err
}
} else {
result.Blob = bh.BlobHash
result.KzgCommitment = bh.KzgCommitment

results = append(results, result)
}
}

return results, nil
}
115 changes: 115 additions & 0 deletions packages/blobstorage/pkg/http/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package http

import (
"context"
"math/big"
"net/http"
"os"

"github.com/labstack/echo/v4/middleware"
"github.com/taikoxyz/taiko-mono/packages/blobstorage"

echo "github.com/labstack/echo/v4"
)

type ethClient interface {
BlockNumber(ctx context.Context) (uint64, error)
ChainID(ctx context.Context) (*big.Int, error)
}

// @title Taiko Relayer API
// @version 1.0
// @termsOfService http://swagger.io/terms/

// @contact.name API Support
// @contact.url https://community.taiko.xyz/
// @contact.email [email protected]

// @license.name MIT

// @host relayer.katla.taiko.xyz
// Server represents an relayer http server instance.
type Server struct {
echo *echo.Echo
blobHashRepo blobstorage.BlobHashRepository
}

type NewServerOpts struct {
Echo *echo.Echo
CorsOrigins []string
BlobHashRepo blobstorage.BlobHashRepository
}

func NewServer(opts NewServerOpts) (*Server, error) {
srv := &Server{
echo: opts.Echo,
blobHashRepo: opts.BlobHashRepo,
}

corsOrigins := opts.CorsOrigins
if corsOrigins == nil {
corsOrigins = []string{"*"}
}

srv.configureMiddleware(corsOrigins)
srv.configureRoutes()

return srv, nil
}

// Start starts the HTTP server
func (srv *Server) Start(address string) error {
return srv.echo.Start(address)
}

// Shutdown shuts down the HTTP server
func (srv *Server) Shutdown(ctx context.Context) error {
return srv.echo.Shutdown(ctx)
}

// ServeHTTP implements the `http.Handler` interface which serves HTTP requests
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
srv.echo.ServeHTTP(w, r)
}

// Health endpoints for probes
func (srv *Server) Health(c echo.Context) error {
return c.NoContent(http.StatusOK)
}

func LogSkipper(c echo.Context) bool {
switch c.Request().URL.Path {
case "/healthz":
return true
case "/metrics":
return true
default:
return true
}
}

func (srv *Server) configureMiddleware(corsOrigins []string) {
srv.echo.Use(middleware.RequestID())

srv.echo.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
Skipper: LogSkipper,
Format: `{"time":"${time_rfc3339_nano}","level":"INFO","message":{"id":"${id}","remote_ip":"${remote_ip}",` + //nolint:lll
`"host":"${host}","method":"${method}","uri":"${uri}","user_agent":"${user_agent}",` + //nolint:lll
`"response_status":${status},"error":"${error}","latency":${latency},"latency_human":"${latency_human}",` +
`"bytes_in":${bytes_in},"bytes_out":${bytes_out}}}` + "\n",
Output: os.Stdout,
}))

srv.echo.Use(middleware.CORSWithConfig(middleware.CORSConfig{
AllowOrigins: corsOrigins,
AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept},
AllowMethods: []string{http.MethodGet, http.MethodHead},
}))
}

func (srv *Server) configureRoutes() {
srv.echo.GET("/healthz", srv.Health)
srv.echo.GET("/", srv.Health)

srv.echo.GET("/getBlob", srv.GetBlob)
}
Loading

0 comments on commit ee1233d

Please sign in to comment.