Skip to content

Commit

Permalink
feature: update_service_to_support_adding_endpoint_after_construction (
Browse files Browse the repository at this point in the history
…#1274)

* update_service_to_support_adding_dynamic_endpoint_after_service_construction

* revert format change & restrict empty list to InfoResponse

* revert format change & restrict empty list to InfoResponse

* revert format change & restrict empty list to InfoResponse

* revert format change & restrict empty list to InfoResponse

* adding comments for 2 discovery contexts

* sync with infoResponsse, adding empty list
  • Loading branch information
rexf authored Feb 17, 2025
1 parent a48f61e commit dabcba8
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 29 deletions.
25 changes: 25 additions & 0 deletions src/main/java/io/nats/client/support/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,17 @@ public static <T> void _addList(StringBuilder sb, String fname, List<T> list, Li
sb.append("],");
}

/**
* Appends an empty JSON array to a string builder with the specified field name.
* @param sb the string builder to append to
* @param fname the name of the JSON field
*/
private static void _addEmptyList(StringBuilder sb, String fname) {
sb.append(Q);
jsonEncode(sb, fname);
sb.append("\":[],");
}

/**
* Appends a json field to a string builder.
* @param sb string builder
Expand Down Expand Up @@ -367,9 +378,23 @@ private static void _addStrings(StringBuilder sb, String fname, List<String> str
* @param jsons field value
*/
public static void addJsons(StringBuilder sb, String fname, List<? extends JsonSerializable> jsons) {
addJsons(sb, fname, jsons, false);
}

/**
* Appends a json field to a string builder and the additional flag to indicate if an empty list to be added.
* @param sb string builder
* @param fname fieldname
* @param jsons field value
* @param addEmptyList flag to indicate if an empty list to be added
*/
public static void addJsons(StringBuilder sb, String fname, List<? extends JsonSerializable> jsons, boolean addEmptyList) {
if (jsons != null && !jsons.isEmpty()) {
_addList(sb, fname, jsons, (sbs, s) -> sbs.append(s.toJson()));
}
else if (addEmptyList) {
_addEmptyList(sb, fname);
}
}

/**
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/nats/service/EndpointContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ class EndpointContext {
}

void start() {
sub = qGroup == null
? dispatcher.subscribe(se.getSubject(), this::onMessage)
: dispatcher.subscribe(se.getSubject(), qGroup, this::onMessage);
started = DateTimeUtils.gmtNow();
if (sub == null) {
sub = qGroup == null
? dispatcher.subscribe(se.getSubject(), this::onMessage)
: dispatcher.subscribe(se.getSubject(), qGroup, this::onMessage);
started = DateTimeUtils.gmtNow();
}
}

public void onMessage(Message msg) throws InterruptedException {
Expand Down
22 changes: 15 additions & 7 deletions src/main/java/io/nats/service/InfoResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ public class InfoResponse extends ServiceResponse {
this.description = description;
this.endpoints = new ArrayList<>();
for (ServiceEndpoint se : serviceEndpoints) {
endpoints.add(new Endpoint(
se.getName(),
se.getSubject(),
se.getQueueGroup(),
se.getMetadata()
));
addServiceEndpoint(se);
}
}

Expand All @@ -61,7 +56,7 @@ private InfoResponse(JsonValue jv) {
@Override
protected void subToJson(StringBuilder sb) {
JsonUtils.addField(sb, DESCRIPTION, description);
JsonUtils.addJsons(sb, ENDPOINTS, endpoints);
JsonUtils.addJsons(sb, ENDPOINTS, endpoints, true);
}

/**
Expand All @@ -80,6 +75,19 @@ public List<Endpoint> getEndpoints() {
return endpoints;
}

/**
* Adds a service endpoint to the list of endpoints.
* @param se the service endpoint to be added
*/
public void addServiceEndpoint(ServiceEndpoint se) {
endpoints.add(new Endpoint(
se.getName(),
se.getSubject(),
se.getQueueGroup(),
se.getMetadata()
));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
72 changes: 58 additions & 14 deletions src/main/java/io/nats/service/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -46,7 +46,7 @@ public class Service {

private final Connection conn;
private final Duration drainTimeout;
private final Map<String, EndpointContext> serviceContexts;
private final ConcurrentMap<String, EndpointContext> serviceContexts;
private final List<EndpointContext> discoveryContexts;
private final List<Dispatcher> dInternals;
private final PingResponse pingResponse;
Expand All @@ -66,17 +66,9 @@ public class Service {
// set up the service contexts
// ? do we need an internal dispatcher for any user endpoints
Dispatcher dTemp = null;
serviceContexts = new HashMap<>();
serviceContexts = new ConcurrentHashMap<>();
for (ServiceEndpoint se : b.serviceEndpoints.values()) {
if (se.getDispatcher() == null) {
if (dTemp == null) {
dTemp = conn.createDispatcher();
}
serviceContexts.put(se.getName(), new EndpointContext(conn, dTemp, false, se));
}
else {
serviceContexts.put(se.getName(), new EndpointContext(conn, null, false, se));
}
addServiceEndpoint(se);
}
if (dTemp != null) {
dInternals.add(dTemp);
Expand All @@ -96,10 +88,42 @@ public class Service {

discoveryContexts = new ArrayList<>();
addDiscoveryContexts(SRV_PING, pingResponse, b.pingDispatcher, dTemp);
addDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
addDynamicDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
addStatsContexts(b.statsDispatcher, dTemp);
}

/**
* Adds a service endpoint to the list of service contexts and starts it if the service is running.
* @param se the service endpoint to be added
*/
public void addServiceEndpoint(ServiceEndpoint se) {
Dispatcher dTemp = null == dInternals || dInternals.isEmpty() ? null : dInternals.get(0);
EndpointContext ctx = null;
if (se.getDispatcher() == null) {
if (dTemp == null) {
dTemp = conn.createDispatcher();
dInternals.add(dTemp);
}
ctx = new EndpointContext(conn, dTemp, false, se);
} else {
ctx = new EndpointContext(conn, null, false, se);
}
serviceContexts.put(se.getName(), ctx);
startStopLock.lock();
try {
if (runningIndicator != null) {
ctx.start();
}
} finally {
startStopLock.unlock();
}

if (null != infoResponse) {
infoResponse.addServiceEndpoint(se);
}

}

private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispatcher dInternal, ServiceMessageHandler handler) {
Endpoint[] endpoints = new Endpoint[] {
internalEndpoint(discoveryName, null, null),
Expand All @@ -114,6 +138,26 @@ private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispat
}
}

/**
* Adds dynamic discovery contexts for the service, dynamically generating the bytes content per call.
* This is different from `addDiscoveryContexts` which reuses the same static bytes at registration.
* @param discoveryName the name of the discovery
* @param dUser the user dispatcher
* @param dInternal the internal dispatcher
* @param handler the service message handler
*/
private void addDynamicDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize());
addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
}

/**
* Adds discovery contexts for the service, reusing the same static bytes at registration.
* @param discoveryName the name of the discovery
* @param sr the service response
* @param dUser the user dispatcher
* @param dInternal the internal dispatcher
*/
private void addDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
final byte[] responseBytes = sr.serialize();
ServiceMessageHandler handler = smsg -> smsg.respond(conn, responseBytes);
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/nats/service/ServiceBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public Service build() {
required(conn, "Connection");
required(name, "Name");
required(version, "Version");
required(serviceEndpoints, "Service Endpoints");
return new Service(this);
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/nats/service/StatsResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private StatsResponse(JsonValue jv) {

@Override
protected void subToJson(StringBuilder sb) {
JsonUtils.addJsons(sb, ENDPOINTS, endpointStatsList);
JsonUtils.addJsons(sb, ENDPOINTS, endpointStatsList, true);
JsonUtils.addField(sb, STARTED, started);
}

Expand Down
65 changes: 63 additions & 2 deletions src/test/java/io/nats/service/ServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,70 @@ public void testServiceBuilderConstruction() {
() -> Service.builder().connection(conn).name(NAME).addServiceEndpoint(se).build());
assertTrue(iae.getMessage().contains("Version cannot be null or empty"));

iae = assertThrows(IllegalArgumentException.class,
assertDoesNotThrow(
() -> Service.builder().connection(conn).name(NAME).version("1.0.0").build());
assertTrue(iae.getMessage().contains("Endpoints cannot be null or empty"));
}

@Test
public void testAddingEndpointAfterServiceBuilderConstruction() {
Options options = new Options.Builder().build();
Connection conn = new MockNatsConnection(options);
ServiceEndpoint se = ServiceEndpoint.builder()
.endpoint(new Endpoint(name(0)))
.handler(m -> {
})
.build();

// minimum valid service
Service service = Service.builder().connection(conn).name(NAME).version("1.0.0").addServiceEndpoint(se).build();
assertNotNull(service.toString()); // coverage
assertNotNull(service.getId());
assertEquals(NAME, service.getName());
assertEquals(ServiceBuilder.DEFAULT_DRAIN_TIMEOUT, service.getDrainTimeout());
assertEquals("1.0.0", service.getVersion());
assertNull(service.getDescription());

service = Service.builder().connection(conn).name(NAME).version("1.0.0")
.description("desc")
.drainTimeout(Duration.ofSeconds(1))
.build();

service.addServiceEndpoint(se);
assertEquals("desc", service.getDescription());
assertEquals(Duration.ofSeconds(1), service.getDrainTimeout());

assertThrows(IllegalArgumentException.class, () -> Service.builder().name(null));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(EMPTY));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_SPACE));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_PRINTABLE));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOT));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(STAR_NOT_SEGMENT)); // invalid in the middle
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(GT_NOT_SEGMENT)); // invalid in the middle
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOLLAR));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_LOW));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_127));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_FWD_SLASH));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_BACK_SLASH));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_EQUALS));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_TIC));

assertThrows(IllegalArgumentException.class, () -> Service.builder().version(null));
assertThrows(IllegalArgumentException.class, () -> Service.builder().version(EMPTY));
assertThrows(IllegalArgumentException.class, () -> Service.builder().version("not-semver"));

IllegalArgumentException iae = assertThrows(IllegalArgumentException.class,
() -> Service.builder().name(NAME).version("1.0.0").addServiceEndpoint(se).build());
assertTrue(iae.getMessage().contains("Connection cannot be null"));

iae = assertThrows(IllegalArgumentException.class,
() -> Service.builder().connection(conn).version("1.0.0").addServiceEndpoint(se).build());
assertTrue(iae.getMessage().contains("Name cannot be null or empty"));

iae = assertThrows(IllegalArgumentException.class,
() -> Service.builder().connection(conn).name(NAME).addServiceEndpoint(se).build());
assertTrue(iae.getMessage().contains("Version cannot be null or empty"));

assertDoesNotThrow(() -> Service.builder().connection(conn).name(NAME).version("1.0.0").build());
}

@Test
Expand Down

0 comments on commit dabcba8

Please sign in to comment.