From 079d0279177d20b11e9be22c4385dee9e33239c7 Mon Sep 17 00:00:00 2001 From: Oliver Date: Sun, 21 May 2023 23:31:07 +1000 Subject: [PATCH] feat: multiple probe channels --- config.go | 4 +- config.json.example | 2 +- main.go | 68 ++++++--- ui/package.json | 4 + ui/src/App.jsx | 50 ++++-- ui/src/JSmpegPlayer.jsx | 6 +- ui/yarn.lock | 326 +++++++++++++++++++++++++++++++++++++++- 7 files changed, 411 insertions(+), 49 deletions(-) diff --git a/config.go b/config.go index f233822..9811148 100644 --- a/config.go +++ b/config.go @@ -25,8 +25,8 @@ type Router struct { } type Probe struct { - Enabled bool `json:"enabled"` - RouterDestination int `json:"router_destination"` + Enabled bool `json:"enabled"` + RouterDestinations []int `json:"router_destinations"` } type Salvo struct { diff --git a/config.json.example b/config.json.example index 183b8f2..816ed34 100644 --- a/config.json.example +++ b/config.json.example @@ -9,7 +9,7 @@ }, "probe": { "enabled": true, - "router_destination": 72 + "router_destinations": [71, 72] }, "salvos": [] } \ No newline at end of file diff --git a/main.go b/main.go index e3d1b41..e6baed1 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "os/signal" + "strconv" "sync" "syscall" @@ -21,7 +22,7 @@ import ( var ( router *nk.Router - ProbeHandler *ProbeSocketHandler + ProbeHandlers []*ProbeSocketHandler MatrixWSConnections = make(map[WebsocketConnection]uuid.UUID) MatrixWSConnectionsMutex sync.Mutex Upgrader = websocket.Upgrader{ @@ -90,21 +91,24 @@ func main() { } if Config.Probe.Enabled { - ProbeHandler = &ProbeSocketHandler{ - clients: make(map[*ProbeClient]bool), - register: make(chan *ProbeClient), - unregister: make(chan *ProbeClient), - broadcast: make(chan *[]byte), - upgrader: &websocket.Upgrader{ - ReadBufferSize: readBufferSize, - WriteBufferSize: writeBufferSize, - CheckOrigin: func(r *http.Request) bool { - return true + ProbeHandlers = make([]*ProbeSocketHandler, len(Config.Probe.RouterDestinations)) + for i := range Config.Probe.RouterDestinations { + ProbeHandlers[i] = &ProbeSocketHandler{ + clients: make(map[*ProbeClient]bool), + register: make(chan *ProbeClient), + unregister: make(chan *ProbeClient), + broadcast: make(chan *[]byte), + upgrader: &websocket.Upgrader{ + ReadBufferSize: readBufferSize, + WriteBufferSize: writeBufferSize, + CheckOrigin: func(r *http.Request) bool { + return true + }, }, - }, - } + } - go ProbeHandler.Run() + go ProbeHandlers[i].Run() + } } go router.Connect() @@ -146,8 +150,26 @@ func serveHTTP() { svc.GET("/v1/config", HandleConfig) if Config.Probe.Enabled { - svc.GET("/v1/ws/probe", ProbeHandler.ServeWS) - svc.POST("/v1/probe/stream", HandleProbeStream) + svc.GET("/v1/ws/probe/:id", func(ctx *gin.Context) { + id := ctx.Param("id") + index, err := strconv.Atoi(id) + if err != nil { + ctx.JSON(http.StatusBadRequest, gin.H{"code": http.StatusBadRequest, "message": "invalid probe id", "error": err.Error()}) + log.Printf("[probe-viewer] unable to handle stream: %s", err) + return + } + + if index > len(ProbeHandlers) || index < 0 { + ctx.JSON(http.StatusBadRequest, gin.H{"code": http.StatusBadRequest, "message": "invalid probe id"}) + log.Printf("[probe-viewer] unable to handle stream: %s", err) + return + } + + ProbeHandlers[index].ServeWS(ctx) + + }) + + svc.POST("/v1/probe/stream/:id", HandleProbeStream) } err := svc.Run(fmt.Sprintf(":%d", Config.Server.Port)) @@ -268,7 +290,15 @@ func HandleMatrixWS(c *gin.Context) { } func HandleProbeStream(c *gin.Context) { - log.Printf("IncomingStream connected: %s\n", c.RemoteIP()) + id := c.Param("id") + index, err := strconv.Atoi(id) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"code": http.StatusBadRequest, "message": "invalid probe id", "error": err.Error()}) + log.Printf("[probe] unable to handle stream: %s", err) + return + } + + log.Printf("stream for probe %d connected from %s", index, c.RemoteIP()) for { data, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, 1024)) @@ -276,8 +306,8 @@ func HandleProbeStream(c *gin.Context) { break } - ProbeHandler.BroadcastData(&data) + ProbeHandlers[index].BroadcastData(&data) } - log.Printf("IncomingStream disconnected: %s\n", c.RemoteIP()) + log.Printf("stream for probe %d disconnected from %s", index, c.RemoteIP()) } diff --git a/ui/package.json b/ui/package.json index 8445755..66d26e7 100644 --- a/ui/package.json +++ b/ui/package.json @@ -27,6 +27,9 @@ "babel-loader": "^9.1.2", "carbon-components-react": "^8.24.0", "eslint": "^8.35.0", + "eslint-plugin-import": "^2.25.3", + "eslint-plugin-jsx-a11y": "^6.5.1", + "eslint-plugin-react-hooks": "^4.3.0", "html-webpack-plugin": "^5.5.0", "mini-css-extract-plugin": "^2.7.2", "react": "17.0.1", @@ -40,6 +43,7 @@ "devDependencies": { "@pmmmwh/react-refresh-webpack-plugin": "^0.5.10", "css-loader": "^6.7.3", + "eslint-config-airbnb": "^19.0.4", "eslint-plugin-react": "^7.32.2", "prettier": "1.17.0", "sass": "^1.58.3", diff --git a/ui/src/App.jsx b/ui/src/App.jsx index 6e9e0d6..f5d9884 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -21,7 +21,8 @@ import { PortInput, PortOutput, Maximize, - QueryQueue + QueryQueue, + ChooseItem } from '@carbon/icons-react'; import Header from './Header.jsx'; @@ -37,6 +38,7 @@ const App = function App() { ) const {matrix, loading: matrixLoading, error: matrixError, route} = useMatrix() + const [selectedProbe, setSelectedProbe] = useState(0) const [selectedDestination, setSelectedDestination] = useState(1) const [ProbeSOTRouting, setProbeSODRouting] = useState(false) const [probeFullscreen, setProbeFullscreen] = useState(false) @@ -44,25 +46,25 @@ const App = function App() { useEffect(() => { if (config?.probe.enabled && ProbeSOTRouting) { - route(config.probe.router_destination, matrix.destinations[selectedDestination - 1].source.id) + route(config.probe.router_destinations[selectedProbe], matrix.destinations[selectedDestination - 1].source.id) } - if (config?.probe.enabled && selectedDestination == config.probe.router_destination && ProbeSOTRouting) { + if (config?.probe.enabled && selectedDestination == config.probe.router_destinations[selectedProbe] && ProbeSOTRouting) { setProbeSODRouting(false) } }, [selectedDestination]); useEffect(() => { if (config?.probe.enabled && ProbeSOTRouting) { - if (matrix.destinations[selectedDestination - 1].source.id != matrix.destinations[config.probe.router_destination - 1].source.id) { - route(config.probe.router_destination, matrix.destinations[selectedDestination - 1].source.id) + if (matrix.destinations[selectedDestination - 1].source.id != matrix.destinations[config.probe.router_destinations[selectedProbe] - 1].source.id) { + route(config.probe.router_destinations[selectedProbe], matrix.destinations[selectedDestination - 1].source.id) } } }, [matrix]); useEffect(() => { if (config?.probe.enabled && ProbeSOTRouting) { - if (matrix.destinations[selectedDestination - 1].source.id != matrix.destinations[config.probe.router_destination - 1].source.id) { - route(config.probe.router_destination, matrix.destinations[selectedDestination - 1].source.id) + if (matrix.destinations[selectedDestination - 1].source.id != matrix.destinations[config.probe.router_destinations[selectedProbe] - 1].source.id) { + route(config.probe.router_destinations[selectedProbe], matrix.destinations[selectedDestination - 1].source.id) } } }, [ProbeSOTRouting]); @@ -83,12 +85,12 @@ const App = function App() { {config.probe.enabled && Probe: {matrix.destinations?.[config.probe.router_destination - 1]?.source?.label}} + modalHeading={<>Probe {selectedProbe + 1}: {matrix.destinations?.[config.probe.router_destinations[selectedProbe] - 1]?.source?.label}} passiveModal onRequestClose={()=> setProbeFullscreen(false)} className="fullscreenProbe" > - + } <> @@ -152,13 +153,31 @@ const App = function App() { { config.probe.enabled && <> -

Probe

+ { config.probe.router_destinations.map((dst, index) => ( + + ))} +
- +
Probe Follow: {ProbeSOTRouting ? matrix.destinations?.[selectedDestination - 1]?.label : "None"}
- Probe Source: {matrix.destinations?.[config.probe.router_destination - 1]?.source?.label} + Probe Source: {matrix.destinations?.[config.probe.router_destinations[selectedProbe] - 1]?.source?.label}