Skip to content

Commit

Permalink
feat: added environment
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jun 12, 2023
1 parent cf59917 commit 0a50b21
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 74 deletions.
14 changes: 14 additions & 0 deletions interfaces/grpc_client_model_environment_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package interfaces

import (
"github.com/crawlab-team/crawlab-db/mongo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type GrpcClientModelEnvironmentService interface {
ModelBaseService
GetEnvironmentById(id primitive.ObjectID) (s Environment, err error)
GetEnvironment(query bson.M, opts *mongo.FindOptions) (s Environment, err error)
GetEnvironmentList(query bson.M, opts *mongo.FindOptions) (res []Environment, err error)
}
9 changes: 9 additions & 0 deletions interfaces/model_environment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package interfaces

type Environment interface {
Model
GetKey() (key string)
SetKey(key string)
GetValue() (value string)
SetValue(value string)
}
2 changes: 2 additions & 0 deletions interfaces/task_handler_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type TaskHandlerService interface {
GetModelTaskService() (modelTaskSvc GrpcClientModelTaskService)
// GetModelTaskStatService get model task stat service
GetModelTaskStatService() (modelTaskStatSvc GrpcClientModelTaskStatService)
// GetModelEnvironmentService get model environment service
GetModelEnvironmentService() (modelEnvironmentSvc GrpcClientModelEnvironmentService)
// GetNodeConfigService get node config service
GetNodeConfigService() (cfgSvc NodeConfigService)
// GetCurrentNode get node of the handler
Expand Down
77 changes: 77 additions & 0 deletions models/client/model_environment_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package client

import (
"github.com/crawlab-team/crawlab-core/errors"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-db/mongo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type EnvironmentServiceDelegate struct {
interfaces.GrpcClientModelBaseService
}

func (svc *EnvironmentServiceDelegate) GetEnvironmentById(id primitive.ObjectID) (e interfaces.Environment, err error) {
res, err := svc.GetById(id)
if err != nil {
return nil, err
}
s, ok := res.(interfaces.Environment)
if !ok {
return nil, errors.ErrorModelInvalidType
}
return s, nil
}

func (svc *EnvironmentServiceDelegate) GetEnvironment(query bson.M, opts *mongo.FindOptions) (e interfaces.Environment, err error) {
res, err := svc.Get(query, opts)
if err != nil {
return nil, err
}
s, ok := res.(interfaces.Environment)
if !ok {
return nil, errors.ErrorModelInvalidType
}
return s, nil
}

func (svc *EnvironmentServiceDelegate) GetEnvironmentList(query bson.M, opts *mongo.FindOptions) (res []interfaces.Environment, err error) {
list, err := svc.GetList(query, opts)
if err != nil {
return nil, err
}
for _, item := range list.GetModels() {
s, ok := item.(interfaces.Environment)
if !ok {
return nil, errors.ErrorModelInvalidType
}
res = append(res, s)
}
return res, nil
}

func NewEnvironmentServiceDelegate(opts ...ModelBaseServiceDelegateOption) (svc2 interfaces.GrpcClientModelEnvironmentService, err error) {
// apply options
opts = append(opts, WithBaseServiceModelId(interfaces.ModelIdEnvironment))

// base service
baseSvc, err := NewBaseServiceDelegate(opts...)
if err != nil {
return nil, err
}

// service
svc := &EnvironmentServiceDelegate{baseSvc}

return svc, nil
}

func ProvideEnvironmentServiceDelegate(path string, opts ...ModelBaseServiceDelegateOption) func() (svc interfaces.GrpcClientModelEnvironmentService, err error) {
if path != "" {
opts = append(opts, WithBaseServiceConfigPath(path))
}
return func() (svc interfaces.GrpcClientModelEnvironmentService, err error) {
return NewEnvironmentServiceDelegate(opts...)
}
}
16 changes: 16 additions & 0 deletions models/models/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ func (e *Environment) SetId(id primitive.ObjectID) {
e.Id = id
}

func (e *Environment) GetKey() (key string) {
return e.Key
}

func (e *Environment) SetKey(key string) {
e.Key = key
}

func (e *Environment) GetValue() (value string) {
return e.Value
}

func (e *Environment) SetValue(value string) {
e.Value = value
}

type EnvironmentList []Environment

func (l *EnvironmentList) GetModels() (res []interfaces.Model) {
Expand Down
82 changes: 16 additions & 66 deletions task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,33 +304,13 @@ func (r *Runner) startHealthCheck() {
}

func (r *Runner) configureEnv() {
// TODO: refactor
//envs := r.s.Envs
//if r.s.Type == constants.Configurable {
// // 数据库配置
// envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_HOST", Value: viper.GetString("mongo.host")})
// envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PORT", Value: viper.GetString("mongo.port")})
// envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_DB", Value: viper.GetString("mongo.db")})
// envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_USERNAME", Value: viper.GetString("mongo.username")})
// envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PASSWORD", Value: viper.GetString("mongo.password")})
// envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_AUTHSOURCE", Value: viper.GetString("mongo.authSource")})
//
// // 设置配置
// for envName, envValue := range r.s.Config.Settings {
// envs = append(envs, model.Env{Name: "CRAWLAB_SETTING_" + envName, Value: envValue})
// }
//}

// 默认把Node.js的全局node_modules加入环境变量
//envPath := os.Getenv("PATH")
//nodePath := "/usr/lib/node_modules"
//if !strings.Contains(envPath, nodePath) {
// _ = os.Setenv("PATH", nodePath+":"+envPath)
//}
//_ = os.Setenv("NODE_PATH", nodePath)

// default results collection
//col := utils.GetSpiderCol(r.s.Col, r.s.Name)
envPath := os.Getenv("PATH")
nodePath := "/usr/lib/node_modules"
if !strings.Contains(envPath, nodePath) {
_ = os.Setenv("PATH", nodePath+":"+envPath)
}
_ = os.Setenv("NODE_PATH", nodePath)

// default envs
r.cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+r.tid.Hex())
Expand All @@ -342,46 +322,16 @@ func (r *Runner) configureEnv() {
} else {
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+constants.DefaultGrpcAuthKey)
}
//r.cmd.Env = append(r.cmd.Env, "CRAWLAB_COLLECTION="+col)
//r.cmd.Env = append(r.cmd.Env, "CRAWLAB_MONGO_HOST="+viper.GetString("mongo.host"))
//r.cmd.Env = append(r.cmd.Env, "CRAWLAB_MONGO_PORT="+viper.GetString("mongo.port"))
//if viper.GetString("mongo.db") != "" {
// r.cmd.Env = append(r.cmd.Env, "CRAWLAB_MONGO_DB="+viper.GetString("mongo.db"))
//}
//if viper.GetString("mongo.username") != "" {
// r.cmd.Env = append(r.cmd.Env, "CRAWLAB_MONGO_USERNAME="+viper.GetString("mongo.username"))
//}
//if viper.GetString("mongo.password") != "" {
// r.cmd.Env = append(r.cmd.Env, "CRAWLAB_MONGO_PASSWORD="+viper.GetString("mongo.password"))
//}
//if viper.GetString("mongo.authSource") != "" {
// r.cmd.Env = append(r.cmd.Env, "CRAWLAB_MONGO_AUTHSOURCE="+viper.GetString("mongo.authSource"))
//}
//r.cmd.Env = append(r.cmd.Env, "PYTHONUNBUFFERED=0")
//r.cmd.Env = append(r.cmd.Env, "PYTHONIOENCODING=utf-8")
//r.cmd.Env = append(r.cmd.Env, "TZ=Asia/Shanghai")
//r.cmd.Env = append(r.cmd.Env, "CRAWLAB_DEDUP_FIELD="+r.s.DedupField)
//r.cmd.Env = append(r.cmd.Env, "CRAWLAB_DEDUP_METHOD="+r.s.DedupMethod)
//if r.s.IsDedup {
// r.cmd.Env = append(r.cmd.Env, "CRAWLAB_IS_DEDUP=1")
//} else {
// r.cmd.Env = append(r.cmd.Env, "CRAWLAB_IS_DEDUP=0")
//}

// TODO: implement task environment variables
//for _, env := range r.s.Envs {
// r.cmd.Env = append(r.cmd.Env, env.Name+"="+env.Value)
//}

// TODO: implement global environment variables
//variables, err := models.MustGetRootService().GetVariableList(nil, nil)
//if err != nil {
// return err
//}
//for _, variable := range variables {
// r.cmd.Env = append(r.cmd.Env, variable.Key+"="+variable.Value)
//}
//return nil

// global environment variables
envs, err := r.svc.GetModelEnvironmentService().GetEnvironmentList(nil, nil)
if err != nil {
trace.PrintError(err)
return
}
for _, env := range envs {
r.cmd.Env = append(r.cmd.Env, env.GetKey()+"="+env.GetValue())
}
}

// wait for process to finish and send task signal (constants.TaskSignal)
Expand Down
26 changes: 18 additions & 8 deletions task/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
type Service struct {
// dependencies
interfaces.TaskBaseService
cfgSvc interfaces.NodeConfigService
modelSvc service.ModelService
clientModelSvc interfaces.GrpcClientModelService
clientModelNodeSvc interfaces.GrpcClientModelNodeService
clientModelSpiderSvc interfaces.GrpcClientModelSpiderService
clientModelTaskSvc interfaces.GrpcClientModelTaskService
clientModelTaskStatSvc interfaces.GrpcClientModelTaskStatService
c interfaces.GrpcClient // grpc client
cfgSvc interfaces.NodeConfigService
modelSvc service.ModelService
clientModelSvc interfaces.GrpcClientModelService
clientModelNodeSvc interfaces.GrpcClientModelNodeService
clientModelSpiderSvc interfaces.GrpcClientModelSpiderService
clientModelTaskSvc interfaces.GrpcClientModelTaskService
clientModelTaskStatSvc interfaces.GrpcClientModelTaskStatService
clientModelEnvironmentSvc interfaces.GrpcClientModelEnvironmentService
c interfaces.GrpcClient // grpc client

// settings
//maxRunners int
Expand Down Expand Up @@ -212,6 +213,10 @@ func (svc *Service) GetModelTaskStatService() (modelTaskSvc interfaces.GrpcClien
return svc.clientModelTaskStatSvc
}

func (svc *Service) GetModelEnvironmentService() (modelTaskSvc interfaces.GrpcClientModelEnvironmentService) {
return svc.clientModelEnvironmentSvc
}

func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) {
return svc.cfgSvc
}
Expand Down Expand Up @@ -453,6 +458,9 @@ func NewTaskHandlerService(opts ...Option) (svc2 interfaces.TaskHandlerService,
if err := c.Provide(client.ProvideTaskStatServiceDelegate(svc.GetConfigPath())); err != nil {
return nil, trace.TraceError(err)
}
if err := c.Provide(client.ProvideEnvironmentServiceDelegate(svc.GetConfigPath())); err != nil {
return nil, trace.TraceError(err)
}
if err := c.Provide(client2.ProvideGetClient(svc.GetConfigPath())); err != nil {
return nil, trace.TraceError(err)
}
Expand All @@ -464,6 +472,7 @@ func NewTaskHandlerService(opts ...Option) (svc2 interfaces.TaskHandlerService,
clientModelSpiderSvc interfaces.GrpcClientModelSpiderService,
clientModelTaskSvc interfaces.GrpcClientModelTaskService,
clientModelTaskStatSvc interfaces.GrpcClientModelTaskStatService,
clientModelEnvironmentSvc interfaces.GrpcClientModelEnvironmentService,
c interfaces.GrpcClient,
) {
svc.cfgSvc = cfgSvc
Expand All @@ -473,6 +482,7 @@ func NewTaskHandlerService(opts ...Option) (svc2 interfaces.TaskHandlerService,
svc.clientModelSpiderSvc = clientModelSpiderSvc
svc.clientModelTaskSvc = clientModelTaskSvc
svc.clientModelTaskStatSvc = clientModelTaskStatSvc
svc.clientModelEnvironmentSvc = clientModelEnvironmentSvc
svc.c = c
}); err != nil {
return nil, trace.TraceError(err)
Expand Down

0 comments on commit 0a50b21

Please sign in to comment.