Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of DatagramSocket #69

Closed
wants to merge 0 commits into from
Closed

Implementation of DatagramSocket #69

wants to merge 0 commits into from

Conversation

antoniojimeneznieto
Copy link
Collaborator

I am working on implementing a UringDatagramSocket class by extending the DatagramSocket trait, and I have encountered an issue with the read function. Ideally, I was looking for a function in the io_uring library similar to recvfrom, but it doesn't exist. The closest alternative I found is recvmsg. However, I am not certain if using recvmsg is the best solution for implementing the read method in UringDatagramSocket. I would like to seek advice and opinions on whether this is a good approach, or if there is a more suitable alternative.

In addition to the read method implementation, I have also implemented the write method using the sendto function. I would appreciate any feedback or suggestions for potential improvements in the write method as well :)

@armanbilge
Copy link
Owner

Awesome! Yes, recvmsg seems right: I believe that it is a more general version of recvfrom. There is a msghdr type in Scala Native, hopefully we can use that here.
https://github.com/scala-native/scala-native/blob/364885493c7be0507fd40e62ad484dde995c352b/posixlib/src/main/scala/scala/scalanative/posix/sys/socket.scala#L23-L31

def apply[F[_]](ring: Uring[F], fd: Int)(implicit
F: Async[F]
): Resource[F, UringDatagramSocket[F]] =
ResizableBuffer(8192).evalMap { buf =>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum size of a datagram is 65535. So we can allocate a buffer of that size upfront (it doesn't need to be resizable). At least, that's how FS2 is doing it, maybe we can find an even better way.
https://github.com/typelevel/fs2/blob/5ac2a1148c1ecbdc4a3905a176ce3ac0fb369cc5/io/jvm/src/main/scala/fs2/io/net/AsynchronousDatagramSocketGroup.scala#L155

@antoniojimeneznieto
Copy link
Collaborator Author

I am struggling with the read/msg method, could I get some assistance please?

Comment on lines 35 to 36
private[this] def recvmsg(msg: Ptr[msghdr], flags: Int): F[Int] =
ring.call(io_uring_prep_recvmsg(_, fd, msg, flags))
Copy link
Owner

@armanbilge armanbilge Apr 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the msghdr struct is defined like this in Scala Native:

 type msghdr = CStruct7[
    Ptr[Byte], // msg_name
    socklen_t, // msg_namelen
    Ptr[uio.iovec], // msg_iov
    CInt, // msg_iovlen
    Ptr[Byte], // msg_control
    socklen_t, // msg_crontrollen
    CInt // msg_flags
  ]

https://github.com/scala-native/scala-native/blob/364885493c7be0507fd40e62ad484dde995c352b/posixlib/src/main/scala/scala/scalanative/posix/sys/socket.scala#L23-L31

which is equivalent to this C declaration:

struct iovec {                    /* Scatter/gather array items */
    void  *iov_base;              /* Starting address */
    size_t iov_len;               /* Number of bytes to transfer */
};

struct msghdr {
    void         *msg_name;       /* optional address */
    socklen_t     msg_namelen;    /* size of address */
    struct iovec *msg_iov;        /* scatter/gather array */
    size_t        msg_iovlen;     /* # elements in msg_iov */
    void         *msg_control;    /* ancillary data, see below */
    size_t        msg_controllen; /* ancillary data buffer len */
    int           msg_flags;      /* flags on received message */
};

https://linux.die.net/man/2/recvmsg

So I think what we want to do is:

  1. before calling io_uring_prep_recvmsg, we should setup msghdr with the pre-allocated buffer. I think we should provide it as the msg_iov.

  2. after io_uring_prep_recvmsg completes, we can read the datagram out of the buffer. The return value will tell us how many bytes were recieved. Additionally, we can retrieve the address out of msg_name using msg_namelen as the length.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have clarified all my doubts, thank you very much :)

buf <- buffer.get(defaultReadSize)

iov <- F.delay {
val iov = stackalloc[uio.iovec]()
Copy link
Owner

@armanbilge armanbilge Apr 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the trick with stack allocations, is that they only exist for the duration of the function in which they are created. So in this case, the iov allocation will actually go away as soon as the delay completes, because a delay(...) is actually a lambda function delay(() => ...).

I think we can allocate the iovec and msghdr structures at the same time as we allocate the read buffer when we create the DatagramSocket. Because we can only have one read in progress at a time, we will only need one and the we can re-use it.

Comment on lines 118 to 134
.evalMap { buf =>
(Mutex[F], Mutex[F]).mapN { (readMutex, writeMutex) =>
val iov = malloc(sizeof[uio.iovec]).asInstanceOf[Ptr[uio.iovec]]
val remoteAddr = malloc(sizeof[sockaddr_storage]).asInstanceOf[Ptr[sockaddr_storage]]
val msg = malloc(sizeof[msghdr]).asInstanceOf[Ptr[msghdr]]

new UringDatagramSocket(ring, fd, buf, 65535, readMutex, writeMutex, iov, msg, remoteAddr)
}
}
.flatMap { socket =>
val release: F[Unit] = F.delay {
free(socket.iov.asInstanceOf[Ptr[Byte]])
free(socket.msg.asInstanceOf[Ptr[Byte]])
free(socket.remoteAddr.asInstanceOf[Ptr[Byte]])
}
Resource.make(F.pure(socket))(_ => release)
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Resource.eval(...).flatMap(Resource.make(pure(...))(...)) we should just write Resource.make(...)(...) 🙂

ResizableBuffer(65535).evalMap { buf =>
(Mutex[F], Mutex[F]).mapN(new UringDatagramSocket(ring, fd, buf, 65535, _, _))
}
ResizableBuffer(65535)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we go back to the resizable buffer? I think we can just allocate this with malloc along with the iov and msghdr.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I still have to work on the buffer

Comment on lines 120 to 122
val iov = malloc(sizeof[uio.iovec]).asInstanceOf[Ptr[uio.iovec]]
val remoteAddr = malloc(sizeof[sockaddr_storage]).asInstanceOf[Ptr[sockaddr_storage]]
val msg = malloc(sizeof[msghdr]).asInstanceOf[Ptr[msghdr]]
Copy link
Owner

@armanbilge armanbilge Apr 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

malloc may return null I think (due to out-of-memory) we should check for that.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example:

val ptr = malloc(size.toUInt)
if (ptr == null)
throw new RuntimeException(s"malloc: ${errno}")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to make a more generic function in utils that allocates memory size given and check for null ?

Copy link
Owner

@armanbilge armanbilge Apr 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely! It can return a Resource[F, Ptr[Byte]] that frees itself on release.

@armanbilge armanbilge linked an issue Apr 24, 2023 that may be closed by this pull request
@antoniojimeneznieto
Copy link
Collaborator Author

Is it worth considering the nonReizableBuffer class? Or is it better to just allocate a new buffer every time we need it with a malloc ?

@armanbilge
Copy link
Owner

Yes I agree, I don't think that it needs a dedicated class. We can just use a pointer and store the size (which in this case is a constant anyway :)

Copy link
Owner

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 160 to 168
def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] =
F.delay {
SocketAddressHelpers.toSocketAddress { (addr, len) =>
if (getsockname(fd, addr, len) == -1)
Left(new IOException(s"getsockname: ${errno}"))
else
Either.unit
}
}.flatMap(_.liftTo)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is same as here right? Maybe we should move it to SocketAddressHelpers so we can use it in both places.

def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] =
F.delay {
SocketAddressHelpers.toSocketAddress { (addr, len) =>
if (getsockname(fd, addr, len) == -1)
Left(new IOException(s"getsockname: ${errno}"))
else
Either.unit
}
}.flatMap(_.liftTo)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :) Yes!! I am going to work on the tests and I think it's an excellent idea to move this piece of code to the SocketAdressHelper 👍

