-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathSender.scala
141 lines (128 loc) · 5.12 KB
/
Sender.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package org.embulk.output.fluentd.sender
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import akka._
import akka.io.Inet.SO.ReuseAddress
import akka.pattern.ask
import akka.stream._
import akka.stream.scaladsl._
import akka.util.{ByteString, Timeout}
import org.slf4j.Logger
import scala.concurrent._
import scala.concurrent.duration._
import scala.util._
trait Sender {
def close(): Unit
val instance: SourceQueueWithComplete[Seq[Map[String, AnyRef]]]
def apply(value: Seq[Map[String, AnyRef]]): Future[QueueOfferResult]
def sendCommand(size: Int, byteString: ByteString): Future[Done]
def waitForComplete(): Result
}
case class SenderImpl private[sender] (host: String,
port: Int,
groupedSize: Int,
asyncSize: Int,
senderFlow: SenderFlow,
actorManager: ActorManager,
asyncSizeRequestPerSecond: Int = 0,
retryCount: Int = 0,
retryDelayIntervalSecond: Int = 10)(implicit logger: Logger)
extends Sender {
import actorManager._
system.scheduler.schedule(0.seconds, 30.seconds, supervisor, LogStatus(logger))
val retryDelayIntervalSecondDuration: FiniteDuration = retryDelayIntervalSecond.seconds
def apply(value: Seq[Map[String, AnyRef]]): Future[QueueOfferResult] = {
actorManager.supervisor ! Record(value.size)
instance.offer(value)
}
def close(): Unit = {
implicit val timeout = Timeout(5.seconds)
val f: Future[ClosedStatus] = (actorManager.supervisor ? Close).mapTo[ClosedStatus]
val result = Await.result(f, Duration.Inf)
if (!result.alreadyClosed) {
logger.debug("wait for closing.")
// wait for akka-stream termination.
instance.complete()
val result = waitForComplete()
Await.result(actorManager.terminate(), Duration.Inf)
actorManager.system.terminate()
logger.info(
s"Completed RecordCount:${result.record} completedCount:${result.complete} retriedRecordCount:${result.retried}")
}
}
def waitForComplete(): Result = {
logger.debug("wait for complete.")
var result: Option[Result] = None
implicit val timeout = Timeout(5.seconds)
while (result.isEmpty) {
(actorManager.supervisor ? GetStatus).onComplete {
case Success(Result(recordCount, complete, failed, retried)) =>
logger.debug(s"current status ${Result(recordCount, complete, failed, retried)}")
if (recordCount == (complete + failed)) {
result = Some(Result(recordCount, complete, failed, retried))
}
case Success(Stop(recordCount, complete, failed, retried)) =>
result = Some(Result(recordCount, complete, failed, retried))
case _ =>
sys.error("fail of wait complete.")
}
Thread.sleep(1000)
}
result.get
}
val instance: SourceQueueWithComplete[Seq[Map[String, AnyRef]]] = {
val base = Source
.queue(Int.MaxValue, OverflowStrategy.backpressure)
.grouped(groupedSize)
.via(senderFlow.msgPackFlow)
val withThrottle = if (asyncSizeRequestPerSecond > 0) {
base.throttle(asyncSize, asyncSizeRequestPerSecond.seconds, 0, ThrottleMode.Shaping)
} else base
withThrottle
.mapAsync(asyncSize) {
case (size, byteString) =>
sendCommand(size, byteString)
}
.to(Sink.ignore)
.run()
}
val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection(
InetSocketAddress.createUnresolved(host, port),
None,
List(ReuseAddress(false)),
halfClose = true,
Duration(3, TimeUnit.MINUTES),
Duration(3, TimeUnit.MINUTES)
)
def sendCommand(size: Int, byteString: ByteString): Future[Done] = {
val command = Source
.single(byteString)
.via(connection)
def _sendCommand(size: Int, c: Int)(retried: Boolean): Future[Done] = {
val futureCommand = command.runWith(Sink.ignore)
futureCommand.onComplete {
case Success(_) =>
actorManager.supervisor ! Complete(size)
case Failure(e) if c > 0 =>
logger.info(
s"Sending fluentd ${size.toString} records was failed. - will retry ${c - 1} more times ${retryDelayIntervalSecondDuration.toSeconds} seconds later.",
e)
actorManager.supervisor ! Retried(size)
Thread.sleep(retryDelayIntervalSecondDuration.toSeconds)
_sendCommand(size, c - 1)(retried = true)
case Failure(e) =>
actorManager.supervisor ! Failed(size)
logger.error(
s"Sending fluentd retry count is over and will be terminate soon. Please check your fluentd environment.",
e)
sys.error("Sending fluentd was terminated cause of retry count over.")
instance.complete()
}
futureCommand
}
_sendCommand(size, retryCount)(retried = false).recoverWith {
case _: Exception =>
Future.successful(Done)
}
}
}