From 3fa855404e9dabfcbc80ec78de3eee80c04165be Mon Sep 17 00:00:00 2001 From: tbs60 Date: Tue, 22 Oct 2024 16:58:46 +0800 Subject: [PATCH] feat:avoid unnecessary task start, issue: #308 --- .../bk_dist/booster/command/command.go | 1 + .../bk_dist/booster/command/process.go | 3 +- .../booster/bk_dist/booster/pkg/booster.go | 4 + src/backend/booster/bk_dist/common/env/env.go | 1 + .../booster/bk_dist/common/types/booster.go | 3 +- .../bk_dist/common/websocket/session.go | 27 ++- .../bk_dist/controller/pkg/api/v1/handler.go | 20 +- .../controller/pkg/manager/local/executor.go | 4 +- .../controller/pkg/manager/local/mgr.go | 49 ++++ .../controller/pkg/manager/remote/mgr.go | 28 ++- .../bk_dist/controller/pkg/types/error.go | 1 + .../bk_dist/controller/pkg/types/manager.go | 217 ++++++++++++++++++ 12 files changed, 342 insertions(+), 16 deletions(-) diff --git a/src/backend/booster/bk_dist/booster/command/command.go b/src/backend/booster/bk_dist/booster/command/command.go index 67972c82e..89af640b6 100644 --- a/src/backend/booster/bk_dist/booster/command/command.go +++ b/src/backend/booster/bk_dist/booster/command/command.go @@ -111,6 +111,7 @@ const ( FlagDynamicPort = "dynamic_port" FlagWorkerOfferSlot = "worker_offer_slot" FlagCleanTmpFilesDayAgo = "clean_tmp_files_day_ago" + FlagIgnoreHttpStatus = "ignore_http_status" EnvBuildIDOld = "TURBO_PLAN_BUILD_ID" EnvBuildID = "TBS_BUILD_ID" diff --git a/src/backend/booster/bk_dist/booster/command/process.go b/src/backend/booster/bk_dist/booster/command/process.go index c4375a30b..c26987937 100644 --- a/src/backend/booster/bk_dist/booster/command/process.go +++ b/src/backend/booster/bk_dist/booster/command/process.go @@ -316,6 +316,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) { IdleKeepSecs: c.Int(FlagIdleKeepSecs), CleanTmpFilesDayAgo: cleanTmpFilesDayAgo, SearchToolchain: c.Bool(FlagSearchToolchain), + IgnoreHttpStatus: c.Bool(FlagIgnoreHttpStatus), }, Transport: dcType.BoosterTransport{ @@ -323,7 +324,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) { ServerHost: ServerHost, Timeout: 5 * time.Second, HeartBeatTick: 5 * time.Second, - InspectTaskTick: 100 * time.Millisecond, + InspectTaskTick: 1000 * time.Millisecond, TaskPreparingTimeout: time.Duration(waitResourceSeconds) * time.Second, PrintTaskInfoEveryTime: 5, CommitSuicideCheckTick: 5 * time.Second, diff --git a/src/backend/booster/bk_dist/booster/pkg/booster.go b/src/backend/booster/bk_dist/booster/pkg/booster.go index d6492e98c..ddeae022a 100644 --- a/src/backend/booster/bk_dist/booster/pkg/booster.go +++ b/src/backend/booster/bk_dist/booster/pkg/booster.go @@ -364,6 +364,10 @@ func (b *Booster) getWorkersEnv() map[string]string { requiredEnv[env.KeyExecutorSearchToolchain] = envValueTrue } + if b.config.Works.IgnoreHttpStatus { + requiredEnv[env.KeyExecutorIgnoreHttpStatus] = envValueTrue + } + resultEnv := make(map[string]string, 10) for k, v := range requiredEnv { resultEnv[env.GetEnvKey(k)] = v diff --git a/src/backend/booster/bk_dist/common/env/env.go b/src/backend/booster/bk_dist/common/env/env.go index 1c633565a..26931748e 100644 --- a/src/backend/booster/bk_dist/common/env/env.go +++ b/src/backend/booster/bk_dist/common/env/env.go @@ -75,6 +75,7 @@ const ( KeyExecutorUELibLocalCPUWeight = "UE_LIB_LOCAL_CPU_WEIGHT" KeyExecutorUELinkLocalCPUWeight = "UE_LINK_LOCAL_CPU_WEIGHT" KeyExecutorUEShaderLocalCPUWeight = "UE_SHADER_LOCAL_CPU_WEIGHT" + KeyExecutorIgnoreHttpStatus = "IGNORE_HTTP_STATUS" KeyUserDefinedLogLevel = "USER_DEFINED_LOG_LEVEL" KeyUserDefinedExecutorLogLevel = "USER_DEFINED_EXECUTOR_LOG_LEVEL" diff --git a/src/backend/booster/bk_dist/common/types/booster.go b/src/backend/booster/bk_dist/common/types/booster.go index d69e94f74..bb0ead47f 100644 --- a/src/backend/booster/bk_dist/common/types/booster.go +++ b/src/backend/booster/bk_dist/common/types/booster.go @@ -133,7 +133,8 @@ type BoosterWorks struct { EnableLink bool EnableLib bool - SearchToolchain bool + SearchToolchain bool + IgnoreHttpStatus bool } // BoosterTransport describe the transport data to controller diff --git a/src/backend/booster/bk_dist/common/websocket/session.go b/src/backend/booster/bk_dist/common/websocket/session.go index 7d45923fb..8d22f5662 100644 --- a/src/backend/booster/bk_dist/common/websocket/session.go +++ b/src/backend/booster/bk_dist/common/websocket/session.go @@ -13,7 +13,6 @@ import ( "context" "fmt" "net" - "net/http" "strconv" "strings" "sync" @@ -69,10 +68,11 @@ type Session struct { req *restful.Request - ip string // 远端的ip - port int32 // 远端的port - conn net.Conn - valid bool // 连接是否可用 + ip string // 远端的ip + port int32 // 远端的port + conn net.Conn + connKey string + valid bool // 连接是否可用 // 收到数据后的回调函数 callback WebSocketFunc @@ -98,7 +98,7 @@ type Session struct { type WebSocketFunc func(r *restful.Request, id MessageID, data []byte, s *Session) error // server端创建session,需要指定http处理函数 -func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSocketFunc) *Session { +func NewServerSession(w *restful.Response, r *restful.Request, callback WebSocketFunc) *Session { conn, _, _, err := ws.UpgradeHTTP(r.Request, w) if err != nil || conn == nil { blog.Errorf("[session] UpgradeHTTP failed with error:%v", err) @@ -127,6 +127,7 @@ func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSoc ip: ip, port: int32(port), conn: conn, + connKey: fmt.Sprintf("%s_%d", remoteaddr, time.Now().Nanosecond()), sendNotifyChan: sendNotifyChan, sendQueue: sendQueue, waitMap: make(map[MessageID]*Message), @@ -141,6 +142,10 @@ func NewServerSession(w http.ResponseWriter, r *restful.Request, callback WebSoc return s } +func (s *Session) GetConnKey() string { + return s.connKey +} + // client端创建session,需要指定目标server的ip和端口 func NewClientSession(ip string, port int32, url string, callback WebSocketFunc) *Session { ctx, cancel := context.WithCancel(context.Background()) @@ -454,7 +459,6 @@ func (s *Session) serverReceive(wg *sync.WaitGroup) { } } -// func (s *Session) notifyAndWait(msg *Message) *MessageResult { blog.Debugf("[session] notify send and wait for response now...") @@ -567,10 +571,11 @@ func (s *Session) check(wg *sync.WaitGroup) { wg.Done() for { select { - case <-s.ctx.Done(): - blog.Infof("[session] session check canceled by context") - s.clean(ErrorContextCanceled) - return + // 在用户环境上发现s.ctx.Done()异常触发的,不清楚原因,先屏蔽 + // case <-s.ctx.Done(): + // blog.Infof("[session] session check canceled by context") + // s.clean(ErrorContextCanceled) + // return case err := <-s.errorChan: blog.Warnf("[session] session check found error:%v", err) s.clean(err) diff --git a/src/backend/booster/bk_dist/controller/pkg/api/v1/handler.go b/src/backend/booster/bk_dist/controller/pkg/api/v1/handler.go index 4df2f1241..c214f77ff 100644 --- a/src/backend/booster/bk_dist/controller/pkg/api/v1/handler.go +++ b/src/backend/booster/bk_dist/controller/pkg/api/v1/handler.go @@ -25,6 +25,15 @@ import ( "github.com/emicklei/go-restful" ) +var ( + gHttpConnCache *types.HttpConnCache +) + +func init() { + gHttpConnCache = types.NewHttpConnCache() + go gHttpConnCache.Check() +} + func available(_ *restful.Request, resp *restful.Response) { api.ReturnRest(&api.RestResponse{Resp: resp, Data: &AvailableResp{Pid: os.Getpid()}}) } @@ -477,6 +486,9 @@ func executeLocalTask(req *restful.Request, resp *restful.Response) { return } + // TODO : 需要关注http的连接状态 + config.InitHttpConnStatus(req, gHttpConnCache, nil, 0) + result, err := defaultManager.ExecuteLocalTask(workID, config) if err != nil { // blog.Errorf("api: executeLocalTask execute local task failed, work: %s, err: %v", workID, err) @@ -504,7 +516,10 @@ func executeLocalTask(req *restful.Request, resp *restful.Response) { r.Write2Resp(&api.RestResponse{Resp: resp}) } -func callbackOfLocalExecute(req *restful.Request, id websocket.MessageID, data []byte, s *websocket.Session) error { +func callbackOfLocalExecute(req *restful.Request, + id websocket.MessageID, + data []byte, + s *websocket.Session) error { workID := req.PathParameter(pathParamWorkID) r := &LocalTaskExecuteResp{} @@ -526,6 +541,9 @@ func callbackOfLocalExecute(req *restful.Request, id websocket.MessageID, data [ return ret.Err } + // TODO : 需要关注http的连接状态 + config.InitHttpConnStatus(req, gHttpConnCache, s, 20) + result, err := defaultManager.ExecuteLocalTask(workID, config) if err != nil { // blog.Errorf("api: executeLocalTask execute local task failed, work: %s, err: %v", workID, err) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go index 4721b7b86..5fc862916 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go @@ -361,7 +361,7 @@ func (e *executor) tryExecuteLocalTask() *types.LocalTaskExecuteResult { defer e.mgr.work.Basic().UpdateJobStats(e.stats) dcSDK.StatsTimeNow(&e.stats.LocalWorkEnterTime) - gotLock := true + gotLock := false defer func() { if gotLock { dcSDK.StatsTimeNow(&e.stats.LocalWorkLeaveTime) @@ -392,6 +392,8 @@ func (e *executor) tryExecuteLocalTask() *types.LocalTaskExecuteResult { gotLock = false blog.Infof("executor: not got lock to execute local-task from pid(%d) with weight %d", e.req.Pid, locallockweight) return nil + } else { + gotLock = true } return e.realExecuteLocalTask(locallockweight) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go index 427618717..401115e16 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go @@ -103,6 +103,23 @@ func (m *Mgr) GetPumpCache() (*analyser.FileCache, *analyser.RootCache) { return m.pumpFileCache, m.pumpRootCache } +func checkHttpConn(req *types.LocalTaskExecuteRequest) (*types.LocalTaskExecuteResult, error) { + if !types.IsHttpConnStatusOk(req.HttpConnCache, req.HttpConnKey) { + blog.Errorf("local: httpconncache exit execute pid(%d) command:[%s] for http connection[%s] error", + req.Pid, strings.Join(req.Commands, " "), req.HttpConnKey) + return &types.LocalTaskExecuteResult{ + Result: &dcSDK.LocalTaskResult{ + ExitCode: -1, + Message: types.ErrLocalHttpConnDisconnected.Error(), + Stdout: nil, + Stderr: nil, + }, + }, types.ErrLocalHttpConnDisconnected + } + + return nil, nil +} + // ExecuteTask 若是task command本身运行失败, 不作为execute失败, 将结果放在result中返回即可 // 只有筹备执行的过程中失败, 才作为execute失败 func (m *Mgr) ExecuteTask( @@ -123,6 +140,11 @@ func (m *Mgr) ExecuteTask( defer e.executeFinalTask() defer e.handleRecord() + ret, err := checkHttpConn(req) + if err != nil { + return ret, err + } + // 该work被置为degraded || 该executor被置为degraded, 则直接走本地执行 if m.work.Basic().Settings().Degraded || e.degrade() { blog.Warnf("local: execute task for work(%s) from pid(%d) degrade to local with degraded", @@ -150,6 +172,11 @@ func (m *Mgr) ExecuteTask( // 优化没有远程资源转本地的逻辑; 如果没有远程资源,则先获取本地锁,然后转本地执行 // 如果没有本地锁,则先等待,后面有远程资源时,则直接远程,无需全部阻塞在本地执行 for { + ret, err := checkHttpConn(req) + if err != nil { + return ret, err + } + // 先检查是否有远程资源 if !m.work.Resource().HasAvailableWorkers() { // check whether this task need remote worker, @@ -183,6 +210,11 @@ func (m *Mgr) ExecuteTask( m.work.Basic().Info().IncPrepared() m.work.Remote().IncRemoteJobs() + ret, err = checkHttpConn(req) + if err != nil { + return ret, err + } + c, err := e.executePreTask() if err != nil { m.work.Basic().Info().DecPrepared() @@ -199,9 +231,16 @@ func (m *Mgr) ExecuteTask( Sandbox: e.sandbox, IOTimeout: e.ioTimeout, BanWorkerList: []*protocol.Host{}, + HttpConnCache: req.HttpConnCache, + HttpConnKey: req.HttpConnKey, } for i := 0; i < m.getTryTimes(e); i++ { + ret, err = checkHttpConn(req) + if err != nil { + return ret, err + } + req.Stats.RemoteTryTimes = i + 1 r, err = m.work.Remote().ExecuteTask(remoteReq) if err != nil { @@ -231,6 +270,11 @@ func (m *Mgr) ExecuteTask( m.work.Basic().Info().DecPrepared() m.work.Remote().DecRemoteJobs() if err != nil { + ret, err = checkHttpConn(req) + if err != nil { + return ret, err + } + if err == types.ErrSendFileFailed { blog.Infof("local: retry remote-task failed from work(%s) for (%d) times from pid(%d)"+ " with send file error, retryOnRemoteFail now", @@ -253,6 +297,11 @@ func (m *Mgr) ExecuteTask( blog.Warnf("local: execute post-task for work(%s) from pid(%d) failed: %v", m.work.ID(), req.Pid, err) req.Stats.RemoteErrorMessage = err.Error() + ret, err = checkHttpConn(req) + if err != nil { + return ret, err + } + lr, err := m.retryOnRemoteFail(req, globalWork, e) if err == nil && lr != nil { return lr, err diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 2c51bca25..a03533d8b 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -506,6 +506,16 @@ func (m *Mgr) workerCheck(ctx context.Context) { } } +func checkHttpConn(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) { + if !types.IsHttpConnStatusOk(req.HttpConnCache, req.HttpConnKey) { + blog.Errorf("remote: httpconncache exit execute pid(%d) for http connection[%s] error", + req.Pid, req.HttpConnKey) + return nil, types.ErrLocalHttpConnDisconnected + } + + return nil, nil +} + // ExecuteTask run the task in remote worker and ensure the dependent files func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTaskExecuteResult, error) { if m.TotalSlots() <= 0 { @@ -550,8 +560,13 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas m.DecRemoteJobs() }() + ret, err := checkHttpConn(req) + if err != nil { + return ret, err + } + // 1. send toolchain if required 2. adjust exe remote path for req - err := m.sendToolchain(handler, req) + err = m.sendToolchain(handler, req) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain failed: %v, going to disable host(%s)", @@ -565,6 +580,12 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) } + + ret, err = checkHttpConn(req) + if err != nil { + return ret, err + } + remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { req.BanWorkerList = append(req.BanWorkerList, req.Server) @@ -588,6 +609,11 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas blog.Infof("remote: try to real execute remote task for work(%s) from pid(%d) with timeout(%d) after send files", m.work.ID(), req.Pid, req.IOTimeout) + ret, err = checkHttpConn(req) + if err != nil { + return ret, err + } + var result *dcSDK.BKDistResult if m.conf.LongTCP { if !req.Req.CustomSave { diff --git a/src/backend/booster/bk_dist/controller/pkg/types/error.go b/src/backend/booster/bk_dist/controller/pkg/types/error.go index f0ea3f405..52d8f7a61 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/error.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/error.go @@ -40,4 +40,5 @@ var ( ErrFileLock = fmt.Errorf("lock file failed") ErrWorkIDEmpty = fmt.Errorf("work id is empty") ErrWorkNotRead = fmt.Errorf("work is not ready") + ErrLocalHttpConnDisconnected = fmt.Errorf("local http connection disconnected") ) diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index f37cb0c37..5e21c7c04 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -10,15 +10,23 @@ package types import ( + "fmt" "os" "os/user" + "strconv" + "strings" + "sync" "time" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env" dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol" dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" dcSyscall "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/syscall" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/websocket" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/codec" v2 "github.com/TencentBlueKing/bk-turbo/src/backend/booster/server/pkg/api/v2" + "github.com/emicklei/go-restful" ) // WorkRegisterConfig describe the config of registering work @@ -191,6 +199,9 @@ type RemoteTaskExecuteRequest struct { Sandbox *dcSyscall.Sandbox IOTimeout int BanWorkerList []*dcProtocol.Host + + HttpConnKey string + HttpConnCache *HttpConnCache } // RemoteTaskExecuteResult describe the remote task execution result @@ -212,6 +223,102 @@ type RemoteTaskSendFileResult struct { Result *dcSDK.BKSendFileResult } +// ++++++++++++++++ TODO : 增加http连接缓存 +func getHttpNetKey(req *restful.Request) string { + return fmt.Sprintf("%s_%d", req.Request.RemoteAddr, time.Now().Nanosecond()) +} + +type connStatus int + +const ( + Connected connStatus = iota + Disconnected + Unknown = 99 +) + +type httpConn struct { + status connStatus + delayCleanSecs int + ErrorStartTime time.Time +} + +type HttpConnCache struct { + cache map[string]*httpConn + mutex sync.RWMutex +} + +func NewHttpConnCache() *HttpConnCache { + return &HttpConnCache{ + cache: make(map[string]*httpConn), + } +} + +func (c *HttpConnCache) InsertOnce(key string, delayCleanSecs int) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + _, ok := c.cache[key] + if !ok { + c.cache[key] = &httpConn{status: Connected, delayCleanSecs: delayCleanSecs} + return true + } else { + return false + } +} + +func (c *HttpConnCache) IsConnStatusOk(key string) bool { + c.mutex.RLock() + defer c.mutex.RUnlock() + + v, ok := c.cache[key] + if !ok { + return false + } + + return v.status == Connected +} + +func (c *HttpConnCache) OnConnStatusError(key string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + v, ok := c.cache[key] + if ok { + if v.delayCleanSecs == 0 { + delete(c.cache, key) + } else { + v.status = Disconnected + v.ErrorStartTime = time.Now() + } + } +} + +func (c *HttpConnCache) Check() { + blog.Infof("types: httpconncache: start go routine to check http conn status") + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.mutex.Lock() + for k, v := range c.cache { + if v.status != Connected { + old := v.ErrorStartTime.Add(time.Duration(v.delayCleanSecs) * time.Second) + if old.Before(time.Now()) { + delete(c.cache, k) + blog.Infof("types: httpconncache: deleted %s from cache after wait", k) + } + } + } + c.mutex.Unlock() + } + } +} + +// ---------------- + // LocalTaskExecuteRequest describe the local task execution param type LocalTaskExecuteRequest struct { Pid int @@ -220,6 +327,116 @@ type LocalTaskExecuteRequest struct { Commands []string Environments []string Stats *dcSDK.ControllerJobStats + + // 该请求的执行是否依赖http连接状态,默认依赖,即ignoreHttpStatus为false; + // 如果依赖连接状态,当连接断开时,该请求直接返回失败 + ignoreHttpStatus bool + checkedHttpStatus bool + HttpConnKey string + HttpConnCache *HttpConnCache +} + +// 这个正常只有一个协程调用,无需加锁 +func (r *LocalTaskExecuteRequest) InitHttpConnStatus(req *restful.Request, + cache *HttpConnCache, + s *websocket.Session, + delaysecs int) error { + shouldcheck := r.shouldCheckHttpStatus() + if shouldcheck && req != nil && cache != nil { + key := "" + if s == nil { + key = getHttpNetKey(req) + } else { + key = s.GetConnKey() + } + + // TODO : 检查该连接的检查协程是否已启动 + firstInsert := cache.InsertOnce(key, delaysecs) + if firstInsert { + blog.Infof("types: httpconncache: inserted %s to cache", key) + + // 确保协程启动 + var wg = sync.WaitGroup{} + wg.Add(1) + if s == nil { + go func(wg *sync.WaitGroup) { + wg.Done() + ctx := req.Request.Context() + select { + case <-ctx.Done(): + err := ctx.Err() + cache.OnConnStatusError(key) + blog.Infof("types: httpconncache: found %s disconnected with error:%v", key, err) + } + }(&wg) + } else { + go func(wg *sync.WaitGroup) { + wg.Done() + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if !s.IsValid() { + cache.OnConnStatusError(key) + blog.Infof("types: httpconncache: found websocket %s disconnected", key) + return + } + } + } + }(&wg) + } + wg.Wait() + } + + r.HttpConnKey = key + r.HttpConnCache = cache + return nil + } else { + r.HttpConnKey = "" + r.HttpConnCache = nil + return nil + } +} + +func IsHttpConnStatusOk(cache *HttpConnCache, key string) bool { + if cache == nil || key == "" { + return true + } + + return cache.IsConnStatusOk(key) +} + +func (r *LocalTaskExecuteRequest) shouldCheckHttpStatus() bool { + if r.checkedHttpStatus { + return !r.ignoreHttpStatus + } + + // check env.KeyExecutorIgnoreHttpStatus + for _, e := range r.Environments { + if strings.HasPrefix(e, env.GetEnvKey(env.KeyExecutorIgnoreHttpStatus)) { + for i := 0; i < len(e); i++ { + if e[i] == '=' { + b, err1 := strconv.ParseBool(e[i+1:]) + if err1 == nil { + r.ignoreHttpStatus = b + r.checkedHttpStatus = true + return !r.ignoreHttpStatus + } else { + r.ignoreHttpStatus = false + r.checkedHttpStatus = true + return !r.ignoreHttpStatus + } + } + } + break + } + } + + r.ignoreHttpStatus = false + r.checkedHttpStatus = true + + return !r.ignoreHttpStatus } // LocalTaskExecuteResult describe the local task execution result