Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listening socket is not released #177

Open
serrufSe opened this issue Dec 14, 2020 · 2 comments
Open

Listening socket is not released #177

serrufSe opened this issue Dec 14, 2020 · 2 comments

Comments

@serrufSe
Copy link
Contributor

serrufSe commented Dec 14, 2020

Hello
I've found what listening sock might not be released, since scalapb.zio_grpc.Server.ServiceImpl releas underlying io.grpc.Server with io.grpc.Server#shutdown method
Should scalapb.zio_grpc.Server.ServiceImpl#toManaged releas with io.grpc.Server#shutdownNow instead?
Is there a good reason not to do this?
Or maybe a pull request for more flexible behavior would be more appropriate ?
Reproducing description:

proto file

syntax = "proto3";

package examples;

message Request {
    string name = 1;
    bytes data = 2;
}

message Empty {}

service TransportService {

  rpc Transmit(stream Request) returns (Empty) ;
}

Server

package zio_grpc.examples

import java.util.concurrent.TimeUnit

import examples.greeter.{Empty, Request, ZioGreeter}
import io.grpc.{ServerBuilder, Status}
import scalapb.zio_grpc.ManagedServer
import zio._
import zio.duration._

import scala.concurrent.Await
import scala.concurrent.duration.Duration


class TransportService(queue: Queue[Request]) extends ZioGreeter.ZTransportService[ZEnv, Any] {
  override def transmit(request: stream.Stream[Status, Request]): ZIO[zio.ZEnv with Any, Status, Empty] =
    request.foreach(request => queue.offer(request)).as(Empty())
}


object ServerEx {

  val app = for {
    queue <- Queue.unbounded[Request]
    promise <- Promise.make[Nothing, Unit]
    _ <- (queue.take *> promise.succeed()).repeat(Schedule.fixed(1.seconds)).fork
    _ <- ManagedServer.fromService(ServerBuilder
      .forPort(9000), new TransportService(queue)).use{_ =>
      promise.await}
  } yield ()

  def main(args: Array[String]): Unit = {
    val runtime = zio.Runtime.default

    Await.result(runtime.unsafeRunToFuture(app.exitCode), Duration.apply(1, TimeUnit.HOURS))

    println("zio app done")

    Thread.sleep(1000000)
  }
}

Client

package zio_grpc.examples

import examples.greeter.Request
import examples.greeter.ZioGreeter.TransportServiceClient
import io.grpc.ManagedChannelBuilder
import scalapb.zio_grpc.ZManagedChannel
import zio.URIO
import zio.stream.ZStream
import zio._

object ClientEx extends App {

  val clientStream = for {
    queue <- Queue.unbounded[Request]
    _ <- queue.offer(Request())
    _ <- TransportServiceClient.transmit(ZStream.fromQueue(queue))
  } yield ()


  val layer = TransportServiceClient.live(ZManagedChannel(ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()))

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = clientStream.provideCustomLayer(layer).exitCode
}

After server "zio app done" socket still exists

lsof -i :9000
COMMAND    PID  USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    476816 pavel   85u  IPv6 5566989      0t0  TCP localhost:9000->localhost:35922 (ESTABLISHED)
java    476892 pavel   81u  IPv6 5565078      0t0  TCP localhost:35922->localhost:9000 (ESTABLISHED)

And the client application will not be notified in any way about the server shutdown.
Socket will be released and client will stop only after killing the entire server application (kill -9 476816 for my example )

@ollyw
Copy link
Contributor

ollyw commented Oct 7, 2021

Just to add to this, it seems counter-intuitive that shutdown on the underlying channel returns when the shutdown has started and not completed. In the ZIO/effect world of async, it could be more useful to start shutdown then return the result of io.grpc.ManagedChannel.awaitTermination using blocking effect. That way normal idiomatic things like channel.shutdown.race(ZIO....) can be used. The downside of this is that ManagedChannel.shutdown doesn't force clients to disconnect, so shutdownNow could perhaps be used if the user passes a parameter forceShutdownNow to the ZChannel shutdown method.

Alternatively ZChannel could expose awaitShutdown which provides a thinner abstraction, but less idiomatic. Do you have an opinion @thesamet? If either solution works for you, I could create an MR

@thesamet
Copy link
Contributor

thesamet commented Oct 7, 2021

In the ZIO/effect world of async, it could be more useful to start shutdown then return the result of io.grpc.ManagedChannel.awaitTermination using blocking effect.

I agree. Will be open to a PR that makes the shutdown effect more idiomatic.I am also ok with exposing the low-level methods but they should be presented as such so only users that want more control will use them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants