Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.Test;

public class NamespaceOwnershipStatusTest {
Expand Down Expand Up @@ -55,6 +57,10 @@ public void testSerialization() throws Exception {
assertTrue(nsStatus.is_active);
}
}
assertEquals(jsonMapper.writeValueAsString(nsMap), jsonStr);
JSONAssert.assertEquals(
jsonMapper.writeValueAsString(nsMap),
jsonStr,
JSONCompareMode.STRICT
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.Test;

public class NamespaceIsolationPoliciesTest {
Expand Down Expand Up @@ -67,7 +69,11 @@ public void testJsonSerialization() throws Exception {
assertEquals(new String(secondaryBrokersJson), "[\"prod1-broker.*.use.example.com\"]");

byte[] outJson = jsonMapperForWriter.writeValueAsBytes(policies.getPolicies());
assertEquals(new String(outJson), this.defaultJson);
JSONAssert.assertEquals(
new String(outJson),
this.defaultJson,
JSONCompareMode.STRICT
);

Map<String, String> parameters = new HashMap<>();
parameters.put("min_limit", "1");
Expand Down
7 changes: 7 additions & 0 deletions pulsar-functions/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.json.JSONException;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -83,7 +86,7 @@ public void testAutoAckConvertFailed() {
}

@Test
public void testConvertBackFidelity() {
public void testConvertBackFidelity() throws JSONException {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant("test-tenant");
functionConfig.setNamespace("test-namespace");
Expand Down Expand Up @@ -121,9 +124,10 @@ public void testConvertBackFidelity() {
functionConfig.setResources(Resources.getDefaultResources());
// set default cleanupSubscription config
functionConfig.setCleanupSubscription(true);
assertEquals(
JSONAssert.assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
new Gson().toJson(convertedConfig),
JSONCompareMode.STRICT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.json.JSONException;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -99,7 +102,7 @@ public void testAutoAckConvertFailed() throws IOException {
}

@Test
public void testConvertBackFidelity() throws IOException {
public void testConvertBackFidelity() throws IOException, JSONException {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant("test-tenant");
sinkConfig.setNamespace("test-namespace");
Expand Down Expand Up @@ -143,9 +146,10 @@ public void testConvertBackFidelity() throws IOException {
new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.SHARED, functionDetails.getSource().getSubscriptionType());
SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
JSONAssert.assertEquals(
new Gson().toJson(convertedConfig),
new Gson().toJson(sinkConfig)
new Gson().toJson(sinkConfig),
JSONCompareMode.STRICT
);

sinkConfig.setRetainOrdering(true);
Expand All @@ -155,9 +159,11 @@ public void testConvertBackFidelity() throws IOException {
new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.FAILOVER, functionDetails.getSource().getSubscriptionType());
convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
JSONAssert.assertEquals(
new Gson().toJson(convertedConfig),
new Gson().toJson(sinkConfig));
new Gson().toJson(sinkConfig),
JSONCompareMode.STRICT
);

sinkConfig.setRetainOrdering(false);
sinkConfig.setRetainKeyOrdering(true);
Expand All @@ -166,9 +172,11 @@ public void testConvertBackFidelity() throws IOException {
new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.KEY_SHARED, functionDetails.getSource().getSubscriptionType());
convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
JSONAssert.assertEquals(
new Gson().toJson(convertedConfig),
new Gson().toJson(sinkConfig));
new Gson().toJson(sinkConfig),
JSONCompareMode.STRICT
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.SourceContext;
import org.json.JSONException;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -115,13 +118,14 @@ public void testMergeEqual() {
}

@Test
public void testBatchConfigMergeEqual() {
public void testBatchConfigMergeEqual() throws JSONException {
SourceConfig sourceConfig = createSourceConfigWithBatch();
SourceConfig newSourceConfig = createSourceConfigWithBatch();
SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
assertEquals(
JSONAssert.assertEquals(
new Gson().toJson(sourceConfig),
new Gson().toJson(mergedConfig)
new Gson().toJson(mergedConfig),
JSONCompareMode.STRICT
);
}

Expand Down
7 changes: 7 additions & 0 deletions pulsar-io/elastic-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -99,8 +101,12 @@ public GenericObject getValue() {
"keyIgnore", "true"), null);
Pair<String, String> pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getLeft(), "1");
assertEquals(pair.getRight(), "{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,"
+ "\"f\":1.0,\"i\":1,\"l\":10}}");
JSONAssert.assertEquals(
pair.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,"
+ "\"f\":1.0,\"i\":1,\"l\":10}}",
JSONCompareMode.STRICT
);
elasticSearchSink.close();

// two fields PK
Expand All @@ -112,9 +118,20 @@ public GenericObject getValue() {
"schemaEnable", "true",
"keyIgnore", "true"), null);
Pair<String, String> pair2 = elasticSearchSink2.extractIdAndDocument(genericObjectRecord);
assertEquals(pair2.getLeft(), "[\"1\",1]");
assertEquals(pair2.getRight(), "{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,"
+ "\"f\":1.0,\"i\":1,\"l\":10}}");

// NON_EXTENSIBLE is NOT extensible and does NOT have strict ordering so both
// possibilities ["1",1] and [1,"1"] will pass
JSONAssert.assertEquals(
pair2.getLeft(),
"[\"1\",1]",
JSONCompareMode.NON_EXTENSIBLE
);
JSONAssert.assertEquals(
pair2.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,"
+ "\"f\":1.0,\"i\":1,\"l\":10}}",
JSONCompareMode.STRICT
);
elasticSearchSink2.close();

// default config with null PK => indexed with auto generated _id
Expand All @@ -124,8 +141,12 @@ public GenericObject getValue() {
"compatibilityMode", "ELASTICSEARCH"), null);
Pair<String, String> pair3 = elasticSearchSink3.extractIdAndDocument(genericObjectRecord);
assertNull(pair3.getLeft());
assertEquals(pair3.getRight(), "{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,"
+ "\"f\":1.0,\"i\":1,\"l\":10}}");
JSONAssert.assertEquals(
pair3.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,"
+ "\"f\":1.0,\"i\":1,\"l\":10}}",
JSONCompareMode.STRICT
);
elasticSearchSink3.close();

// default config with null PK + null value
Expand Down
7 changes: 7 additions & 0 deletions pulsar-io/kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
Expand Down Expand Up @@ -510,15 +512,21 @@ public Optional<Long> getEventTime() {
ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false);

assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ "\"payload\":{},"
+ "\"eventTime\":1648502845803}");
JSONAssert.assertEquals(
json,
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ "\"payload\":{},\"eventTime\":1648502845803}",
JSONCompareMode.STRICT
);

json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, true);

assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ "\"payload\":{},"
+ "\"eventTime\":1648502845803}");
JSONAssert.assertEquals(
json,
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ "\"payload\":{},\"eventTime\":1648502845803}",
JSONCompareMode.STRICT
);
}

@Test
Expand Down
Loading