diff --git a/pkg/manager/router/router.go b/pkg/manager/router/router.go index e97c3d93..c37b4253 100644 --- a/pkg/manager/router/router.go +++ b/pkg/manager/router/router.go @@ -139,7 +139,9 @@ func (b *backendWrapper) String() string { // connWrapper wraps RedirectableConn. type connWrapper struct { RedirectableConn - phase connPhase + // Reference to the target backend if it's redirecting, otherwise nil. + redirectingBackend *backendWrapper // Last redirect start time of this connection. lastRedirect time.Time + phase connPhase } diff --git a/pkg/manager/router/router_score.go b/pkg/manager/router/router_score.go index 802fd174..b3325064 100644 --- a/pkg/manager/router/router_score.go +++ b/pkg/manager/router/router_score.go @@ -190,6 +190,7 @@ func (router *ScoreBasedRouter) RedirectConnections() error { connWrapper.phase = phaseRedirectNotify // we dont care the results _ = connWrapper.Redirect(backend) + connWrapper.redirectingBackend = backend } } } @@ -263,6 +264,7 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec router.adjustBackendList(toBe, true) connWrapper.phase = phaseRedirectFail } + connWrapper.redirectingBackend = nil addMigrateMetrics(from, to, succeed, connWrapper.lastRedirect) } @@ -270,11 +272,17 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) error { router.Lock() defer router.Unlock() - // OnConnClosed is always called after processing ongoing redirect events, - // so the addr passed in is the right backend. be := router.ensureBackend(addr, true) - be.Value.connScore-- - router.removeConn(be, router.getConnWrapper(conn)) + connWrapper := router.getConnWrapper(conn) + redirectingBackend := connWrapper.Value.redirectingBackend + // If this connection is redirecting, decrease the score of the target backend. + if redirectingBackend != nil { + redirectingBackend.connScore-- + connWrapper.Value.redirectingBackend = nil + } else { + be.Value.connScore-- + } + router.removeConn(be, connWrapper) return nil } @@ -375,6 +383,7 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) { conn.phase = phaseRedirectNotify conn.lastRedirect = curTime conn.Redirect(idlestBackend) + conn.redirectingBackend = idlestBackend } } diff --git a/pkg/manager/router/router_test.go b/pkg/manager/router/router_test.go index 59faa67a..bde4e055 100644 --- a/pkg/manager/router/router_test.go +++ b/pkg/manager/router/router_test.go @@ -900,3 +900,23 @@ func TestBackendHealthy(t *testing.T) { require.False(t, conn.to.Healthy()) tester.redirectFinish(1, false) } + +func TestCloseRedirectingConns(t *testing.T) { + // Make the connection redirect. + tester := newRouterTester(t) + tester.addBackends(1) + tester.addConnections(1) + require.Equal(t, 1, tester.getBackendByIndex(0).connScore) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(1) + require.Equal(t, 0, tester.getBackendByIndex(0).connScore) + require.Equal(t, 1, tester.getBackendByIndex(1).connScore) + // Close the connection. + tester.updateBackendStatusByAddr(tester.getBackendByIndex(0).Addr(), StatusHealthy) + tester.closeConnections(1, true) + require.Equal(t, 0, tester.getBackendByIndex(0).connScore) + require.Equal(t, 0, tester.getBackendByIndex(1).connScore) + require.Equal(t, 0, tester.getBackendByIndex(0).connList.Len()) + require.Equal(t, 0, tester.getBackendByIndex(1).connList.Len()) +}