Skip to content

Commit

Permalink
feat(machine): add WhenQueueEnds
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jun 27, 2024
1 parent 761ea1c commit 34a48bc
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 59 deletions.
4 changes: 2 additions & 2 deletions docs/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -1054,9 +1054,9 @@ Foo
<-mach.WhenTick("DownloadingFile", 2, nil)
```
Almost all "when" methods return a share channel which closes when an event happens (or the optionally passed context is
All "when" methods return a channel, which closes when an event happens (or the optionally passed context gets
canceled). They are used to wait until a certain moment, when we know the execution can proceed. Using "when" methods
creates new channels and should be used with caution, possibly making use of the early disposal context. In the future,
creates new channels and should be used with caution, ideally making use of the early disposal context. In the future,
these channels will be reused and should scale way better.
"When" methods are:
Expand Down
52 changes: 26 additions & 26 deletions examples/nfa/nfa_test.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
// Based on https://en.wikipedia.org/wiki/Nondeterministic_finite_automaton
// TODO add Input0, Input1, End
//
//=== RUN TestRegexp
//=== RUN TestRegexp/test_101010_(OK)
//[state] +Start +StepX
//[state] +Input
//[extern:InputState] input: 1
//[state] +Input
//[extern:InputState] input: 0
//[state] +Input
//[extern:InputState] input: 1
//[state] +Step0 -StepX
//[state] +Input
//[extern:InputState] input: 0
//[state] +Step1 -Step0
//[state] +Input
//[extern:InputState] input: 1
//[state] +Step2 -Step1
//[state] +Input
//[extern:InputState] input: 0
//[state] +Step3 -Step2
//[state] -Start
//--- PASS: TestRegexp (0.00s)
// === RUN TestRegexp
// === RUN TestRegexp/test_101010_(OK)
// [state] +Start +StepX
// [state] +Input
// [extern:InputState] input: 1
// [state] +Input
// [extern:InputState] input: 0
// [state] +Input
// [extern:InputState] input: 1
// [state] +Step0 -StepX
// [state] +Input
// [extern:InputState] input: 0
// [state] +Step1 -Step0
// [state] +Input
// [extern:InputState] input: 1
// [state] +Step2 -Step1
// [state] +Input
// [extern:InputState] input: 0
// [state] +Step3 -Step2
// [state] -Start
// --- PASS: TestRegexp (0.00s)
// --- PASS: TestRegexp/test_101010_(OK) (0.00s)
//PASS
// PASS

package nfa

Expand Down Expand Up @@ -81,11 +82,10 @@ func (t *Regexp) StartState(e *am.Event) {
mach.Add1(StepX, nil)

// jump out of the queue
queueEnd := mach.WhenQueueEnds(nil)
go func() {
// TODO use mach.WhenQueueEnds()
<-mach.When1(StepX, nil)
// needed for the queue lock to release
time.Sleep(1 * time.Millisecond)
// wait for drained
<-queueEnd

for _, c := range t.input {
switch c {
Expand Down
98 changes: 88 additions & 10 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Machine struct {
queueLock sync.RWMutex
queueProcessing sync.Mutex
queueLen int
queueRunning bool

emitters []*emitter
once sync.Once
Expand All @@ -96,6 +97,8 @@ type Machine struct {
indexStateCtx indexStateCtx
indexEventCh indexEventCh
indexEventChLock sync.Mutex
indexWhenQueue []whenQueueBinding
indexWhenQueueLock sync.Mutex
handlerStart chan *handlerCall
handlerEnd chan bool
handlerPanic chan any
Expand All @@ -106,6 +109,7 @@ type Machine struct {
logArgs func(args A) map[string]string
currentHandler string
disposeHandlers []func()
timeLast T
}

// NewCommon creates a new Machine instance with all the common options set.
Expand Down Expand Up @@ -766,6 +770,68 @@ func (m *Machine) WhenTicksEq(
return m.WhenTime(S{state}, T{uint64(ticks)}, ctx)
}

// WhenQueueEnds returns a channel which closes when the queue ends. Optionally
// accepts a context to close the channel earlier.
func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})

if m.Disposed {
close(ch)

return ch
}

// locks
m.indexWhenQueueLock.Lock()
defer m.indexWhenQueueLock.Unlock()

// finish early
if !m.queueRunning {
close(ch)

return ch
}

// add the binding to an index of each state
binding := whenQueueBinding{
ch: ch,
}

// log
m.log(LogDecisions, "[when:queue] new wait")

// dispose with context
// TODO extract
if ctx != nil {
go func() {
select {
case <-ch:
return
case <-m.Ctx.Done():
return
case <-ctx.Done():
}
// GC only if needed
if m.Disposed {
return
}

// TODO track
closeSafe(ch)

m.indexWhenQueueLock.Lock()
defer m.indexWhenQueueLock.Unlock()

m.indexWhenQueue = slicesWithout(m.indexWhenQueue, binding)
}()
}

// insert the binding
m.indexWhenQueue = append(m.indexWhenQueue, binding)

return ch
}

// Time returns a list of logical clocks of specified states (or all the states
// if nil).
// states: optionally passing a list of states param guarantees a deterministic
Expand All @@ -774,6 +840,10 @@ func (m *Machine) Time(states S) T {
m.activeStatesLock.RLock()
defer m.activeStatesLock.RUnlock()

return m.time(states)
}

func (m *Machine) time(states S) T {
if states == nil {
states = m.StateNames
}
Expand Down Expand Up @@ -1448,7 +1518,9 @@ func (m *Machine) processQueue() Result {
var ret []Result

// execute the queue
m.queueRunning = false
for len(m.queue) > 0 {
m.queueRunning = true

if m.Disposed {
return Canceled
Expand Down Expand Up @@ -1480,6 +1552,8 @@ func (m *Machine) processQueue() Result {
// execute the transition
ret = append(ret, m.Transition.emitEvents())

m.timeLast = m.Transition.TAfter

// process flow methods
m.processWhenBindings()
m.processWhenTimeBindings()
Expand All @@ -1489,7 +1563,9 @@ func (m *Machine) processQueue() Result {
// release the atomic lock
m.Transition = nil
m.queueProcessing.Unlock()
m.emit(EventQueueEnd, nil, nil)
m.queueRunning = false
m.emit(EventQueueEnd, A{"T": m.timeLast}, nil)
m.processWhenQueueBindings()

if len(ret) == 0 {
return Canceled
Expand Down Expand Up @@ -1664,6 +1740,17 @@ func (m *Machine) processWhenTimeBindings() {
}
}

func (m *Machine) processWhenQueueBindings() {
m.indexWhenQueueLock.Lock()
toClose := slices.Clone(m.indexWhenQueue)
m.indexWhenQueue = nil
m.indexWhenQueueLock.Unlock()

for _, binding := range toClose {
closeSafe(binding.ch)
}
}

// SetLogArgs accepts a function which decides which mutation arguments to log.
// See NewArgsMapper or create your own manually.
func (m *Machine) SetLogArgs(matcher func(args A) map[string]string) {
Expand Down Expand Up @@ -2314,15 +2401,6 @@ func (m *Machine) CanRemove(states S) bool {
panic("CanRemove not implemented; github.com/pancsta/asyncmachine-go/pulls")
}

// WhenQueueEnds writes every time the queue ends with the times from after
// the last mutation. The channel will close with ctx or Machine.Ctx.
// TODO WhenQueueEnds
// Planned.
func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan T {
panic(
"WhenQueueEnds not implemented; github.com/pancsta/asyncmachine-go/pulls")
}

// Clocks returns the map of specified cloks or all clocks if states is nil.
func (m *Machine) Clocks(states S) Clocks {
if states == nil {
Expand Down
40 changes: 40 additions & 0 deletions pkg/machine/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,3 +1320,43 @@ func TestInspect(t *testing.T) {
`
assertString(t, m, expected, names)
}

