Skip to content

Commit e6c2434

Browse files
authored
Merge pull request #3 from Amerousful/v2.0
Release 2.0
2 parents 8835417 + 18c63f8 commit e6c2434

23 files changed

+946
-19
lines changed

README.md

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Add to your `pom.xml`
1111
<dependency>
1212
<groupId>io.github.amerousful</groupId>
1313
<artifactId>gatling-kafka</artifactId>
14-
<version>1.1</version>
14+
<version>2.0</version>
1515
</dependency>
1616
```
1717

@@ -20,7 +20,7 @@ Add to your `pom.xml`
2020
Add to your `build.sbt`
2121

2222
```scala
23-
libraryDependencies += "io.github.amerousful" % "gatling-kafka" % "1.1"
23+
libraryDependencies += "io.github.amerousful" % "gatling-kafka" % "2.0"
2424
```
2525

2626
Import:
@@ -70,6 +70,26 @@ Request and reply:
7070
.key("#{id} #{key}")
7171
.check(jsonPath("$.m").is("#{payload}_1"))
7272
```
73+
***
74+
Scenario:
75+
```scala
76+
scenario("Kafka Scenario")
77+
.exec(kafkaFireAndForget)
78+
```
79+
80+
Inject:
81+
82+
```scala
83+
setUp(
84+
scn.inject(
85+
constantUsersPerSec(2) during(10 seconds)
86+
).protocols(kafkaProtocol)
87+
)
88+
```
89+
90+
91+
### [Java example](src/test/scala/KafkaJavaExample.java)
92+
### [Kotlin example](src/test/scala/KafkaKotlinExample.kt)
7393

7494
***
7595

