Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions centraldogma/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"
api "com.linecorp.centraldogma:centraldogma-client:$centralDogmaVersion"
implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion"

testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "com.linecorp.centraldogma:centraldogma-testing-junit:$centralDogmaVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.decaton.centraldogma;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -38,22 +40,26 @@
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.ChangeConflictException;
import com.linecorp.centraldogma.common.PathPattern;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.PropertyDefinition;
import com.linecorp.decaton.processor.runtime.PropertySupplier;

import static com.linecorp.decaton.processor.runtime.ProcessorProperties.PROPERTY_DEFINITIONS;

/**
* A {@link PropertySupplier} implementation with Central Dogma backend.
*
* <p>
* This implementation maps property's {@link PropertyDefinition#name()} as the absolute field name in the file
* on Central Dogma.
*
* <p>
* You can use json or yaml format for the property file.
* You cannot nest keys in both formats. Keys must be top-level fields.
* <p>
* An example JSON format would be look like:
* {@code
* <pre>{@code
* {
* "decaton.partition.concurrency": 10,
* "decaton.ignore.keys": [
Expand All @@ -62,7 +68,16 @@
* ],
* "decaton.processing.rate.per.partition": 50
* }
* }
* }</pre>
*
* An example YAML format would be look like:
* <pre>{@code
* decaton.partition.concurrency: 10
* decaton.ignore.keys:
* - "123456"
* - "79797979"
* decaton.processing.rate.per.partition: 50
* }</pre>
*/
public class CentralDogmaPropertySupplier implements PropertySupplier, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(CentralDogmaPropertySupplier.class);
Expand All @@ -73,7 +88,6 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose
private static final ObjectMapper objectMapper = new ObjectMapper();

private final Watcher<JsonNode> rootWatcher;

private final ConcurrentMap<String, DynamicProperty<?>> cachedProperties = new ConcurrentHashMap<>();

/**
Expand All @@ -94,7 +108,9 @@ public CentralDogmaPropertySupplier(CentralDogma centralDogma, String projectNam
* @param fileName the name of the file containing properties as top-level fields.
*/
public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepository, String fileName) {
rootWatcher = centralDogmaRepository.watcher(Query.ofJsonPath(fileName)).start();
DecatonPropertyFileFormat configFile = DecatonPropertyFileFormat.of(fileName);
this.rootWatcher = configFile.createWatcher(centralDogmaRepository, fileName);

try {
rootWatcher.awaitInitialValue(INITIAL_VALUE_TIMEOUT_SECS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand All @@ -103,6 +119,20 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor
} catch (TimeoutException e) {
throw new RuntimeException(e);
}

rootWatcher.watch(node -> {
for(ConcurrentHashMap.Entry<String, DynamicProperty<?>> cachedProperty : cachedProperties.entrySet()) {
if (node.has(cachedProperty.getKey())) {
try {
setValue(cachedProperty.getValue(), node.get(cachedProperty.getKey()));
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set value updatedfrom CentralDogma for {}", cachedProperty.getKey(), e);
}
}
}
});
}

// visible for testing
Expand All @@ -129,25 +159,13 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
// for most use cases though, this cache is only filled/read once.
final DynamicProperty<?> cachedProp = cachedProperties.computeIfAbsent(definition.name(), name -> {
DynamicProperty<T> prop = new DynamicProperty<>(definition);
Watcher<JsonNode> child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name()));
child.watch(node -> {
try {
setValue(prop, node);
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e);
}
});
try {
JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher
setValue(prop, node);
setValue(prop, rootWatcher.latestValue().get(definition.name()));
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
}

return prop;
});

Expand Down Expand Up @@ -175,8 +193,7 @@ public void close() {
public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project,
String repository, String filename) {
final CentralDogmaRepository centralDogmaRepository = centralDogma.forRepo(project, repository);
createPropertyFile(centralDogmaRepository, filename, ProcessorProperties.defaultProperties());
return new CentralDogmaPropertySupplier(centralDogmaRepository, filename);
return register(centralDogmaRepository, filename);
}

