diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index 8bcaf88..ba0952e 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -16,6 +16,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" ) @@ -53,7 +54,7 @@ type Machine struct { // Tracers are optional tracers for telemetry integrations. Tracers []Tracer // If true, the machine has been Disposed and is no-op. Read-only. - Disposed bool + Disposed atomic.Bool // ParentID is the ID of the parent machine (if any). ParentID string // Tags are optional tags for telemetry integrations etc. @@ -285,7 +286,7 @@ func (m *Machine) Dispose() { // handler go func() { - if m.Disposed { + if m.Disposed.Load() { return } m.queueProcessing.Lock() @@ -301,7 +302,10 @@ func (m *Machine) DisposeForce() { } func (m *Machine) dispose(force bool) { - m.Disposed = true + if !m.Disposed.CompareAndSwap(false, true) { + // already disposed + return + } for i := range m.Tracers { m.Tracers[i].MachineDispose(m.ID) @@ -359,8 +363,12 @@ func (m *Machine) dispose(force bool) { closeSafe(m.handlerEnd) closeSafe(m.handlerPanic) closeSafe(m.handlerStart) - m.handlerTimer.Stop() - m.handlerTimer = nil + + if m.handlerTimer != nil { + m.handlerTimer.Stop() + m.handlerTimer = nil + } + m.clock = nil // release the queue lock @@ -377,7 +385,7 @@ func (m *Machine) dispose(force bool) { m.disposeHandlers = nil // the end - close(m.whenDisposed) + closeSafe(m.whenDisposed) } // disposeEmitter detaches the emitter from the machine and disposes it. @@ -404,7 +412,7 @@ func (m *Machine) WhenErr(ctx context.Context) <-chan struct{} { // TODO re-use channels with the same state set and context func (m *Machine) When(states S, ctx context.Context) <-chan struct{} { ch := make(chan struct{}) - if m.Disposed { + if m.Disposed.Load() { close(ch) return ch } @@ -464,7 +472,7 @@ func (m *Machine) When1(state string, ctx context.Context) <-chan struct{} { // listening on 2 WhenNot() channels within the same `select` to GC the 2nd one. func (m *Machine) WhenNot(states S, ctx context.Context) <-chan struct{} { ch := make(chan struct{}) - if m.Disposed { + if m.Disposed.Load() { close(ch) return ch } @@ -527,7 +535,7 @@ func (m *Machine) WhenArgs( ) <-chan struct{} { ch := make(chan struct{}) - if m.Disposed { + if m.Disposed.Load() { close(ch) return ch @@ -578,7 +586,7 @@ func (m *Machine) WhenTime( ) <-chan struct{} { ch := make(chan struct{}) valid := len(states) == len(times) - if m.Disposed || !valid { + if m.Disposed.Load() || !valid { if !valid { m.log(LogDecisions, "[when:time] times for all passed stated required") } @@ -660,7 +668,7 @@ func (m *Machine) WhenTicksEq( func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan struct{} { ch := make(chan struct{}) - if m.Disposed { + if m.Disposed.Load() { close(ch) return ch @@ -697,7 +705,7 @@ func (m *Machine) WhenQueueEnds(ctx context.Context) <-chan struct{} { case <-ctx.Done(): } // GC only if needed - if m.Disposed { + if m.Disposed.Load() { return } @@ -769,7 +777,7 @@ func (m *Machine) TimeSum(states S) uint64 { // transition (Executed, Queued, Canceled). // Like every mutation method, it will resolve relations and trigger handlers. func (m *Machine) Add(states S, args A) Result { - if m.Disposed || m.queueLen >= m.QueueLimit { + if m.Disposed.Load() || m.queueLen >= m.QueueLimit { return Canceled } m.queueMutation(MutationAdd, states, args) @@ -786,7 +794,7 @@ func (m *Machine) Add1(state string, args A) Result { // error. // Like every mutation method, it will resolve relations and trigger handlers. func (m *Machine) AddErr(err error) Result { - if m.Disposed { + if m.Disposed.Load() { return Canceled } // TODO test .Err @@ -811,7 +819,7 @@ func (m *Machine) IsErr() bool { // the transition (Executed, Queued, Canceled). // Like every mutation method, it will resolve relations and trigger handlers. func (m *Machine) Remove(states S, args A) Result { - if m.Disposed || m.queueLen >= m.QueueLimit { + if m.Disposed.Load() || m.queueLen >= m.QueueLimit { return Canceled } @@ -846,7 +854,7 @@ func (m *Machine) Remove1(state string, args A) Result { // the transition (Executed, Queued, Canceled). // Like every mutation method, it will resolve relations and trigger handlers. func (m *Machine) Set(states S, args A) Result { - if m.Disposed || m.queueLen >= m.QueueLimit { + if m.Disposed.Load() || m.queueLen >= m.QueueLimit { return Canceled } m.queueMutation(MutationSet, states, args) @@ -1125,7 +1133,7 @@ func (m *Machine) BindHandlers(handlers any) error { // The Tracer API is a better way to event feeds. func (m *Machine) OnEvent(events []string, ctx context.Context) chan *Event { ch := make(chan *Event, 50) - if m.Disposed { + if m.Disposed.Load() { ch := make(chan *Event) close(ch) return ch @@ -1154,7 +1162,7 @@ func (m *Machine) OnEvent(events []string, ctx context.Context) chan *Event { } // GC only if needed - if m.Disposed { + if m.Disposed.Load() { return } @@ -1384,7 +1392,7 @@ func (m *Machine) setActiveStates(calledStates S, targetStates S, // machine. func (m *Machine) processQueue() Result { // empty queue - if len(m.queue) == 0 || m.Disposed { + if len(m.queue) == 0 || m.Disposed.Load() { return Canceled } @@ -1412,7 +1420,7 @@ func (m *Machine) processQueue() Result { for len(m.queue) > 0 { m.queueRunning = true - if m.Disposed { + if m.Disposed.Load() { return Canceled } if len(m.queue) == 0 { @@ -1706,6 +1714,8 @@ func (m *Machine) log(level LogLevel, msg string, args ...any) { // transition m.logEntriesLock.Lock() defer m.logEntriesLock.Unlock() + + // TODO include log level for am-dbg filtering m.logEntries = append(m.logEntries, out) } } @@ -1784,7 +1794,7 @@ func (m *Machine) processEmitters(e *Event) (Result, bool) { handlerCalled := false for _, emitter = range m.emitters { // disposed - if m.Disposed { + if m.Disposed.Load() { break } // internal event @@ -1880,7 +1890,7 @@ func (m *Machine) handlerLoop() { // catch panics and fwd defer func() { r := recover() - if r != nil && !m.Disposed { + if r != nil && !m.Disposed.Load() { m.handlerPanic <- r } }() @@ -1907,7 +1917,7 @@ func (m *Machine) handlerLoop() { ret = callRet[0].Interface().(bool) } - if call.timeout || m.Disposed { + if call.timeout || m.Disposed.Load() { continue } diff --git a/pkg/machine/misc.go b/pkg/machine/misc.go index 65e8fe0..5dd9253 100644 --- a/pkg/machine/misc.go +++ b/pkg/machine/misc.go @@ -812,7 +812,7 @@ func diposeWithCtx[T comparable]( case <-ctx.Done(): } // GC only if needed - if mach.Disposed { + if mach.Disposed.Load() { return } diff --git a/tools/cmd/am-dbg/main.go b/tools/cmd/am-dbg/main.go index bcaf749..4b32982 100644 --- a/tools/cmd/am-dbg/main.go +++ b/tools/cmd/am-dbg/main.go @@ -12,8 +12,8 @@ import ( am "github.com/pancsta/asyncmachine-go/pkg/machine" "github.com/pancsta/asyncmachine-go/pkg/telemetry" "github.com/pancsta/asyncmachine-go/tools/debugger" - ss "github.com/pancsta/asyncmachine-go/tools/debugger/states" "github.com/pancsta/asyncmachine-go/tools/debugger/server" + ss "github.com/pancsta/asyncmachine-go/tools/debugger/states" "github.com/spf13/cobra" ) diff --git a/tools/debugger/debugger.go b/tools/debugger/debugger.go index 8012777..b58ac77 100644 --- a/tools/debugger/debugger.go +++ b/tools/debugger/debugger.go @@ -33,7 +33,7 @@ const ( // TODO customize playInterval = 500 * time.Millisecond // TODO add param --max-clients - maxClients = 150 + maxClients = 500 timeFormat = "15:04:05.000000000" fastJumpAmount = 50 arrowThrottleMs = 200 @@ -243,14 +243,17 @@ func (d *Debugger) updateFiltersBar() { // tx filters for _, item := range filters { + + // checked if item.active { text += f(" [::b]%s[::-]", cview.Escape("[X]")) } else { text += f(" [ ]") } + // focused if d.focusedFilter == item.id && focused { - text += f("[%s][::b]%s[::-]", colorActive, item.label) + text += f("[%s][::bu]%s[::-]", colorActive, item.label) } else if !focused { text += f("[%s]%s", colorHighlight2, item.label) } else { @@ -483,11 +486,6 @@ func (d *Debugger) updateTimelines() { d.timelineSteps.SetFilledColor(tcell.ColorRed) } - // inactive steps bar when no next tx - if nextTx == nil { - d.timelineSteps.SetTitleColor(tcell.ColorGrey) - d.timelineSteps.SetBorderColor(tcell.ColorGrey) - } stepsCount := 0 onLastTx := c.CursorTx >= txCount if !onLastTx { @@ -544,7 +542,7 @@ func (d *Debugger) updateSidebar(immediate bool) { // TODO sometimes scrolls for no reason func (d *Debugger) doUpdateSidebar() { - if d.Mach.Disposed { + if d.Mach.Disposed.Load() { return } diff --git a/tools/debugger/handlers.go b/tools/debugger/handlers.go index f096ffd..73fa36e 100644 --- a/tools/debugger/handlers.go +++ b/tools/debugger/handlers.go @@ -187,7 +187,7 @@ func (d *Debugger) PausedState(_ *am.Event) { } func (d *Debugger) TailModeState(_ *am.Event) { - d.C.CursorTx = d.filterTxCursor(d.C, len(d.C.MsgTxs), true) + d.C.CursorTx = d.filterTxCursor(d.C, len(d.C.MsgTxs), false) // needed bc tail mode if carried over via SelectingClient d.updateTxBars() d.updateTimelines() @@ -527,7 +527,8 @@ func (d *Debugger) ClientMsgState(e *am.Event) { d.updateSidebar(false) // UI updates for the selected client if updateTailMode { - d.C.CursorTx = d.filterTxCursor(d.C, len(d.C.MsgTxs), true) + // force the latest tx + d.C.CursorTx = d.filterTxCursor(d.C, len(d.C.MsgTxs), false) d.updateViews(false) } @@ -580,6 +581,7 @@ func (d *Debugger) RemoveClientState(e *am.Event) { } else { d.buildSidebar(-1) } + d.draw() } @@ -837,6 +839,7 @@ func (d *Debugger) ToggleFilterState(_ *am.Event) { // TODO split the state <-d.Mach.WhenQueueEnds(stateCtx) if stateCtx.Err() != nil { + d.Mach.Remove1(ss.ToggleFilter, nil) return // expired } @@ -844,18 +847,22 @@ func (d *Debugger) ToggleFilterState(_ *am.Event) { d.filterClientTxs() } - // rebuild the whole log to reflect the UI changes - err := d.rebuildLog(stateCtx, len(d.C.MsgTxs)-1) - if err != nil { - d.Mach.AddErr(err) - } - d.updateLog(false) + if d.C != nil { - if stateCtx.Err() != nil { - return // expired + // rebuild the whole log to reflect the UI changes + err := d.rebuildLog(stateCtx, len(d.C.MsgTxs)-1) + if err != nil { + d.Mach.AddErr(err) + } + d.updateLog(false) + + if stateCtx.Err() != nil { + return // expired + } + + d.C.CursorTx = d.filterTxCursor(d.C, d.C.CursorTx, false) } - d.C.CursorTx = d.filterTxCursor(d.C, d.C.CursorTx, false) // queue this removal after filter states, so we can depend on WhenNot d.Mach.Remove1(ss.ToggleFilter, nil)