Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .changeset/shiny-friends-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"@effect/platform-node": minor
"@effect/platform": minor
---

Add UDP Socket support with platform-agnostic interface and Node.js implementation

Introduces comprehensive UDP socket functionality following the existing Socket API patterns:

- Platform-agnostic UdpSocket interface in @effect/platform
- Node.js implementation using dgram module in @effect/platform-node
- Support for sending/receiving datagrams with sender address information
- Proper resource management with Effect's acquireRelease pattern
- Comprehensive error handling for bind, send, and receive operations
- Full test coverage including edge cases and cleanup scenarios

UDP sockets are message-oriented and connectionless, unlike TCP's stream-oriented approach. Each datagram includes complete
sender information, enabling flexible communication patterns for real-time applications, game networking, and distributed
systems.
194 changes: 194 additions & 0 deletions packages/platform-bun/src/BunUdpSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* @since 1.0.0
*/
import * as UdpSocket from "@effect/platform/UdpSocket"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import type * as Scope from "effect/Scope"

/**
* @since 1.0.0
* @category tags
*/
export interface BunUdpSocket {
readonly _: unique symbol
}

/**
* @since 1.0.0
* @category tags
*/
export const BunUdpSocket: Context.Tag<BunUdpSocket, any> = Context.GenericTag(
"@effect/platform-bun/BunUdpSocket/BunUdpSocket"
)

/**
* Creates a UDP socket using Bun's networking APIs.
*
* Bun provides Node.js-compatible networking APIs, so this implementation
* leverages Bun's dgram module for UDP socket operations.
*
* @example
* ```ts
* import { BunUdpSocket } from "@effect/platform-bun"
* import { Effect, Queue } from "effect"
*
* const program = Effect.gen(function*() {
* // Create server socket
* const server = yield* BunUdpSocket.make({ _tag: "UdpAddress", hostname: "0.0.0.0", port: 8080 })
*
* // Create client socket
* const client = yield* BunUdpSocket.make({ _tag: "UdpAddress", hostname: "0.0.0.0", port: 0 })
* const clientAddress = client.address
*
* // Set up message handling on server
* const messages = yield* Queue.unbounded()
* const serverFiber = yield* Effect.fork(
* server.run((message) => messages.offer(message))
* )
*
* // Send message from client to server
* const testMessage = new TextEncoder().encode("Hello Server!")
* yield* client.send(testMessage, { _tag: "UdpAddress", hostname: "127.0.0.1", port: 8080 })
*
* // Receive message on server
* const received = yield* messages.take
* console.log(`Server received: ${new TextDecoder().decode(received.data)}`)
* console.log(`From client: ${received.remoteAddress.hostname}:${received.remoteAddress.port}`)
*
* yield* Effect.fiberInterrupt(serverFiber)
* })
*
* Effect.runPromise(Effect.scoped(program))
* ```
*
* @param address - UDP address configuration for the socket
* @returns Effect that creates a UDP socket with proper resource management
*
* @since 1.0.0
* @category constructors
*/
export const make = (
address?: Partial<UdpSocket.UdpAddress>
): Effect.Effect<UdpSocket.UdpSocket, UdpSocket.UdpSocketError, Scope.Scope> =>
Effect.gen(function*() {
// Bun has Node.js compatibility for dgram
const dgram = yield* Effect.promise(() => import("node:dgram"))

const socket = yield* Effect.acquireRelease(
Effect.async<any, UdpSocket.UdpSocketError>((resume) => {
const socket = dgram.createSocket("udp4")
const port = address?.port ?? 0
const hostname = address?.hostname ?? "0.0.0.0"

socket.on("error", (error) => {
resume(Effect.fail(new UdpSocket.UdpSocketGenericError({ reason: "Bind", cause: error })))
})

socket.bind(port, hostname, () => {
socket.removeAllListeners("error")
resume(Effect.succeed(socket))
})
}),
(socket) =>
Effect.sync(() => {
socket.removeAllListeners()
try {
socket.close()
} catch (error) {
// Expected for double-close, ignore silently
if (error instanceof Error && error.message === "Not running") {
return
}
// For unexpected errors during automatic cleanup, we silently ignore them
// to prevent scope cleanup failures. Users can call socket.close() explicitly
// if they need to handle close errors.
}
})
)

let isClosed = false

// Get the actual bound address
const actualAddress = socket.address()
const boundAddress: UdpSocket.UdpAddress = {
_tag: "UdpAddress",
hostname: actualAddress.address,
port: actualAddress.port
}

return {
[UdpSocket.TypeId]: UdpSocket.TypeId,
address: boundAddress,
close: Effect.async<void, UdpSocket.UdpSocketError>((resume) => {
isClosed = true
try {
socket.close(() => {
resume(Effect.void)
})
} catch (error) {
// Surface close errors to users who call close() explicitly
resume(Effect.fail(new UdpSocket.UdpSocketGenericError({ reason: "Close", cause: error })))
}
}),
send: (data: Uint8Array, address: UdpSocket.UdpAddress) =>
Effect.async<void, UdpSocket.UdpSocketError>((resume) => {
if (isClosed) {
resume(Effect.fail(new UdpSocket.UdpSocketGenericError({ reason: "Send", cause: "Socket is closed" })))
return
}
socket.send(data, address.port, address.hostname, (error: any) => {
if (error) {
resume(Effect.fail(new UdpSocket.UdpSocketGenericError({ reason: "Send", cause: error })))
} else {
resume(Effect.void)
}
})
}),
run: <_, E = never, R = never>(handler: (_: UdpSocket.UdpMessage) => Effect.Effect<_, E, R> | void) =>
Effect.async<void, UdpSocket.UdpSocketError | E>((resume) => {
function onMessage(msg: Buffer, rinfo: any) {
const message: UdpSocket.UdpMessage = {
data: new Uint8Array(msg),
remoteAddress: {
_tag: "UdpAddress",
hostname: rinfo.address,
port: rinfo.port
}
}
const result = handler(message)
if (Effect.isEffect(result)) {
// Improved error handling: don't fail the entire server on individual message errors
Effect.runPromise(result as Effect.Effect<_, E, never>).catch((error) => {
// Log individual message errors but don't fail the server
console.error("Error handling UDP message:", error)
})
}
}

function onError(error: Error) {
resume(Effect.fail(new UdpSocket.UdpSocketGenericError({ reason: "Receive", cause: error })))
}

socket.on("message", onMessage)
socket.on("error", onError)

return Effect.sync(() => {
socket.off("message", onMessage)
socket.off("error", onError)
})
})
}
})

/**
* Creates a Layer that provides a UDP socket bound to the specified address.
*
* @since 1.0.0
* @category layers
*/
export const layer = (
address?: Partial<UdpSocket.UdpAddress>
): Layer.Layer<UdpSocket.UdpSocket, UdpSocket.UdpSocketError> =>
Layer.scoped(UdpSocket.UdpSocket, make(address))
5 changes: 5 additions & 0 deletions packages/platform-bun/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ export * as BunStream from "./BunStream.js"
*/
export * as BunTerminal from "./BunTerminal.js"

/**
* @since 1.0.0
*/
export * as BunUdpSocket from "./BunUdpSocket.js"

/**
* @since 1.0.0
*/
Expand Down
Loading