diff --git a/docs/manual.md b/docs/manual.md index 99b04d1..b391865 100644 --- a/docs/manual.md +++ b/docs/manual.md @@ -1054,9 +1054,9 @@ Foo <-mach.WhenTick("DownloadingFile", 2, nil) ``` -Almost all "when" methods return a share channel which closes when an event happens (or the optionally passed context is +All "when" methods return a channel, which closes when an event happens (or the optionally passed context gets canceled). They are used to wait until a certain moment, when we know the execution can proceed. Using "when" methods -creates new channels and should be used with caution, possibly making use of the early disposal context. In the future, +creates new channels and should be used with caution, ideally making use of the early disposal context. In the future, these channels will be reused and should scale way better. "When" methods are: diff --git a/examples/nfa/nfa_test.go b/examples/nfa/nfa_test.go index c7c6648..bc77856 100644 --- a/examples/nfa/nfa_test.go +++ b/examples/nfa/nfa_test.go @@ -1,28 +1,29 @@ // Based on https://en.wikipedia.org/wiki/Nondeterministic_finite_automaton +// TODO add Input0, Input1, End // -//=== RUN TestRegexp -//=== RUN TestRegexp/test_101010_(OK) -//[state] +Start +StepX -//[state] +Input -//[extern:InputState] input: 1 -//[state] +Input -//[extern:InputState] input: 0 -//[state] +Input -//[extern:InputState] input: 1 -//[state] +Step0 -StepX -//[state] +Input -//[extern:InputState] input: 0 -//[state] +Step1 -Step0 -//[state] +Input -//[extern:InputState] input: 1 -//[state] +Step2 -Step1 -//[state] +Input -//[extern:InputState] input: 0 -//[state] +Step3 -Step2 -//[state] -Start -//--- PASS: TestRegexp (0.00s) +// === RUN TestRegexp +// === RUN TestRegexp/test_101010_(OK) +// [state] +Start +StepX +// [state] +Input +// [extern:InputState] input: 1 +// [state] +Input +// [extern:InputState] input: 0 +// [state] +Input +// [extern:InputState] input: 1 +// [state] +Step0 -StepX +// [state] +Input +// [extern:InputState] input: 0 +// [state] +Step1 -Step0 +// [state] +Input +// [extern:InputState] input: 1 +// [state] +Step2 -Step1 +// [state] +Input +// [extern:InputState] input: 0 +// [state] +Step3 -Step2 +// [state] -Start +// --- PASS: TestRegexp (0.00s) // --- PASS: TestRegexp/test_101010_(OK) (0.00s) -//PASS +// PASS package nfa @@ -81,11 +82,10 @@ func (t *Regexp) StartState(e *am.Event) { mach.Add1(StepX, nil) // jump out of the queue + queueEnd := mach.WhenQueueEnds(nil) go func() { - // TODO use mach.WhenQueueEnds() - <-mach.When1(StepX, nil) - // needed for the queue lock to release - time.Sleep(1 * time.Millisecond) + // wait for drained + <-queueEnd for _, c := range t.input { switch c { diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index 45d0231..7b31fd6 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -79,6 +79,7 @@ type Machine struct { queueLock sync.RWMutex queueProcessing sync.Mutex queueLen int + queueRunning bool emitters []*emitter once sync.Once @@ -96,6 +97,8 @@ type Machine struct { indexStateCtx indexStateCtx indexEventCh indexEventCh indexEventChLock sync.Mutex + indexWhenQueue []whenQueueBinding + indexWhenQueueLock sync.Mutex handlerStart chan *handlerCall handlerEnd chan bool handlerPanic chan any @@ -106,6 +109,7 @@ type Machine struct { logArgs func(args A) map[string]string currentHandler string disposeHandlers []func() + timeLast T } // NewCommon creates a new Machine instance with all the common options set. @@ -766,6 +770,68 @@ func (m *Machine) WhenTicksEq( return m.WhenTime(S{state}, T{uint64(ticks)}, ctx) } +// WhenQueueEnds returns a channel which closes when the queue ends. Optionally +// accepts a context to close the channel earlier. +func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + + if m.Disposed { + close(ch) + + return ch + } + + // locks + m.indexWhenQueueLock.Lock() + defer m.indexWhenQueueLock.Unlock() + + // finish early + if !m.queueRunning { + close(ch) + + return ch + } + + // add the binding to an index of each state + binding := whenQueueBinding{ + ch: ch, + } + + // log + m.log(LogDecisions, "[when:queue] new wait") + + // dispose with context + // TODO extract + if ctx != nil { + go func() { + select { + case <-ch: + return + case <-m.Ctx.Done(): + return + case <-ctx.Done(): + } + // GC only if needed + if m.Disposed { + return + } + + // TODO track + closeSafe(ch) + + m.indexWhenQueueLock.Lock() + defer m.indexWhenQueueLock.Unlock() + + m.indexWhenQueue = slicesWithout(m.indexWhenQueue, binding) + }() + } + + // insert the binding + m.indexWhenQueue = append(m.indexWhenQueue, binding) + + return ch +} + // Time returns a list of logical clocks of specified states (or all the states // if nil). // states: optionally passing a list of states param guarantees a deterministic @@ -774,6 +840,10 @@ func (m *Machine) Time(states S) T { m.activeStatesLock.RLock() defer m.activeStatesLock.RUnlock() + return m.time(states) +} + +func (m *Machine) time(states S) T { if states == nil { states = m.StateNames } @@ -1448,7 +1518,9 @@ func (m *Machine) processQueue() Result { var ret []Result // execute the queue + m.queueRunning = false for len(m.queue) > 0 { + m.queueRunning = true if m.Disposed { return Canceled @@ -1480,6 +1552,8 @@ func (m *Machine) processQueue() Result { // execute the transition ret = append(ret, m.Transition.emitEvents()) + m.timeLast = m.Transition.TAfter + // process flow methods m.processWhenBindings() m.processWhenTimeBindings() @@ -1489,7 +1563,9 @@ func (m *Machine) processQueue() Result { // release the atomic lock m.Transition = nil m.queueProcessing.Unlock() - m.emit(EventQueueEnd, nil, nil) + m.queueRunning = false + m.emit(EventQueueEnd, A{"T": m.timeLast}, nil) + m.processWhenQueueBindings() if len(ret) == 0 { return Canceled @@ -1664,6 +1740,17 @@ func (m *Machine) processWhenTimeBindings() { } } +func (m *Machine) processWhenQueueBindings() { + m.indexWhenQueueLock.Lock() + toClose := slices.Clone(m.indexWhenQueue) + m.indexWhenQueue = nil + m.indexWhenQueueLock.Unlock() + + for _, binding := range toClose { + closeSafe(binding.ch) + } +} + // SetLogArgs accepts a function which decides which mutation arguments to log. // See NewArgsMapper or create your own manually. func (m *Machine) SetLogArgs(matcher func(args A) map[string]string) { @@ -2314,15 +2401,6 @@ func (m *Machine) CanRemove(states S) bool { panic("CanRemove not implemented; github.com/pancsta/asyncmachine-go/pulls") } -// WhenQueueEnds writes every time the queue ends with the times from after -// the last mutation. The channel will close with ctx or Machine.Ctx. -// TODO WhenQueueEnds -// Planned. -func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan T { - panic( - "WhenQueueEnds not implemented; github.com/pancsta/asyncmachine-go/pulls") -} - // Clocks returns the map of specified cloks or all clocks if states is nil. func (m *Machine) Clocks(states S) Clocks { if states == nil { diff --git a/pkg/machine/machine_test.go b/pkg/machine/machine_test.go index 1ba9ee8..10ef963 100644 --- a/pkg/machine/machine_test.go +++ b/pkg/machine/machine_test.go @@ -1320,3 +1320,43 @@ func TestInspect(t *testing.T) { ` assertString(t, m, expected, names) } + +// TestWhenQueueEnds +type TestWhenQueueEndsHandlers struct { + *ExceptionHandler +} + +func (h *TestWhenQueueEndsHandlers) AState(e *Event) { + close(e.Args["readyMut"].(chan struct{})) + <-e.Args["readyGo"].(chan struct{}) + e.Machine.Add1("B", nil) +} + +func TestWhenQueueEnds(t *testing.T) { + // init + m := NewNoRels(t, nil) + defer m.Dispose() + // order + err := m.VerifyStates(S{"A", "B", "C", "D", "Exception"}) + if err != nil { + t.Fatal(err) + } + // bind handlers + err = m.BindHandlers(&TestWhenQueueEndsHandlers{}) + assert.NoError(t, err) + + // run the test + readyGo := make(chan struct{}) + readyMut := make(chan struct{}) + var queueEnds <-chan struct{} + go func() { + <-readyMut + assert.True(t, m.DuringTransition(), + "Machine should be during a transition") + queueEnds = m.WhenQueueEnds(context.TODO()) + close(readyGo) + }() + m.Add1("A", A{"readyMut": readyMut, "readyGo": readyGo}) + // confirm the queue wait is closed + <-queueEnds +} diff --git a/pkg/machine/misc.go b/pkg/machine/misc.go index 48ea8f0..24b07d0 100644 --- a/pkg/machine/misc.go +++ b/pkg/machine/misc.go @@ -39,9 +39,9 @@ type State struct { // Struct is a map of state names to state definitions. type Struct = map[string]State -/////////////// -///// options -/////////////// +// ///////////// +// /// options +// ///////////// // Opts struct is used to configure a new Machine. type Opts struct { @@ -107,9 +107,9 @@ func OptsWithParentTracers(opts *Opts, parent *Machine) *Opts { return opts } -/////////////// -///// enums -/////////////// +// ///////////// +// /// enums +// ///////////// // Result enum is the result of a state Transition type Result int @@ -300,9 +300,9 @@ func (r Relation) String() string { return "" } -/////////////// -///// logging -/////////////// +// ///////////// +// /// logging +// ///////////// // Logger is a logging function for the machine. type Logger func(level LogLevel, msg string, args ...any) @@ -384,9 +384,9 @@ type Tracer interface { Inheritable() bool } -/////////////// -///// events, when, emitters -/////////////// +// ///////////// +// /// events, when, emitters +// ///////////// // Event struct represents a single event of a Mutation withing a Transition. // One event can have 0-n handlers. @@ -453,6 +453,10 @@ type whenArgsBinding struct { args A } +type whenQueueBinding struct { + ch chan struct{} +} + type stateIsActive map[string]bool // emitter represents a single event consumer, synchronized by channels. @@ -467,9 +471,9 @@ func (e *emitter) dispose() { e.methods = nil } -/////////////// -///// exception support -/////////////// +// ///////////// +// /// exception support +// ///////////// // Exception is the Exception state name const Exception = "Exception" @@ -511,9 +515,9 @@ func (eh *ExceptionHandler) ExceptionState(e *Event) { } } -/////////////// -///// pub utils -/////////////// +// ///////////// +// /// pub utils +// ///////////// // DiffStates returns the states that are in states1 but not in states2. func DiffStates(states1 S, states2 S) S { @@ -597,9 +601,9 @@ func SMerge(states ...S) S { return slicesUniq(s) } -/////////////// -///// utils -/////////////// +// ///////////// +// /// utils +// ///////////// // j joins state names into a single string func j(states []string) string { diff --git a/pkg/machine/transition.go b/pkg/machine/transition.go index 52f41d8..0a777ea 100644 --- a/pkg/machine/transition.go +++ b/pkg/machine/transition.go @@ -41,6 +41,7 @@ type Transition struct { // clocks of the states from after the transition // TODO timeAfter, produce Clocks via ClockAfter(), add index diffs ClocksAfter Clocks + TAfter T // State names with "enter" handlers to execute Enters S // State names with "exit" handlers to executed @@ -388,6 +389,7 @@ func (t *Transition) emitEvents() Result { clocks[state] = m.clock[state] } t.ClocksAfter = clocks + t.TAfter = m.time(nil) // AUTO STATES if result == Canceled {