From 39f3c65f41d663548bab54a0d412856135e2054e Mon Sep 17 00:00:00 2001 From: "masatoshi.shimada" Date: Fri, 21 Jul 2017 20:14:28 +0900 Subject: [PATCH 1/2] apply ReuseAddress true / Refactoring tcp handling. --- .../embulk/output/fluentd/sender/Sender.scala | 85 ++++++++++++------- .../output/fluentd/sender/SenderFlow.scala | 6 -- 2 files changed, 52 insertions(+), 39 deletions(-) diff --git a/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala b/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala index 3e833c1..c126961 100644 --- a/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala +++ b/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala @@ -1,6 +1,10 @@ package org.embulk.output.fluentd.sender +import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit + import akka._ +import akka.io.Inet.SO.ReuseAddress import akka.pattern.ask import akka.stream._ import akka.stream.scaladsl._ @@ -15,7 +19,7 @@ trait Sender { def close(): Unit val instance: SourceQueueWithComplete[Seq[Map[String, AnyRef]]] def apply(value: Seq[Map[String, AnyRef]]): Future[QueueOfferResult] - def tcpHandling(size: Int, byteString: ByteString): Future[Done] + def sendCommand(size: Int, byteString: ByteString): Future[Done] def waitForComplete(): Result } @@ -87,45 +91,60 @@ case class SenderImpl private[sender] (host: String, withThrottle .mapAsync(asyncSize) { case (size, byteString) => - tcpHandling(size, byteString) + sendCommand(size, byteString) } .to(Sink.ignore) .run() } - def sendCommand(byteString: ByteString): Future[Done] = - Source - .single(byteString) - .via(senderFlow.tcpConnectionFlow(host, port)) - .runWith(Sink.ignore) + def sendCommand(size: Int, byteString: ByteString): Future[Done] = { + val futureCommand = tcpOutgoing(size, byteString) + futureCommand.onComplete { + case Success(_) => + actorManager.supervisor ! Complete(size) + case Failure(e) => + actorManager.supervisor ! Failed(size) + instance.complete() + logger.error( + s"Sending fluentd retry count is over and will be terminate soon. Please check your fluentd environment.", + e) + sys.error("Sending fluentd was terminated cause of retry count over.") + } + futureCommand + } + + val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection( + InetSocketAddress.createUnresolved(host, port), + None, + List(ReuseAddress(true)), + halfClose = true, + Duration(3, TimeUnit.MINUTES), + Duration(3, TimeUnit.MINUTES) + ) - def tcpHandling(size: Int, byteString: ByteString): Future[Done] = { - def _tcpHandling(size: Int, byteString: ByteString, c: Int)(retried: Boolean): Future[Done] = { - val futureCommand = sendCommand(byteString) - futureCommand.onComplete { - case Success(_) => - actorManager.supervisor ! Complete(size) - case Failure(e) if c > 0 => - logger.info( - s"Sending fluentd ${size.toString} records was failed. - will retry ${c - 1} more times ${retryDelayIntervalSecondDuration.toSeconds} seconds later.", - e) - actorManager.supervisor ! Retried(size) - akka.pattern.after(retryDelayIntervalSecondDuration, actorManager.system.scheduler)( - _tcpHandling(size, byteString, c - 1)(retried = true)) - case Failure(e) => - actorManager.supervisor ! Failed(size) - logger.error( - s"Sending fluentd retry count is over and will be terminate soon. Please check your fluentd environment.", - e) - sys.error("Sending fluentd was terminated cause of retry count over.") - instance.complete() + def tcpOutgoing(size: Int, byteString: ByteString): Future[Done] = { + val command = Source + .single(byteString) + .mapAsync(1) { v => + Source + .single(v) + .via(connection) + .toMat(Sink.ignore)(Keep.right) + .run() } - futureCommand - } - _tcpHandling(size, byteString, retryCount)(retried = false).recoverWith { - case _: Exception => - Future.successful(Done) - } + command + .recoverWithRetries( + retryCount, { + case v: Throwable => + logger.info( + s"Sending fluentd ${size.toString} records was failed. - will retry ${retryDelayIntervalSecondDuration.toSeconds} seconds later.", + v) + actorManager.supervisor ! Retried(size) + command.initialDelay(retryDelayIntervalSecondDuration) + } + ) + .toMat(Sink.ignore)(Keep.right) + .run() } } diff --git a/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala b/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala index 94e6d10..6c55b9e 100644 --- a/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala +++ b/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala @@ -11,8 +11,6 @@ import scala.concurrent.Future trait SenderFlow { val msgPackFlow: Flow[Seq[Seq[Map[String, AnyRef]]], (Int, ByteString), NotUsed] - def tcpConnectionFlow(host: String, port: Int)( - implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] } case class SenderFlowImpl private[sender] (tag: String, unixtime: Long, timeKeyOpt: Option[String]) @@ -29,8 +27,4 @@ case class SenderFlowImpl private[sender] (tag: String, unixtime: Long, timeKeyO } (packing.size, ByteString(MsgPack.pack(Seq(tag, packing)))) } - override def tcpConnectionFlow(host: String, port: Int)( - implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] = - Tcp().outgoingConnection(host, port) - } From c9239365092fb8628fd9de9fe06bfc6f0c81cf3e Mon Sep 17 00:00:00 2001 From: "masatoshi.shimada" Date: Fri, 21 Jul 2017 22:11:05 +0900 Subject: [PATCH 2/2] fix --- .../embulk/output/fluentd/sender/Sender.scala | 65 ++++++++----------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala b/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala index c126961..11c763d 100644 --- a/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala +++ b/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala @@ -97,54 +97,45 @@ case class SenderImpl private[sender] (host: String, .run() } - def sendCommand(size: Int, byteString: ByteString): Future[Done] = { - val futureCommand = tcpOutgoing(size, byteString) - futureCommand.onComplete { - case Success(_) => - actorManager.supervisor ! Complete(size) - case Failure(e) => - actorManager.supervisor ! Failed(size) - instance.complete() - logger.error( - s"Sending fluentd retry count is over and will be terminate soon. Please check your fluentd environment.", - e) - sys.error("Sending fluentd was terminated cause of retry count over.") - } - futureCommand - } - val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection( InetSocketAddress.createUnresolved(host, port), None, - List(ReuseAddress(true)), + List(ReuseAddress(false)), halfClose = true, Duration(3, TimeUnit.MINUTES), Duration(3, TimeUnit.MINUTES) ) - def tcpOutgoing(size: Int, byteString: ByteString): Future[Done] = { + def sendCommand(size: Int, byteString: ByteString): Future[Done] = { val command = Source .single(byteString) - .mapAsync(1) { v => - Source - .single(v) - .via(connection) - .toMat(Sink.ignore)(Keep.right) - .run() + .via(connection) + def _sendCommand(size: Int, c: Int)(retried: Boolean): Future[Done] = { + val futureCommand = command.runWith(Sink.ignore) + futureCommand.onComplete { + case Success(_) => + actorManager.supervisor ! Complete(size) + case Failure(e) if c > 0 => + logger.info( + s"Sending fluentd ${size.toString} records was failed. - will retry ${c - 1} more times ${retryDelayIntervalSecondDuration.toSeconds} seconds later.", + e) + actorManager.supervisor ! Retried(size) + Thread.sleep(retryDelayIntervalSecondDuration.toSeconds) + _sendCommand(size, c - 1)(retried = true) + case Failure(e) => + actorManager.supervisor ! Failed(size) + logger.error( + s"Sending fluentd retry count is over and will be terminate soon. Please check your fluentd environment.", + e) + sys.error("Sending fluentd was terminated cause of retry count over.") + instance.complete() } - command - .recoverWithRetries( - retryCount, { - case v: Throwable => - logger.info( - s"Sending fluentd ${size.toString} records was failed. - will retry ${retryDelayIntervalSecondDuration.toSeconds} seconds later.", - v) - actorManager.supervisor ! Retried(size) - command.initialDelay(retryDelayIntervalSecondDuration) - } - ) - .toMat(Sink.ignore)(Keep.right) - .run() + futureCommand + } + _sendCommand(size, retryCount)(retried = false).recoverWith { + case _: Exception => + Future.successful(Done) + } } }