// TestWhenQueueEnds
type TestWhenQueueEndsHandlers struct {
*ExceptionHandler
}

func (h *TestWhenQueueEndsHandlers) AState(e *Event) {
close(e.Args["readyMut"].(chan struct{}))
<-e.Args["readyGo"].(chan struct{})
e.Machine.Add1("B", nil)
}

func TestWhenQueueEnds(t *testing.T) {
// init
m := NewNoRels(t, nil)
defer m.Dispose()
// order
err := m.VerifyStates(S{"A", "B", "C", "D", "Exception"})
if err != nil {
t.Fatal(err)
}
// bind handlers
err = m.BindHandlers(&TestWhenQueueEndsHandlers{})
assert.NoError(t, err)

// run the test
readyGo := make(chan struct{})
readyMut := make(chan struct{})
var queueEnds <-chan struct{}
go func() {
<-readyMut
assert.True(t, m.DuringTransition(),
"Machine should be during a transition")
queueEnds = m.WhenQueueEnds(context.TODO())
close(readyGo)
}()
m.Add1("A", A{"readyMut": readyMut, "readyGo": readyGo})
// confirm the queue wait is closed
<-queueEnds
}
46 changes: 25 additions & 21 deletions pkg/machine/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type State struct {
// Struct is a map of state names to state definitions.
type Struct = map[string]State

///////////////
///// options
///////////////
// /////////////
// /// options
// /////////////

// Opts struct is used to configure a new Machine.
type Opts struct {
Expand Down Expand Up @@ -107,9 +107,9 @@ func OptsWithParentTracers(opts *Opts, parent *Machine) *Opts {
return opts
}

///////////////
///// enums
///////////////
// /////////////
// /// enums
// /////////////

// Result enum is the result of a state Transition
type Result int
Expand Down Expand Up @@ -300,9 +300,9 @@ func (r Relation) String() string {
return ""
}

///////////////
///// logging
///////////////
// /////////////
// /// logging
// /////////////

// Logger is a logging function for the machine.
type Logger func(level LogLevel, msg string, args ...any)
Expand Down Expand Up @@ -384,9 +384,9 @@ type Tracer interface {
Inheritable() bool
}

///////////////
///// events, when, emitters
///////////////
// /////////////
// /// events, when, emitters
// /////////////

// Event struct represents a single event of a Mutation withing a Transition.
// One event can have 0-n handlers.
Expand Down Expand Up @@ -453,6 +453,10 @@ type whenArgsBinding struct {
args A
}

type whenQueueBinding struct {
ch chan struct{}
}

type stateIsActive map[string]bool

// emitter represents a single event consumer, synchronized by channels.
Expand All @@ -467,9 +471,9 @@ func (e *emitter) dispose() {
e.methods = nil
}

///////////////
///// exception support
///////////////
// /////////////
// /// exception support
// /////////////

// Exception is the Exception state name
const Exception = "Exception"
Expand Down Expand Up @@ -511,9 +515,9 @@ func (eh *ExceptionHandler) ExceptionState(e *Event) {
}
}

///////////////
///// pub utils
///////////////
// /////////////
// /// pub utils
// /////////////

// DiffStates returns the states that are in states1 but not in states2.
func DiffStates(states1 S, states2 S) S {
Expand Down Expand Up @@ -597,9 +601,9 @@ func SMerge(states ...S) S {
return slicesUniq(s)
}

///////////////
///// utils
///////////////
// /////////////
// /// utils
// /////////////

// j joins state names into a single string
func j(states []string) string {
Expand Down
Loading

0 comments on commit 34a48bc

Please sign in to comment.