diff --git a/arduino/bitmapstream.ino b/arduino/bitmapstream.ino index eb5d281..334d405 100644 --- a/arduino/bitmapstream.ino +++ b/arduino/bitmapstream.ino @@ -1,34 +1,26 @@ #include #include #include -#include +#include +#include #define PANEL_WIDTH 64 #define PANEL_HEIGHT 32 #define PANELS_NUMBER 2 #define E_PIN_DEFAULT 18 -const char* websockets_connection_string = "ws://rgb.mun.sh/sub"; //Enter server adress -using namespace websockets; +WiFiManagerParameter brightness("brightness", "Display Brightness", "128", 40); +WiFiManagerParameter serverAddress("serverAddress", "Server Address", "rgb.mun.sh", 40); +WiFiManagerParameter serverPath("serverPath", "Server Path", "/sub", 40); +WiFiManagerParameter serverPort("serverPort", "Server Port", "80", 40); +WiFiManagerParameter serverReconnectInterval("serverReconnectInterval", "Server Reconnect Delay", "5000", 40); MatrixPanel_I2S_DMA* dma_display; -WebsocketsClient client; +WebSocketsClient webSocket; -uint16_t* uint16_data; - -void onMessageCallback(WebsocketsMessage message) { - uint16_data = (uint16_t *) message.c_str(); -} - -void onEventsCallback(WebsocketsEvent event, String data) { - if(event == WebsocketsEvent::ConnectionOpened) { - Serial.println("Connnection Opened"); - } else if(event == WebsocketsEvent::ConnectionClosed) { - Serial.println("Connnection Closed"); - delay(2000); - ESP.restart(); - } -} +uLongf destLen = 8192; +Bytef destBuffer[8192]; +Bytef resultBuffer[8192]; void setup() { Serial.begin(115200); @@ -50,50 +42,85 @@ void setup() { dma_display->clearScreen(); dma_display->setTextSize(1); dma_display->setTextWrap(true); - dma_display->setCursor(0,0); + dma_display->setCursor(0, 0); dma_display->print("Connecting to WIFI..."); dma_display->setCursor(16, 16); dma_display->print("SSID: TrainSignAP"); WiFiManager wm; + wm.addParameter(&brightness); + wm.addParameter(&serverAddress); + wm.addParameter(&serverPath); + wm.addParameter(&serverPort); + wm.addParameter(&serverReconnectInterval); + bool connected; connected = wm.autoConnect("TrainSignAP"); - if(!connected) { - Serial.println("Failed to connect"); - dma_display->clearScreen(); - dma_display->setTextSize(1); // size 1 == 8 pixels high - dma_display->setTextWrap(true); // Don't wrap at end of line - will do ourselves - dma_display->setCursor(0,0); - dma_display->print("Wifi failed :,("); - delay(2000); - ESP.restart(); + if (!connected) { + Serial.println("Failed to connect"); + dma_display->clearScreen(); + dma_display->setTextSize(1); // size 1 == 8 pixels high + dma_display->setTextWrap(true); // Don't wrap at end of line - will do ourselves + dma_display->setCursor(0, 0); + dma_display->print("Wifi failed :,("); + delay(2000); + ESP.restart(); } - // run callback when messages are received - client.onMessage(onMessageCallback); - - // run callback when events are occuring - client.onEvent(onEventsCallback); + Serial.println("WiFi Connected"); - // Connect to server - connected = client.connect(websockets_connection_string); - if(!connected) { - Serial.println("Connnection Closed"); - dma_display->clearScreen(); - dma_display->setTextSize(1); - dma_display->setTextWrap(true); - dma_display->setCursor(0,0); - dma_display->print("Stream failed :,("); + String brightnessValue = brightness.getValue(); + int brightnessInt = brightnessValue.toInt(); + if (brightnessInt < 0) brightnessInt = 0; + if (brightnessInt > 255) brightnessInt = 255; + dma_display->setBrightness8(brightnessInt); - delay(2000); - ESP.restart(); + String portValue = serverPort.getValue(); + int port = portValue.toInt(); + String serverReconnectIntervalValue = serverReconnectInterval.getValue(); + int serverReconnect = serverReconnectIntervalValue.toInt(); + + webSocket.begin(serverAddress.getValue(), port, serverPath.getValue()); + webSocket.onEvent(webSocketEvent); + webSocket.setReconnectInterval(serverReconnect); +} + +void webSocketEvent(WStype_t type, uint8_t* payload, size_t length) { + // Decompress the payload + switch (type) { + case WStype_DISCONNECTED: + Serial.println("Disconnected!"); + memset(resultBuffer, 0, 8192); + + dma_display->clearScreen(); + dma_display->setTextSize(1); + dma_display->setTextWrap(true); + dma_display->setCursor(0, 0); + dma_display->print("Reconnecting..."); + break; + case WStype_CONNECTED: + Serial.printf("Connected to url: %s\n", payload); + break; + case WStype_BIN: { + int result = uncompress(destBuffer, &destLen, payload, length); + if (result != Z_OK) { + Serial.println("Decompression failed!"); + } + + // apply result delta to last frame + for (int i = 0; i < 8192; i++) { + resultBuffer[i] = resultBuffer[i] + destBuffer[i]; + } + + break; + } } } void loop() { - client.poll(); - if (uint16_data != nullptr) { - dma_display->drawRGBBitmap(0, 0, uint16_data, 128, 32); + webSocket.loop(); + if (webSocket.isConnected()) { + dma_display->drawRGBBitmap(0, 0, (uint16_t*) resultBuffer, 128, 32); } } diff --git a/arduino/bitmapstreamudp.ino b/arduino/bitmapstreamudp.ino new file mode 100644 index 0000000..b9deb94 --- /dev/null +++ b/arduino/bitmapstreamudp.ino @@ -0,0 +1,96 @@ +#include +#include +// #include +#include +#include +#include + +#define PANEL_WIDTH 64 +#define PANEL_HEIGHT 32 +#define PANELS_NUMBER 2 +#define E_PIN_DEFAULT 18 + +const char* ssid = "xxx"; +const char* password = "xxx"; +const int udpPort = 8080; + +MatrixPanel_I2S_DMA* dma_display; +AsyncUDP udp; + +byte buffers[4][1024]; // Adjust this according to your needs +bool received[4] = {false}; // Flag to track received packets +uint16_t assembledData[4096]; +int expectedSequence = 0; + +void setup() { + Serial.begin(115200); + + HUB75_I2S_CFG mxconfig; + mxconfig.mx_height = PANEL_HEIGHT; // we have 64 pix heigh panels + mxconfig.chain_length = PANELS_NUMBER; // we have 2 panels chained + mxconfig.gpio.e = E_PIN_DEFAULT; // we MUST assign pin e to some free pin on a board to drive 64 pix height panels with 1/32 scan + mxconfig.clkphase = false; + + dma_display = new MatrixPanel_I2S_DMA(mxconfig); + + // Allocate memory and start DMA display + if (not dma_display->begin()) + Serial.println("****** !KABOOM! I2S memory allocation failed ***********"); + + dma_display->setBrightness8(128); // range is 0-255, 0 - 0%, 255 - 100% + + dma_display->clearScreen(); + dma_display->setTextSize(1); + dma_display->setTextWrap(true); + dma_display->setCursor(0, 0); + dma_display->print("Connecting to WIFI..."); + dma_display->setCursor(16, 16); + + WiFi.mode(WIFI_STA); + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) { + delay(1000); + } + + Serial.println("WiFi Connected"); + + if (udp.connect(IPAddress(0,0,0,0), udpPort)) { + Serial.println("Server Connected"); + udp.onPacket([](AsyncUDPPacket packet) { + int sequence = packet.data()[0]; // Assuming first byte is sequence number + memcpy(buffers[sequence], packet.data() + 1, packet.length() - 1); + received[sequence] = true; + }); + udp.print("hi"); + } else { + Serial.println("Failed to connect"); + dma_display->clearScreen(); + dma_display->setTextSize(1); // size 1 == 8 pixels high + dma_display->setTextWrap(true); // Don't wrap at end of line - will do ourselves + dma_display->setCursor(0, 0); + dma_display->print("Wifi failed :,("); + delay(2000); + ESP.restart(); + } +} + + +void loop() { + bool allReceived = true; + for (int i = 0; i < 4; i++) { + if (!received[i]) { + allReceived = false; + break; + } + } + + if (allReceived) { + for (int i = 0; i < 4; i++) { + memcpy(assembledData + (1024 * i), buffers[i], 1024); + received[i] = false; + } + + dma_display->drawRGBBitmap(0, 0, assembledData, 128, 32); + } +} diff --git a/bun.lockb b/bun.lockb index 833d0fe..6e75064 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 7ac1901..46b6b2e 100644 --- a/package.json +++ b/package.json @@ -15,11 +15,13 @@ "bun-serve-express": "^1.0.4", "express": "^4.18.2", "gsap": "^3.11.5", + "pako": "^2.1.0", "pixi.js-legacy": "^7.2.1", "vite": "^5.0.12" }, "devDependencies": { "@types/bun": "^1.0.3", - "@types/express": "^4.17.21" + "@types/express": "^4.17.21", + "@types/pako": "^2.0.3" } } \ No newline at end of file diff --git a/src/server.ts b/src/server.ts index ebf7840..98f9836 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,6 +1,7 @@ import { ServerWebSocket, WebSocketServeOptions } from 'bun'; import bunExpress from 'bun-serve-express'; import express from 'express'; +import pako from 'pako'; // import { webSocketServer } from './ws/websocketServer'; process.env.TZ = 'Europe/London'; @@ -8,27 +9,21 @@ process.env.TZ = 'Europe/London'; type WebSocketData = { url: URL; id: string; - ready: boolean; + firstSend: boolean; }; const randomId = () => { return Math.random().toString(36).substring(2, 7); }; +let lastFrame: Buffer | null = null; let latestFrame: Buffer | null = null; const app = bunExpress({ websocket: { open(ws) { ws.data.id = randomId(); - ws.data.ready = false; - - setTimeout( - () => { - ws.data.ready = true; - }, - process.env.SEND_DELAY ? parseInt(process.env.SEND_DELAY) : 3000 - ); + ws.data.firstSend = false; if (ws.data.url.pathname == '/pub') { pubSockets.push(ws); @@ -37,7 +32,6 @@ const app = bunExpress({ if (ws.data.url.pathname == '/sub') { subSockets.push(ws); - console.log('new subscriber', ws.data.id); } }, @@ -78,8 +72,22 @@ const sendInterval = process.env.SEND_INTERVAL ? parseInt(process.env.SEND_INTER setInterval(() => { // Echo data back to sub sockets if (latestFrame == null) return; + + // find delta between last frame and now + const deltaFrame = Buffer.alloc(latestFrame.length); + for (let i = 0; i < latestFrame.length; i++) { + deltaFrame[i] = latestFrame[i] - (lastFrame ? lastFrame[i] : 0); + } + + lastFrame = latestFrame; + for (const sub of subSockets) { - if (sub.data.ready) sub.send(latestFrame); + if (!sub.data.firstSend) { + sub.data.firstSend = true; + sub.send(pako.deflate(latestFrame)); + } else { + sub.send(pako.deflate(deltaFrame)); + } } }, sendInterval); @@ -195,7 +203,13 @@ app.get('/api/flights/arrivals', async (req, res) => { date: new Date(fl.scheduled_time) }, origin: fl.airport_name, - message: fl.message ? fl.message.replace('Expected', 'exp').replace('Landed', 'lnd').replace('Now ', '') : null, + message: fl.message + ? fl.message + .replace('Expected', 'exp') + .replace('Landed', 'lnd') + .replace('Now ', '') + .replace(/Diverted.*/, 'Diverted') + : null, status: fl.status })) .filter((fl: any) => { diff --git a/udpserver/go.mod b/udpserver/go.mod new file mode 100644 index 0000000..dcbeefc --- /dev/null +++ b/udpserver/go.mod @@ -0,0 +1,8 @@ +module udpserver + +go 1.21.6 + +require ( + github.com/gorilla/websocket v1.5.1 // indirect + golang.org/x/net v0.17.0 // indirect +) diff --git a/udpserver/go.sum b/udpserver/go.sum new file mode 100644 index 0000000..272772f --- /dev/null +++ b/udpserver/go.sum @@ -0,0 +1,4 @@ +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= diff --git a/udpserver/main.go b/udpserver/main.go new file mode 100644 index 0000000..d7b578b --- /dev/null +++ b/udpserver/main.go @@ -0,0 +1,128 @@ +package main + +import ( + "fmt" + "log" + "net" + "net/url" + "os" + "os/signal" + "time" + + "github.com/gorilla/websocket" +) + +func getEnv(key, fallback string) string { + value := os.Getenv(key) + if len(value) == 0 { + return fallback + } + return value +} + +func main() { + port := getEnv("UDP_PORT", "8080") + + // ws setup + u := url.URL{Scheme: "ws", Host: "rgb.mun.sh", Path: "/sub"} + log.Printf("connecting to %s", u.String()) + + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + + // UDP setup + udpAddr, err := net.ResolveUDPAddr("udp", ":"+port) // Change the UDP address and port accordingly + if err != nil { + log.Fatal("UDP resolve address:", err) + } + + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + log.Fatal("UDP listen:", err) + } + defer conn.Close() + + done := make(chan struct{}) + + clients := make(map[string]*net.UDPAddr) + + // Read from websocket and write to UDP + go func() { + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Println("read:", err) + return + } + // log.Printf("recv: %s", message) + + // Send data to all clients + for _, client := range clients { + // chunk message into 4 parts, with a sequence number + chunk0 := append([]byte{0}, message[:len(message)/4]...) + chunk1 := append([]byte{1}, message[len(message)/4:len(message)/2]...) + chunk2 := append([]byte{2}, message[len(message)/2:len(message)/4*3]...) + chunk3 := append([]byte{3}, message[len(message)/4*3:]...) + + // Send data to UDP connection + for _, chunk := range [][]byte{chunk0, chunk1, chunk2, chunk3} { + _, err := conn.WriteToUDP(chunk, client) + if err != nil { + fmt.Printf("Error sending data to %s: %v\n", client, err) + // delete client from clients map + delete(clients, client.String()) + } + } + } + } + }() + + // Handle udp clients + go func() { + defer close(done) + buf := make([]byte, 1024) + + for { + // Read from UDP connection + n, clientAddr, err := conn.ReadFromUDP(buf) + if err != nil { + fmt.Println("Error reading from UDP connection:", err) + continue + } + + // Print received data + fmt.Printf("Received from %s: %s\n", clientAddr, buf[:n]) + + // Add client to clients map + clients[clientAddr.String()] = clientAddr + } + }() + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + for { + select { + case <-done: + return + case <-interrupt: + log.Println("interrupt") + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Println("write close:", err) + return + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } +} \ No newline at end of file