@@ -119,7 +139,7 @@ object CustomMatcher extends KafkaMatcher {
119139
.messageMatcher(CustomMatcher)
120140

121141
```
122-
\
142+
123143
### Chain for build a request:
124144
```text
125145
send ->

build.sbt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
name := "gatling-kafka"
22

3-
version := "1.1"
3+
version := "2.0"
44

5-
scalaVersion := "2.13.8"
5+
scalaVersion := "2.13.12"
6+
7+
val gatlingVersion = "3.9.5"
68

79
libraryDependencies ++= Seq(
8-
"io.gatling" % "gatling-core" % "3.10.3" % "provided",
10+
"io.gatling" % "gatling-core" % gatlingVersion % "provided",
11+
"io.gatling" % "gatling-core-java" % gatlingVersion % "provided",
12+
"com.github.spotbugs" % "spotbugs-annotations" % "4.7.3",
913

1014
"com.typesafe.akka" %% "akka-stream-kafka" % "4.0.2",
1115
"com.typesafe.akka" %% "akka-protobuf-v3" % "2.6.20",

changelog.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Changelog 2.0 (09-02-2024)
2+
----------------------------
3+
* Implement support for both Java and Kotlin
4+
* Downgrade Gatling to version '3.9.5'. The backward compatibility is broken for ActionBuilder in version 3.10.x
5+
* Add support for regular expression (regex) check
6+
* Add support for header check
7+
* Fix initialization issue with tracker poll consumer for Send request
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
package io.github.amerousful.kafka.javaapi;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import io.github.amerousful.kafka.request.KafkaDslBuilderBase;
6+
import io.gatling.commons.validation.Validation;
7+
import io.gatling.core.session.Session;
8+
import scala.Function1;
9+
10+
public class Kafka {
11+
private final KafkaDslBuilderBase wrapped;
12+
13+
public Kafka(Function1<Session, Validation<String>> name) {
14+
wrapped = new KafkaDslBuilderBase(name);
15+
}
16+
17+
@NonNull
18+
public KafkaSendActionBuilder.Topic send() {
19+
return new KafkaSendActionBuilder.Topic(wrapped.send());
20+
}
21+
22+
@NonNull
23+
public RequestReplyActionBuilder.Topic requestReply() {
24+
return new RequestReplyActionBuilder.Topic(wrapped.requestReply());
25+
}
26+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.github.amerousful.kafka.javaapi;
2+
3+
public final class KafkaBroker {
4+
5+
private final io.github.amerousful.kafka.protocol.KafkaBroker wrapped;
6+
7+
KafkaBroker(io.github.amerousful.kafka.protocol.KafkaBroker wrapped) {
8+
this.wrapped = wrapped;
9+
}
10+
11+
public io.github.amerousful.kafka.protocol.KafkaBroker asScala() {
12+
return wrapped;
13+
}
14+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package io.github.amerousful.kafka.javaapi;
2+
3+
import edu.umd.cs.findbugs.annotations.NonNull;
4+
import io.gatling.core.check.Check;
5+
import io.gatling.core.check.CheckMaterializer;
6+
import io.gatling.javaapi.core.CheckBuilder;
7+
import io.gatling.javaapi.core.Session;
8+
import io.github.amerousful.kafka.javaapi.internal.KafkaCheckType;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
11+
import java.util.function.Function;
12+
13+
import static io.gatling.javaapi.core.internal.Converters.toScalaFunction;
14+
import static io.gatling.javaapi.core.internal.Expressions.*;
15+
import static io.gatling.javaapi.core.internal.Expressions.toExpression;
16+
17+
public final class KafkaDsl {
18+
private KafkaDsl() {
19+
}
20+
21+
public static KafkaProtocolBuilder kafka =
22+
new KafkaProtocolBuilder(io.github.amerousful.kafka.Predef.kafka(io.gatling.core.Predef.configuration()));
23+
24+
@NonNull
25+
public static KafkaBroker KafkaBroker(@NonNull String host, int port) {
26+
return new KafkaBroker(new io.github.amerousful.kafka.protocol.KafkaBroker(host, port));
27+
}
28+
29+
@NonNull
30+
public static Kafka kafka(@NonNull String name) {
31+
return new Kafka(toStringExpression(name));
32+
}
33+
34+
@NonNull
35+
public static Kafka kafka(@NonNull Function<Session, String> name) {
36+
return new Kafka(javaFunctionToExpression(name));
37+
}
38+
39+
@NonNull
40+
public static CheckBuilder.MultipleFind<String> header(@NonNull CharSequence name) {
41+
return new CheckBuilder.MultipleFind.Default<>(
42+
io.github.amerousful.kafka.Predef.header(toStaticValueExpression(name)),
43+
KafkaCheckType.Header,
44+
String.class,
45+
null);
46+
}
47+
48+
@NonNull
49+
public static CheckBuilder.MultipleFind<String> header(@NonNull String name) {
50+
return new CheckBuilder.MultipleFind.Default<>(
51+
io.github.amerousful.kafka.Predef.header(toExpression(name, CharSequence.class)),
52+
KafkaCheckType.Header,
53+
String.class,
54+
null);
55+
}
56+
57+
@NonNull
58+
public static CheckBuilder.MultipleFind<String> header(
59+
@NonNull Function<Session, CharSequence> name) {
60+
return new CheckBuilder.MultipleFind.Default<>(
61+
io.github.amerousful.kafka.Predef.header(javaFunctionToExpression(name)),
62+
KafkaCheckType.Header,
63+
String.class,
64+
null);
65+
}
66+
67+
@SuppressWarnings("rawtypes")
68+
public static CheckBuilder simpleCheck(Function<ConsumerRecord<String, String>, Boolean> f) {
69+
return new CheckBuilder() {
70+
@Override
71+
public io.gatling.core.check.CheckBuilder<?, ?> asScala() {
72+
return new io.gatling.core.check.CheckBuilder() {
73+
@Override
74+
public Check<?> build(CheckMaterializer materializer) {
75+
return io.github.amerousful.kafka.Predef.simpleCheck(toScalaFunction(f::apply));
76+
}
77+
};
78+
}
79+
80+
@Override
81+
public CheckType type() {
82+
return KafkaCheckType.Simple;
83+
}
84+
};
85+
}
86+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
2+
package io.github.amerousful.kafka.javaapi;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.clients.producer.ProducerRecord;
7+
8+
9+
public interface KafkaMessageMatcher {
10+
11+
@NonNull
12+
String requestMatchId(@NonNull ProducerRecord<String, String> msg);
13+
14+
@NonNull
15+
String responseMatchId(@NonNull ConsumerRecord<String, String> msg);
16+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package io.github.amerousful.kafka.javaapi;
2+
3+
import edu.umd.cs.findbugs.annotations.NonNull;
4+
import io.gatling.core.protocol.Protocol;
5+
import io.gatling.javaapi.core.ProtocolBuilder;
6+
import io.github.amerousful.kafka.protocol.javaapi.KafkaMessageMatchers;
7+
import scala.Enumeration;
8+
9+
import java.time.Duration;
10+
import java.util.Arrays;
11+
import java.util.List;
12+
import java.util.stream.Collectors;
13+
14+
import static io.gatling.javaapi.core.internal.Converters.toScalaDuration;
15+
import static io.gatling.javaapi.core.internal.Converters.toScalaSeq;
16+
17+
public final class KafkaProtocolBuilder implements ProtocolBuilder {
18+
19+
private final io.github.amerousful.kafka.protocol.KafkaProtocolBuilder wrapped;
20+
21+
KafkaProtocolBuilder(io.github.amerousful.kafka.protocol.KafkaProtocolBuilder wrapped) {
22+
this.wrapped = wrapped;
23+
}
24+
25+
@NonNull
26+
public KafkaProtocolBuilder broker(KafkaBroker broker) {
27+
return new KafkaProtocolBuilder(wrapped.broker(broker.asScala()));
28+
}
29+
30+
@NonNull
31+
public KafkaProtocolBuilder brokers(KafkaBroker... brokers) {
32+
List<io.github.amerousful.kafka.protocol.KafkaBroker> listOfBrokersAsScala =
33+
Arrays.stream(brokers).map(KafkaBroker::asScala).collect(Collectors.toList());
34+
return new KafkaProtocolBuilder(wrapped.brokers(toScalaSeq(listOfBrokersAsScala)));
35+
}
36+
37+
@NonNull
38+
public KafkaProtocolBuilder acks(@NonNull String acks) {
39+
return new KafkaProtocolBuilder(wrapped.acks(acks));
40+
}
41+
42+
@NonNull
43+
public KafkaProtocolBuilder producerKeySerializer(@NonNull String serializer) {
44+
return new KafkaProtocolBuilder(wrapped.producerKeySerializer(serializer));
45+
}
46+
47+
@NonNull
48+
public KafkaProtocolBuilder producerValueSerializer(@NonNull String serializer) {
49+
return new KafkaProtocolBuilder(wrapped.producerValueSerializer(serializer));
50+
}
51+
52+
@NonNull
53+
public KafkaProtocolBuilder consumerKeyDeserializer(@NonNull String deserializer) {
54+
return new KafkaProtocolBuilder(wrapped.consumerKeyDeserializer(deserializer));
55+
}
56+
57+
@NonNull
58+
public KafkaProtocolBuilder consumerValueDeserializer(@NonNull String deserializer) {
59+
return new KafkaProtocolBuilder(wrapped.consumerValueDeserializer(deserializer));
60+
}
61+
62+
@NonNull
63+
public KafkaProtocolBuilder consumerIdenticalDeserializer(@NonNull String deserializer) {
64+
return consumerKeyDeserializer(deserializer)
65+
.consumerValueDeserializer(deserializer);
66+
}
67+
68+
@NonNull
69+
public KafkaProtocolBuilder producerIdenticalSerializer(@NonNull String serializer) {
70+
return new KafkaProtocolBuilder(wrapped.producerIdenticalSerializer(serializer));
71+
}
72+
73+
@NonNull
74+
public KafkaProtocolBuilder addProducerProperty(@NonNull String key, @NonNull String value) {
75+
return new KafkaProtocolBuilder(wrapped.addProducerProperty(key, value));
76+
}
77+
78+
@NonNull
79+
public KafkaProtocolBuilder addConsumerProperty(@NonNull String key, @NonNull String value) {
80+
return new KafkaProtocolBuilder(wrapped.addConsumerProperty(key, value));
81+
}
82+
83+
@NonNull
84+
public KafkaProtocolBuilder credentials(@NonNull String username, @NonNull String password, boolean sslEnabled, @NonNull Enumeration.Value saslMechanism) {
85+
return new KafkaProtocolBuilder(wrapped.credentials(username, password, sslEnabled, saslMechanism));
86+
}
87+
88+
@NonNull
89+
public KafkaProtocolBuilder replyTimeout(long timeout) {
90+
return replyTimeout(Duration.ofSeconds(timeout));
91+
}
92+
93+
@NonNull
94+
public KafkaProtocolBuilder replyTimeout(@NonNull Duration timeout) {
95+
return new KafkaProtocolBuilder(wrapped.replyTimeout(toScalaDuration(timeout)));
96+
}
97+
98+
@NonNull
99+
public KafkaProtocolBuilder messageMatcher(@NonNull KafkaMessageMatcher matcher) {
100+
return new KafkaProtocolBuilder(wrapped.messageMatcher(KafkaMessageMatchers.toScala(matcher)));
101+
}
102+
103+
@NonNull
104+
public KafkaProtocolBuilder matchByKey() {
105+
return new KafkaProtocolBuilder(wrapped.matchByKey());
106+
}
107+
108+
@NonNull
109+
public KafkaProtocolBuilder matchByValue() {
110+
return new KafkaProtocolBuilder(wrapped.matchByValue());
111+
}
112+
113+
@NonNull
114+
public KafkaProtocolBuilder replyConsumerName(@NonNull String name) {
115+
return new KafkaProtocolBuilder(wrapped.replyConsumerName(name));
116+
}
117+
118+
@Override
119+
public Protocol protocol() {
120+
return wrapped.build();
121+
}
122+
}

0 commit comments

Comments
 (0)