Skip to content

Commit

Permalink
refac(machine): refac to directional channs
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Jul 10, 2024
1 parent ab1ee58 commit 04c4ed3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
28 changes: 15 additions & 13 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (m *Machine) disposeEmitter(emitter *emitter) {
// Exception state.
//
// ctx: optional context defaults to the machine's context.
func (m *Machine) WhenErr(ctx context.Context) chan struct{} {
func (m *Machine) WhenErr(ctx context.Context) <-chan struct{} {
// handle with a shared channel with broadcast via close
return m.When([]string{"Exception"}, ctx)
}
Expand All @@ -409,7 +409,7 @@ func (m *Machine) WhenErr(ctx context.Context) chan struct{} {
// ctx: optional context that will close the channel when done. Useful when
// listening on 2 When() channels within the same `select` to GC the 2nd one.
// TODO re-use channels with the same state set and context
func (m *Machine) When(states S, ctx context.Context) chan struct{} {
func (m *Machine) When(states S, ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
if m.Disposed {
close(ch)
Expand Down Expand Up @@ -489,7 +489,7 @@ func (m *Machine) When(states S, ctx context.Context) chan struct{} {

// When1 is an alias to When() for a single state.
// See When.
func (m *Machine) When1(state string, ctx context.Context) chan struct{} {
func (m *Machine) When1(state string, ctx context.Context) <-chan struct{} {
return m.When(S{state}, ctx)
}

Expand All @@ -498,7 +498,7 @@ func (m *Machine) When1(state string, ctx context.Context) chan struct{} {
//
// ctx: optional context that will close the channel when done. Useful when
// listening on 2 WhenNot() channels within the same `select` to GC the 2nd one.
func (m *Machine) WhenNot(states S, ctx context.Context) chan struct{} {
func (m *Machine) WhenNot(states S, ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
if m.Disposed {
close(ch)
Expand Down Expand Up @@ -577,7 +577,7 @@ func (m *Machine) WhenNot(states S, ctx context.Context) chan struct{} {

// WhenNot1 is an alias to WhenNot() for a single state.
// See WhenNot.
func (m *Machine) WhenNot1(state string, ctx context.Context) chan struct{} {
func (m *Machine) WhenNot1(state string, ctx context.Context) <-chan struct{} {
return m.WhenNot(S{state}, ctx)
}

Expand All @@ -588,7 +588,7 @@ func (m *Machine) WhenNot1(state string, ctx context.Context) chan struct{} {
// TODO better val comparisons
func (m *Machine) WhenArgs(
state string, args A, ctx context.Context,
) chan struct{} {
) <-chan struct{} {
ch := make(chan struct{})

if m.Disposed {
Expand Down Expand Up @@ -664,7 +664,7 @@ func (m *Machine) WhenArgs(
// state.
func (m *Machine) WhenTime(
states S, times T, ctx context.Context,
) chan struct{} {
) <-chan struct{} {
ch := make(chan struct{})
valid := len(states) == len(times)
if m.Disposed || !valid {
Expand Down Expand Up @@ -760,20 +760,19 @@ func (m *Machine) WhenTime(
// WhenTicks waits N tick for a single state. Uses WhenTime underneath.
func (m *Machine) WhenTicks(
state string, ticks int, ctx context.Context,
) chan struct{} {
) <-chan struct{} {
return m.WhenTime(S{state}, T{uint64(ticks) + m.Clock(state)}, ctx)
}

// WhenTicksEq waits till ticks for a single state equal the given value.
// Uses WhenTime underneath.
func (m *Machine) WhenTicksEq(
state string, ticks int, ctx context.Context,
) chan struct{} {
) <-chan struct{} {
return m.WhenTime(S{state}, T{uint64(ticks)}, ctx)
}

// WhenQueueEnds returns a channel which closes when the queue ends. Optionally
// accepts a context to close the channel earlier.
// WhenQueueEnds closes every time the queue ends, or the optional ctx expires.
func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})

Expand Down Expand Up @@ -1743,6 +1742,7 @@ func (m *Machine) processWhenTimeBindings() {
}
}
m.activeStatesLock.Unlock()

// notify outside the critical zone
for ch := range toClose {
closeSafe(toClose[ch])
Expand All @@ -1751,11 +1751,11 @@ func (m *Machine) processWhenTimeBindings() {

func (m *Machine) processWhenQueueBindings() {
m.indexWhenQueueLock.Lock()
toClose := slices.Clone(m.indexWhenQueue)
toPush := slices.Clone(m.indexWhenQueue)
m.indexWhenQueue = nil
m.indexWhenQueueLock.Unlock()

for _, binding := range toClose {
for _, binding := range toPush {
closeSafe(binding.ch)
}
}
Expand Down Expand Up @@ -2055,9 +2055,11 @@ func (m *Machine) processWhenArgs(e *Event) {
if !compareArgs(e.Args, binding.args) {
continue
}

m.log(LogDecisions, "[when:args] match for %s", e.Name)
// args match - dispose and close outside the mutex
chToClose = append(chToClose, binding.ch)

// GC
if len(m.indexWhenArgs[e.Name]) == 1 {
delete(m.indexWhenArgs, e.Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/x/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func RaceCtx[T *any](ctx context.Context, ch chan T) (T, error) {
// TODO test case, solve locking by passing the event to the submachine
func NestedState(
e *am.Event, strIDField string, machGetter func(id string) *am.Machine,
) (am.Result, chan struct{}, error) {
) (am.Result, <-chan struct{}, error) {
// validate
if e.Mutation().Type != am.MutationAdd {
return am.Canceled, nil, fmt.Errorf(
Expand Down

0 comments on commit 04c4ed3

Please sign in to comment.