From 76a530f9eabf056ea4788aca1db2a27a3f768165 Mon Sep 17 00:00:00 2001 From: pojol Date: Sun, 31 Jul 2022 23:02:55 +0800 Subject: [PATCH] feat : parallel node --- behavior/behavior.go | 1 + bot/bot.go | 540 ++++++++++++++++++++++++++++--------------- bot/bot_test.go | 27 ++- factory/batch.go | 12 +- factory/factory.go | 1 - utils/switch.go | 19 +- utils/switch_test.go | 31 +++ 7 files changed, 423 insertions(+), 208 deletions(-) create mode 100644 utils/switch_test.go diff --git a/behavior/behavior.go b/behavior/behavior.go index 4b7bb1c..4d75862 100644 --- a/behavior/behavior.go +++ b/behavior/behavior.go @@ -12,6 +12,7 @@ const ( WAIT = "WaitNode" LOOP = "LoopNode" ASSERT = "AssertNode" + PARALLEL = "ParallelNode" ) type Tree struct { diff --git a/bot/bot.go b/bot/bot.go index 6bd5d6b..98e7763 100644 --- a/bot/bot.go +++ b/bot/bot.go @@ -6,6 +6,7 @@ import ( "math/rand" "strconv" "sync" + "sync/atomic" "time" "github.com/pojol/gobot/behavior" @@ -19,20 +20,53 @@ type ErrInfo struct { Err error } +type RunMode int + +const ( + Debug RunMode = 1 + iota + Batch +) + +type Thread struct { + num int + preid string + err error +} + +type Transaction struct { + cur *behavior.Tree + parent *behavior.Tree + thread *Thread +} + type Bot struct { id string name string + mode RunMode preloadErr string tree *behavior.Tree - prev *behavior.Tree cur *behavior.Tree + threadnum int32 + threadDoneNum int32 + + threadChan chan *Transaction + waitChan chan *Transaction + waitLst []*Transaction + next *utils.Switch + + threadLst []*Thread + threadDone chan interface{} + sync.Mutex bs *botState + donech chan<- string + errch chan<- ErrInfo + runtimeErr string } @@ -113,11 +147,15 @@ func (b *Bot) GetPrevNodeID() string { func NewWithBehaviorTree(path string, bt *behavior.Tree, name string, idx int32, globalScript []string) *Bot { bot := &Bot{ - id: strconv.Itoa(int(idx)), - tree: bt, - cur: bt, - bs: luaPool.Get(), - name: name, + id: strconv.Itoa(int(idx)), + tree: bt, + cur: bt, + bs: luaPool.Get(), + name: name, + threadChan: make(chan *Transaction, 1), + waitChan: make(chan *Transaction, 1), + threadDone: make(chan interface{}), + next: utils.NewSwitch(), } rand.Seed(time.Now().UnixNano()) @@ -149,80 +187,191 @@ func NewWithBehaviorTree(path string, bt *behavior.Tree, name string, idx int32, return bot } -func (b *Bot) run_selector(nod *behavior.Tree, next bool) (bool, error) { +func (b *Bot) fillThreadInfo(t *Thread) { + + b.Lock() + + var f bool - if next { - for k := range nod.Children { - ok, err := b.run_nod(nod.Children[k], true) + for k, v := range b.threadLst { + if v.num == t.num { + b.threadLst[k] = t + f = true + break + } + } + + if !f { + b.threadLst = append(b.threadLst, t) + } + + fmt.Println("thread info ===>") + for _, v := range b.threadLst { + fmt.Println("\tthread"+strconv.Itoa(v.num), v.preid, v.err) + } + + b.Unlock() + +} + +func (b *Bot) do(t *Transaction) bool { + var ok bool + + switch t.cur.Ty { + case behavior.SELETE: + ok = b.strategySelete(t) + case behavior.SEQUENCE: + ok = b.strategySequence(t) + case behavior.CONDITION: + ok = b.strategyCondition(t) + case behavior.WAIT: + ok = b.strategyWait(t) + case behavior.LOOP: + ok = b.strategyLoop(t) + case behavior.ASSERT: + ok = b.strategyAssert(t) + case behavior.PARALLEL: + ok = b.strategyParallel(t) + case behavior.ROOT: + ok = true + default: + ok = b.strategyScript(t) + } + + return ok +} - if err != nil { - return false, err +func (b *Bot) strategySelete(t *Transaction) bool { + + batch := func() bool { + for k := range t.cur.Children { + + tr := &Transaction{ + cur: t.cur.Children[k], + parent: t.cur, + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, + } + ok := b.do(tr) + b.fillThreadInfo(tr.thread) + + if tr.thread.err != nil { + return false } if ok { break } + } + return true } - return true, nil -} + step := func() bool { -func (b *Bot) run_assert(nod *behavior.Tree, next bool) (bool, error) { + tr := &Transaction{ + cur: t.parent.Next(), + parent: t.parent, + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, + } + b.waitChan <- tr - err := DoString(b.bs.L, nod.Code) - if err != nil { - return false, err + return true } - err = b.bs.L.CallByParam(lua.P{ - Fn: b.bs.L.GetGlobal("execute"), - NRet: 1, - Protect: true, - }) - if err != nil { - return false, err + if b.mode == Batch { + return batch() } - v := b.bs.L.Get(-1) - b.bs.L.Pop(1) + return step() +} - if lua.LVAsBool(v) { - if next { - err = b.run_children(nod, nod.Children) - if err != nil { - return false, err - } +func (b *Bot) strategySequence(t *Transaction) bool { + + for k := range t.cur.Children { + + tr := &Transaction{ + cur: t.cur.Children[k], + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, } - return true, nil + ok := b.do(tr) + b.fillThreadInfo(tr.thread) + + if tr.thread.err != nil { + return false + } + + if !ok { + break + } } - return false, fmt.Errorf("node %v assert failed", nod.ID) + return true } -func (b *Bot) run_sequence(nod *behavior.Tree, next bool) (bool, error) { - if next { - for k := range nod.Children { - ok, err := b.run_nod(nod.Children[k], true) +func (b *Bot) strategyParallel(t *Transaction) bool { - if err != nil { - return false, err - } + for k := range t.cur.Children { - if !ok { - break - } + tr := &Transaction{ + cur: t.cur.Children[k], + thread: &Thread{ + num: int(atomic.AddInt32(&b.threadnum, 1)), + preid: t.cur.ID, + }, + } + b.fillThreadInfo(tr.thread) + + b.threadChan <- tr + + } + + return true +} + +func (b *Bot) strategyWait(t *Transaction) bool { + + time.Sleep(time.Millisecond * time.Duration(t.cur.Wait)) + + if b.mode == Batch && len(t.cur.Children) != 0 { + + tr := &Transaction{ + cur: t.cur.Children[0], + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, } + b.fillThreadInfo(tr.thread) + b.do(t) + } - return true, nil + return true } -func (b *Bot) run_condition(nod *behavior.Tree, next bool) (bool, error) { +func (b *Bot) strategyCondition(t *Transaction) bool { - err := DoString(b.bs.L, nod.Code) + tr := &Transaction{ + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, + } + defer b.fillThreadInfo(tr.thread) + + err := DoString(b.bs.L, t.cur.Code) if err != nil { - return false, err + tr.thread.err = err + return false } err = b.bs.L.CallByParam(lua.P{ @@ -231,163 +380,173 @@ func (b *Bot) run_condition(nod *behavior.Tree, next bool) (bool, error) { Protect: true, }) if err != nil { - return false, err + t.thread.err = err + return false } v := b.bs.L.Get(-1) b.bs.L.Pop(1) if lua.LVAsBool(v) { - if next { - err = b.run_children(nod, nod.Children) - if err != nil { - return false, err - } + if b.mode == Batch && len(t.cur.Children) != 0 { + + tr.cur = t.cur.Children[0] + b.do(tr) } - return true, nil } - return false, nil + return true } -func (b *Bot) run_wait(nod *behavior.Tree, next bool) (bool, error) { - time.Sleep(time.Millisecond * time.Duration(nod.Wait)) +func (b *Bot) strategyLoop(t *Transaction) bool { - if next { - err := b.run_children(nod, nod.Children) - if err != nil { - return false, err + for i := 0; i < int(t.cur.Loop); i++ { + if b.mode == Batch && len(t.cur.Children) != 0 { + tr := &Transaction{ + cur: t.cur.Children[0], + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, + } + b.fillThreadInfo(tr.thread) + b.do(tr) } } - return true, nil + return true } -func (b *Bot) run_loop(nod *behavior.Tree, next bool) (bool, error) { +func (b *Bot) strategyScript(t *Transaction) bool { - var err error + tr := &Transaction{ + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, + } + defer b.fillThreadInfo(tr.thread) - if nod.Loop == 0 { - for { - if next { - err = b.run_children(nod, nod.Children) - if err != nil { - goto ext - } - } - time.Sleep(time.Millisecond) - } - } else { + tr.thread.err = DoString(b.bs.L, t.cur.Code) + if tr.thread.err != nil { + return false + } - if next { - for i := 0; i < int(nod.Loop); i++ { - err = b.run_children(nod, nod.Children) - if err != nil { - goto ext - } - } - } + tr.thread.err = b.bs.L.CallByParam(lua.P{ + Fn: b.bs.L.GetGlobal("execute"), + NRet: 1, + Protect: true, + }) + if tr.thread.err != nil { + return false } -ext: - return true, err + b.bs.L.Pop(1) + + if b.mode == Batch && len(t.cur.Children) != 0 { + tr.cur = t.cur.Children[0] + b.do(tr) + } + + return true } -func (b *Bot) run_script(nod *behavior.Tree, next bool) (bool, error) { +func (b *Bot) strategyAssert(t *Transaction) bool { - err := DoString(b.bs.L, nod.Code) - if err != nil { - return false, err + tr := &Transaction{ + thread: &Thread{ + num: t.thread.num, + preid: t.cur.ID, + }, } - err = b.bs.L.CallByParam(lua.P{ + tr.thread.err = DoString(b.bs.L, t.cur.Code) + if tr.thread.err != nil { + return false + } + + tr.thread.err = b.bs.L.CallByParam(lua.P{ Fn: b.bs.L.GetGlobal("execute"), NRet: 1, Protect: true, }) - if err != nil { - return false, err + if tr.thread.err != nil { + return false } - //v := b.bs.L.Get(-1) + v := b.bs.L.Get(-1) b.bs.L.Pop(1) - // mergo.MergeWithOverwrite(&b.metadata, t) + if lua.LVAsBool(v) { - if next { - err = b.run_children(nod, nod.Children) - if err != nil { - return false, err + if len(t.cur.Children) != 0 { + tr.cur = t.cur.Children[0] + b.fillThreadInfo(tr.thread) + b.do(tr) } + + return true } - return true, nil + return false } -func (b *Bot) run_nod(nod *behavior.Tree, next bool) (bool, error) { +func (b *Bot) loop() { - var ok bool - var err error + for { - switch nod.Ty { - case behavior.SELETE: - ok, err = b.run_selector(nod, next) - case behavior.SEQUENCE: - ok, err = b.run_sequence(nod, next) - case behavior.CONDITION: - ok, err = b.run_condition(nod, next) - case behavior.WAIT: - ok, _ = b.run_wait(nod, next) - case behavior.LOOP: - ok, err = b.run_loop(nod, next) - case behavior.ASSERT: - ok, err = b.run_assert(nod, next) - case behavior.ROOT: - ok = true - default: - ok, err = b.run_script(nod, next) - } + select { + case t := <-b.threadChan: + b.do(t) + if t.thread.err != nil { + b.errch <- ErrInfo{ID: b.id, Err: t.thread.err} + goto ext + } - return ok, err -} + b.threadDoneNum++ -func (b *Bot) run_children(parent *behavior.Tree, children []*behavior.Tree) error { - var err error + if atomic.LoadInt32(&b.threadnum) == b.threadDoneNum { + b.donech <- b.id + goto ext + } + case w := <-b.waitChan: + b.waitLst = append(b.waitLst, w) + case <-b.next.Done(): + if b.next.HasOpend() { - for k := range children { - _, err = b.run_nod(children[k], true) - if err != nil { - break + for _, v := range b.waitLst { + b.threadChan <- v + } + + b.waitLst = b.waitLst[:0] + } + b.next.Close() } + } - return err +ext: + // cleanup + b.close() } -func (b *Bot) Run(doneCh chan string, errch chan ErrInfo) { +func (b *Bot) Run(doneCh chan<- string, errch chan<- ErrInfo, mode RunMode) { - go func() { + b.donech = doneCh + b.errch = errch + b.mode = mode - defer func() { - if err := recover(); err != nil { - errch <- ErrInfo{ - ID: b.id, - Err: err.(error), - } - } - }() + tn := int(atomic.AddInt32(&b.threadnum, 1)) - err := b.run_children(b.tree, b.tree.Children) - if err != nil { - errch <- ErrInfo{ - ID: b.id, - Err: err, - } - return - } + go b.loop() - doneCh <- b.id - }() + if len(b.tree.Children) != 0 { + b.threadChan <- &Transaction{ + cur: b.cur.Children[0], + parent: b.cur, + thread: &Thread{num: tn, preid: b.tree.ID}, + } + } } @@ -399,19 +558,24 @@ func (b *Bot) RunByBlock() error { } }() - err := b.run_children(b.tree, b.tree.Children) - if err != nil { - fmt.Println("run block err", err) - } + donech := make(chan string) + errch := make(chan ErrInfo) + + b.Run(donech, errch, Batch) - return err + select { + case <-donech: + return nil + case e := <-errch: + return e.Err + } } func (b *Bot) GetReport() []script.Report { return b.bs.httpMod.GetReport() } -func (b *Bot) Close() { +func (b *Bot) close() { b.bs.L.DoString(` meta = {} `) @@ -428,44 +592,46 @@ const ( ) func (b *Bot) RunStep() State { - if b.cur == nil { - return SEnd - } - b.Lock() - defer b.Unlock() + /* + if b.cur == nil { + return SEnd + } - f, err := b.run_nod(b.cur, false) - if err != nil { - b.runtimeErr = err.Error() - return SBreak - } - // step 中使用了sleep之后,会有多个goroutine执行接下来的程序 - // fmt.Println(goid.Get()) + b.Lock() + defer b.Unlock() - if b.cur.Parent != nil { - if b.cur.Parent.Ty == behavior.SELETE && f { - b.cur.Parent.Step = len(b.cur.Parent.Children) + f, err := b.run_nod(b.cur, false) + if err != nil { + b.runtimeErr = err.Error() + return SBreak } - } + // step 中使用了sleep之后,会有多个goroutine执行接下来的程序 + // fmt.Println(goid.Get()) - if f && b.cur.Step < len(b.cur.Children) { - // down - nextidx := b.cur.Step - b.cur.Step++ - next := b.cur.Children[nextidx] - b.prev = b.cur - b.cur = next - } else { - // right if b.cur.Parent != nil { - b.prev = b.cur - b.cur = b.cur.Parent.Next() - if b.cur == nil { - return SEnd + if b.cur.Parent.Ty == behavior.SELETE && f { + b.cur.Parent.Step = len(b.cur.Parent.Children) } } - } + if f && b.cur.Step < len(b.cur.Children) { + // down + nextidx := b.cur.Step + b.cur.Step++ + next := b.cur.Children[nextidx] + b.prev = b.cur + b.cur = next + } else { + // right + if b.cur.Parent != nil { + b.prev = b.cur + b.cur = b.cur.Parent.Next() + if b.cur == nil { + return SEnd + } + } + } + */ return SSucc } diff --git a/bot/bot_test.go b/bot/bot_test.go index 030ef8a..b181e98 100644 --- a/bot/bot_test.go +++ b/bot/bot_test.go @@ -251,7 +251,7 @@ func TestLoad(t *testing.T) { assert.Equal(t, err, nil) bot = NewWithBehaviorTree("../script/", tree, "test", 1, []string{}) - defer bot.Close() + defer bot.close() for i := 0; i < 20; i++ { bot.RunStep() @@ -261,6 +261,29 @@ func TestLoad(t *testing.T) { t.Fail() } +func TestRuning(t *testing.T) { + var tree *behavior.Tree + var bot *Bot + + tree, err := behavior.New([]byte(compose)) + assert.Equal(t, err, nil) + + bot = NewWithBehaviorTree("../script/", tree, "test", 1, []string{}) + + donech := make(chan string) + errch := make(chan ErrInfo) + bot.Run(donech, errch, Batch) + + select { + case <-donech: + fmt.Println("running succ") + return + case e := <-errch: + fmt.Println("running", e.Err) + t.Fail() + } +} + func TestPool(t *testing.T) { var tree *behavior.Tree var bot *Bot @@ -269,7 +292,7 @@ func TestPool(t *testing.T) { assert.Equal(t, err, nil) bot = NewWithBehaviorTree("../script/", tree, "test", 1, []string{}) - defer bot.Close() + defer bot.close() err = bot.RunByBlock() diff --git a/factory/batch.go b/factory/batch.go index a21891c..06824e4 100644 --- a/factory/batch.go +++ b/factory/batch.go @@ -113,8 +113,6 @@ func (b *Batch) pop(id string) { b.bwg.Done() atomic.AddInt32(&b.CurNum, 1) - b.bots[id].Close() - if atomic.LoadInt32(&b.CurNum) >= b.TotalNum { b.done <- 1 } @@ -131,9 +129,9 @@ func (b *Batch) loop() { for { select { - case bot := <-b.pipeline: - b.push(bot) - bot.Run(b.botDoneCh, b.botErrCh) + case botptr := <-b.pipeline: + b.push(botptr) + botptr.Run(b.botDoneCh, b.botErrCh, bot.Batch) case id := <-b.botDoneCh: if _, ok := b.bots[id]; ok { b.pushReport(b.rep, b.bots[id]) @@ -261,9 +259,9 @@ func (b *Batch) record() { b.rep.Dura = duration if b.rep.ErrNum != 0 { - b.colorer.Printf("robot : %d req count : %d duration : %s qps : %d errors : %v\n", b.rep.BotNum, b.rep.ReqNum, duration, qps, utils.Red(b.rep.ErrNum)) + b.colorer.Printf("robot : %d match to %d APIs req count : %d duration : %s qps : %d errors : %v\n", b.rep.BotNum, len(b.rep.UrlMap), b.rep.ReqNum, duration, qps, utils.Red(b.rep.ErrNum)) } else { - fmt.Printf("robot : %d req count : %d duration : %s qps : %d errors : %d\n", b.rep.BotNum, b.rep.ReqNum, duration, qps, b.rep.ErrNum) + fmt.Printf("robot : %d match to %d APIs req count : %d duration : %s qps : %d errors : %d\n", b.rep.BotNum, len(b.rep.UrlMap), b.rep.ReqNum, duration, qps, b.rep.ErrNum) } } diff --git a/factory/factory.go b/factory/factory.go index ebf9484..b697d22 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -197,7 +197,6 @@ func (f *Factory) FindBot(botid string) *bot.Bot { func (f *Factory) RmvBot(botid string) { if _, ok := f.debugBots[botid]; ok { - f.debugBots[botid].Close() delete(f.debugBots, botid) } diff --git a/utils/switch.go b/utils/switch.go index bb0fba8..0961ead 100644 --- a/utils/switch.go +++ b/utils/switch.go @@ -3,7 +3,6 @@ package utils // from https://github.com/grpc/grpc-go/blob/master/internal/grpcsync/event.go import ( - "sync" "sync/atomic" ) @@ -11,18 +10,16 @@ import ( type Switch struct { opened int32 c chan struct{} - o sync.Once } // Open 开启 -func (s *Switch) Open() bool { - ret := false - s.o.Do(func() { - atomic.StoreInt32(&s.opened, 1) - close(s.c) - ret = true - }) - return ret +func (s *Switch) Open() { + atomic.StoreInt32(&s.opened, 1) + s.c <- struct{}{} +} + +func (s *Switch) Close() { + atomic.StoreInt32(&s.opened, 0) } // Done 事件触发 @@ -37,5 +34,5 @@ func (s *Switch) HasOpend() bool { // NewSwitch 返回一个新的开关事件 func NewSwitch() *Switch { - return &Switch{c: make(chan struct{})} + return &Switch{c: make(chan struct{}, 1)} } diff --git a/utils/switch_test.go b/utils/switch_test.go new file mode 100644 index 0000000..feaa900 --- /dev/null +++ b/utils/switch_test.go @@ -0,0 +1,31 @@ +package utils + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSwitch(t *testing.T) { + + s := NewSwitch() + tick := int32(0) + + go func() { + for { + <-s.Done() + atomic.AddInt32(&tick, 1) + } + }() + + s.Open() + assert.Equal(t, s.HasOpend(), true) + s.Close() + assert.Equal(t, s.HasOpend(), false) + + time.Sleep(time.Millisecond * 100) + + assert.Equal(t, tick, int32(2)) +}