18
18
19
19
import static com .fasterxml .jackson .dataformat .yaml .YAMLGenerator .Feature .WRITE_DOC_START_MARKER ;
20
20
import static com .linecorp .decaton .processor .runtime .ProcessorProperties .CONFIG_PARTITION_CONCURRENCY ;
21
- import static org .junit .jupiter .api .Assertions .*;
21
+ import static org .junit .jupiter .api .Assertions .assertEquals ;
22
+ import static org .junit .jupiter .api .Assertions .assertFalse ;
23
+ import static org .junit .jupiter .api .Assertions .assertSame ;
24
+ import static org .junit .jupiter .api .Assertions .assertThrows ;
25
+ import static org .junit .jupiter .api .Assertions .assertTrue ;
26
+ import static org .junit .jupiter .api .Assertions .fail ;
22
27
import static org .mockito .ArgumentMatchers .any ;
23
28
import static org .mockito .ArgumentMatchers .eq ;
24
29
import static org .mockito .Mockito .doAnswer ;
36
41
37
42
import com .fasterxml .jackson .databind .ObjectMapper ;
38
43
import com .fasterxml .jackson .dataformat .yaml .YAMLFactory ;
39
- import lombok .extern .slf4j .Slf4j ;
40
- import org .junit .jupiter .api .Nested ;
41
44
import org .junit .jupiter .api .Test ;
42
45
import org .junit .jupiter .api .Timeout ;
43
46
import org .junit .jupiter .api .extension .RegisterExtension ;
60
63
import org .junit .jupiter .params .ParameterizedTest ;
61
64
import org .junit .jupiter .params .provider .MethodSource ;
62
65
63
- @ Slf4j
64
66
public class CentralDogmaPropertySupplierIntegrationTest {
65
67
@ RegisterExtension
66
68
final CentralDogmaExtension extension = new CentralDogmaExtension () {
@@ -210,16 +212,14 @@ void testCDIntegrationYaml() throws Exception {
210
212
assertEquals (20 , concurrency .value ());
211
213
assertEquals (java .util .Arrays .asList ("123456" , "79797979" ), ignoreKeys .value ());
212
214
213
- assertEquals (20 ,
214
- IntStream .range (0 , 10_000 )
215
- .mapToObj (i -> CONFIG_PARTITION_CONCURRENCY )
216
- .map (supplier ::getProperty )
217
- .reduce ((l , r ) -> {
218
- assertSame (l .get (), r .get ());
219
- return l ;
220
- })
221
- .orElseThrow ()
222
- .get ().value ().intValue ());
215
+ assertEquals (20 , IntStream
216
+ .range (0 , 10_000 )
217
+ .mapToObj (i -> CONFIG_PARTITION_CONCURRENCY )
218
+ .map (supplier ::getProperty )
219
+ .reduce ((l , r ) -> {
220
+ assertSame (l .get (), r .get ());
221
+ return l ;
222
+ }).get ().get ().value ().intValue ());
223
223
}
224
224
225
225
@ Test
@@ -242,32 +242,26 @@ public void testCDRegisterSuccessJson() {
242
242
243
243
@ Test
244
244
@ Timeout (10 )
245
- public void testCDRegisterSuccessYaml () {
245
+ public void testCDRegisterSuccessYaml () throws Exception {
246
246
String yamlFile = "/subscription.yaml" ;
247
247
CentralDogma client = extension .client ();
248
248
client .createProject (PROJECT_NAME ).join ();
249
249
CentralDogmaRepository centralDogmaRepository = client .createRepository (PROJECT_NAME , REPOSITORY_NAME ).join ();
250
250
251
251
CentralDogmaPropertySupplier .register (centralDogmaRepository , yamlFile );
252
252
253
- String expectedYaml = "decaton.ignore.keys: []\n "
254
- + "decaton.processing.rate.per.partition: -1\n "
255
- + "decaton.partition.concurrency: 1\n "
256
- + "decaton.max.pending.records: 10000\n "
257
- + "decaton.commit.interval.ms: 1000\n "
258
- + "decaton.group.rebalance.timeout.ms: 1000\n "
259
- + "decaton.processing.shutdown.timeout.ms: 0\n "
260
- + "decaton.logging.mdc.enabled: true\n "
261
- + "decaton.client.metrics.micrometer.bound: false\n "
262
- + "decaton.deferred.complete.timeout.ms: -1\n "
263
- + "decaton.processor.threads.termination.timeout.ms: 9223372036854775807\n "
264
- + "decaton.per.key.quota.processing.rate: -1\n "
265
- + "decaton.retry.task.in.legacy.format: false\n "
266
- + "decaton.legacy.parse.fallback.enabled: false\n " ;
267
-
268
253
String actualText = centralDogmaRepository .file (Query .ofText (yamlFile )).get ().join ().content ();
269
- assertEquals (expectedYaml , actualText );
270
- log .info ("Content of {}: {}" , yamlFile , actualText );
254
+
255
+ ObjectMapper yaml = new ObjectMapper (new YAMLFactory ());
256
+ JsonNode actual = yaml .readTree (actualText );
257
+ JsonNode expected = defaultProperties ();
258
+
259
+ assertEquals (expected .toString (), actual .toString (),
260
+ () -> "\n expected: " + expected .toPrettyString ()
261
+ + "\n actual: " + actual .toPrettyString ());
262
+
263
+ assertFalse (actualText .startsWith ("---" ), "YAML should not include doc start marker" );
264
+ assertFalse (actualText .trim ().startsWith ("{" ), "YAML should not be JSON text" );
271
265
}
272
266
273
267
@ Test
@@ -331,7 +325,7 @@ void testCDRegisterConflictYaml() throws Exception {
331
325
"# pushed by user‑B (should win the race)\n "
332
326
+ "foo: bar\n " ;
333
327
334
- JsonNode userBPush = Jackson .readTree ("{\" foo\" :\" bar\" }" );
328
+ JsonNode userBYamlAsJsonNode = Jackson .readTree ("{\" foo\" :\" bar\" }" );
335
329
336
330
String defaultYaml = new ObjectMapper (new YAMLFactory ().disable (WRITE_DOC_START_MARKER ))
337
331
.writeValueAsString (defaultProperties ());
@@ -344,9 +338,7 @@ void testCDRegisterConflictYaml() throws Exception {
344
338
.commit (any (), eq (Change .ofTextUpsert (FILE , defaultYaml )));
345
339
346
340
ExecutorService svc = Executors .newFixedThreadPool (2 );
347
-
348
341
svc .submit (() -> CentralDogmaPropertySupplier .register (userA , FILE ));
349
-
350
342
svc .submit (() -> {
351
343
try {
352
344
userAIsRunning .await ();
@@ -368,7 +360,7 @@ void testCDRegisterConflictYaml() throws Exception {
368
360
369
361
JsonNode actual = new ObjectMapper (new YAMLFactory ())
370
362
.readTree (entry .content ());
371
- assertEquals (userBPush , actual );
363
+ assertEquals (userBYamlAsJsonNode , actual );
372
364
}
373
365
374
366
0 commit comments