Skip to content

Commit

Permalink
fix: Coherence with close state
Browse files Browse the repository at this point in the history
  • Loading branch information
Hansanto committed Oct 20, 2024
1 parent b6b5b87 commit 0702d2c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ actual fun TcpServer(port: Int): TcpServer = KtorTcpServer(port)
class KtorTcpServer(
private val port: Int = 0,
private val acceptDelayMillis: Int = 0,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
dispatcher: CoroutineDispatcher = Dispatchers.IO
) : TcpServer {

private val selectorManager = SelectorManager(dispatcher)
private val serverScope = CoroutineScope(SupervisorJob() + dispatcher)
private var serverSocket: ServerSocket? = null

override fun listen(block: (socket: TcpSocket) -> Unit) {
require(serverSocket == null) { "Server is already listening" }
require(serverScope.isActive) { "Server is closed and cannot be restarted" }
require(serverSocket == null) { "Server is already started" }

serverScope.launch(start = CoroutineStart.UNDISPATCHED) {
serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
Expand Down Expand Up @@ -66,13 +67,12 @@ class KtorTcpServer(
}

override suspend fun address(): Address {
requireNotNull(serverSocket) {
"Server is not listening, please call listen() before calling address()"
}
require(serverScope.isActive) { "Server is closed" }
requireNotNull(serverSocket) { "Server is not listening, please call listen() before calling address()" }

return withTimeout(1000) {
var address: Address
while (true) {
var address: Address? = null
while (address == null) {
val serverSocket = requireNotNull(serverSocket) {
"close() was called during a call to address()"
}
Expand All @@ -81,8 +81,7 @@ class KtorTcpServer(
address = (serverSocket.localAddress as InetSocketAddress).let {
Address(it.hostname, it.port)
}
break
} catch (e: Exception) {
} catch (_: Exception) {
delay(50)
continue
}
Expand All @@ -92,11 +91,14 @@ class KtorTcpServer(
}

override fun close() {
if (serverSocket != null) {
serverSocket?.let { serverSocket ->
// Cancel the server scope only if the server was started before
// If the server was not started, the scope keeps being active and server can be started for the first time
// Otherwise, the scope is cancelled to prevent the server from being started again
serverScope.cancel()
selectorManager.close()
serverSocket!!.close()
serverSocket = null
serverSocket.close()
this.serverSocket = null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import node.buffer.Buffer
import node.events.Event
import node.net.AddressInfo
import node.net.createServer
import node.test.it
import okio.IOException
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
Expand Down Expand Up @@ -51,9 +52,11 @@ internal class NodeTcpServer(private val port: Int) : TcpServer {

private var server: WrappedServer? = null
private var address: Address? = null
private var isClosed = false

override fun listen(block: (socket: TcpSocket) -> Unit) {
require(server == null) { "Server is already listening" }
require(!isClosed) { "Server is closed and cannot be restarted" }
require(server == null) { "Server is already started" }

server = createServer { netSocket ->
block(NodeTcpSocket(netSocket))
Expand All @@ -63,11 +66,14 @@ internal class NodeTcpServer(private val port: Int) : TcpServer {
}

override suspend fun address(): Address {
requireNotNull(server) { "Server is not listening, please call listen() before calling address()" }
require(!isClosed) { "Server is closed" }
val server = requireNotNull(this.server) { "Server is not listening, please call listen() before calling address()" }

val server = server!!
return address ?: suspendCoroutine { cont ->
server.once(Event.LISTENING) {
val server = requireNotNull(this.server) {
"close() was called during a call to address()"
}
val address = server.address().unsafeCast<AddressInfo>()
this.address = Address(address.address, address.port.toInt()).also {
cont.resume(it)
Expand All @@ -77,10 +83,13 @@ internal class NodeTcpServer(private val port: Int) : TcpServer {
}

override fun close() {
server?.let {
it.close()
server = null
address = null
if(isClosed) return

server?.let { server ->
server.close()
this.server = null
// Is closed only if the server was started before
isClosed = true
}
}
}
Expand Down

0 comments on commit 0702d2c

Please sign in to comment.