Comment on lines 63 to 69
_ <- F.delay {
iov._1 = buffer
iov._2 = defaultReadSize.toULong
}

_ <- F.delay {
msg.msg_name = remoteAddr.asInstanceOf[Ptr[Byte]]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can combine these delay blocks, saves some performance.

Also defaultReadSize will always be 65535 (maximum size of a datagram) so we can hardcode this constant, something like this maybe:

object UringDatagramSocket {
  final val MaxDatagramSize = 65535

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that it might be interesting to make a mallocResources function that takes a list of sizes and returns Resource[F,List[Ptr[Byte]]] so we can combine all the delays block in the apply function. What do you think ?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking about this as well! I was wondering if we can do:

mallocResource(sizeA + sizeB + sizeC).flatMap { ptr =>
  val ptrA = ptr
  val ptrB = ptr + sizeA
  val ptrC = ptr + sizeA + sizeB
  ...
}

Then we can avoid the List as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's brilliant!

Comment on lines 80 to 83
remoteAddress <- F.fromEither(
SocketAddressHelpers
.toSocketAddress(remoteAddr)
)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I made a mistake before. I think this does need to be suspended in a delay block, because even though the conversion is not a side-effect, remoteAddr is mutable memory and reading mutable memory is a side-effect.

sendto(ptr, slice.length, 0, addr, len).void
}
}
.unlessA(datagram.bytes.isEmpty)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some googling and actually I think it's possible to send an empty datagram. So we should make sure that we can handle that case.

@@ -33,4 +37,14 @@ private[uring] object util {
bytes
}

def mallocResource[F[_]: Sync](size: CSize): Resource[F, Ptr[Byte]] =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@antoniojimeneznieto
Copy link
Collaborator Author

In the FS2 implementation we use a method to open a DatagramSocket. I was thinking about following that model and creating an UringDatagramSocketGroup class that would take care of opening the UringDatagramSocket.

Then UringNetwork could take a new argument (UringDatagramSocketGroup) and create an openDatagramSocket method based on the UringDatagramSocketGroup one. What do you think ?

@armanbilge
Copy link
Owner

I was thinking about following that model and creating an UringDatagramSocketGroup class that would take care of opening the UringDatagramSocket.

Yes, definitely, that makes sense to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Proof-of-concept Datagram support
3 participants