diff --git a/.gitignore b/.gitignore index 8e8f5b54..4a4ab393 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ yarn.lock .DS_Store go-peer/go-peer **/.idea +nim-peer/nim_peer +nim-peer/local.* diff --git a/README.md b/README.md index 939a4b66..78eae9ae 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Some of the cool and cutting-edge [transport protocols](https://connectivity.lib | [`node-js-peer`](./node-js-peer/) | Node.js Chat Peer in TypeScript | ✅ | ✅ | ✅ | ✅ | ✅ | | [`go-peer`](./go-peer/) | Chat peer implemented in Go | ✅ | ❌ | ✅ | ✅ | ✅ | | [`rust-peer`](./rust-peer/) | Chat peer implemented in Rust | ❌ | ❌ | ✅ | ✅ | ✅ | +| [`nim-peer`](./nim-peer/) | Chat peer implemented in Nim | ❌ | ❌ | ❌ | ❌ | ✅ | ✅ - Protocol supported ❌ - Protocol not supported @@ -97,3 +98,15 @@ cargo run -- --help cd go-peer go run . ``` + +## Getting started: Nim +``` +cd nim-peer +nimble build + +# Wait for connections in tcp/9093 +./nim_peer + +# Connect to another node (e.g. in localhost tcp/9092) +./nim_peer --connect /ip4/127.0.0.1/tcp/9092/p2p/12D3KooSomePeerId +``` diff --git a/nim-peer/config.nims b/nim-peer/config.nims new file mode 100644 index 00000000..d57a74e2 --- /dev/null +++ b/nim-peer/config.nims @@ -0,0 +1,8 @@ +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +--define: + "chronicles_sinks=textblocks[dynamic]" +--define: + "chronicles_log_level=DEBUG" +# end Nimble config diff --git a/nim-peer/nim_peer.nimble b/nim-peer/nim_peer.nimble new file mode 100644 index 00000000..667aca8d --- /dev/null +++ b/nim-peer/nim_peer.nimble @@ -0,0 +1,13 @@ +# Package + +version = "0.1.0" +author = "Status Research & Development GmbH" +description = "universal-connectivity nim peer" +license = "MIT" +srcDir = "src" +bin = @["nim_peer"] + + +# Dependencies + +requires "nim >= 2.2.0", "nimwave", "chronos", "chronicles", "libp2p", "illwill", "cligen", "stew" diff --git a/nim-peer/src/file_exchange.nim b/nim-peer/src/file_exchange.nim new file mode 100644 index 00000000..824de18d --- /dev/null +++ b/nim-peer/src/file_exchange.nim @@ -0,0 +1,33 @@ +import os +import libp2p, chronos, chronicles, stew/byteutils + +const + MaxFileSize: int = 1024 # 1KiB + MaxFileIdSize: int = 1024 # 1KiB + FileExchangeCodec*: string = "/universal-connectivity-file/1" + +type FileExchange* = ref object of LPProtocol + +proc new*(T: typedesc[FileExchange]): T = + proc handle(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} = + try: + let fileId = string.fromBytes(await conn.readLp(MaxFileIdSize)) + # filename is /tmp/{fileid} + let filename = getTempDir().joinPath(fileId) + if filename.fileExists: + let fileContent = cast[seq[byte]](readFile(filename)) + await conn.writeLp(fileContent) + except CancelledError as e: + raise e + except CatchableError as e: + error "Exception in handler", error = e.msg + finally: + await conn.close() + + return T.new(codecs = @[FileExchangeCodec], handler = handle) + +proc requestFile*( + p: FileExchange, conn: Connection, fileId: string +): Future[seq[byte]] {.async.} = + await conn.writeLp(cast[seq[byte]](fileId)) + await conn.readLp(MaxFileSize) diff --git a/nim-peer/src/nim_peer.nim b/nim-peer/src/nim_peer.nim new file mode 100644 index 00000000..fcad4cd9 --- /dev/null +++ b/nim-peer/src/nim_peer.nim @@ -0,0 +1,241 @@ +{.push raises: [Exception].} + +import tables, deques, strutils, os, streams + +import libp2p, chronos, cligen, chronicles +from libp2p/protocols/pubsub/rpc/message import Message + +from illwave as iw import nil, `[]`, `[]=`, `==`, width, height +from terminal import nil + +import ./ui/root +import ./utils +import ./file_exchange + +const + KeyFile: string = "local.key" + PeerIdFile: string = "local.peerid" + MaxKeyLen: int = 4096 + ListenPort: int = 9093 + +proc cleanup() {.noconv: (raises: []).} = + try: + iw.deinit() + except: + discard + try: + terminal.resetAttributes() + terminal.showCursor() + # Clear screen and move cursor to top-left + stdout.write("\e[2J\e[H") # ANSI escape: clear screen & home + stdout.flushFile() + quit(130) # SIGINT conventional exit code + except IOError as exc: + echo "Unexpected error: " & exc.msg + quit(1) + +proc readKeyFile( + filename: string +): PrivateKey {.raises: [OSError, IOError, ResultError[crypto.CryptoError]].} = + let size = getFileSize(filename) + + if size == 0: + raise newException(OSError, "Empty key file") + + var buf: seq[byte] + buf.setLen(size) + + var fs = openFileStream(filename, fmRead) + defer: + fs.close() + + discard fs.readData(buf[0].addr, size.int) + PrivateKey.init(buf).tryGet() + +proc writeKeyFile( + filename: string, key: PrivateKey +) {.raises: [OSError, IOError, ResultError[crypto.CryptoError]].} = + var fs = openFileStream(filename, fmWrite) + defer: + fs.close() + + let buf = key.getBytes().tryGet() + fs.writeData(buf[0].addr, buf.len) + +proc loadOrCreateKey(rng: var HmacDrbgContext): PrivateKey = + if fileExists(KeyFile): + try: + return readKeyFile(KeyFile) + except: + discard # overwrite file + try: + let k = PrivateKey.random(rng).tryGet() + writeKeyFile(KeyFile, k) + k + except: + echo "Could not create new key" + quit(1) + +proc start( + addrs: Opt[MultiAddress], headless: bool, room: string, port: int +) {.async: (raises: [CancelledError]).} = + # Handle Ctrl+C + setControlCHook(cleanup) + + var rng = newRng() + + let switch = + try: + SwitchBuilder + .new() + .withRng(rng) + .withTcpTransport() + .withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/" & $port).tryGet()]) + .withYamux() + .withNoise() + .withPrivateKey(loadOrCreateKey(rng[])) + .build() + except LPError as exc: + echo "Could not start switch: " & $exc.msg + quit(1) + except Exception as exc: + echo "Could not start switch: " & $exc.msg + quit(1) + + try: + writeFile(PeerIdFile, $switch.peerInfo.peerId) + except IOError as exc: + error "Could not write PeerId to file", description = exc.msg + + let (gossip, fileExchange) = + try: + (GossipSub.init(switch = switch, triggerSelf = true), FileExchange.new()) + except InitializationError as exc: + echo "Could not initialize gossipsub: " & $exc.msg + quit(1) + + try: + switch.mount(gossip) + switch.mount(fileExchange) + await switch.start() + except LPError as exc: + echo "Could start switch: " & $exc.msg + + info "Started switch", peerId = $switch.peerInfo.peerId + + let + recvQ = newAsyncQueue[string]() + peerQ = newAsyncQueue[(PeerId, PeerEventKind)]() + systemQ = newAsyncQueue[string]() + + # if --connect was specified, connect to peer + if addrs.isSome(): + try: + discard await switch.connect(addrs.get()) + except Exception as exc: + error "Connection error", description = exc.msg + + # wait so that gossipsub can form mesh + await sleepAsync(3.seconds) + + # topic handlers + # chat and file handlers actually need to be validators instead of regular handlers + # validators allow us to get information about which peer sent a message + let onChatMsg = proc( + topic: string, msg: Message + ): Future[ValidationResult] {.async, gcsafe.} = + let strMsg = cast[string](msg.data) + await recvQ.put(shortPeerId(msg.fromPeer) & ": " & strMsg) + await systemQ.put("Received message") + await systemQ.put(" Source: " & $msg.fromPeer) + await systemQ.put(" Topic: " & $topic) + await systemQ.put(" Seqno: " & $seqnoToUint64(msg.seqno)) + await systemQ.put(" ") # empty line + return ValidationResult.Accept + + # when a new file is announced, download it + let onNewFile = proc( + topic: string, msg: Message + ): Future[ValidationResult] {.async, gcsafe.} = + let fileId = sanitizeFileId(cast[string](msg.data)) + # this will only work if we're connected to `fromPeer` (since we don't have kad-dht) + let conn = await switch.dial(msg.fromPeer, FileExchangeCodec) + let filePath = getTempDir() / fileId + let fileContents = await fileExchange.requestFile(conn, fileId) + writeFile(filePath, fileContents) + await conn.close() + # Save file in /tmp/fileId + await systemQ.put("Downloaded file to " & filePath) + await systemQ.put(" ") # empty line + return ValidationResult.Accept + + # when a new peer is announced + let onNewPeer = proc(topic: string, data: seq[byte]) {.async, gcsafe.} = + let peerId = PeerId.init(data).valueOr: + error "Could not parse PeerId from data", data = $data + return + await peerQ.put((peerId, PeerEventKind.Joined)) + + # register validators and handlers + + # receive chat messages + gossip.subscribe(room, nil) + gossip.addValidator(room, onChatMsg) + + # receive files offerings + gossip.subscribe(ChatFileTopic, nil) + gossip.addValidator(ChatFileTopic, onNewFile) + + # receive newly connected peers through gossipsub + gossip.subscribe(PeerDiscoveryTopic, onNewPeer) + + let onPeerJoined = proc( + peer: PeerId, peerEvent: PeerEvent + ) {.gcsafe, async: (raises: [CancelledError]).} = + await peerQ.put((peer, PeerEventKind.Joined)) + + let onPeerLeft = proc( + peer: PeerId, peerEvent: PeerEvent + ) {.gcsafe, async: (raises: [CancelledError]).} = + await peerQ.put((peer, PeerEventKind.Left)) + + # receive newly connected peers through direct connections + switch.addPeerEventHandler(onPeerJoined, PeerEventKind.Joined) + switch.addPeerEventHandler(onPeerLeft, PeerEventKind.Left) + + # add already connected peers + for peerId in switch.peerStore[AddressBook].book.keys: + await peerQ.put((peerId, PeerEventKind.Joined)) + + if headless: + runForever() + else: + try: + await runUI(gossip, room, recvQ, peerQ, systemQ, switch.peerInfo.peerId) + except Exception as exc: + error "Unexpected error", description = exc.msg + finally: + if switch != nil: + await switch.stop() + try: + cleanup() + except: + discard + +proc cli(connect = "", room = ChatTopic, port = ListenPort, headless = false) = + var addrs = Opt.none(MultiAddress) + if connect.len > 0: + addrs = Opt.some(MultiAddress.init(connect).get()) + try: + waitFor start(addrs, headless, room, port) + except CancelledError: + echo "Operation cancelled" + +when isMainModule: + dispatch cli, + help = { + "connect": "full multiaddress (with /p2p/ peerId) of the node to connect to", + "room": "Room name", + "port": "TCP listen port", + "headless": "No UI, can only receive messages", + } diff --git a/nim-peer/src/ui/context.nim b/nim-peer/src/ui/context.nim new file mode 100644 index 00000000..45062e48 --- /dev/null +++ b/nim-peer/src/ui/context.nim @@ -0,0 +1,4 @@ +type State* = object + inputBuffer*: string + +include nimwave/prelude diff --git a/nim-peer/src/ui/root.nim b/nim-peer/src/ui/root.nim new file mode 100644 index 00000000..2a21ed68 --- /dev/null +++ b/nim-peer/src/ui/root.nim @@ -0,0 +1,196 @@ +import chronos, chronicles, deques, strutils, os +from illwave as iw import nil, `[]`, `[]=`, `==`, width, height +from nimwave as nw import nil +from terminal import nil +import libp2p + +import ./scrollingtextbox +import ./context +import ../utils + +const + InputPanelHeight: int = 3 + ScrollSpeed: int = 2 + +type InputPanel = ref object of nw.Node + +method render(node: InputPanel, ctx: var nw.Context[State]) = + ctx = nw.slice(ctx, 0, 0, iw.width(ctx.tb), InputPanelHeight) + render( + nw.Box( + border: nw.Border.Single, + direction: nw.Direction.Vertical, + children: nw.seq("> " & ctx.data.inputBuffer), + ), + ctx, + ) + +proc resizePanels( + chatPanel: ScrollingTextBox, + peersPanel: ScrollingTextBox, + systemPanel: ScrollingTextBox, + newWidth: int, + newHeight: int, +) = + let + peersPanelWidth = (newWidth / 4).int + topHeight = (newHeight / 2).int + chatPanel.resize(newWidth - peersPanelWidth, topHeight) + peersPanel.resize(peersPanelWidth, topHeight) + systemPanel.resize(newWidth, newHeight - topHeight - InputPanelHeight) + +proc runUI*( + gossip: GossipSub, + room: string, + recvQ: AsyncQueue[string], + peerQ: AsyncQueue[(PeerId, PeerEventKind)], + systemQ: AsyncQueue[string], + myPeerId: PeerId, +) {.async: (raises: [Exception]).} = + var + ctx = nw.initContext[State]() + prevTb: iw.TerminalBuffer + mouse: iw.MouseInfo + key: iw.Key + terminal.enableTrueColors() + terminal.hideCursor() + try: + iw.init() + except: + return + + ctx.tb = iw.initTerminalBuffer(terminal.terminalWidth(), terminal.terminalHeight()) + + # TODO: publish my peerid in peerid topic + let + peersPanelWidth = (iw.width(ctx.tb) / 4).int + topHeight = (iw.height(ctx.tb) / 2).int + chatPanel = ScrollingTextBox.new( + title = "Chat", width = iw.width(ctx.tb) - peersPanelWidth, height = topHeight + ) + peersPanel = ScrollingTextBox.new( + title = "Peers", + width = peersPanelWidth, + height = topHeight, + text = @[shortPeerId(myPeerId) & " (You)"], + ) + systemPanel = ScrollingTextBox.new( + title = "System", + width = iw.width(ctx.tb), + height = iw.height(ctx.tb) - topHeight - InputPanelHeight, + ) + + # Send chronicle logs to systemPanel + defaultChroniclesStream.output.writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.gcsafe.} = + for line in msg.replace("\t", " ").splitLines(): + systemPanel.push(line) + + ctx.data.inputBuffer = "" + let focusAreas = @[chatPanel, peersPanel, systemPanel] + var focusIndex = 0 + var focusedPanel: ScrollingTextBox + + while true: + focusedPanel = focusAreas[focusIndex] + focusedPanel.border = nw.Border.Double + key = iw.getKey(mouse) + if key == iw.Key.Mouse: + case mouse.scrollDir + of iw.ScrollDirection.sdUp: + focusedPanel.scrollUp(ScrollSpeed) + of iw.ScrollDirection.sdDown: + focusedPanel.scrollDown(ScrollSpeed) + else: + discard + elif key == iw.Key.Tab: + # unfocus previous panel + focusedPanel.border = nw.Border.Single + focusIndex += 1 + if focusIndex >= focusAreas.len: + focusIndex = 0 # wrap around + elif key in {iw.Key.Space .. iw.Key.Tilde}: + ctx.data.inputBuffer.add(cast[char](key.ord)) + elif key == iw.Key.Backspace and ctx.data.inputBuffer.len > 0: + ctx.data.inputBuffer.setLen(ctx.data.inputBuffer.len - 1) + elif key == iw.Key.Enter: + # handle /file command to send/publish files + if ctx.data.inputBuffer.startsWith("/file"): + let parts = ctx.data.inputBuffer.split(" ") + if parts.len < 2: + systemPanel.push("Invalid /file command, missing file name") + else: + for path in parts[1 ..^ 1]: + if not fileExists(path): + systemPanel.push("Unable to find file '" & path & "', skipping") + continue + let fileId = path.splitFile().name + # copy file to /tmp/{filename} + copyFile(path, getTempDir().joinPath(fileId)) + # publish /tmp/{filename} + try: + discard await gossip.publish(ChatFileTopic, cast[seq[byte]](@(fileId))) + systemPanel.push("Offering file " & fileId) + except Exception as exc: + systemPanel.push("Unable to offer file: " & exc.msg) + else: + try: + discard await gossip.publish(room, cast[seq[byte]](@(ctx.data.inputBuffer))) + chatPanel.push("You: " & ctx.data.inputBuffer) # show message in ui + systemPanel.push("Sent chat message") + except Exception as exc: + systemPanel.push("Unable to send chat message: " & exc.msg) + ctx.data.inputBuffer = "" # clear input buffer + elif key != iw.Key.None: + discard + + # update peer list if there's a new peer from peerQ + if not peerQ.empty(): + let (newPeer, eventKind) = await peerQ.get() + + if eventKind == PeerEventKind.Joined and + not peersPanel.text.contains(shortPeerId(newPeer)): + systemPanel.push("Adding peer " & shortPeerId(newPeer)) + peersPanel.push(shortPeerId(newPeer)) + + if eventKind == PeerEventKind.Left and + peersPanel.text.contains(shortPeerId(newPeer)): + systemPanel.push("Removing peer " & shortPeerId(newPeer)) + peersPanel.remove(shortPeerId(newPeer)) + + # update messages if there's a new message from recvQ + if not recvQ.empty(): + let msg = await recvQ.get() + chatPanel.push(msg) # show message in ui + + # update messages if there's a new message from recvQ + if not systemQ.empty(): + let msg = await systemQ.get() + if msg.len > 0: + systemPanel.push(msg) # show message in ui + + renderRoot( + nw.Box( + direction: nw.Direction.Vertical, + children: nw.seq( + nw.Box( + direction: nw.Direction.Horizontal, children: nw.seq(chatPanel, peersPanel) + ), + systemPanel, + InputPanel(), + ), + ), + ctx, + ) + + # render + iw.display(ctx.tb, prevTb) + prevTb = ctx.tb + ctx.tb = iw.initTerminalBuffer(terminal.terminalWidth(), terminal.terminalHeight()) + if iw.width(prevTb) != iw.width(ctx.tb) or iw.height(prevTb) != iw.height(ctx.tb): + resizePanels( + chatPanel, peersPanel, systemPanel, iw.width(ctx.tb), iw.height(ctx.tb) + ) + + await sleepAsync(5.milliseconds) diff --git a/nim-peer/src/ui/scrollingtextbox.nim b/nim-peer/src/ui/scrollingtextbox.nim new file mode 100644 index 00000000..77c77db1 --- /dev/null +++ b/nim-peer/src/ui/scrollingtextbox.nim @@ -0,0 +1,112 @@ +import unicode +from nimwave as nw import nil + +import ./context + +type ScrollingTextBox* = ref object of nw.Node + title*: string + text*: seq[string] + width*: int + height*: int + startingLine: int + border*: nw.Border + +proc new*( + T: typedesc[ScrollingTextBox], + title: string = "", + width: int = 3, + height: int = 3, + text: seq[string] = @[], +): T = + # width and height cannot be less than 3 (2 for borders + 1 for content) + let height = max(height, 3) + let width = max(width, 3) + # height and width - 2 to account for size of box lines (top and botton) + ScrollingTextBox( + title: title, + width: width - 2, + height: height - 2, + text: text, + startingLine: 0, + border: nw.Border.Single, + ) + +proc resize*(node: ScrollingTextBox, width: int, height: int) = + let height = max(height, 3) + let width = max(width, 3) + node.width = width - 2 + node.height = height - 2 + +proc formatText(node: ScrollingTextBox): seq[string] = + result = @[] + result.add(node.title.alignLeft(node.width)) + # empty line after title + result.add(" ".alignLeft(node.width)) + for i in node.startingLine ..< max(node.startingLine + node.height - 2, 0): + if i < node.text.len: + result.add(node.text[i].alignLeft(node.width)) + else: + result.add(" ".alignLeft(node.width)) + +proc scrollUp*(node: ScrollingTextBox, speed: int) = + node.startingLine = max(node.startingLine - speed, 0) + +proc scrollDown*(node: ScrollingTextBox, speed: int) = + let lastStartingLine = max(0, node.text.len - node.height + 2) + node.startingLine = min(node.startingLine + speed, lastStartingLine) + +proc tail(node: ScrollingTextBox) = + ## focuses window in lowest frame + node.startingLine = max(0, node.text.len - node.height + 2) + +proc isAnsiEscapeSequence(s: string, idx: int): bool = + ## Check if the substring starting at `idx` is an ANSI escape sequence + if idx < 0 or idx + 2 >= s.len: # Need at least 3 characters for "\e[" + return false + if s[idx] == '\e' and s[idx + 1] == '[': # Must start with "\e[" + var i = idx + 2 + while i < s.len and (s[i] in '0' .. '9' or s[i] == ';' or s[i] == 'm'): + i.inc + return s[i - 1] == 'm' # Ends with 'm' + return false + +proc chunkString(s: string, chunkSize: int): seq[string] = + var result: seq[string] = @[] + var i = 0 + + while i < s.len: + var endIdx = min(i + chunkSize - 1, s.len - 1) + + # Avoid splitting escape sequences + while endIdx > i and isAnsiEscapeSequence(s, endIdx): + dec endIdx + + result.add(s[i .. endIdx]) + i = endIdx + 1 + + return result + +proc push*(node: ScrollingTextBox, newLine: string) = + if newLine.len == 0 or node.width <= 0: + return + for chunk in chunkString(newLine, node.width): + node.text.add(chunk) + node.tail() + +proc remove*(node: ScrollingTextBox, lineToRemove: string) = + let idx = node.text.find(lineToRemove) + if idx >= 0: + node.text.delete(idx) + if idx <= node.startingLine: + node.scrollUp(1) + +method render(node: ScrollingTextBox, ctx: var nw.Context[State]) = + ctx = nw.slice(ctx, 0, 0, node.width + 2, node.height + 2) + render( + nw.Box( + border: node.border, + direction: nw.Direction.Vertical, + children: nw.seq(node.formatText()), + ), + ctx, + ) diff --git a/nim-peer/src/utils.nim b/nim-peer/src/utils.nim new file mode 100644 index 00000000..938c94f3 --- /dev/null +++ b/nim-peer/src/utils.nim @@ -0,0 +1,47 @@ +import strutils + +import libp2p + +const + ChatTopic*: string = "universal-connectivity" + ChatFileTopic*: string = "universal-connectivity-file" + PeerDiscoveryTopic*: string = "universal-connectivity-browser-peer-discovery" + +const SanitizationRules = [ + ({'\0' .. '\31'}, ' '), # Control chars -> space + ({'"'}, '\''), # Double quote -> single quote + ({'/', '\\', ':', '|'}, '-'), # Slash, backslash, colon, pipe -> dash + ({'*', '?', '<', '>'}, '_'), # Asterisk, question, angle brackets -> underscore +] + +proc shortPeerId*(peerId: PeerId): string {.raises: [ValueError].} = + let strPeerId = $peerId + if strPeerId.len < 7: + raise newException(ValueError, "PeerId too short") + strPeerId[^7 ..^ 1] + +proc sanitizeFileId*(fileId: string): string = + ## Sanitize a filename for Windows, macOS, and Linux + result = fileId + for (chars, replacement) in SanitizationRules: + for ch in chars: + result = result.multiReplace(($ch, $replacement)) + result = result.strip() + # Avoid reserved Windows filenames (CON, PRN, AUX, NUL, COM1..COM9, LPT1..LPT9) + var reserved = @["CON", "PRN", "AUX", "NUL"] + for i in 1 .. 9: + reserved.add("COM" & $i) + reserved.add("LPT" & $i) + if result.toUpperAscii() in reserved: + result = "_" & result + # Avoid empty filenames + if result.len == 0: + result = "_" + +proc seqnoToUint64*(bytes: seq[byte]): uint64 = + if bytes.len != 8: + return 0 + var seqno: uint64 = 0 + for i in 0 ..< 8: + seqno = seqno or (uint64(bytes[i]) shl (8 * (7 - i))) + seqno