-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathMatchSimulation.scala
68 lines (59 loc) · 2.1 KB
/
MatchSimulation.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package ru.tinkoff.gatling.kafka.examples
import io.gatling.core.Predef._
import io.gatling.core.feeder.Feeder
import io.gatling.core.structure.ScenarioBuilder
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.DurationInt
class MatchSimulation extends Simulation {
val kafkaProtocolMatchByValue: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
),
)
.consumeSettings(
Map(
"bootstrap.servers" -> "localhost:9092",
),
)
.timeout(5.seconds)
// for match by message value
.matchByValue
def matchByOwnVal(message: KafkaProtocolMessage): Array[Byte] = {
// do something with the message and extract the values your are interested in
// method is called:
// - for each message which will be sent out
// - for each message which has been received
"Custom Message".getBytes // just returning something
}
val kafkaProtocolMatchByMessage: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
),
)
.consumeSettings(
Map(
"bootstrap.servers" -> "localhost:9092",
),
)
.timeout(5.seconds)
.matchByMessage(matchByOwnVal)
val c = new AtomicInteger(0)
val feeder: Feeder[Int] = Iterator.continually(Map("kekey" -> c.incrementAndGet()))
val scn: ScenarioBuilder = scenario("Basic")
.feed(feeder)
.exec(
kafka("ReqRep").requestReply
.requestTopic("test.t")
.replyTopic("test.t")
.send[String, String]("#{kekey}", """{ "m": "dkf" }"""),
)
setUp(scn.inject(atOnceUsers(1))).protocols(kafkaProtocolMatchByMessage).maxDuration(120.seconds)
}