diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 76c4030b..420589ca 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -479,12 +479,12 @@ public void shutdown() { private Mono doShutdown() { return Mono.defer( () -> { - LOGGER.debug("[{}] Cluster member is shutting down", localMember); + LOGGER.info("[{}] Cluster member is shutting down", localMember); return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop()) .then() .doFinally(s -> scheduler.dispose()) .doOnSuccess( - avoid -> LOGGER.debug("[{}] Cluster member has been shutdown", localMember)); + avoid -> LOGGER.info("[{}] Cluster member has been shutdown", localMember)); }); } @@ -492,8 +492,8 @@ private Mono leaveCluster() { return membership .leaveCluster() .subscribeOn(scheduler) - .doOnSubscribe(s -> LOGGER.debug("[{}] Cluster member is leaving a cluster", localMember)) - .doOnSuccess(s -> LOGGER.debug("[{}] Cluster member has left a cluster", localMember)) + .doOnSubscribe(s -> LOGGER.info("[{}] Cluster member is leaving a cluster", localMember)) + .doOnSuccess(s -> LOGGER.info("[{}] Cluster member has left a cluster", localMember)) .doOnError( ex -> LOGGER.warn( diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 7c72de85..7681c090 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -151,7 +151,7 @@ private void doPing() { publishPingResult(period, pingMember, computeMemberStatus(message, period)); }, ex -> { - LOGGER.debug( + LOGGER.warn( "[{}][{}] Failed to get PingAck from {} within {} ms", localMember, period, @@ -198,7 +198,7 @@ private void doPingReq( publishPingResult(period, pingMember, computeMemberStatus(message, period)); }, throwable -> { - LOGGER.debug( + LOGGER.warn( "[{}][{}] Timeout getting transit PingAck from {} to {} within {} ms", localMember, period, @@ -250,7 +250,7 @@ private void onPing(Message message) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}][{}] Failed to send PingAck to {}, cause: {}", localMember, period, @@ -276,7 +276,7 @@ private void onPingReq(Message message) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}][{}] Failed to send transit Ping to {}, cause: {}", localMember, period, @@ -306,7 +306,7 @@ private void onTransitPingAck(Message message) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}][{}] Failed to resend transit PingAck to {}, cause: {}", localMember, period, diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index d6edd8b8..d20bea84 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -299,7 +299,7 @@ private void spreadGossipsTo(long period, Member member) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}][{}] Failed to send GossipReq({}) to {}, cause: {}", localMember, period, diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 837902e2..9ac83b0b 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -261,7 +261,7 @@ private void start0(MonoSink sink) { return; } // If seed addresses are specified in config - send initial sync to those nodes - LOGGER.debug("[{}] Making initial Sync to all seed members: {}", localMember, seedMembers); + LOGGER.info("[{}] Making initial Sync to all seed members: {}", localMember, seedMembers); //noinspection unchecked Mono[] syncs = @@ -289,7 +289,7 @@ private void start0(MonoSink sink) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}] Exception on initial SyncAck, cause: {}", localMember, ex.toString())); } @@ -352,7 +352,7 @@ private void doSync() { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}][doSync] Failed to send Sync to {}, cause: {}", localMember, address, @@ -403,7 +403,7 @@ private Mono onSync(Message syncMsg) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}] Failed to send SyncAck to {}, cause: {}", localMember, sender, @@ -433,7 +433,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { .subscribe( null, ex -> - LOGGER.debug( + LOGGER.warn( "[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}", localMember, address, @@ -581,7 +581,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason .fetchMetadata(r1.member()) .doOnError( ex -> - LOGGER.debug( + LOGGER.warn( "[{}][updateMembership][{}] Skipping to add/update member: {}, " + "due to failed fetchMetadata call (cause: {})", localMember, @@ -676,7 +676,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { } private void publishEvent(MembershipEvent event) { - LOGGER.debug("[{}][publishEvent] {}", localMember, event); + LOGGER.info("[{}][publishEvent] {}", localMember, event); sink.next(event); } @@ -699,9 +699,9 @@ private Mono onDeadMemberDetected(MembershipRecord r1) { // Log that member leaved gracefully or without notification if (r0.isLeaving()) { - LOGGER.debug("[{}] Member leaved gracefully: {}", localMember, member); + LOGGER.info("[{}] Member leaved gracefully: {}", localMember, member); } else { - LOGGER.warn("[{}] Member leaved without notification: {}", localMember, member); + LOGGER.info("[{}] Member leaved without notification: {}", localMember, member); } final long timestamp = System.currentTimeMillis(); @@ -756,7 +756,7 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord r) { r.member().id(), id -> { LOGGER.debug( - "[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", + "[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout: {}", localMember, id, suspicionTimeout); @@ -794,7 +794,7 @@ private Mono spreadMembershipGossip(MembershipRecord r) { .spread(msg) .doOnError( ex -> - LOGGER.debug( + LOGGER.warn( "[{}] Failed to send membership with gossip, cause: {}", localMember, ex.toString())) @@ -864,10 +864,7 @@ private List getSuspectedMembers() { } private List getRemovedMembers() { - return removedMembersHistory // - .stream() - .map(MembershipEvent::member) - .collect(Collectors.toList()); + return removedMembersHistory.stream().map(MembershipEvent::member).collect(Collectors.toList()); } private List findRecordsByCondition(Predicate condition) { diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index 200996cf..14a448de 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -171,10 +171,7 @@ public void testJoinDynamicPort() { } finally { // Shutdown all nodes shutdown( - Stream.concat( - Stream.of(seedNode), // - otherNodes.stream()) - .collect(Collectors.toList())); + Stream.concat(Stream.of(seedNode), otherNodes.stream()).collect(Collectors.toList())); } } @@ -243,9 +240,7 @@ public void onMembershipEvent(MembershipEvent event) { } finally { // Shutdown all nodes shutdown( - Stream.concat( - Stream.of(seedNode, metadataNode), // - otherNodes.stream()) + Stream.concat(Stream.of(seedNode, metadataNode), otherNodes.stream()) .collect(Collectors.toList())); } } @@ -321,9 +316,7 @@ public void onMembershipEvent(MembershipEvent event) { } finally { // Shutdown all nodes shutdown( - Stream.concat( - Stream.of(seedNode, metadataNode), // - otherNodes.stream()) + Stream.concat(Stream.of(seedNode, metadataNode), otherNodes.stream()) .collect(Collectors.toList())); } } @@ -400,9 +393,7 @@ public void onMembershipEvent(MembershipEvent event) { } finally { // Shutdown all nodes shutdown( - Stream.concat( - Stream.of(seedNode, metadataNode), // - otherNodes.stream()) + Stream.concat(Stream.of(seedNode, metadataNode), otherNodes.stream()) .collect(Collectors.toList())); } } diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 54e609c4..36b42090 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -36,8 +35,8 @@ public class FailureDetectorTest extends BaseTest { private Scheduler scheduler; @BeforeEach - void setUp(TestInfo testInfo) { - scheduler = Schedulers.newSingle(testInfo.getDisplayName().replaceAll(" ", "_"), true); + void setUp() { + scheduler = Schedulers.newSingle("scheduler", true); } @AfterEach diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index 9281a9b8..2622524b 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -30,7 +30,6 @@ import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -92,8 +91,8 @@ static Stream experiment() { private Scheduler scheduler; @BeforeEach - void setUp(TestInfo testInfo) { - scheduler = Schedulers.newSingle(testInfo.getDisplayName().replaceAll(" ", "_"), true); + void setUp() { + scheduler = Schedulers.newSingle("scheduler", true); } @AfterEach diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 34e68106..eb38677d 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; import reactor.core.Exceptions; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; @@ -51,8 +50,8 @@ public class MembershipProtocolTest extends BaseTest { private Scheduler scheduler; @BeforeEach - void setUp(TestInfo testInfo) { - scheduler = Schedulers.newSingle(testInfo.getDisplayName().replaceAll(" ", "_"), true); + void setUp() { + scheduler = Schedulers.newSingle("scheduler", true); stopables = new ArrayList<>(); } diff --git a/cluster/src/test/resources/log4j2-test.xml b/cluster/src/test/resources/log4j2-test.xml index 53878182..6ad658d7 100644 --- a/cluster/src/test/resources/log4j2-test.xml +++ b/cluster/src/test/resources/log4j2-test.xml @@ -4,23 +4,21 @@ - %level{length=1} %date{MMdd-HHmm:ss,SSS} %logger{1.} %message [%thread]%n + %level{length=1} %date{HHmm:ss,SSS} %logger{1.} %message [%thread]%n - - - - - - - - - + + + + + + + - + diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java index d6e5e159..86f318eb 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java @@ -197,8 +197,8 @@ public Address sender() { @Override public String toString() { return new StringJoiner(", ", Message.class.getSimpleName() + "[", "]") - .add("headers=" + headers) - .add("data=" + data) + .add("headers(" + headers.size() + ")") + .add("data=" + (data != null ? data.getClass().getName() : null)) .toString(); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 1470ace1..6fd74d08 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -188,7 +188,7 @@ public Mono bind0() { config.port(), ex.toString())) .map(server -> new TransportImpl(server, this)) - .doOnSuccess(t -> LOGGER.debug("[{}] Bound cluster transport", address)) + .doOnSuccess(t -> LOGGER.info("[{}] Bound cluster transport", t.address())) .cast(Transport.class); } @@ -214,13 +214,13 @@ public final Mono stop() { private Mono doStop() { return Mono.defer( () -> { - LOGGER.debug("[{}] Transport is shutting down", address); + LOGGER.info("[{}] Transport is shutting down", address); // Complete incoming messages observable messageSink.complete(); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) - .doOnSuccess(avoid -> LOGGER.debug("[{}] Transport has been shut down", address)); + .doOnSuccess(avoid -> LOGGER.info("[{}] Transport has been shut down", address)); }); } @@ -233,6 +233,7 @@ public final Flux listen() { public Mono send(Address address, Message message) { return connections .computeIfAbsent(address, this::connect0) + .doOnSubscribe(s -> LOGGER.debug("[{}] Send {} to {}", this.address, message, address)) .map(Connection::outbound) .flatMap(out -> out.send(Mono.just(message).map(this::toByteBuf), bb -> true).then()) .then(); @@ -267,11 +268,7 @@ public Mono requestResponse(Address address, final Message request) { @SuppressWarnings("unused") private Mono onMessage(NettyInbound in, NettyOutbound out) { - return in.receive() // - .retain() - .map(this::toMessage) - .doOnNext(messageSink::next) - .then(); + return in.receive().retain().map(this::toMessage).doOnNext(messageSink::next).then(); } private Message toMessage(ByteBuf byteBuf) { @@ -300,7 +297,7 @@ private Mono connect0(Address address) { return newTcpClient(address) .doOnDisconnected( c -> { - LOGGER.debug("[{}] Disconnected from: {}, {}", this.address, address, c.channel()); + LOGGER.debug("[{}] Disconnected from {}, {}", this.address, address, c.channel()); connections.remove(address); }) .doOnConnected( @@ -308,7 +305,7 @@ private Mono connect0(Address address) { .connect() .doOnError( th -> { - LOGGER.debug( + LOGGER.warn( "[{}][connect0][{}] Exception occurred: {}", this.address, address, diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportSendOrderTest.java index 786ac0b7..290fcc39 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportSendOrderTest.java @@ -45,7 +45,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E int sentPerIteration = 1000; long[] iterationTimeSeries = new long[iterationNum - 1]; for (int i = 0; i < iterationNum; i++) { - LOGGER.info("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); client = createTransport(); final List received = new ArrayList<>(); @@ -74,7 +74,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E } assertSendOrder(sentPerIteration, received); - LOGGER.info("Iteration time: {} ms", iterationTime); + LOGGER.debug("Iteration time: {} ms", iterationTime); serverSubscriber.dispose(); destroyTransport(client); @@ -82,7 +82,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E LongSummaryStatistics iterationTimeStats = LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.info("Iteration time stats (ms): {}", iterationTimeStats); + LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); } @Test @@ -94,7 +94,7 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { long[] iterationTimeSeries = new long[iterationNum - 1]; List totalSentTimeSeries = new ArrayList<>(sentPerIteration * (iterationNum - 1)); for (int i = 0; i < iterationNum; i++) { - LOGGER.info("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); List iterSentTimeSeries = new ArrayList<>(sentPerIteration); client = createTransport(); @@ -137,15 +137,15 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { LongSummaryStatistics iterSentTimeStats = iterSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); if (i == 0) { // warm up iteration - LOGGER.info("Warm up iteration time: {} ms", iterationTime); - LOGGER.info("Sent time stats warm up iter (ms): {}", iterSentTimeStats); + LOGGER.debug("Warm up iteration time: {} ms", iterationTime); + LOGGER.debug("Sent time stats warm up iter (ms): {}", iterSentTimeStats); } else { totalSentTimeSeries.addAll(iterSentTimeSeries); LongSummaryStatistics totalSentTimeStats = totalSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - LOGGER.info("Iteration time: {} ms", iterationTime); - LOGGER.info("Sent time stats iter (ms): {}", iterSentTimeStats); - LOGGER.info("Sent time stats total (ms): {}", totalSentTimeStats); + LOGGER.debug("Iteration time: {} ms", iterationTime); + LOGGER.debug("Sent time stats iter (ms): {}", iterSentTimeStats); + LOGGER.debug("Sent time stats total (ms): {}", totalSentTimeStats); } serverSubscriber.dispose(); @@ -154,7 +154,7 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { LongSummaryStatistics iterationTimeStats = LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.info("Iteration time stats (ms): {}", iterationTimeStats); + LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); } @Test @@ -163,7 +163,7 @@ public void testSendOrderMultiThread(TestInfo testInfo) throws Exception { final int total = 1000; for (int i = 0; i < 10; i++) { - LOGGER.info("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); ExecutorService exec = Executors.newFixedThreadPool( 4, diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportTest.java index 639b0c22..0c5d1e39 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/TransportTest.java @@ -58,7 +58,7 @@ public void testUnresolvedHostConnection() { public void testInteractWithNoConnection(TestInfo testInfo) { Address serverAddress = Address.from("localhost:49255"); for (int i = 0; i < 10; i++) { - LOGGER.info("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); client = createTransport(); diff --git a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml b/transport-parent/transport-netty/src/test/resources/log4j2-test.xml index 53878182..6ad658d7 100644 --- a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml +++ b/transport-parent/transport-netty/src/test/resources/log4j2-test.xml @@ -4,23 +4,21 @@ - %level{length=1} %date{MMdd-HHmm:ss,SSS} %logger{1.} %message [%thread]%n + %level{length=1} %date{HHmm:ss,SSS} %logger{1.} %message [%thread]%n - - - - - - - - - + + + + + + + - +