Skip to content

Commit 0c1da8a

Browse files
committed
NO-ISSUE Fix which keys are used to update dynamic properties
1 parent 92530e9 commit 0c1da8a

File tree

3 files changed

+121
-17
lines changed

3 files changed

+121
-17
lines changed

centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,14 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor
121121
}
122122

123123
rootWatcher.watch(node -> {
124-
for (PropertyDefinition<?> definition : PROPERTY_DEFINITIONS) {
125-
DynamicProperty<?> p = cachedProperties.get(definition.name());
126-
if (p != null && node.has(definition.name())) {
124+
for(ConcurrentHashMap.Entry<String, DynamicProperty<?>> cachedProperty : cachedProperties.entrySet()) {
125+
if (node.has(cachedProperty.getKey())) {
127126
try {
128-
setValue(p, node.get(definition.name()));
127+
setValue(cachedProperty.getValue(), node.get(cachedProperty.getKey()));
129128
} catch (Exception e) {
130129
// Catching Exception instead of RuntimeException, since
131130
// Kotlin-implemented DynamicProperty would throw checked exceptions
132-
logger.warn("Failed to set value from CentralDogma for {}", definition.name(), e);
131+
logger.warn("Failed to set value updatedfrom CentralDogma for {}", cachedProperty.getKey(), e);
133132
}
134133
}
135134
}
@@ -165,7 +164,7 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
165164
} catch (Exception e) {
166165
// Catching Exception instead of RuntimeException, since
167166
// Kotlin-implemented DynamicProperty would throw checked exceptions
168-
logger.warn("Failed to set value from CentralDogma for {}", definition.name(), e);
167+
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
169168
}
170169
return prop;
171170
});

centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java

Lines changed: 97 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import com.fasterxml.jackson.databind.ObjectMapper;
4343
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
44+
import com.linecorp.decaton.processor.runtime.PropertyDefinition;
4445
import org.junit.jupiter.api.Test;
4546
import org.junit.jupiter.api.Timeout;
4647
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -222,6 +223,98 @@ void testCDIntegrationYaml() throws Exception {
222223
}).get().get().value().intValue());
223224
}
224225

226+
@Test
227+
@Timeout(50)
228+
public void testCDIntegrationDynamicPropertyJson() throws InterruptedException {
229+
final String FILE = "/subscription.json";
230+
CentralDogma client = extension.client();
231+
232+
client.createProject(PROJECT_NAME).join();
233+
CentralDogmaRepository repo =
234+
client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join();
235+
236+
final String topic = "orders";
237+
final String dynamicName =
238+
"decaton.shaping.topic.processing.rate.per.partition." + topic;
239+
assertFalse(ProcessorProperties.defaultProperties().stream()
240+
.anyMatch(p -> p.definition().name().equals(dynamicName)));
241+
242+
final String ORIGINAL =
243+
"{\n"
244+
+ " \"" + dynamicName + "\": 7,\n"
245+
+ " \"decaton.partition.concurrency\": 10\n"
246+
+ "}\n";
247+
248+
repo.commit("init-json", Change.ofJsonUpsert(FILE, ORIGINAL)).push().join();
249+
250+
CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE);
251+
252+
PropertyDefinition<Integer> DYNAMIC_INT =
253+
PropertyDefinition.define(dynamicName, Integer.class, 0, v -> v instanceof Integer);
254+
255+
Property<Integer> prop = supplier.getProperty(DYNAMIC_INT).get();
256+
assertEquals(7, prop.value().intValue());
257+
258+
final String UPDATED =
259+
"{\n"
260+
+ " \"" + dynamicName + "\": 11,\n" // This is changed
261+
+ " \"decaton.partition.concurrency\": 10\n"
262+
+ "}\n";
263+
264+
CountDownLatch latch = new CountDownLatch(2);
265+
prop.listen((oldV, newV) -> latch.countDown());
266+
267+
repo.commit("patch-json", Change.ofJsonPatch(FILE, ORIGINAL, UPDATED)).push().join();
268+
269+
latch.await();
270+
assertEquals(11, prop.value().intValue());
271+
}
272+
273+
@Test
274+
@Timeout(50)
275+
public void testCDIntegrationDynamicPropertyYaml() throws Exception {
276+
final String FILE = "/subscription.yaml";
277+
CentralDogma client = extension.client();
278+
279+
client.createProject(PROJECT_NAME).join();
280+
CentralDogmaRepository repo =
281+
client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join();
282+
283+
final String topic = "payments";
284+
final String dynamicName =
285+
"decaton.shaping.topic.processing.rate.per.partition." + topic;
286+
assertFalse(ProcessorProperties.defaultProperties().stream()
287+
.anyMatch(p -> p.definition().name().equals(dynamicName)));
288+
289+
final String ORIGINAL_YAML =
290+
"decaton.partition.concurrency: 10\n"
291+
+ dynamicName + ": 3\n";
292+
293+
repo.commit("init-yaml", Change.ofTextUpsert(FILE, ORIGINAL_YAML))
294+
.push().join();
295+
296+
CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE);
297+
298+
PropertyDefinition<Integer> DYNAMIC_INT =
299+
PropertyDefinition.define(dynamicName, Integer.class, 0, v -> v instanceof Integer);
300+
301+
Property<Integer> prop = supplier.getProperty(DYNAMIC_INT).get();
302+
assertEquals(3, prop.value().intValue());
303+
304+
final String UPDATED_YAML =
305+
"decaton.partition.concurrency: 10\n"
306+
+ dynamicName + ": 9\n"; // This is changed
307+
308+
CountDownLatch latch = new CountDownLatch(2);
309+
prop.listen((o, n) -> latch.countDown());
310+
311+
repo.commit("patch-yaml", Change.ofTextPatch(FILE, ORIGINAL_YAML, UPDATED_YAML))
312+
.push().join();
313+
314+
latch.await();
315+
assertEquals(9, prop.value().intValue());
316+
}
317+
225318
@Test
226319
@Timeout(10)
227320
public void testCDRegisterSuccessJson() {
@@ -414,7 +507,7 @@ static Stream<FormatCase> formats() {
414507
return Stream.of(JSON, YAML);
415508
}
416509

417-
@ParameterizedTest(name = "registerTimeout‑{0}")
510+
@ParameterizedTest()
418511
@MethodSource("formats")
419512
@Timeout(15)
420513
void testCDRegisterTimeout(FormatCase testCase) {
@@ -433,7 +526,7 @@ void testCDRegisterTimeout(FormatCase testCase) {
433526
});
434527
}
435528

