diff --git a/src/main/java/io/nats/client/support/JsonUtils.java b/src/main/java/io/nats/client/support/JsonUtils.java index 624b663f1..58f4f00a6 100644 --- a/src/main/java/io/nats/client/support/JsonUtils.java +++ b/src/main/java/io/nats/client/support/JsonUtils.java @@ -328,6 +328,17 @@ public static void _addList(StringBuilder sb, String fname, List list, Li sb.append("],"); } + /** + * Appends an empty JSON array to a string builder with the specified field name. + * @param sb the string builder to append to + * @param fname the name of the JSON field + */ + private static void _addEmptyList(StringBuilder sb, String fname) { + sb.append(Q); + jsonEncode(sb, fname); + sb.append("\":[],"); + } + /** * Appends a json field to a string builder. * @param sb string builder @@ -367,9 +378,23 @@ private static void _addStrings(StringBuilder sb, String fname, List str * @param jsons field value */ public static void addJsons(StringBuilder sb, String fname, List jsons) { + addJsons(sb, fname, jsons, false); + } + + /** + * Appends a json field to a string builder and the additional flag to indicate if an empty list to be added. + * @param sb string builder + * @param fname fieldname + * @param jsons field value + * @param addEmptyList flag to indicate if an empty list to be added + */ + public static void addJsons(StringBuilder sb, String fname, List jsons, boolean addEmptyList) { if (jsons != null && !jsons.isEmpty()) { _addList(sb, fname, jsons, (sbs, s) -> sbs.append(s.toJson())); } + else if (addEmptyList) { + _addEmptyList(sb, fname); + } } /** diff --git a/src/main/java/io/nats/service/EndpointContext.java b/src/main/java/io/nats/service/EndpointContext.java index 7b419a348..b22ccb249 100644 --- a/src/main/java/io/nats/service/EndpointContext.java +++ b/src/main/java/io/nats/service/EndpointContext.java @@ -54,10 +54,12 @@ class EndpointContext { } void start() { - sub = qGroup == null - ? dispatcher.subscribe(se.getSubject(), this::onMessage) - : dispatcher.subscribe(se.getSubject(), qGroup, this::onMessage); - started = DateTimeUtils.gmtNow(); + if (sub == null) { + sub = qGroup == null + ? dispatcher.subscribe(se.getSubject(), this::onMessage) + : dispatcher.subscribe(se.getSubject(), qGroup, this::onMessage); + started = DateTimeUtils.gmtNow(); + } } public void onMessage(Message msg) throws InterruptedException { diff --git a/src/main/java/io/nats/service/InfoResponse.java b/src/main/java/io/nats/service/InfoResponse.java index fbbbb4f80..734d0aa62 100644 --- a/src/main/java/io/nats/service/InfoResponse.java +++ b/src/main/java/io/nats/service/InfoResponse.java @@ -38,12 +38,7 @@ public class InfoResponse extends ServiceResponse { this.description = description; this.endpoints = new ArrayList<>(); for (ServiceEndpoint se : serviceEndpoints) { - endpoints.add(new Endpoint( - se.getName(), - se.getSubject(), - se.getQueueGroup(), - se.getMetadata() - )); + addServiceEndpoint(se); } } @@ -61,7 +56,7 @@ private InfoResponse(JsonValue jv) { @Override protected void subToJson(StringBuilder sb) { JsonUtils.addField(sb, DESCRIPTION, description); - JsonUtils.addJsons(sb, ENDPOINTS, endpoints); + JsonUtils.addJsons(sb, ENDPOINTS, endpoints, true); } /** @@ -80,6 +75,19 @@ public List getEndpoints() { return endpoints; } + /** + * Adds a service endpoint to the list of endpoints. + * @param se the service endpoint to be added + */ + public void addServiceEndpoint(ServiceEndpoint se) { + endpoints.add(new Endpoint( + se.getName(), + se.getSubject(), + se.getQueueGroup(), + se.getMetadata() + )); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/io/nats/service/Service.java b/src/main/java/io/nats/service/Service.java index e335a6072..d058dae7b 100644 --- a/src/main/java/io/nats/service/Service.java +++ b/src/main/java/io/nats/service/Service.java @@ -21,10 +21,10 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -46,7 +46,7 @@ public class Service { private final Connection conn; private final Duration drainTimeout; - private final Map serviceContexts; + private final ConcurrentMap serviceContexts; private final List discoveryContexts; private final List dInternals; private final PingResponse pingResponse; @@ -66,17 +66,9 @@ public class Service { // set up the service contexts // ? do we need an internal dispatcher for any user endpoints Dispatcher dTemp = null; - serviceContexts = new HashMap<>(); + serviceContexts = new ConcurrentHashMap<>(); for (ServiceEndpoint se : b.serviceEndpoints.values()) { - if (se.getDispatcher() == null) { - if (dTemp == null) { - dTemp = conn.createDispatcher(); - } - serviceContexts.put(se.getName(), new EndpointContext(conn, dTemp, false, se)); - } - else { - serviceContexts.put(se.getName(), new EndpointContext(conn, null, false, se)); - } + addServiceEndpoint(se); } if (dTemp != null) { dInternals.add(dTemp); @@ -96,10 +88,42 @@ public class Service { discoveryContexts = new ArrayList<>(); addDiscoveryContexts(SRV_PING, pingResponse, b.pingDispatcher, dTemp); - addDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp); + addDynamicDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp); addStatsContexts(b.statsDispatcher, dTemp); } + /** + * Adds a service endpoint to the list of service contexts and starts it if the service is running. + * @param se the service endpoint to be added + */ + public void addServiceEndpoint(ServiceEndpoint se) { + Dispatcher dTemp = null == dInternals || dInternals.isEmpty() ? null : dInternals.get(0); + EndpointContext ctx = null; + if (se.getDispatcher() == null) { + if (dTemp == null) { + dTemp = conn.createDispatcher(); + dInternals.add(dTemp); + } + ctx = new EndpointContext(conn, dTemp, false, se); + } else { + ctx = new EndpointContext(conn, null, false, se); + } + serviceContexts.put(se.getName(), ctx); + startStopLock.lock(); + try { + if (runningIndicator != null) { + ctx.start(); + } + } finally { + startStopLock.unlock(); + } + + if (null != infoResponse) { + infoResponse.addServiceEndpoint(se); + } + + } + private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispatcher dInternal, ServiceMessageHandler handler) { Endpoint[] endpoints = new Endpoint[] { internalEndpoint(discoveryName, null, null), @@ -114,6 +138,26 @@ private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispat } } + /** + * Adds dynamic discovery contexts for the service, dynamically generating the bytes content per call. + * This is different from `addDiscoveryContexts` which reuses the same static bytes at registration. + * @param discoveryName the name of the discovery + * @param dUser the user dispatcher + * @param dInternal the internal dispatcher + * @param handler the service message handler + */ + private void addDynamicDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) { + ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize()); + addDiscoveryContexts(discoveryName, dUser, dInternal, handler); + } + + /** + * Adds discovery contexts for the service, reusing the same static bytes at registration. + * @param discoveryName the name of the discovery + * @param sr the service response + * @param dUser the user dispatcher + * @param dInternal the internal dispatcher + */ private void addDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) { final byte[] responseBytes = sr.serialize(); ServiceMessageHandler handler = smsg -> smsg.respond(conn, responseBytes); diff --git a/src/main/java/io/nats/service/ServiceBuilder.java b/src/main/java/io/nats/service/ServiceBuilder.java index f61778bf4..05336b3ac 100644 --- a/src/main/java/io/nats/service/ServiceBuilder.java +++ b/src/main/java/io/nats/service/ServiceBuilder.java @@ -157,7 +157,6 @@ public Service build() { required(conn, "Connection"); required(name, "Name"); required(version, "Version"); - required(serviceEndpoints, "Service Endpoints"); return new Service(this); } } diff --git a/src/main/java/io/nats/service/StatsResponse.java b/src/main/java/io/nats/service/StatsResponse.java index 2723d3a4a..ef52a21a7 100644 --- a/src/main/java/io/nats/service/StatsResponse.java +++ b/src/main/java/io/nats/service/StatsResponse.java @@ -87,7 +87,7 @@ private StatsResponse(JsonValue jv) { @Override protected void subToJson(StringBuilder sb) { - JsonUtils.addJsons(sb, ENDPOINTS, endpointStatsList); + JsonUtils.addJsons(sb, ENDPOINTS, endpointStatsList, true); JsonUtils.addField(sb, STARTED, started); } diff --git a/src/test/java/io/nats/service/ServiceTests.java b/src/test/java/io/nats/service/ServiceTests.java index b7cc99e48..71ea0f11f 100644 --- a/src/test/java/io/nats/service/ServiceTests.java +++ b/src/test/java/io/nats/service/ServiceTests.java @@ -562,9 +562,70 @@ public void testServiceBuilderConstruction() { () -> Service.builder().connection(conn).name(NAME).addServiceEndpoint(se).build()); assertTrue(iae.getMessage().contains("Version cannot be null or empty")); - iae = assertThrows(IllegalArgumentException.class, + assertDoesNotThrow( () -> Service.builder().connection(conn).name(NAME).version("1.0.0").build()); - assertTrue(iae.getMessage().contains("Endpoints cannot be null or empty")); + } + + @Test + public void testAddingEndpointAfterServiceBuilderConstruction() { + Options options = new Options.Builder().build(); + Connection conn = new MockNatsConnection(options); + ServiceEndpoint se = ServiceEndpoint.builder() + .endpoint(new Endpoint(name(0))) + .handler(m -> { + }) + .build(); + + // minimum valid service + Service service = Service.builder().connection(conn).name(NAME).version("1.0.0").addServiceEndpoint(se).build(); + assertNotNull(service.toString()); // coverage + assertNotNull(service.getId()); + assertEquals(NAME, service.getName()); + assertEquals(ServiceBuilder.DEFAULT_DRAIN_TIMEOUT, service.getDrainTimeout()); + assertEquals("1.0.0", service.getVersion()); + assertNull(service.getDescription()); + + service = Service.builder().connection(conn).name(NAME).version("1.0.0") + .description("desc") + .drainTimeout(Duration.ofSeconds(1)) + .build(); + + service.addServiceEndpoint(se); + assertEquals("desc", service.getDescription()); + assertEquals(Duration.ofSeconds(1), service.getDrainTimeout()); + + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(null)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(EMPTY)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_SPACE)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_PRINTABLE)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOT)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(STAR_NOT_SEGMENT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(GT_NOT_SEGMENT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOLLAR)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_LOW)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_127)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_FWD_SLASH)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_BACK_SLASH)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_EQUALS)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_TIC)); + + assertThrows(IllegalArgumentException.class, () -> Service.builder().version(null)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().version(EMPTY)); + assertThrows(IllegalArgumentException.class, () -> Service.builder().version("not-semver")); + + IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, + () -> Service.builder().name(NAME).version("1.0.0").addServiceEndpoint(se).build()); + assertTrue(iae.getMessage().contains("Connection cannot be null")); + + iae = assertThrows(IllegalArgumentException.class, + () -> Service.builder().connection(conn).version("1.0.0").addServiceEndpoint(se).build()); + assertTrue(iae.getMessage().contains("Name cannot be null or empty")); + + iae = assertThrows(IllegalArgumentException.class, + () -> Service.builder().connection(conn).name(NAME).addServiceEndpoint(se).build()); + assertTrue(iae.getMessage().contains("Version cannot be null or empty")); + + assertDoesNotThrow(() -> Service.builder().connection(conn).name(NAME).version("1.0.0").build()); } @Test