From 13e99efc546bc76004efe68641ea52ae55493251 Mon Sep 17 00:00:00 2001 From: Oliver Date: Sun, 12 Mar 2023 23:30:15 +1100 Subject: [PATCH] integrated probe --- .github/workflows/release.yaml | 4 +- config.go | 5 +- main.go | 75 +++++++++++++---- probe.go | 144 +++++++++++++++++++++++++++++++++ ui/package.json | 1 + ui/src/App.jsx | 14 ++-- ui/src/JSmpegPlayer.jsx | 34 ++++++++ ui/src/index.scss | 29 +++++++ ui/src/useMatrix.js | 2 +- ui/webpack.config.js | 4 +- ui/yarn.lock | 5 ++ 11 files changed, 284 insertions(+), 33 deletions(-) create mode 100644 probe.go create mode 100644 ui/src/JSmpegPlayer.jsx diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0fa4d98..73fcdb6 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -9,9 +9,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Auth GHCR - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} diff --git a/config.go b/config.go index 1950aac..f1d9e03 100644 --- a/config.go +++ b/config.go @@ -21,9 +21,8 @@ type Router struct { } type Probe struct { - Enabled bool `json:"enabled"` - RouterDestination int `json:"router_destination"` - EmbedURL string `json:"embed_url"` + Enabled bool `json:"enabled"` + RouterDestination int `json:"router_destination"` } var Config Configuration diff --git a/main.go b/main.go index 0b8d816..5260ec6 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,8 @@ package main import ( "fmt" + "io" + "io/ioutil" "log" "net/http" "os" @@ -19,9 +21,10 @@ import ( var ( router *nk.Router - WebsocketConnections = make(map[WebsocketConnection]uuid.UUID) - WebsocketConnectionsMutex sync.Mutex - Upgrader = websocket.Upgrader{ + ProbeHandler *ProbeSocketHandler + MatrixWSConnections = make(map[WebsocketConnection]uuid.UUID) + MatrixWSConnectionsMutex sync.Mutex + Upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, @@ -38,11 +41,11 @@ func main() { router.SetOnUpdate(func(d *nk.Destination) { log.Printf("Received update for %s, current source %s", d.Label, d.Source.Label) - WebsocketConnectionsMutex.Lock() - for conn := range WebsocketConnections { + MatrixWSConnectionsMutex.Lock() + for conn := range MatrixWSConnections { conn.Socket.WriteJSON(d) } - WebsocketConnectionsMutex.Unlock() + MatrixWSConnectionsMutex.Unlock() }) data, err := os.ReadFile("labels.lbl") @@ -52,6 +55,24 @@ func main() { log.Printf("unable to load DashBoard labels.lbl") } + if Config.Probe.Enabled { + ProbeHandler = &ProbeSocketHandler{ + clients: make(map[*ProbeClient]bool), + register: make(chan *ProbeClient), + unregister: make(chan *ProbeClient), + broadcast: make(chan *[]byte), + upgrader: &websocket.Upgrader{ + ReadBufferSize: readBufferSize, + WriteBufferSize: writeBufferSize, + CheckOrigin: func(r *http.Request) bool { + return true + }, + }, + } + + go ProbeHandler.Run() + } + go router.Connect() go serveHTTP() @@ -82,13 +103,18 @@ func serveHTTP() { }) svc.Static("/dist", "/dist") - svc.GET("/v1/ws", HandleWS) + svc.GET("/v1/ws/matrix", HandleMatrixWS) svc.GET("/v1/matrix", HandleMatrix) svc.POST("/v1/matrix/:dst/:src", HandleRouteRequest) svc.GET("/v1/config", HandleConfig) + if Config.Probe.Enabled { + svc.GET("/v1/ws/probe", ProbeHandler.ServeWS) + svc.POST("/v1/probe/stream", HandleProbeStream) + } + err := svc.Run(fmt.Sprintf(":%d", Config.Server.Port)) if err != nil { log.Fatalln("unable to start http server", err) @@ -140,7 +166,7 @@ func HandleConfig(c *gin.Context) { c.JSON(http.StatusOK, Config) } -func HandleWS(c *gin.Context) { +func HandleMatrixWS(c *gin.Context) { w := c.Writer r := c.Request @@ -157,9 +183,9 @@ func HandleWS(c *gin.Context) { Mux: new(sync.Mutex), } - WebsocketConnectionsMutex.Lock() - WebsocketConnections[connection] = uuid.Must(uuid.NewRandom()) - WebsocketConnectionsMutex.Unlock() + MatrixWSConnectionsMutex.Lock() + MatrixWSConnections[connection] = uuid.Must(uuid.NewRandom()) + MatrixWSConnectionsMutex.Unlock() defer connection.Socket.Close() for { @@ -172,19 +198,34 @@ func HandleWS(c *gin.Context) { websocket.CloseNoStatusReceived, websocket.CloseGoingAway, ) { - WebsocketConnectionsMutex.Lock() - delete(WebsocketConnections, connection) - WebsocketConnectionsMutex.Unlock() + MatrixWSConnectionsMutex.Lock() + delete(MatrixWSConnections, connection) + MatrixWSConnectionsMutex.Unlock() return } } - WebsocketConnectionsMutex.Lock() - delete(WebsocketConnections, connection) - WebsocketConnectionsMutex.Unlock() + MatrixWSConnectionsMutex.Lock() + delete(MatrixWSConnections, connection) + MatrixWSConnectionsMutex.Unlock() break } } } + +func HandleProbeStream(c *gin.Context) { + log.Printf("IncomingStream connected: %s\n", c.RemoteIP()) + + for { + data, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, 1024)) + if err != nil || len(data) == 0 { + break + } + + ProbeHandler.BroadcastData(&data) + } + + log.Printf("IncomingStream disconnected: %s\n", c.RemoteIP()) +} diff --git a/probe.go b/probe.go new file mode 100644 index 0000000..16732c7 --- /dev/null +++ b/probe.go @@ -0,0 +1,144 @@ +package main + +// The vast majority of this is modified from jsmpeg-stream-go by Chanishk +// Their package does not have a license and I was unable to contact them to ask if I could use it +// If anyone has a problem with the code being used here please let me know and I'll do my best to handle it +// Go check out Chanshik's stuff because it's pretty cool +// https://github.com/chanshik/jsmpeg-stream-go +// https://github.com/chanshik/ + +import ( + "log" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" +) + +type ProbeClient struct { + ws *websocket.Conn + sendChan chan *[]byte + + unregisterChan chan *ProbeClient +} + +var ( + readBufferSize int = 8192 + writeBufferSize int = 8192 +) + +func NewProbeClient(ws *websocket.Conn, unregisterChan chan *ProbeClient) *ProbeClient { + client := &ProbeClient{ + ws: ws, + sendChan: make(chan *[]byte, 512), + unregisterChan: unregisterChan, + } + + return client +} + +func (c *ProbeClient) Close() { + log.Println("Closing client's send channel") + close(c.sendChan) +} + +func (c *ProbeClient) ReadHandler() { + defer func() { + c.unregisterChan <- c + }() + + for { + msgType, msg, err := c.ws.ReadMessage() + if err != nil { + break + } + + if msgType == websocket.CloseMessage { + break + } + + log.Println("Received from client: " + string(msg)) + } +} + +func (c *ProbeClient) WriteHandler() { + defer func() { + c.unregisterChan <- c + }() + + for { + select { + case data, ok := <-c.sendChan: + if !ok { + log.Println("Client send failed") + c.ws.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + c.ws.WriteMessage(websocket.BinaryMessage, *data) + } + } +} + +func (c *ProbeClient) Run() { + go c.ReadHandler() + go c.WriteHandler() +} + +type ProbeSocketHandler struct { + clients map[*ProbeClient]bool // *client -> is connected (true/false) + register chan *ProbeClient + unregister chan *ProbeClient + broadcast chan *[]byte + + upgrader *websocket.Upgrader +} + +func (h *ProbeSocketHandler) Run() { + for { + select { + case client := <-h.register: + h.clients[client] = true + log.Printf("New client registered. Total: %d\n", len(h.clients)) + + case client := <-h.unregister: + _, ok := h.clients[client] + if ok { + delete(h.clients, client) + } + log.Printf("Client unregistered. Total: %d\n", len(h.clients)) + + case data := <-h.broadcast: + h.BroadcastData(data) + } + } +} + +func (h *ProbeSocketHandler) ServeWS(c *gin.Context) { + if c.Request.Method != "GET" { + c.JSON(http.StatusMethodNotAllowed, gin.H{"code": http.StatusMethodNotAllowed, "error": "Method Not Allowed"}) + return + } + + w := c.Writer + r := c.Request + + ws, err := h.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + log.Println("New client connected") + client := NewProbeClient(ws, h.unregister) + + h.register <- client + + go client.Run() +} + +func (h *ProbeSocketHandler) BroadcastData(data *[]byte) { + for client := range h.clients { + client.sendChan <- data + } +} diff --git a/ui/package.json b/ui/package.json index b1fd526..01ff5fd 100644 --- a/ui/package.json +++ b/ui/package.json @@ -21,6 +21,7 @@ "@carbon/colors": "^11.13.0", "@carbon/icons-react": "^11.17.0", "@carbon/react": "1.1.0", + "@cycjimmy/jsmpeg-player": "^6.0.5", "axios": "^1.3.4", "axios-hooks": "^4.0.0", "babel-loader": "^9.1.2", diff --git a/ui/src/App.jsx b/ui/src/App.jsx index 3200d01..d9611df 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -12,6 +12,8 @@ import Header from './Header.jsx'; import useMatrix from './useMatrix' +import JSmpegPlayer from './JSmpegPlayer.jsx' + const route = (dst, src) => { console.log("routing", src, "to", dst) axios.post(`/v1/matrix/${dst}/${src}`) @@ -66,7 +68,6 @@ const App = function App() { <>
@@ -80,13 +81,9 @@ const App = function App() { modalHeading={<>Probe: {matrix.destinations?.[config.probe.router_destination - 1]?.source.label}} passiveModal onRequestClose={()=> setProbeFullscreen(false)} - isFullWidth + className="fullscreenProbe" > -
- {probeFullscreen &&