Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/app_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func New(version, commit, date string) (*App, error) {
app.supervisor = supervisor.New(ctx)
app.installSupervisorErrorHandler()
// Route PTY messages through the app-level pump.
app.center.SetMsgSink(app.enqueueExternalMsg)
app.center.SetMsgSinkTry(app.tryEnqueueExternalMsg)
app.sidebarTerminal.SetMsgSink(app.enqueueExternalMsg)
app.center.SetInstanceID(app.instanceID)
app.sidebarTerminal.SetInstanceID(app.instanceID)
Expand Down
23 changes: 19 additions & 4 deletions internal/app/app_msgpump.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/andyrewlee/amux/internal/perf"
"github.com/andyrewlee/amux/internal/safego"
"github.com/andyrewlee/amux/internal/ui/center"
"github.com/andyrewlee/amux/internal/ui/common"
)

func (a *App) SetMsgSender(send func(tea.Msg)) {
Expand Down Expand Up @@ -39,14 +40,23 @@ func (a *App) SetMsgSender(send func(tea.Msg)) {
}

func (a *App) enqueueExternalMsg(msg tea.Msg) {
_ = a.tryEnqueueExternalMsg(msg)
}

func (a *App) tryEnqueueExternalMsg(msg tea.Msg) bool {
if msg == nil {
return
return false
}
if isCriticalExternalMsg(msg) {
_, nonEvicting := msg.(common.NonEvictingCriticalExternalMsg)
select {
case a.externalCritical <- msg:
return
return true
default:
if nonEvicting {
perf.Count("external_msg_drop_critical", 1)
return false
}
// Critical channel full - try to drop a non-critical message to make room
select {
case <-a.externalMsgs:
Expand All @@ -55,17 +65,19 @@ func (a *App) enqueueExternalMsg(msg tea.Msg) {
}
select {
case a.externalCritical <- msg:
return
return true
default:
perf.Count("external_msg_drop_critical", 1)
return
return false
}
}
}
select {
case a.externalMsgs <- msg:
return true
default:
perf.Count("external_msg_drop", 1)
return false
}
}

Expand Down Expand Up @@ -132,6 +144,9 @@ func (a *App) installSupervisorErrorHandler() {
}

func isCriticalExternalMsg(msg tea.Msg) bool {
if _, ok := msg.(common.CriticalExternalMsg); ok {
return true
}
switch msg.(type) {
case messages.Error, messages.SidebarPTYStopped, center.PTYStopped:
return true
Expand Down
73 changes: 73 additions & 0 deletions internal/app/app_msgpump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,28 @@ import (
"time"

tea "charm.land/bubbletea/v2"

"github.com/andyrewlee/amux/internal/ui/common"
)

type testMsg string

type criticalTestMsg struct{}

func (criticalTestMsg) MarkCriticalExternalMsg() {}

var _ common.CriticalExternalMsg = criticalTestMsg{}

type nonEvictingCriticalTestMsg struct{}

func (nonEvictingCriticalTestMsg) MarkCriticalExternalMsg() {}
func (nonEvictingCriticalTestMsg) MarkNonEvictingCriticalExternalMsg() {}

var (
_ common.CriticalExternalMsg = nonEvictingCriticalTestMsg{}
_ common.NonEvictingCriticalExternalMsg = nonEvictingCriticalTestMsg{}
)

func TestEnqueueExternalMsgDropsWhenFull(t *testing.T) {
a := &App{externalMsgs: make(chan tea.Msg, 1)}

Expand Down Expand Up @@ -53,6 +71,61 @@ func TestEnqueueExternalMsgDropsWhenFull(t *testing.T) {
close(a.externalMsgs)
}

func TestEnqueueExternalMsgRoutesCriticalInterfaceToCriticalQueue(t *testing.T) {
a := &App{
externalMsgs: make(chan tea.Msg, 1),
externalCritical: make(chan tea.Msg, 1),
}

msg := criticalTestMsg{}
a.enqueueExternalMsg(msg)

if got := len(a.externalCritical); got != 1 {
t.Fatalf("expected critical queue length 1, got %d", got)
}
if got := len(a.externalMsgs); got != 0 {
t.Fatalf("expected normal queue length 0, got %d", got)
}
}

func TestNonEvictingCriticalInterfaceImpliesCriticalRouting(t *testing.T) {
var msg any = nonEvictingCriticalTestMsg{}
if _, ok := msg.(common.NonEvictingCriticalExternalMsg); !ok {
t.Fatal("expected test message to implement NonEvictingCriticalExternalMsg")
}
if _, ok := msg.(common.CriticalExternalMsg); !ok {
t.Fatal("expected NonEvictingCriticalExternalMsg to imply CriticalExternalMsg")
}
}

func TestEnqueueExternalMsg_NonEvictingCriticalDoesNotDropNormalQueue(t *testing.T) {
a := &App{
externalMsgs: make(chan tea.Msg, 1),
externalCritical: make(chan tea.Msg, 1),
}

a.externalMsgs <- testMsg("normal")
a.externalCritical <- criticalTestMsg{}

a.enqueueExternalMsg(nonEvictingCriticalTestMsg{})

if got := len(a.externalMsgs); got != 1 {
t.Fatalf("expected normal queue length to remain 1, got %d", got)
}
select {
case msg := <-a.externalMsgs:
got, ok := msg.(testMsg)
if !ok {
t.Fatalf("expected normal queue message type %T, got %T", testMsg("normal"), msg)
}
if got != testMsg("normal") {
t.Fatalf("expected normal queue message %q, got %q", testMsg("normal"), got)
}
default:
t.Fatal("expected normal queue message to remain present")
}
}

func readMsg(t *testing.T, ch <-chan tea.Msg) tea.Msg {
t.Helper()
select {
Expand Down
85 changes: 67 additions & 18 deletions internal/ui/center/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ import (
// Model is the Bubbletea model for the center pane
type Model struct {
// State
workspace *data.Workspace
workspaceIDCached string
workspaceIDRepo string
workspaceIDRoot string
tabsByWorkspace map[string][]*Tab // tabs per workspace ID
activeTabByWorkspace map[string]int // active tab index per workspace
focused bool
canFocusRight bool
tabsRevision uint64
agentManager *appPty.AgentManager
msgSink func(tea.Msg)
tabEvents chan tabEvent
tabActorReady uint32
tabActorHeartbeat int64
flushLoadSampleAt time.Time
cachedBusyTabCount int
workspace *data.Workspace
workspaceIDCached string
workspaceIDRepo string
workspaceIDRoot string
tabsByWorkspace map[string][]*Tab // tabs per workspace ID
activeTabByWorkspace map[string]int // active tab index per workspace
focused bool
canFocusRight bool
tabsRevision uint64
agentManager *appPty.AgentManager
msgSink func(tea.Msg)
msgSinkTry func(tea.Msg) bool
tabEvents chan tabEvent
tabActorReady uint32
tabActorHeartbeat int64
tabActorRedrawPending uint32
flushLoadSampleAt time.Time
cachedBusyTabCount int

// Layout
width int
Expand Down Expand Up @@ -167,20 +169,67 @@ func (m *Model) terminalMetrics() TerminalMetrics {
}

func (m *Model) isTabActorReady() bool {
return atomic.LoadUint32(&m.tabActorReady) == 1
if atomic.LoadUint32(&m.tabActorReady) == 0 {
return false
}
lastBeat := atomic.LoadInt64(&m.tabActorHeartbeat)
if lastBeat == 0 {
return false
}
if time.Since(time.Unix(0, lastBeat)) > tabActorStallTimeout {
atomic.StoreUint32(&m.tabActorReady, 0)
return false
}
return true
}

func (m *Model) setTabActorReady() {
atomic.StoreInt64(&m.tabActorHeartbeat, time.Now().UnixNano())
atomic.StoreUint32(&m.tabActorReady, 1)
}

func (m *Model) noteTabActorHeartbeat() {
atomic.StoreInt64(&m.tabActorHeartbeat, time.Now().UnixNano())
observedAt := time.Now().UnixNano()
for {
prev := atomic.LoadInt64(&m.tabActorHeartbeat)
if observedAt <= prev {
observedAt = prev + 1
}
if atomic.CompareAndSwapInt64(&m.tabActorHeartbeat, prev, observedAt) {
break
}
}
if atomic.LoadUint32(&m.tabActorReady) == 0 {
atomic.StoreUint32(&m.tabActorReady, 1)
}
}

func (m *Model) requestTabActorRedraw() {
if m == nil {
return
}
if m.msgSinkTry != nil {
if !atomic.CompareAndSwapUint32(&m.tabActorRedrawPending, 0, 1) {
return
}
if m.msgSinkTry(tabActorRedraw{}) {
return
}
atomic.StoreUint32(&m.tabActorRedrawPending, 0)
return
}
if m.msgSink != nil {
m.msgSink(tabActorRedraw{})
}
}

func (m *Model) clearTabActorRedrawPending() {
if m == nil {
return
}
atomic.StoreUint32(&m.tabActorRedrawPending, 0)
}

func (m *Model) setWorkspace(ws *data.Workspace) {
m.workspace = ws
m.workspaceIDCached = ""
Expand Down
Loading
Loading