From a022c3073d9d627b9879b31261b5f61cb749e23e Mon Sep 17 00:00:00 2001 From: artem-v Date: Sun, 19 Jan 2020 21:51:18 +0200 Subject: [PATCH] Enhanced logging --- .../cluster/utils/NetworkEmulator.java | 24 ++--- .../io/scalecube/cluster/ClusterImpl.java | 31 +++---- .../fdetector/FailureDetectorImpl.java | 85 +++++++++-------- .../cluster/gossip/GossipProtocolImpl.java | 33 +++++-- .../membership/MembershipProtocolImpl.java | 81 ++++++++++------- .../cluster/metadata/MetadataStoreImpl.java | 91 +++++++++---------- .../src/test/resources/log4j2-test-trace.xml | 24 ----- cluster/src/test/resources/log4j2-test.xml | 6 +- examples/src/main/resources/log4j2.xml | 7 +- .../transport/netty/ExceptionHandler.java | 4 +- .../transport/netty/TransportImpl.java | 71 ++++++--------- .../src/test/resources/log4j2-test.xml | 7 +- 12 files changed, 238 insertions(+), 226 deletions(-) delete mode 100644 cluster/src/test/resources/log4j2-test-trace.xml diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java index 82d7a758..90bb22c0 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java @@ -70,7 +70,7 @@ public OutboundSettings outboundSettings(Address destination) { public void outboundSettings(Address destination, int lossPercent, int meanDelay) { OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay); outboundSettings.put(destination, settings); - LOGGER.debug("Set outbound settings {} from {} to {}", settings, address, destination); + LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination); } /** @@ -81,21 +81,21 @@ public void outboundSettings(Address destination, int lossPercent, int meanDelay */ public void setDefaultOutboundSettings(int lossPercent, int meanDelay) { defaultOutboundSettings = new OutboundSettings(lossPercent, meanDelay); - LOGGER.debug("Set default outbound settings {} for {}", defaultOutboundSettings, address); + LOGGER.debug("[{}] Set default outbound settings {}", address, defaultOutboundSettings); } /** Blocks outbound messages to all destinations. */ public void blockAllOutbound() { outboundSettings.clear(); setDefaultOutboundSettings(100, 0); - LOGGER.debug("Blocked outbound from {} to all destinations", address); + LOGGER.debug("[{}] Blocked outbound to all destinations", address); } /** Unblocks outbound messages to all destinations. */ public void unblockAllOutbound() { outboundSettings.clear(); setDefaultOutboundSettings(0, 0); - LOGGER.debug("Unblocked outbound from {} to all destinations", address); + LOGGER.debug("[{}] Unblocked outbound to all destinations", address); } /** @@ -116,7 +116,7 @@ public void blockOutbound(Collection
destinations) { for (Address destination : destinations) { outboundSettings.put(destination, new OutboundSettings(100, 0)); } - LOGGER.debug("Blocked outbound from {} to {}", address, destinations); + LOGGER.debug("[{}] Blocked outbound to {}", address, destinations); } /** @@ -135,7 +135,7 @@ public void unblockOutbound(Address... destinations) { */ public void unblockOutbound(Collection
destinations) { destinations.forEach(outboundSettings::remove); - LOGGER.debug("Unblocked outbound from {} to {}", address, destinations); + LOGGER.debug("[{}] Unblocked outbound {}", address, destinations); } /** @@ -221,7 +221,7 @@ public InboundSettings inboundSettings(Address destination) { public void inboundSettings(Address destination, boolean shallPass) { InboundSettings settings = new InboundSettings(shallPass); inboundSettings.put(destination, settings); - LOGGER.debug("Set inbound settings {} from {} to {}", settings, address, destination); + LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); } /** @@ -231,21 +231,21 @@ public void inboundSettings(Address destination, boolean shallPass) { */ public void setDefaultInboundSettings(boolean shallPass) { defaultInboundSettings = new InboundSettings(shallPass); - LOGGER.debug("Set default inbound settings {} for {}", defaultInboundSettings, address); + LOGGER.debug("[{}] Set default inbound settings {}", address, defaultInboundSettings); } /** Blocks inbound messages from all destinations. */ public void blockAllInbound() { inboundSettings.clear(); setDefaultInboundSettings(false); - LOGGER.debug("Blocked inbound to {} from all destinations", address); + LOGGER.debug("[{}] Blocked inbound from all destinations", address); } /** Unblocks inbound messages to all destinations. */ public void unblockAllInbound() { inboundSettings.clear(); setDefaultInboundSettings(true); - LOGGER.debug("Unblocked inbound to {} from all destinations", address); + LOGGER.debug("[{}] Unblocked inbound from all destinations", address); } /** @@ -266,7 +266,7 @@ public void blockInbound(Collection
destinations) { for (Address destination : destinations) { inboundSettings.put(destination, new InboundSettings(false)); } - LOGGER.debug("Blocked inbound to {} from {}", address, destinations); + LOGGER.debug("[{}] Blocked inbound from {}", address, destinations); } /** @@ -285,7 +285,7 @@ public void unblockInbound(Address... destinations) { */ public void unblockInbound(Collection
destinations) { destinations.forEach(inboundSettings::remove); - LOGGER.debug("Unblocked inbound to {} from {}", address, destinations); + LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations); } /** diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 054d8ffa..76c4030b 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -118,17 +118,15 @@ private void initLifecycle() { .then(doStart()) .doOnSuccess(avoid -> onStart.onComplete()) .doOnError(onStart::onError) - .subscribe( - null, - th -> { - LOGGER.error("Cluster member {} failed on start: ", localMember, th); - shutdown.onComplete(); - }); + .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); - shutdown // + shutdown .then(doShutdown()) .doFinally(s -> onShutdown.onComplete()) - .subscribe(); + .subscribe( + null, + th -> + LOGGER.warn("[{}][doShutdown] Exception occurred: {}", localMember, th.toString())); } /** @@ -352,7 +350,7 @@ private ObjectInstance startJmxMonitor0() throws Exception { } private void onError(Throwable th) { - LOGGER.error("Received unexpected error: ", th); + LOGGER.error("[{}] Received unexpected error:", localMember, th); } private Flux listenMessage() { @@ -481,16 +479,12 @@ public void shutdown() { private Mono doShutdown() { return Mono.defer( () -> { - LOGGER.info("Cluster member {} is shutting down", localMember); + LOGGER.debug("[{}] Cluster member is shutting down", localMember); return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop()) .then() .doFinally(s -> scheduler.dispose()) .doOnSuccess( - avoid -> LOGGER.info("Cluster member {} has been shut down", localMember)) - .doOnError( - th -> - LOGGER.warn( - "Cluster member {} failed on shutdown: {}", localMember, th.toString())); + avoid -> LOGGER.debug("[{}] Cluster member has been shutdown", localMember)); }); } @@ -498,11 +492,12 @@ private Mono leaveCluster() { return membership .leaveCluster() .subscribeOn(scheduler) - .doOnSuccess(s -> LOGGER.debug("Cluster member {} has left a cluster", localMember)) + .doOnSubscribe(s -> LOGGER.debug("[{}] Cluster member is leaving a cluster", localMember)) + .doOnSuccess(s -> LOGGER.debug("[{}] Cluster member has left a cluster", localMember)) .doOnError( ex -> - LOGGER.info( - "Cluster member {} failed on leaveCluster: {}", localMember, ex.toString())) + LOGGER.warn( + "[{}][leaveCluster] Exception occurred: {}", localMember, ex.toString())) .then(); } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 362b63cd..7c72de85 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -138,7 +138,7 @@ private void doPing() { PingData pingData = new PingData(localMember, pingMember); Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); - LOGGER.trace("Send Ping[{}] to {} [at {}]", period, pingMember, localMember); + LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); Address address = pingMember.address(); transport .requestResponse(address, pingMsg) @@ -146,22 +146,23 @@ private void doPing() { .publishOn(scheduler) .subscribe( message -> { - LOGGER.trace("Received PingAck[{}] from {} [at {}]", period, pingMember, localMember); + LOGGER.debug( + "[{}][{}] Received PingAck from {}", localMember, period, message.sender()); publishPingResult(period, pingMember, computeMemberStatus(message, period)); }, ex -> { LOGGER.debug( - "Failed to get PingAck[{}] from {} within {} ms [at {}]", + "[{}][{}] Failed to get PingAck from {} within {} ms", + localMember, period, pingMember, - config.pingTimeout(), - localMember); + config.pingTimeout()); final int timeLeft = config.pingInterval() - config.pingTimeout(); final List pingReqMembers = selectPingReqMembers(pingMember); if (timeLeft <= 0 || pingReqMembers.isEmpty()) { - LOGGER.trace("No PingReq[{}] occurred", period); + LOGGER.debug("[{}][{}] No PingReq occurred", localMember, period); publishPingResult(period, pingMember, MemberStatus.SUSPECT); } else { doPingReq(currentPeriod, pingMember, pingReqMembers, cid); @@ -176,8 +177,8 @@ private void doPingReq( .qualifier(PING_REQ) .correlationId(cid) .build(); - LOGGER.trace( - "Send PingReq[{}] to {} for {} [at {}]", period, pingReqMembers, pingMember, localMember); + LOGGER.debug( + "[{}][{}] Send PingReq to {} for {}", localMember, period, pingReqMembers, pingMember); Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout()); pingReqMembers.forEach( @@ -188,22 +189,22 @@ private void doPingReq( .publishOn(scheduler) .subscribe( message -> { - LOGGER.trace( - "Received transit PingAck[{}] from {} to {} [at {}]", + LOGGER.debug( + "[{}][{}] Received transit PingAck from {} to {}", + localMember, period, message.sender(), - pingMember, - localMember); + pingMember); publishPingResult(period, pingMember, computeMemberStatus(message, period)); }, throwable -> { LOGGER.debug( - "Timeout getting transit PingAck[{}] from {} to {} within {} ms [at {}]", + "[{}][{}] Timeout getting transit PingAck from {} to {} within {} ms", + localMember, period, member, pingMember, - timeout.toMillis(), - localMember); + timeout.toMillis()); publishPingResult(period, pingMember, MemberStatus.SUSPECT); })); } @@ -225,36 +226,42 @@ private void onMessage(Message message) { /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; - LOGGER.trace("Received Ping[{}] [at {}]", period, localMember); + Address sender = message.sender(); + LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); PingData data = message.data(); data = data.withAckType(AckType.DEST_OK); if (!data.getTo().id().equals(localMember.id())) { - LOGGER.warn( - "Received Ping[{}] to {}, but local member is {}", period, data.getTo(), localMember); + LOGGER.debug( + "[{}][{}] Received Ping from {} to {}, but local member is {}", + localMember, + period, + sender, + data.getTo(), + localMember); data = data.withAckType(AckType.DEST_GONE); } String correlationId = message.correlationId(); Message ackMessage = Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); Address address = data.getFrom().address(); - LOGGER.trace("Send PingAck[{}] to {} [at {}]", period, address, localMember); + LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); transport .send(address, ackMessage) .subscribe( null, ex -> LOGGER.debug( - "Failed to send PingAck[{}] to {} [at {}], cause: {}", + "[{}][{}] Failed to send PingAck to {}, cause: {}", + localMember, period, address, - localMember, ex.toString())); } /** Listens to PING_REQ message and sends PING to requested cluster member. */ private void onPingReq(Message message) { long period = this.currentPeriod; - LOGGER.trace("Received PingReq[{}] [at {}]", period, localMember); + LOGGER.debug("[{}][{}] Received PingReq from {}", localMember, period, message.sender()); PingData data = message.data(); Member target = data.getTo(); Member originalIssuer = data.getFrom(); @@ -263,14 +270,15 @@ private void onPingReq(Message message) { Message pingMessage = Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); Address address = target.address(); - LOGGER.trace("Send transit Ping[{}] to {} [at {}]", period, address, localMember); + LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); transport .send(address, pingMessage) .subscribe( null, ex -> LOGGER.debug( - "Failed to send transit Ping[{}] to {}, cause: {}", + "[{}][{}] Failed to send transit Ping to {}, cause: {}", + localMember, period, address, ex.toString())); @@ -282,7 +290,8 @@ private void onPingReq(Message message) { */ private void onTransitPingAck(Message message) { long period = this.currentPeriod; - LOGGER.trace("Received transit PingAck[{}] [at {}]", period, localMember); + LOGGER.debug( + "[{}][{}] Received transit PingAck from {}", localMember, period, message.sender()); PingData data = message.data(); AckType ackType = data.getAckType(); Member target = data.getOriginalIssuer(); @@ -291,21 +300,22 @@ private void onTransitPingAck(Message message) { Message originalAckMessage = Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); Address address = target.address(); - LOGGER.trace("Resend transit PingAck[{}] to {} [at {}]", period, address, localMember); + LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); transport .send(address, originalAckMessage) .subscribe( null, ex -> LOGGER.debug( - "Failed to resend transit PingAck[{}] to {}, cause: {}", + "[{}][{}] Failed to resend transit PingAck to {}, cause: {}", + localMember, period, address, ex.toString())); } private void onError(Throwable throwable) { - LOGGER.error("Received unexpected error: ", throwable); + LOGGER.error("[{}][{}] Received unexpected error:", localMember, currentPeriod, throwable); } private void onMemberEvent(MembershipEvent event) { @@ -314,10 +324,11 @@ private void onMemberEvent(MembershipEvent event) { boolean removed = pingMembers.remove(member); if (removed) { LOGGER.debug( - "Removed {} from pingMembers list (size={}) [at {}]", + "[{}][{}] Removed {} from pingMembers list (size={})", + localMember, + currentPeriod, member, - pingMembers.size(), - localMember); + pingMembers.size()); } } if (event.isAdded()) { @@ -326,10 +337,11 @@ private void onMemberEvent(MembershipEvent event) { int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0; pingMembers.add(index, member); LOGGER.debug( - "Added {} to pingMembers list (size={}) [at {}]", + "[{}][{}] Added {} to pingMembers list (size={})", + localMember, + currentPeriod, member, - pingMembers.size(), - localMember); + pingMembers.size()); } } @@ -363,7 +375,7 @@ private List selectPingReqMembers(Member pingMember) { } private void publishPingResult(long period, Member member, MemberStatus status) { - LOGGER.debug("Member {} detected as {} period={} [at {}]", member, status, period, localMember); + LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status); sink.next(new FailureDetectorEvent(member, status)); } @@ -384,7 +396,8 @@ private MemberStatus computeMemberStatus(Message message, long period) { memberStatus = MemberStatus.DEAD; break; default: - LOGGER.warn("Unknown PingData.AckType received '{}' at [{}]", ackType, period); + LOGGER.warn( + "[{}][{}] Unknown PingData.AckType received '{}'", localMember, period, ackType); memberStatus = MemberStatus.SUSPECT; } return memberStatus; diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 25aa5c1d..d6edd8b8 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -157,7 +157,7 @@ private void doSpreadGossip() { // Sweep gossips Set gossipsToRemove = getGossipsToRemove(period); if (!gossipsToRemove.isEmpty()) { - LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove); + LOGGER.debug("[{}][{}] Sweep gossips: {}", localMember, period, gossipsToRemove); for (String gossipId : gossipsToRemove) { gossips.remove(gossipId); } @@ -166,7 +166,11 @@ private void doSpreadGossip() { // Check spread gossips Set gossipsThatSpread = getGossipsThatMostLikelyDisseminated(period); if (!gossipsThatSpread.isEmpty()) { - LOGGER.debug("Most likely disseminated gossips[{}]: {}", period, gossipsThatSpread); + LOGGER.debug( + "[{}][{}] Most likely disseminated gossips: {}", + localMember, + period, + gossipsThatSpread); for (String gossipId : gossipsThatSpread) { MonoSink sink = futures.remove(gossipId); if (sink != null) { @@ -175,7 +179,7 @@ private void doSpreadGossip() { } } } catch (Exception ex) { - LOGGER.warn("Exception at doSpreadGossip[{}]: ", period, ex); + LOGGER.warn("[{}][{}][doSpreadGossip] Exception occurred:", localMember, period, ex); } } @@ -219,8 +223,10 @@ private void checkGossipSegmentation() { final SequenceIdCollector sequenceIdCollector = entry.getValue(); if (sequenceIdCollector.size() > intervalsThreshold) { LOGGER.warn( - "Too many missed gossip messages from original gossiper: '{}', " + "[{}][{}] Too many missed gossip messages from original gossiper: '{}', " + "current node({}) was SUSPECTED much for a long time or connection problem", + localMember, + currentPeriod, entry.getKey(), localMember); @@ -235,17 +241,27 @@ private void onMemberEvent(MembershipEvent event) { boolean removed = remoteMembers.remove(member); sequenceIdCollectors.remove(member.id()); if (removed) { - LOGGER.debug("Removed {} from remoteMembers list (size={})", member, remoteMembers.size()); + LOGGER.debug( + "[{}][{}] Removed {} from remoteMembers list (size={})", + localMember, + currentPeriod, + member, + remoteMembers.size()); } } if (event.isAdded()) { remoteMembers.add(member); - LOGGER.debug("Added {} to remoteMembers list (size={})", member, remoteMembers.size()); + LOGGER.debug( + "[{}][{}] Added {} to remoteMembers list (size={})", + localMember, + currentPeriod, + member, + remoteMembers.size()); } } private void onError(Throwable throwable) { - LOGGER.error("Received unexpected error: ", throwable); + LOGGER.error("[{}][{}] Received unexpected error:", localMember, currentPeriod, throwable); } // ================================================ @@ -284,7 +300,8 @@ private void spreadGossipsTo(long period, Member member) { null, ex -> LOGGER.debug( - "Failed to send GossipReq[{}]: {} to {}, cause: {}", + "[{}][{}] Failed to send GossipReq({}) to {}, cause: {}", + localMember, period, message, address, diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index bc0f3d91..837902e2 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -53,8 +53,6 @@ public final class MembershipProtocolImpl implements MembershipProtocol { private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class); - private static final Logger LOGGER_MEMBERSHIP = - LoggerFactory.getLogger("io.scalecube.cluster.Membership"); private enum MembershipUpdateReason { FAILURE_DETECTOR_EVENT, @@ -196,7 +194,7 @@ private boolean checkAddressesNotEqual(Address address0, Address address1) { if (!address0.equals(address1)) { return true; } else { - LOGGER.warn("Filtering out seed address: {}", address0); + LOGGER.warn("[{}] Filtering out seed address: {}", localMember, address0); return false; } } @@ -263,7 +261,7 @@ private void start0(MonoSink sink) { return; } // If seed addresses are specified in config - send initial sync to those nodes - LOGGER.debug("Making initial Sync to all seed members: {}", seedMembers); + LOGGER.debug("[{}] Making initial Sync to all seed members: {}", localMember, seedMembers); //noinspection unchecked Mono[] syncs = @@ -289,7 +287,10 @@ private void start0(MonoSink sink) { sink.success(); }) .subscribe( - null, ex -> LOGGER.debug("Exception on initial SyncAck, cause: {}", ex.toString())); + null, + ex -> + LOGGER.debug( + "[{}] Exception on initial SyncAck, cause: {}", localMember, ex.toString())); } @Override @@ -345,14 +346,17 @@ private void doSync() { Address address = addressOptional.get(); Message message = prepareSyncDataMsg(SYNC, null); - LOGGER.debug("Send Sync: {} to {}", message, address); + LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address); transport .send(address, message) .subscribe( null, ex -> LOGGER.debug( - "Failed to send Sync: {} to {}, cause: {}", message, address, ex.toString())); + "[{}][doSync] Failed to send Sync to {}, cause: {}", + localMember, + address, + ex.toString())); } // ================================================ @@ -379,7 +383,7 @@ private void onMessage(Message message) { private Mono onSyncAck(Message syncAckMsg, boolean onStart) { return Mono.defer( () -> { - LOGGER.debug("Received SyncAck: {}", syncAckMsg); + LOGGER.debug("[{}] Received SyncAck from {}", localMember, syncAckMsg.sender()); return syncMembership(syncAckMsg.data(), onStart); }); } @@ -388,21 +392,21 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) { private Mono onSync(Message syncMsg) { return Mono.defer( () -> { - LOGGER.debug("Received Sync: {}", syncMsg); + final Address sender = syncMsg.sender(); + LOGGER.debug("[{}] Received Sync from {}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( avoid -> { Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); - Address address = syncMsg.sender(); transport - .send(address, message) + .send(sender, message) .subscribe( null, ex -> LOGGER.debug( - "Failed to send SyncAck: {} to {}, cause: {}", - message, - address, + "[{}] Failed to send SyncAck to {}, cause: {}", + localMember, + sender, ex.toString())); }); }); @@ -417,7 +421,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { if (r0.status() == fdEvent.status()) { // status not changed return; } - LOGGER.debug("Received status change on failure detector event: {}", fdEvent); + LOGGER.debug("[{}][onFailureDetectorEvent] Received status change: {}", localMember, fdEvent); if (fdEvent.status() == ALIVE) { // TODO: Consider to make more elegant solution // Alive won't override SUSPECT so issue instead extra sync with member to force it spread @@ -430,7 +434,10 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { null, ex -> LOGGER.debug( - "Failed to send {} to {}, cause: {}", syncMsg, address, ex.toString())); + "[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}", + localMember, + address, + ex.toString())); } else { MembershipRecord record = new MembershipRecord(r0.member(), fdEvent.status(), r0.incarnation()); @@ -443,7 +450,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { private void onMembershipGossip(Message message) { if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) { MembershipRecord record = message.data(); - LOGGER.debug("Received membership gossip: {}", record); + LOGGER.debug("[{}] Received membership gossip: {}", localMember, record); updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP) .subscribe(null, this::onError); } @@ -467,7 +474,7 @@ private Optional
selectSyncAddress() { // ================================================ private void onError(Throwable throwable) { - LOGGER.error("Received unexpected error: ", throwable); + LOGGER.error("[{}] Received unexpected error:", localMember, throwable); } @SuppressWarnings("unused") @@ -527,8 +534,10 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason // if current record is LEAVING then we want to process other event too // Check if new record r1 overrides existing membership record r0 if ((r0 == null || !r0.isLeaving()) && !r1.isOverrides(r0)) { - LOGGER_MEMBERSHIP.debug( - "(update reason: {}) skipping update, can't override r0: {} with received r1: {}", + LOGGER.debug( + "[{}][updateMembership][{}] Skipping update, " + + "can't override r0: {} with received r1: {}", + localMember, reason, r0, r1); @@ -572,9 +581,10 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason .fetchMetadata(r1.member()) .doOnError( ex -> - LOGGER_MEMBERSHIP.debug( - "(update reason: {}) skipping to add/update member: {}, " + LOGGER.debug( + "[{}][updateMembership][{}] Skipping to add/update member: {}, " + "due to failed fetchMetadata call (cause: {})", + localMember, reason, r1, ex.toString())) @@ -626,9 +636,11 @@ private Mono onSelfMemberDetected( membershipTable.put(localMember.id(), r2); - LOGGER_MEMBERSHIP.debug( - "(update reason: {}) updating incarnation, local record r0: {} to received r1: {}, " + LOGGER.debug( + "[{}][updateMembership][{}] Updating incarnation, " + + "local record r0: {} to received r1: {}, " + "spreading with increased incarnation r2: {}", + localMember, reason, r0, r1, @@ -664,7 +676,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { } private void publishEvent(MembershipEvent event) { - LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event); + LOGGER.debug("[{}][publishEvent] {}", localMember, event); sink.next(event); } @@ -687,9 +699,9 @@ private Mono onDeadMemberDetected(MembershipRecord r1) { // Log that member leaved gracefully or without notification if (r0.isLeaving()) { - LOGGER_MEMBERSHIP.debug("Member leaved gracefully: {}", member); + LOGGER.debug("[{}] Member leaved gracefully: {}", localMember, member); } else { - LOGGER_MEMBERSHIP.warn("Member leaved without notification: {}", member); + LOGGER.warn("[{}] Member leaved without notification: {}", localMember, member); } final long timestamp = System.currentTimeMillis(); @@ -728,7 +740,7 @@ private void onAliveMemberDetected( private void cancelSuspicionTimeoutTask(String memberId) { Disposable future = suspicionTimeoutTasks.remove(memberId); if (future != null && !future.isDisposed()) { - LOGGER.debug("Cancelled SuspicionTimeoutTask for {}", memberId); + LOGGER.debug("[{}] Cancelled SuspicionTimeoutTask for {}", localMember, memberId); future.dispose(); } } @@ -744,7 +756,10 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord r) { r.member().id(), id -> { LOGGER.debug( - "Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", id, suspicionTimeout); + "[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", + localMember, + id, + suspicionTimeout); return scheduler.schedule( () -> onSuspicionTimeout(id), suspicionTimeout, TimeUnit.MILLISECONDS); }); @@ -754,7 +769,7 @@ private void onSuspicionTimeout(String memberId) { suspicionTimeoutTasks.remove(memberId); MembershipRecord r = membershipTable.get(memberId); if (r != null) { - LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", r); + LOGGER.debug("[{}] Declare SUSPECTED member {} as DEAD by timeout", localMember, r); MembershipRecord deadRecord = new MembershipRecord(r.member(), DEAD, r.incarnation()); updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT) .subscribe(null, this::onError); @@ -774,14 +789,14 @@ private Mono spreadMembershipGossip(MembershipRecord r) { return Mono.defer( () -> { Message msg = Message.withData(r).qualifier(MEMBERSHIP_GOSSIP).build(); - LOGGER.debug("Spead membreship: {} with gossip", msg); + LOGGER.debug("[{}] Send membership with gossip", localMember); return gossipProtocol .spread(msg) .doOnError( ex -> LOGGER.debug( - "Failed to spread membership: {} with gossip, cause: {}", - msg, + "[{}] Failed to send membership with gossip, cause: {}", + localMember, ex.toString())) .then(); }); diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index e3ab220e..1757bc69 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -80,17 +80,12 @@ public void start() { // Subscribe actionsDisposables.add( // Listen to incoming get_metadata requests from other members - transport - .listen() // - .publishOn(scheduler) - .subscribe(this::onMessage, this::onError)); + transport.listen().publishOn(scheduler).subscribe(this::onMessage, this::onError)); } @Override public void stop() { - // Stop accepting requests actionsDisposables.dispose(); - // Clear metadata hash map membersMetadata.clear(); } @@ -119,13 +114,12 @@ public ByteBuffer updateMetadata(Member member, ByteBuffer metadata) { ByteBuffer value = metadata.slice(); ByteBuffer result = membersMetadata.put(member, value); - // updated if (result == null) { LOGGER.debug( - "Added metadata: {} for member {} [at {}]", value.remaining(), member, localMember); + "[{}] Added metadata(size={}) for member {}", localMember, value.remaining(), member); } else { LOGGER.debug( - "Updated metadata: {} for member {} [at {}]", value.remaining(), member, localMember); + "[{}] Updated metadata(size={}) for member {}", localMember, value.remaining(), member); } return result; } @@ -138,7 +132,11 @@ public ByteBuffer removeMetadata(Member member) { // remove ByteBuffer metadata = membersMetadata.remove(member); if (metadata != null) { - LOGGER.debug("Removed metadata for member {} [at {}]", member, localMember); + LOGGER.debug( + "[{}] Removed metadata(size={}) for member {}", + localMember, + metadata.remaining(), + member); return metadata; } return null; @@ -146,13 +144,13 @@ public ByteBuffer removeMetadata(Member member) { @Override public Mono fetchMetadata(Member member) { - return Mono.create( - sink -> { - LOGGER.debug("Getting metadata for member {} [at {}]", member, localMember); - - // Increment counter + return Mono.defer( + () -> { final String cid = cidGenerator.nextCid(); final Address targetAddress = member.address(); + + LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); + Message request = Message.builder() .qualifier(GET_METADATA_REQ) @@ -160,32 +158,29 @@ public Mono fetchMetadata(Member member) { .data(new GetMetadataRequest(member)) .build(); - transport + return transport .requestResponse(targetAddress, request) .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler) .publishOn(scheduler) - .subscribe( - response -> { - LOGGER.debug( - "Received GetMetadataResp[{}] from {} [at {}]", - cid, - targetAddress, - localMember); - GetMetadataResponse respData = response.data(); - ByteBuffer metadata = respData.getMetadata(); - sink.success(metadata); - }, - th -> { - LOGGER.warn( - "Failed getting GetMetadataResp[{}] " - + "from {} within {} ms [at {}], cause : {}", - cid, - targetAddress, - config.metadataTimeout(), - localMember, - th.toString()); - sink.error(th); - }); + .doOnSuccess( + s -> + LOGGER.debug( + "[{}][{}] Received GetMetadataResp from {}", + localMember, + cid, + targetAddress)) + .map(Message::data) + .map(GetMetadataResponse::getMetadata) + .doOnError( + th -> + LOGGER.warn( + "[{}][{}] Timeout getting GetMetadataResp " + + "from {} within {} ms, cause: {}", + localMember, + cid, + targetAddress, + config.metadataTimeout(), + th.toString())); }); } @@ -200,11 +195,12 @@ private void onMessage(Message message) { } private void onError(Throwable throwable) { - LOGGER.error("Received unexpected error: ", throwable); + LOGGER.error("[{}] Received unexpected error:", localMember, throwable); } private void onMetadataRequest(Message message) { - LOGGER.debug("Received GetMetadataReq: {} [at {}]", message, localMember); + final Address sender = message.sender(); + LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); GetMetadataRequest reqData = message.data(); Member targetMember = reqData.getMember(); @@ -212,8 +208,9 @@ private void onMetadataRequest(Message message) { // Validate target member if (!targetMember.id().equals(localMember.id())) { LOGGER.warn( - "Received GetMetadataReq: {} to {}, but local member is {}", - message, + "[{}] Received GetMetadataReq from {} to {}, but local member is {}", + localMember, + sender, targetMember, localMember); return; @@ -229,18 +226,16 @@ private void onMetadataRequest(Message message) { .data(respData) .build(); - Address responseAddress = message.sender(); - LOGGER.debug("Send GetMetadataResp: {} to {} [at {}]", response, responseAddress, localMember); + LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); transport - .send(responseAddress, response) + .send(sender, response) .subscribe( null, ex -> LOGGER.debug( - "Failed to send GetMetadataResp: {} to {} [at {}], cause: {}", - response, - responseAddress, + "[{}] Failed to send GetMetadataResp to {}, cause: {}", localMember, + sender, ex.toString())); } diff --git a/cluster/src/test/resources/log4j2-test-trace.xml b/cluster/src/test/resources/log4j2-test-trace.xml deleted file mode 100644 index ac7925a5..00000000 --- a/cluster/src/test/resources/log4j2-test-trace.xml +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - %level{length=1} %date{MMdd-HHmm:ss,SSS} %logger{1.} %message [%thread]%n - - - - - - - - - - - - - - - - - diff --git a/cluster/src/test/resources/log4j2-test.xml b/cluster/src/test/resources/log4j2-test.xml index 9accca44..53878182 100644 --- a/cluster/src/test/resources/log4j2-test.xml +++ b/cluster/src/test/resources/log4j2-test.xml @@ -12,7 +12,11 @@ - + + + + + diff --git a/examples/src/main/resources/log4j2.xml b/examples/src/main/resources/log4j2.xml index adcd2a57..53878182 100644 --- a/examples/src/main/resources/log4j2.xml +++ b/examples/src/main/resources/log4j2.xml @@ -11,7 +11,12 @@ - + + + + + + diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java index b27654a4..7db73d0f 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java @@ -17,7 +17,7 @@ final class ExceptionHandler extends ChannelDuplexHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionHandler.class); @Override - public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOGGER.debug("Exception caught for channel {}, {}", ctx.channel(), cause.toString()); + public final void exceptionCaught(ChannelHandlerContext ctx, Throwable ex) { + LOGGER.debug("Exception caught for channel {}, cause: {}", ctx.channel(), ex.toString()); } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 3af8d0e6..1470ace1 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -113,7 +113,8 @@ private TransportImpl(DisposableServer server, TransportImpl other) { // Setup cleanup stop.then(doStop()) .doFinally(s -> onStop.onComplete()) - .subscribe(null, ex -> LOGGER.warn("Exception occurred on transport stop: " + ex)); + .subscribe( + null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); } private static Address prepareAddress(DisposableServer server) { @@ -180,23 +181,15 @@ public Mono bind0() { return newTcpServer() .handle(this::onMessage) .bind() - .doOnSuccess( - server -> { - InetAddress address = server.address().getAddress(); - if (address.isAnyLocalAddress()) { - LOGGER.debug("Bound cluster transport on *:{}", server.port()); - } else { - LOGGER.debug( - "Bound cluster transport on {}:{}", address.getHostAddress(), server.port()); - } - }) .doOnError( ex -> LOGGER.error( "Failed to bind cluster transport on port={}, cause: {}", config.port(), ex.toString())) - .map(server -> new TransportImpl(server, this)); + .map(server -> new TransportImpl(server, this)) + .doOnSuccess(t -> LOGGER.debug("[{}] Bound cluster transport", address)) + .cast(Transport.class); } @Override @@ -221,13 +214,13 @@ public final Mono stop() { private Mono doStop() { return Mono.defer( () -> { - LOGGER.debug("Transport is shutting down on {}", address); + LOGGER.debug("[{}] Transport is shutting down", address); // Complete incoming messages observable messageSink.complete(); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) - .doOnSuccess(avoid -> LOGGER.debug("Transport has been shut down on {}", address)); + .doOnSuccess(avoid -> LOGGER.debug("[{}] Transport has been shut down", address)); }); } @@ -238,7 +231,11 @@ public final Flux listen() { @Override public Mono send(Address address, Message message) { - return getOrConnect(address).flatMap(conn -> send0(conn, message)).then(); + return connections + .computeIfAbsent(address, this::connect0) + .map(Connection::outbound) + .flatMap(out -> out.send(Mono.just(message).map(this::toByteBuf), bb -> true).then()) + .then(); } @Override @@ -281,17 +278,11 @@ private Message toMessage(ByteBuf byteBuf) { try (ByteBufInputStream stream = new ByteBufInputStream(byteBuf, true)) { return messageCodec.deserialize(stream); } catch (Exception e) { - LOGGER.warn( - "Message codec exception occurred at converting bytes to message: " + e.toString()); + LOGGER.warn("[{}][toMessage] Exception occurred: {}", address, e.toString()); throw new DecoderException(e); } } - private Mono send0(Connection conn, Message message) { - // do send - return conn.outbound().send(Mono.just(message).map(this::toByteBuf), bb -> true).then(); - } - private ByteBuf toByteBuf(Message message) { ByteBuf bb = ByteBufAllocator.DEFAULT.buffer(); ByteBufOutputStream stream = new ByteBufOutputStream(bb); @@ -299,33 +290,29 @@ private ByteBuf toByteBuf(Message message) { messageCodec.serialize(message, stream); } catch (Exception e) { bb.release(); - LOGGER.warn( - "Message codec exception occurred at converting message to bytes: " + e.toString()); + LOGGER.warn("[{}][toByteBuf] Exception occurred: {}", address, e.toString()); throw new EncoderException(e); } return bb; } - private Mono getOrConnect(Address address) { - return Mono.create( - sink -> - connections - .computeIfAbsent(address, this::connect0) - .subscribe(sink::success, sink::error)); - } - private Mono connect0(Address address) { return newTcpClient(address) .doOnDisconnected( c -> { - LOGGER.debug("Disconnected from: {} {}", address, c.channel()); + LOGGER.debug("[{}] Disconnected from: {}, {}", this.address, address, c.channel()); connections.remove(address); }) - .doOnConnected(c -> LOGGER.debug("Connected to {}: {}", address, c.channel())) + .doOnConnected( + c -> LOGGER.debug("[{}] Connected to {}, {}", this.address, address, c.channel())) .connect() .doOnError( th -> { - LOGGER.debug("Failed to connect on address {}, cause: {}", address, th.toString()); + LOGGER.debug( + "[{}][connect0][{}] Exception occurred: {}", + this.address, + address, + th.toString()); connections.remove(address); }) .cache(); @@ -337,17 +324,17 @@ private Mono closeServer() { if (server == null) { return Mono.empty(); } - server.dispose(); - return server.onDispose().doOnError(e -> LOGGER.warn("Failed to close server: " + e)); + return Mono.fromRunnable(server::dispose) + .then(server.onDispose()) + .doOnError( + e -> + LOGGER.warn( + "[{}][closeServer] Exception occurred: {}", address, e.toString())); }); } private Mono shutdownLoopResources() { - return Mono.defer( - () -> { - loopResources.dispose(); - return loopResources.disposeLater(); - }); + return Mono.fromRunnable(loopResources::dispose).then(loopResources.disposeLater()); } /** diff --git a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml b/transport-parent/transport-netty/src/test/resources/log4j2-test.xml index adcd2a57..53878182 100644 --- a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml +++ b/transport-parent/transport-netty/src/test/resources/log4j2-test.xml @@ -11,7 +11,12 @@ - + + + + + +