436-
@ParameterizedTest(name = "registerNonExistentProject‑{0}")
529+
@ParameterizedTest()
437530
@MethodSource("formats")
438531
void testCDRegisterNonExistentProject(FormatCase testCase) {
439532
assertThrows(RuntimeException.class, () -> {
@@ -442,7 +535,7 @@ void testCDRegisterNonExistentProject(FormatCase testCase) {
442535
});
443536
}
444537

445-
@ParameterizedTest(name = "fileExists‑{0}")
538+
@ParameterizedTest()
446539
@MethodSource("formats")
447540
void testFileExist(FormatCase testCase) {
448541
CentralDogma client = extension.client();
@@ -457,7 +550,7 @@ void testFileExist(FormatCase testCase) {
457550
.fileExists(centralDogmaRepository, testCase.file(), Revision.HEAD));
458551
}
459552

460-
@ParameterizedTest(name = "fileNonExistent‑{0}")
553+
@ParameterizedTest()
461554
@MethodSource("formats")
462555
void testFileNonExistent(FormatCase testCase) {
463556
CentralDogma client = extension.client();

docs/dynamic-property-configuration.adoc

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
= Dynamic Property Configuration
22
:base_version: 9.0.0
33
:modules: centraldogma,processor
4+
:toc:
45

56
== Property Supplier
67
Decaton provides some properties for you to configure how it processes tasks. These properties don't need to be hard-coded. Decaton lets you configure some of the properties so they can be loaded dynamically.
@@ -79,7 +80,7 @@ public class CentralDogmaSupplierMain {
7980
}
8081
----
8182

82-
== YAML Support
83+
== Use YAML instead of JSON
8384

8485
You can store the property file in YAML as well as JSON.
8586
Nothing changes in your code except the file‐name extension.
@@ -98,8 +99,6 @@ CentralDogmaPropertySupplier supplier =
9899
<1> Use `.yaml` (or `.yml`) instead of `.json`.
99100
All other APIs and behaviours remain exactly the same.
100101

101-
=== Authoring the YAML file
102-
103102
You may keep the flat, dot‑separated keys.
104103

105104
[source,yaml]
@@ -110,11 +109,11 @@ decaton.processing.rate.per.partition: 50
110109
----
111110

112111
As with JSON, you cannot use nested structures in YAML.
113-
Therefore, the following is not allowed:
112+
Therefore, the following is NOT allowed:
114113

115114
[source,yaml]
116115
----
117-
# This is not supported.
116+
# This style is not supported.
118117
decaton:
119118
partition:
120119
concurrency: 8
@@ -143,7 +142,7 @@ Hence you should put supplier with higher priority before others.
143142
----
144143

145144

146-
== JSON Schema
145+
== Validate your configuration file with JSON Schema
147146

148147
Decaton ships a set of https://json-schema.org/[JSON Schema] files that precisely describe every key available in `CentralDogmaPropertySupplier` including each key’s type and default value.
149148
Leveraging these schemas in your configuration files gives you two immediate benefits:
@@ -157,6 +156,7 @@ Leveraging these schemas in your configuration files gives you two immediate ben
157156

158157

159158
[source,json]
159+
.decaton-processor-dynamic-configuration.json
160160
----
161161
{
162162
"$schema": "https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json",
@@ -165,7 +165,19 @@ Leveraging these schemas in your configuration files gives you two immediate ben
165165
...
166166
}
167167
----
168-
For example, you can use JSON Schema by adding a `$schema` directive at the top of the file as shown above.follows.
168+
169+
[source,yaml]
170+
.decaton-processor-dynamic-configuration.yaml
171+
----
172+
# $schema: https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json[https://raw.githubusercontent.com/Yang-33/decaton/support-decaton-processor-property-jsonschema-9-1-2-test/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_2019_09-allow-additional-properties.json]
173+
# yaml-language-server: $schema=https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json
174+
decaton.partition.concurrency: 10000
175+
decaton.processing.rate.per.partition: -1
176+
...
177+
----
178+
179+
180+
For example, you can use JSON Schema by adding a `$schema` directive at the top of the file as shown above follows.
169181
Of course, there may be other ways to use it.
170182
Replace `vX.Y.Z` with the exact Decaton version your application depends on.
171183
If you prefer living at HEAD, you can also reference `master`, but pinning to a release tag guarantees repeatable builds.

0 commit comments

Comments
 (0)