@@ -34,7 +34,8 @@ import {
34
34
} from "../lib/api/streaming"
35
35
import { RelationPoint } from "../lib/layout"
36
36
import { RelationStats } from "../proto/gen/monitor_service"
37
- import { ChannelStatsSnapshot } from "./fragment_graph"
37
+ import { ChannelStatsDerived , ChannelStatsSnapshot } from "./fragment_graph"
38
+ import { GetBackPressureResponse } from "../proto/gen/monitor_service"
38
39
39
40
const SIDEBAR_WIDTH = "200px"
40
41
const INTERVAL_MS = 5000
@@ -96,8 +97,8 @@ export default function StreamingGraph() {
96
97
const relationDependency = relationDependencyCallback ( )
97
98
98
99
// Periodically fetch fragment-level back-pressure from Meta node
99
- const [ backPressureRate , setBackPressureRate ] =
100
- useState < Map < string , number > > ( )
100
+ const [ channelStats , setChannelStats ] =
101
+ useState < Map < string , ChannelStatsDerived > > ( )
101
102
const [ relationStats , setRelationStats ] = useState < {
102
103
[ key : number ] : RelationStats
103
104
} > ( )
@@ -108,20 +109,22 @@ export default function StreamingGraph() {
108
109
let initialSnapshot : ChannelStatsSnapshot | undefined
109
110
110
111
if ( resetEmbeddedBackPressures ) {
111
- setBackPressureRate ( undefined )
112
+ setChannelStats ( undefined )
112
113
toggleResetEmbeddedBackPressures ( )
113
114
}
114
115
115
116
function refresh ( ) {
116
117
api . get ( "/metrics/fragment/embedded_back_pressures" ) . then (
117
- ( response ) => {
118
- let snapshot = ChannelStatsSnapshot . fromResponse (
119
- response . channelStats
118
+ ( res ) => {
119
+ let response = GetBackPressureResponse . fromJSON ( res )
120
+ let snapshot = new ChannelStatsSnapshot (
121
+ new Map ( Object . entries ( response . channelStats ) ) ,
122
+ Date . now ( )
120
123
)
121
124
if ( ! initialSnapshot ) {
122
125
initialSnapshot = snapshot
123
126
} else {
124
- setBackPressureRate ( snapshot . getRate ( initialSnapshot ! ) )
127
+ setChannelStats ( snapshot . getRate ( initialSnapshot ) )
125
128
}
126
129
setRelationStats ( response . relationStats )
127
130
} ,
@@ -139,26 +142,26 @@ export default function StreamingGraph() {
139
142
} , [ toast , resetEmbeddedBackPressures ] )
140
143
141
144
// Convert fragment-level backpressure rate map to relation-level backpressure rate
142
- const backPressures : Map < string , number > | undefined = useMemo ( ( ) => {
145
+ const relationChannelStats : Map < string , ChannelStatsDerived > | undefined = useMemo ( ( ) => {
143
146
if ( ! fragmentVertexToRelationMap ) {
144
- return new Map < string , number > ( )
147
+ return new Map < string , ChannelStatsDerived > ( )
145
148
}
146
149
let inMap = fragmentVertexToRelationMap . inMap
147
150
let outMap = fragmentVertexToRelationMap . outMap
148
- if ( backPressureRate ) {
149
- let map = new Map < string , number > ( )
150
- for ( const [ key , value ] of backPressureRate ) {
151
+ if ( channelStats ) {
152
+ let map = new Map < string , ChannelStatsDerived > ( )
153
+ for ( const [ key , stats ] of channelStats ) {
151
154
const [ outputFragment , inputFragment ] = key . split ( "_" ) . map ( Number )
152
155
if ( outMap [ outputFragment ] && inMap [ inputFragment ] ) {
153
156
const outputRelation = outMap [ outputFragment ]
154
157
const inputRelation = inMap [ inputFragment ]
155
158
let key = `${ outputRelation } _${ inputRelation } `
156
- map . set ( key , value )
159
+ map . set ( key , stats )
157
160
}
158
161
}
159
162
return map
160
163
}
161
- } , [ backPressureRate , fragmentVertexToRelationMap ] )
164
+ } , [ channelStats , fragmentVertexToRelationMap ] )
162
165
163
166
const retVal = (
164
167
< Flex p = { 3 } height = "calc(100vh - 20px)" flexDirection = "column" >
@@ -214,7 +217,7 @@ export default function StreamingGraph() {
214
217
nodes = { relationDependency }
215
218
selectedId = { selectedId ?. toString ( ) }
216
219
setSelectedId = { ( id ) => setSelectedId ( parseInt ( id ) ) }
217
- backPressures = { backPressures }
220
+ channelStats = { relationChannelStats }
218
221
relationStats = { relationStats }
219
222
/>
220
223
) }
0 commit comments