Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): Upgrade dependencies #11

Merged
merged 7 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public final class com/apollographql/mockserver/TextMessage : com/apollographql/
public final fun getText ()Ljava/lang/String;
}

public final class com/apollographql/mockserver/VersionKt {
public static final field VERSION Ljava/lang/String;
}

public abstract interface class com/apollographql/mockserver/WebSocketBody {
public abstract fun close ()V
public abstract fun enqueueMessage (Lcom/apollographql/mockserver/WebSocketMessage;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// - Show manifest properties: true
// - Show declarations: true

// Library unique name: <apollo-kotlin-mockserver:mockserver>
// Library unique name: <com.apollographql.mockserver:apollo-mockserver>
abstract interface com.apollographql.mockserver/MockRequestBase { // com.apollographql.mockserver/MockRequestBase|null[0]
abstract val headers // com.apollographql.mockserver/MockRequestBase.headers|{}headers[0]
abstract fun <get-headers>(): kotlin.collections/Map<kotlin/String, kotlin/String> // com.apollographql.mockserver/MockRequestBase.headers.<get-headers>|<get-headers>(){}[0]
Expand Down Expand Up @@ -137,6 +137,8 @@ final class com.apollographql.mockserver/WebsocketMockRequest : com.apollographq
final val version // com.apollographql.mockserver/WebsocketMockRequest.version|{}version[0]
final fun <get-version>(): kotlin/String // com.apollographql.mockserver/WebsocketMockRequest.version.<get-version>|<get-version>(){}[0]
}
final const val com.apollographql.mockserver/VERSION // com.apollographql.mockserver/VERSION|{}VERSION[0]
final fun <get-VERSION>(): kotlin/String // com.apollographql.mockserver/VERSION.<get-VERSION>|<get-VERSION>(){}[0]
final fun (com.apollographql.mockserver/MockServer).com.apollographql.mockserver/assertNoRequest() // com.apollographql.mockserver/assertNoRequest|[email protected](){}[0]
final fun (com.apollographql.mockserver/MockServer).com.apollographql.mockserver/enqueue(kotlin/String = ..., kotlin/Long = ..., kotlin/Int = ...) // com.apollographql.mockserver/enqueue|[email protected](kotlin.String;kotlin.Long;kotlin.Int){}[0]
final fun (com.apollographql.mockserver/MockServer).com.apollographql.mockserver/enqueueError(kotlin/Int) // com.apollographql.mockserver/enqueueError|[email protected](kotlin.Int){}[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ interface TcpSocket: Closeable {
override fun close()
}

/**
* A server that handles [TcpSocket]
* [TcpServer] is not thread safe and its method must be called from the same coroutine.
*/
interface TcpServer : Closeable {
/**
* Starts listening and calls [block] when on incoming connections
* Starts a background coroutine that calls [block] on incoming connections.
* [listen] can only be called once.
*/
fun listen(block: (socket: TcpSocket) -> Unit)

Expand All @@ -41,7 +46,7 @@ interface TcpServer : Closeable {

/**
* Closes the server.
*
* [close] is idempotent.
*/
override fun close()
}
Expand All @@ -51,4 +56,4 @@ class Address(
val port: Int
)

expect fun TcpServer(port: Int): TcpServer
expect fun TcpServer(port: Int): TcpServer
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,63 @@ package com.apollographql.mockserver

import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ServerSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.writeFully
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import okio.IOException
import io.ktor.network.sockets.Socket as WrappedSocket

actual fun TcpServer(port: Int): TcpServer = KtorTcpServer(port)

class KtorTcpServer(port: Int = 0, private val acceptDelayMillis: Int = 0, dispatcher: CoroutineDispatcher = Dispatchers.IO) : TcpServer {
class KtorTcpServer(
private val port: Int = 0,
private val acceptDelayMillis: Int = 0,
dispatcher: CoroutineDispatcher = Dispatchers.IO
) : TcpServer {

private val selectorManager = SelectorManager(dispatcher)
private val scope = CoroutineScope(SupervisorJob() + dispatcher)
private val serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)
private val serverScope = CoroutineScope(SupervisorJob() + dispatcher)
private var serverSocket: ServerSocket? = null

override fun listen(block: (socket: TcpSocket) -> Unit) {
require(serverScope.isActive) { "Server is closed and cannot be restarted" }
require(serverSocket == null) { "Server is already started" }

override fun close() {
scope.cancel()
selectorManager.close()
serverSocket.close()
}
serverScope.launch(start = CoroutineStart.UNDISPATCHED) {
serverSocket = aSocket(selectorManager).tcp().bind("127.0.0.1", port)

override fun listen(block: (socket: TcpSocket) -> Unit) {
scope.launch {
while (true) {
while (isActive) {
if (acceptDelayMillis > 0) {
delay(acceptDelayMillis.toLong())
}
val socket: WrappedSocket = try {
serverSocket.accept()
serverSocket!!.accept()
} catch (t: Throwable) {
if (t is CancellationException) { throw t }
if (t is CancellationException) {
throw t
}
delay(1000)
continue
}

val ktorSocket = KtorTcpSocket(socket)
block(ktorSocket)

val ktorSocket = KtorTcpSocket(socket).apply(block)
launch {
ktorSocket.loop()
}
Expand All @@ -59,22 +67,40 @@ class KtorTcpServer(port: Int = 0, private val acceptDelayMillis: Int = 0, dispa
}

override suspend fun address(): Address {
require(serverScope.isActive) { "Server is closed" }
requireNotNull(serverSocket) { "Server is not listening, please call listen() before calling address()" }

return withTimeout(1000) {
var address: Address
while(true) {
var address: Address? = null
while (address == null) {
val serverSocket = requireNotNull(serverSocket) {
"close() was called during a call to address()"
}

try {
address = (serverSocket.localAddress as InetSocketAddress).let {
Address(it.hostname, it.port)
}
break
} catch (e: Exception) {
} catch (_: Exception) {
delay(50)
continue
}
}
address
}
}

override fun close() {
serverSocket?.let { serverSocket ->
// Cancel the server scope only if the server was started before
// If the server was not started, the scope keeps being active and server can be started for the first time
// Otherwise, the scope is cancelled to prevent the server from being started again
serverScope.cancel()
selectorManager.close()
serverSocket.close()
this.serverSocket = null
}
}
}

internal class KtorTcpSocket(private val socket: WrappedSocket) : TcpSocket {
Expand Down Expand Up @@ -127,4 +153,4 @@ internal class KtorTcpSocket(private val socket: WrappedSocket) : TcpSocket {
override fun send(data: ByteArray) {
writeQueue.trySend(data)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import node.buffer.Buffer
import node.events.Event
import node.net.AddressInfo
import node.net.createServer
import node.test.it
import okio.IOException
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
Expand All @@ -14,6 +15,7 @@ import node.net.Socket as WrappedSocket

internal class NodeTcpSocket(private val netSocket: WrappedSocket) : TcpSocket {
private val readQueue = Channel<ByteArray>(Channel.UNLIMITED)

init {
netSocket.on(Event.DATA) { chunk ->
val bytes = when (chunk) {
Expand Down Expand Up @@ -47,38 +49,48 @@ internal class NodeTcpSocket(private val netSocket: WrappedSocket) : TcpSocket {
}

internal class NodeTcpServer(private val port: Int) : TcpServer {

private var server: WrappedServer? = null
private var address: Address? = null

private var isClosed = false

override fun listen(block: (socket: TcpSocket) -> Unit) {
require(!isClosed) { "Server is closed and cannot be restarted" }
require(server == null) { "Server is already started" }

server = createServer { netSocket ->
block(NodeTcpSocket(netSocket))
}.apply {
listen(port)
}

server!!.listen(port)
}

override suspend fun address(): Address {
check(server != null) {
"You need to call start() before calling port()"
}
require(!isClosed) { "Server is closed" }
val server = requireNotNull(this.server) { "Server is not listening, please call listen() before calling address()" }

return address ?: suspendCoroutine { cont ->
server!!.on(Event.LISTENING) {
val address = server!!.address().unsafeCast<AddressInfo>()

this.address = Address(address.address, address.port.toInt())
cont.resume(this.address!!)
server.once(Event.LISTENING) {
val server = requireNotNull(this.server) {
"close() was called during a call to address()"
}
val address = server.address().unsafeCast<AddressInfo>()
this.address = Address(address.address, address.port.toInt()).also {
cont.resume(it)
}
}
}
}

override fun close() {
check(server != null) {
"server is not started"
if(isClosed) return

server?.let { server ->
server.close()
this.server = null
// Is closed only if the server was started before
isClosed = true
}
server!!.close()
}
}

Expand Down
12 changes: 6 additions & 6 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[versions]
kotlin-plugin = "2.0.0-RC2"
okio = "3.9.0"
ktor = "2.3.7"
atomicfu = "0.23.1"
kotlinx-coroutines = "1.8.0"
okhttp = "4.11.0"
kotlin-plugin = "2.0.21"
okio = "3.9.1"
ktor = "3.0.0"
atomicfu = "0.25.0"
kotlinx-coroutines = "1.9.0"
okhttp = "4.12.0"

[libraries]
kotlin-node = "org.jetbrains.kotlin-wrappers:kotlin-node:18.16.12-pre.634"
Expand Down
Loading