/**
Expand Down Expand Up @@ -215,6 +232,7 @@ public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, S
public static CentralDogmaPropertySupplier register(CentralDogmaRepository centralDogmaRepository,
String filename,
PropertySupplier supplier) {

List<Property<?>> properties = ProcessorProperties.defaultProperties().stream().map(defaultProperty -> {
Optional<? extends Property<?>> prop = supplier.getProperty(defaultProperty.definition());
if (prop.isPresent()) {
Expand All @@ -236,12 +254,18 @@ private static void createPropertyFile(CentralDogmaRepository centralDogmaReposi
long remainingTime = remainingTime(PROPERTY_CREATION_TIMEOUT_MILLIS, startedTime);

JsonNode jsonNodeProperties = convertPropertyListToJsonNode(properties);
Change<?> upsert;
try {
upsert = DecatonPropertyFileFormat.of(fileName).createUpsertChange(fileName, jsonNodeProperties);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

while (!fileExists && remainingTime > 0) {
try {
centralDogmaRepository
.commit(String.format("[CentralDogmaPropertySupplier] Property file created: %s", fileName),
Change.ofJsonUpsert(fileName, jsonNodeProperties))
upsert)
.push(baseRevision)
.get(remainingTime, TimeUnit.MILLISECONDS);
logger.info("New property file {} registered on Central Dogma", fileName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2025 LINE Corporation
*
* Licensed under the Apache License, Version 2.0 …
*/

package com.linecorp.decaton.centraldogma;

import java.io.IOException;
import java.util.Locale;

import com.fasterxml.jackson.databind.JsonNode;
import com.linecorp.centraldogma.client.CentralDogmaRepository;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;

/**
* Encapsulates Central Dogma–specific concerns for reading and writing
* configuration files in various text formats (JSON, YAML, ...).
* <p>
* Implementations convert between raw file contents managed by Central Dogma
* and {@link JsonNode} values consumed by {@link CentralDogmaPropertySupplier}.
*/
public interface DecatonPropertyFileFormat {
/**
* Create and start a Watcher that emits {@link JsonNode} for each file update.
*/
Watcher<JsonNode> createWatcher(CentralDogmaRepository repo, String fileName);

/**
* Serialize the given node and wrap it as Central Dogma {@link Change} for initial file creation.
*/
Change<?> createUpsertChange(String fileName, JsonNode initialNode) throws IOException;

static DecatonPropertyFileFormat of(String fileName) {
String lower = fileName.toLowerCase(Locale.ROOT);
return (lower.endsWith(".yml") || lower.endsWith(".yaml"))
? new YamlFormat()
: new JsonFormat();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025 LINE Corporation
*/

package com.linecorp.decaton.centraldogma;

import com.fasterxml.jackson.databind.JsonNode;
import com.linecorp.centraldogma.client.CentralDogmaRepository;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Query;

public class JsonFormat implements DecatonPropertyFileFormat {
@Override
public Watcher<JsonNode> createWatcher(CentralDogmaRepository repo, String fileName) {
return repo.watcher(Query.ofJsonPath(fileName)).start();
}

@Override
public Change<?> createUpsertChange(String fileName, JsonNode initialNode) {
return Change.ofJsonUpsert(fileName, initialNode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 LINE Corporation
*
* Licensed under the Apache License, Version 2.0 …
*/

package com.linecorp.decaton.centraldogma;

import java.io.IOException;
import java.io.UncheckedIOException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.linecorp.centraldogma.client.CentralDogmaRepository;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Query;

import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;

public class YamlFormat implements DecatonPropertyFileFormat {
private static final ObjectMapper YAML_MAPPER = new ObjectMapper(
new YAMLFactory()
.disable(WRITE_DOC_START_MARKER)
);

@Override
public Watcher<JsonNode> createWatcher(CentralDogmaRepository repo, String fileName) {
return repo.watcher(Query.ofText(fileName))
.map(text -> {
try {
return YAML_MAPPER.readTree(text);
} catch (IOException e) {
throw new UncheckedIOException("Failed to parse YAML from " + fileName, e);
}
})
.start();
}

@Override
public Change<?> createUpsertChange(String fileName, JsonNode initialNode) throws IOException {
String yaml = YAML_MAPPER.writeValueAsString(initialNode);
return Change.ofTextUpsert(fileName, yaml);
}
}
Loading