Skip to content

Commit

Permalink
add gin handler
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai committed Mar 14, 2024
1 parent 4323231 commit 88c24e5
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cmd/server/bootstrap/xcherry.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
var apiServer api.Server
if services[ApiServiceName] {
apiServer = api.NewDefaultAPIServerWithGin(
rootCtx, *cfg, processStore, logger.WithTags(tag.Service(ApiServiceName)))
rootCtx, *cfg, processStore, visibilityStore, logger.WithTags(tag.Service(ApiServiceName)))
err = apiServer.Start()
if err != nil {
logger.Fatal("Failed to start api server", tag.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/lib/pq v1.2.0
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.3-0.20240312083615-629130811a5b
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f
github.com/xcherryio/sdk-go v0.0.0-20240115163029-e21cc0710e61
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4d
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xcherryio/apis v0.0.3-0.20240312083615-629130811a5b h1:hWg63/NjVmFulP5zeW5I+8CCS9zhyMvyBFlZvs8Uc7E=
github.com/xcherryio/apis v0.0.3-0.20240312083615-629130811a5b/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f h1:csBDKtifwAIRXaHpw3xiUqNDdS0As8OSrflQPr0bTm8=
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/sdk-go v0.0.0-20240115163029-e21cc0710e61 h1:6Xr3S342Di2QuvagFb4uG1AkA8lQLWfED1ynZvnu3V0=
github.com/xcherryio/sdk-go v0.0.0-20240115163029-e21cc0710e61/go.mod h1:Ouc00E061VNVYemKbVQCxB3LSOgIkxV81h//1O1ODws=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
Expand Down
9 changes: 6 additions & 3 deletions persistence/data_models/pagination_token_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ func NewPaginationToken(lastProcessExecutionId string, lastStartTime int64) *Pag
}

Check warning on line 17 in persistence/data_models/pagination_token_json.go

View check run for this annotation

Codecov / codecov/patch

persistence/data_models/pagination_token_json.go#L13-L17

Added lines #L13 - L17 were not covered by tests
}

func (pt *PaginationToken) String() string {
b, _ := json.Marshal(pt)
return string(b)
func (pt *PaginationToken) String() (string, error) {
b, err := json.Marshal(pt)
if err != nil {
return "", err
}
return string(b), nil

Check warning on line 25 in persistence/data_models/pagination_token_json.go

View check run for this annotation

Codecov / codecov/patch

persistence/data_models/pagination_token_json.go#L20-L25

Added lines #L20 - L25 were not covered by tests
}

func ParsePaginationTokenFromString(token string) (*PaginationToken, error) {
Expand Down
14 changes: 10 additions & 4 deletions persistence/visibility/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,17 @@ func (p sqlVisibilityStoreImpl) ListProcessExecutions(
}
}

Check warning on line 177 in persistence/visibility/visibility_store.go

View check run for this annotation

Codecov / codecov/patch

persistence/visibility/visibility_store.go#L166-L177

Added lines #L166 - L177 were not covered by tests

nextPaginationToken := data_models.NewPaginationToken(
processExecutionRows[len(processExecutionRows)-1].ProcessExecutionId.String(),
processExecutionRows[len(processExecutionRows)-1].StartTime.Unix(),
)
nextPaginationTokenString, err := nextPaginationToken.String()
if err != nil {
return nil, err
}

Check warning on line 186 in persistence/visibility/visibility_store.go

View check run for this annotation

Codecov / codecov/patch

persistence/visibility/visibility_store.go#L179-L186

Added lines #L179 - L186 were not covered by tests

return &xcapi.ListProcessExecutionsResponse{
ProcessExecutions: processExecutionListInfo,
NextPageToken: ptr.Any(data_models.NewPaginationToken(
processExecutionRows[len(processExecutionRows)-1].ProcessExecutionId.String(),
processExecutionRows[len(processExecutionRows)-1].StartTime.Unix(),
).String()),
NextPageToken: ptr.Any(nextPaginationTokenString),
}, nil

Check warning on line 191 in persistence/visibility/visibility_store.go

View check run for this annotation

Codecov / codecov/patch

persistence/visibility/visibility_store.go#L188-L191

Added lines #L188 - L191 were not covered by tests
}
10 changes: 8 additions & 2 deletions service/api/default_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const PathDescribeProcessExecution = "/api/v1/xcherry/service/process-execution/
const PathStopProcessExecution = "/api/v1/xcherry/service/process-execution/stop"
const PathPublishToLocalQueue = "/api/v1/xcherry/service/process-execution/publish-to-local-queue"
const PathProcessExecutionRpc = "/api/v1/xcherry/service/process-execution/rpc"
const PathListProcessExecutions = "/api/v1/xcherry/service/process-execution/list"

