Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/app_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func New(version, commit, date string) (*App, error) {
app.supervisor = supervisor.New(ctx)
app.installSupervisorErrorHandler()
// Route PTY messages through the app-level pump.
app.center.SetMsgSink(app.enqueueExternalMsg)
app.center.SetMsgSinkTry(app.tryEnqueueExternalMsg)
app.sidebarTerminal.SetMsgSink(app.enqueueExternalMsg)
app.center.SetInstanceID(app.instanceID)
app.sidebarTerminal.SetInstanceID(app.instanceID)
Expand Down
23 changes: 19 additions & 4 deletions internal/app/app_msgpump.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/andyrewlee/amux/internal/perf"
"github.com/andyrewlee/amux/internal/safego"
"github.com/andyrewlee/amux/internal/ui/center"
"github.com/andyrewlee/amux/internal/ui/common"
)

func (a *App) SetMsgSender(send func(tea.Msg)) {
Expand Down Expand Up @@ -39,14 +40,23 @@ func (a *App) SetMsgSender(send func(tea.Msg)) {
}

func (a *App) enqueueExternalMsg(msg tea.Msg) {
_ = a.tryEnqueueExternalMsg(msg)
}

func (a *App) tryEnqueueExternalMsg(msg tea.Msg) bool {
if msg == nil {
return
return false
}
if isCriticalExternalMsg(msg) {
_, nonEvicting := msg.(common.NonEvictingCriticalExternalMsg)
select {
case a.externalCritical <- msg:
return
return true
default:
if nonEvicting {
perf.Count("external_msg_drop_critical", 1)
return false
}
// Critical channel full - try to drop a non-critical message to make room
select {
case <-a.externalMsgs:
Expand All @@ -55,17 +65,19 @@ func (a *App) enqueueExternalMsg(msg tea.Msg) {
}
select {
case a.externalCritical <- msg:
return
return true
default:
perf.Count("external_msg_drop_critical", 1)
return
return false
}
}
}
select {
case a.externalMsgs <- msg:
return true
default:
perf.Count("external_msg_drop", 1)
return false
}
}

Expand Down Expand Up @@ -132,6 +144,9 @@ func (a *App) installSupervisorErrorHandler() {
}

func isCriticalExternalMsg(msg tea.Msg) bool {
if _, ok := msg.(common.CriticalExternalMsg); ok {
return true
}
switch msg.(type) {
case messages.Error, messages.SidebarPTYStopped, center.PTYStopped:
return true
Expand Down
73 changes: 73 additions & 0 deletions internal/app/app_msgpump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,28 @@ import (
"time"

tea "charm.land/bubbletea/v2"

"github.com/andyrewlee/amux/internal/ui/common"
)

type testMsg string

type criticalTestMsg struct{}

func (criticalTestMsg) MarkCriticalExternalMsg() {}

var _ common.CriticalExternalMsg = criticalTestMsg{}

type nonEvictingCriticalTestMsg struct{}

func (nonEvictingCriticalTestMsg) MarkCriticalExternalMsg() {}
func (nonEvictingCriticalTestMsg) MarkNonEvictingCriticalExternalMsg() {}

var (
_ common.CriticalExternalMsg = nonEvictingCriticalTestMsg{}
_ common.NonEvictingCriticalExternalMsg = nonEvictingCriticalTestMsg{}
)

func TestEnqueueExternalMsgDropsWhenFull(t *testing.T) {
a := &App{externalMsgs: make(chan tea.Msg, 1)}

Expand Down Expand Up @@ -53,6 +71,61 @@ func TestEnqueueExternalMsgDropsWhenFull(t *testing.T) {
close(a.externalMsgs)
}

func TestEnqueueExternalMsgRoutesCriticalInterfaceToCriticalQueue(t *testing.T) {
a := &App{
externalMsgs: make(chan tea.Msg, 1),
externalCritical: make(chan tea.Msg, 1),
}

msg := criticalTestMsg{}
a.enqueueExternalMsg(msg)

if got := len(a.externalCritical); got != 1 {
t.Fatalf("expected critical queue length 1, got %d", got)
}
if got := len(a.externalMsgs); got != 0 {
t.Fatalf("expected normal queue length 0, got %d", got)
}
}

func TestNonEvictingCriticalInterfaceImpliesCriticalRouting(t *testing.T) {
var msg any = nonEvictingCriticalTestMsg{}
if _, ok := msg.(common.NonEvictingCriticalExternalMsg); !ok {
t.Fatal("expected test message to implement NonEvictingCriticalExternalMsg")
}
if _, ok := msg.(common.CriticalExternalMsg); !ok {
t.Fatal("expected NonEvictingCriticalExternalMsg to imply CriticalExternalMsg")
}
}

func TestEnqueueExternalMsg_NonEvictingCriticalDoesNotDropNormalQueue(t *testing.T) {
a := &App{
externalMsgs: make(chan tea.Msg, 1),
externalCritical: make(chan tea.Msg, 1),
}

a.externalMsgs <- testMsg("normal")
a.externalCritical <- criticalTestMsg{}

a.enqueueExternalMsg(nonEvictingCriticalTestMsg{})

if got := len(a.externalMsgs); got != 1 {
t.Fatalf("expected normal queue length to remain 1, got %d", got)
}
select {
case msg := <-a.externalMsgs:
got, ok := msg.(testMsg)
if !ok {
t.Fatalf("expected normal queue message type %T, got %T", testMsg("normal"), msg)
}
if got != testMsg("normal") {
t.Fatalf("expected normal queue message %q, got %q", testMsg("normal"), got)
}
default:
t.Fatal("expected normal queue message to remain present")
}
}

func readMsg(t *testing.T, ch <-chan tea.Msg) tea.Msg {
t.Helper()
select {
Expand Down
85 changes: 67 additions & 18 deletions internal/ui/center/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ import (
// Model is the Bubbletea model for the center pane
type Model struct {
// State
workspace *data.Workspace
workspaceIDCached string
workspaceIDRepo string
workspaceIDRoot string
tabsByWorkspace map[string][]*Tab // tabs per workspace ID
activeTabByWorkspace map[string]int // active tab index per workspace
focused bool
canFocusRight bool
tabsRevision uint64
agentManager *appPty.AgentManager
msgSink func(tea.Msg)
tabEvents chan tabEvent
tabActorReady uint32
tabActorHeartbeat int64
flushLoadSampleAt time.Time
cachedBusyTabCount int
workspace *data.Workspace
workspaceIDCached string
workspaceIDRepo string
workspaceIDRoot string
tabsByWorkspace map[string][]*Tab // tabs per workspace ID
activeTabByWorkspace map[string]int // active tab index per workspace
focused bool
canFocusRight bool
tabsRevision uint64
agentManager *appPty.AgentManager
msgSink func(tea.Msg)
msgSinkTry func(tea.Msg) bool
tabEvents chan tabEvent
tabActorReady uint32
tabActorHeartbeat int64
tabActorRedrawPending uint32
flushLoadSampleAt time.Time
cachedBusyTabCount int

// Layout
width int
Expand Down Expand Up @@ -167,20 +169,67 @@ func (m *Model) terminalMetrics() TerminalMetrics {
}

func (m *Model) isTabActorReady() bool {
return atomic.LoadUint32(&m.tabActorReady) == 1
if atomic.LoadUint32(&m.tabActorReady) == 0 {
return false
}
lastBeat := atomic.LoadInt64(&m.tabActorHeartbeat)
if lastBeat == 0 {
return false
}
if time.Since(time.Unix(0, lastBeat)) > tabActorStallTimeout {
atomic.StoreUint32(&m.tabActorReady, 0)
return false
}
return true
}

func (m *Model) setTabActorReady() {
atomic.StoreInt64(&m.tabActorHeartbeat, time.Now().UnixNano())
atomic.StoreUint32(&m.tabActorReady, 1)
}

func (m *Model) noteTabActorHeartbeat() {
atomic.StoreInt64(&m.tabActorHeartbeat, time.Now().UnixNano())
observedAt := time.Now().UnixNano()
for {
prev := atomic.LoadInt64(&m.tabActorHeartbeat)
if observedAt <= prev {
observedAt = prev + 1
}
if atomic.CompareAndSwapInt64(&m.tabActorHeartbeat, prev, observedAt) {
break
}
}
if atomic.LoadUint32(&m.tabActorReady) == 0 {
atomic.StoreUint32(&m.tabActorReady, 1)
}
}

func (m *Model) requestTabActorRedraw() {
if m == nil {
return
}
if m.msgSinkTry != nil {
if !atomic.CompareAndSwapUint32(&m.tabActorRedrawPending, 0, 1) {
return
}
if m.msgSinkTry(tabActorRedraw{}) {
return
}
atomic.StoreUint32(&m.tabActorRedrawPending, 0)
return
}
if m.msgSink != nil {
m.msgSink(tabActorRedraw{})
}
}

func (m *Model) clearTabActorRedrawPending() {
if m == nil {
return
}
atomic.StoreUint32(&m.tabActorRedrawPending, 0)
}

func (m *Model) setWorkspace(ws *data.Workspace) {
m.workspace = ws
m.workspaceIDCached = ""
Expand Down
Loading
Loading