type defaultSever struct {
rootCtx context.Context
Expand All @@ -30,11 +31,15 @@ type defaultSever struct {
}

func NewDefaultAPIServerWithGin(
rootCtx context.Context, cfg config.Config, store persistence.ProcessStore, logger log.Logger,
rootCtx context.Context,
cfg config.Config,
processStore persistence.ProcessStore,
visibilityStore persistence.VisibilityStore,
logger log.Logger,
) Server {
engine := gin.Default()

handler := newGinHandler(cfg, store, logger)
handler := newGinHandler(cfg, processStore, visibilityStore, logger)

engine.GET("/", func(c *gin.Context) {
c.String(http.StatusOK, "Hello from xCherry server!")
Expand All @@ -44,6 +49,7 @@ func NewDefaultAPIServerWithGin(
engine.POST(PathStopProcessExecution, handler.StopProcess)
engine.POST(PathPublishToLocalQueue, handler.PublishToLocalQueue)
engine.POST(PathProcessExecutionRpc, handler.Rpc)
engine.POST(PathListProcessExecutions, handler.ListProcessExecutions)

svrCfg := cfg.ApiService.HttpServer
httpServer := &http.Server{
Expand Down
33 changes: 31 additions & 2 deletions service/api/gin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ type ginHandler struct {
svc Service
}

func newGinHandler(cfg config.Config, store persistence.ProcessStore, logger log.Logger) *ginHandler {
svc := NewServiceImpl(cfg, store, logger)
func newGinHandler(
cfg config.Config,
processStore persistence.ProcessStore,
visibilityStore persistence.VisibilityStore,
logger log.Logger,
) *ginHandler {
svc := NewServiceImpl(cfg, processStore, visibilityStore, logger)
return &ginHandler{
config: cfg,
logger: logger,
Expand Down Expand Up @@ -154,6 +159,30 @@ func (h *ginHandler) Rpc(c *gin.Context) {
c.JSON(http.StatusOK, resp)
}

func (h *ginHandler) ListProcessExecutions(c *gin.Context) {
var req xcapi.ListProcessExecutionsRequest
if err := c.ShouldBindJSON(&req); err != nil {
invalidRequestSchema(c)
return
}

Check warning on line 167 in service/api/gin_handler.go

View check run for this annotation

Codecov / codecov/patch

service/api/gin_handler.go#L162-L167

Added lines #L162 - L167 were not covered by tests

var resp *xcapi.ListProcessExecutionsResponse
var errResp *ErrorWithStatus
h.logger.Debug("received ListProcessExecutions API request", tag.Value(h.toJson(req)))
defer func() {
h.logger.Debug("responded ListProcessExecutions API request", tag.Value(h.toJson(resp)), tag.Value(h.toJson(errResp)))
}()

Check warning on line 174 in service/api/gin_handler.go

View check run for this annotation

Codecov / codecov/patch

service/api/gin_handler.go#L169-L174

Added lines #L169 - L174 were not covered by tests

//resp, errResp = h.svc.ListProcessExecutions(c.Request.Context(), req)

if errResp != nil {
c.JSON(errResp.StatusCode, errResp.Error)
return
}

Check warning on line 181 in service/api/gin_handler.go

View check run for this annotation

Codecov / codecov/patch

service/api/gin_handler.go#L178-L181

Added lines #L178 - L181 were not covered by tests

c.JSON(http.StatusOK, resp)

Check warning on line 183 in service/api/gin_handler.go

View check run for this annotation

Codecov / codecov/patch

service/api/gin_handler.go#L183

Added line #L183 was not covered by tests
}

func (h *ginHandler) toJson(req any) string {
str, err := json.Marshal(req)
if err != nil {
Expand Down
57 changes: 43 additions & 14 deletions service/api/service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ import (
)

type serviceImpl struct {
cfg config.Config
store persistence.ProcessStore
logger log.Logger
cfg config.Config
processStore persistence.ProcessStore
visibilityStore persistence.VisibilityStore
logger log.Logger
}

func NewServiceImpl(cfg config.Config, store persistence.ProcessStore, logger log.Logger) Service {
func NewServiceImpl(
cfg config.Config,
processStore persistence.ProcessStore,
visibilityStore persistence.VisibilityStore,
logger log.Logger,
) Service {
return &serviceImpl{
cfg: cfg,
store: store,
logger: logger,
cfg: cfg,
processStore: processStore,
visibilityStore: visibilityStore,
logger: logger,
}
}

Expand All @@ -51,7 +58,7 @@ func (s serviceImpl) StartProcess(
storeReq.TimeoutTimeUnixSeconds = time.Now().Unix() + int64(timeoutUnixSeconds)
}

resp, perr := s.store.StartProcess(ctx, storeReq)
resp, perr := s.processStore.StartProcess(ctx, storeReq)
if perr != nil {
return nil, s.handleUnknownError(perr)
}
Expand Down Expand Up @@ -94,7 +101,7 @@ func (s serviceImpl) StartProcess(
func (s serviceImpl) StopProcess(
ctx context.Context, request xcapi.ProcessExecutionStopRequest,
) *ErrorWithStatus {
resp, err := s.store.StopProcess(ctx, data_models.StopProcessRequest{
resp, err := s.processStore.StopProcess(ctx, data_models.StopProcessRequest{
Namespace: request.GetNamespace(),
ProcessId: request.GetProcessId(),
ProcessStopType: request.GetStopType(),
Expand All @@ -113,7 +120,7 @@ func (s serviceImpl) StopProcess(
func (s serviceImpl) DescribeLatestProcess(
ctx context.Context, request xcapi.ProcessExecutionDescribeRequest,
) (response *xcapi.ProcessExecutionDescribeResponse, retErr *ErrorWithStatus) {
resp, perr := s.store.DescribeLatestProcess(ctx, data_models.DescribeLatestProcessRequest{
resp, perr := s.processStore.DescribeLatestProcess(ctx, data_models.DescribeLatestProcessRequest{
Namespace: request.Namespace,
ProcessId: request.ProcessId,
})
Expand All @@ -129,7 +136,7 @@ func (s serviceImpl) DescribeLatestProcess(
func (s serviceImpl) PublishToLocalQueue(
ctx context.Context, request xcapi.PublishToLocalQueueRequest,
) *ErrorWithStatus {
resp, err := s.store.PublishToLocalQueue(ctx, data_models.PublishToLocalQueueRequest{
resp, err := s.processStore.PublishToLocalQueue(ctx, data_models.PublishToLocalQueueRequest{

Check warning on line 139 in service/api/service_impl.go

View check run for this annotation

Codecov / codecov/patch

service/api/service_impl.go#L139

Added line #L139 was not covered by tests
Namespace: request.GetNamespace(),
ProcessId: request.GetProcessId(),
Messages: request.GetMessages(),
Expand Down Expand Up @@ -161,7 +168,7 @@ func (s serviceImpl) PublishToLocalQueue(
func (s serviceImpl) Rpc(
ctx context.Context, request xcapi.ProcessExecutionRpcRequest,
) (response *xcapi.ProcessExecutionRpcResponse, retErr *ErrorWithStatus) {
latestPrcExe, err := s.store.GetLatestProcessExecution(ctx, data_models.GetLatestProcessExecutionRequest{
latestPrcExe, err := s.processStore.GetLatestProcessExecution(ctx, data_models.GetLatestProcessExecutionRequest{

Check warning on line 171 in service/api/service_impl.go

View check run for this annotation

Codecov / codecov/patch

service/api/service_impl.go#L171

Added line #L171 was not covered by tests
Namespace: request.GetNamespace(),
ProcessId: request.GetProcessId(),
})
Expand All @@ -184,7 +191,7 @@ func (s serviceImpl) Rpc(

appDatabaseReadResponse := xcapi.AppDatabaseReadResponse{}
if latestPrcExe.AppDatabaseConfig != nil {
appDatabaseReadResp, err := s.store.ReadAppDatabase(ctx, data_models.AppDatabaseReadRequest{
appDatabaseReadResp, err := s.processStore.ReadAppDatabase(ctx, data_models.AppDatabaseReadRequest{

Check warning on line 194 in service/api/service_impl.go

View check run for this annotation

Codecov / codecov/patch

service/api/service_impl.go#L194

Added line #L194 was not covered by tests
AppDatabaseConfig: *latestPrcExe.AppDatabaseConfig,
Request: request.GetAppDatabaseReadRequest(),
})
Expand Down Expand Up @@ -228,7 +235,7 @@ func (s serviceImpl) Rpc(
http.StatusBadRequest, err.Error())
}

updateResp, err := s.store.UpdateProcessExecutionForRpc(ctx, data_models.UpdateProcessExecutionForRpcRequest{
updateResp, err := s.processStore.UpdateProcessExecutionForRpc(ctx, data_models.UpdateProcessExecutionForRpcRequest{

Check warning on line 238 in service/api/service_impl.go

View check run for this annotation

Codecov / codecov/patch

service/api/service_impl.go#L238

Added line #L238 was not covered by tests
Namespace: request.Namespace,
ProcessId: request.ProcessId,
ProcessType: latestPrcExe.ProcessType,
Expand Down Expand Up @@ -272,6 +279,28 @@ func (s serviceImpl) Rpc(
}, nil
}

func (s serviceImpl) ListProcessExecutions(ctx context.Context, request xcapi.ListProcessExecutionsRequest,
) (response *xcapi.ListProcessExecutionsResponse, retErr *ErrorWithStatus) {
if request.Namespace == "" {
return nil, NewErrorWithStatus(http.StatusBadRequest, "namespace is required")
}
if request.PageSize <= 0 {
return nil, NewErrorWithStatus(http.StatusBadRequest, "page size should be positive")
}
if !request.HasStartTimeFilter() {
return nil, NewErrorWithStatus(http.StatusBadRequest, "start time filter is required")
}
if !request.StartTimeFilter.HasEarliestTime() || !request.StartTimeFilter.HasLatestTime() {
return nil, NewErrorWithStatus(http.StatusBadRequest, "both earliest and latest time are required for start time filter")
}

Check warning on line 295 in service/api/service_impl.go

View check run for this annotation

Codecov / codecov/patch

service/api/service_impl.go#L283-L295

Added lines #L283 - L295 were not covered by tests

resp, err := s.visibilityStore.ListProcessExecutions(ctx, request)
if err != nil {
return nil, NewErrorWithStatus(http.StatusInternalServerError, err.Error())
}
return resp, nil

Check warning on line 301 in service/api/service_impl.go

View check run for this annotation

Codecov / codecov/patch

service/api/service_impl.go#L297-L301

Added lines #L297 - L301 were not covered by tests
}

func (s serviceImpl) notifyRemoteImmediateTaskAsync(_ context.Context, req xcapi.NotifyImmediateTasksRequest) {
// execute in the background as best effort
go func() {
Expand Down

0 comments on commit 88c24e5

Please sign in to comment.