Skip to content

Commit

Permalink
Merge pull request #297 from scalecube/update/logging
Browse files Browse the repository at this point in the history
Updated logging
  • Loading branch information
artem-v authored Jan 23, 2020
2 parents a7fba99 + 3f91dfd commit cfb058d
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 93 deletions.
8 changes: 4 additions & 4 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,21 +479,21 @@ public void shutdown() {
private Mono<Void> doShutdown() {
return Mono.defer(
() -> {
LOGGER.debug("[{}] Cluster member is shutting down", localMember);
LOGGER.info("[{}] Cluster member is shutting down", localMember);
return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop())
.then()
.doFinally(s -> scheduler.dispose())
.doOnSuccess(
avoid -> LOGGER.debug("[{}] Cluster member has been shutdown", localMember));
avoid -> LOGGER.info("[{}] Cluster member has been shutdown", localMember));
});
}

private Mono<Void> leaveCluster() {
return membership
.leaveCluster()
.subscribeOn(scheduler)
.doOnSubscribe(s -> LOGGER.debug("[{}] Cluster member is leaving a cluster", localMember))
.doOnSuccess(s -> LOGGER.debug("[{}] Cluster member has left a cluster", localMember))
.doOnSubscribe(s -> LOGGER.info("[{}] Cluster member is leaving a cluster", localMember))
.doOnSuccess(s -> LOGGER.info("[{}] Cluster member has left a cluster", localMember))
.doOnError(
ex ->
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void doPing() {
publishPingResult(period, pingMember, computeMemberStatus(message, period));
},
ex -> {
LOGGER.debug(
LOGGER.warn(
"[{}][{}] Failed to get PingAck from {} within {} ms",
localMember,
period,
Expand Down Expand Up @@ -198,7 +198,7 @@ private void doPingReq(
publishPingResult(period, pingMember, computeMemberStatus(message, period));
},
throwable -> {
LOGGER.debug(
LOGGER.warn(
"[{}][{}] Timeout getting transit PingAck from {} to {} within {} ms",
localMember,
period,
Expand Down Expand Up @@ -250,7 +250,7 @@ private void onPing(Message message) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][{}] Failed to send PingAck to {}, cause: {}",
localMember,
period,
Expand All @@ -276,7 +276,7 @@ private void onPingReq(Message message) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][{}] Failed to send transit Ping to {}, cause: {}",
localMember,
period,
Expand Down Expand Up @@ -306,7 +306,7 @@ private void onTransitPingAck(Message message) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][{}] Failed to resend transit PingAck to {}, cause: {}",
localMember,
period,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private void spreadGossipsTo(long period, Member member) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][{}] Failed to send GossipReq({}) to {}, cause: {}",
localMember,
period,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void start0(MonoSink<Object> sink) {
return;
}
// If seed addresses are specified in config - send initial sync to those nodes
LOGGER.debug("[{}] Making initial Sync to all seed members: {}", localMember, seedMembers);
LOGGER.info("[{}] Making initial Sync to all seed members: {}", localMember, seedMembers);

//noinspection unchecked
Mono<Message>[] syncs =
Expand Down Expand Up @@ -289,7 +289,7 @@ private void start0(MonoSink<Object> sink) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}] Exception on initial SyncAck, cause: {}", localMember, ex.toString()));
}

Expand Down Expand Up @@ -352,7 +352,7 @@ private void doSync() {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][doSync] Failed to send Sync to {}, cause: {}",
localMember,
address,
Expand Down Expand Up @@ -403,7 +403,7 @@ private Mono<Void> onSync(Message syncMsg) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}] Failed to send SyncAck to {}, cause: {}",
localMember,
sender,
Expand Down Expand Up @@ -433,7 +433,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
.subscribe(
null,
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}",
localMember,
address,
Expand Down Expand Up @@ -581,7 +581,7 @@ private Mono<Void> updateMembership(MembershipRecord r1, MembershipUpdateReason
.fetchMetadata(r1.member())
.doOnError(
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}][updateMembership][{}] Skipping to add/update member: {}, "
+ "due to failed fetchMetadata call (cause: {})",
localMember,
Expand Down Expand Up @@ -676,7 +676,7 @@ private Mono<Void> onLeavingDetected(MembershipRecord r0, MembershipRecord r1) {
}

private void publishEvent(MembershipEvent event) {
LOGGER.debug("[{}][publishEvent] {}", localMember, event);
LOGGER.info("[{}][publishEvent] {}", localMember, event);
sink.next(event);
}

Expand All @@ -699,9 +699,9 @@ private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {

// Log that member leaved gracefully or without notification
if (r0.isLeaving()) {
LOGGER.debug("[{}] Member leaved gracefully: {}", localMember, member);
LOGGER.info("[{}] Member leaved gracefully: {}", localMember, member);
} else {
LOGGER.warn("[{}] Member leaved without notification: {}", localMember, member);
LOGGER.info("[{}] Member leaved without notification: {}", localMember, member);
}

final long timestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -756,7 +756,7 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord r) {
r.member().id(),
id -> {
LOGGER.debug(
"[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}",
"[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout: {}",
localMember,
id,
suspicionTimeout);
Expand Down Expand Up @@ -794,7 +794,7 @@ private Mono<Void> spreadMembershipGossip(MembershipRecord r) {
.spread(msg)
.doOnError(
ex ->
LOGGER.debug(
LOGGER.warn(
"[{}] Failed to send membership with gossip, cause: {}",
localMember,
ex.toString()))
Expand Down Expand Up @@ -864,10 +864,7 @@ private List<Member> getSuspectedMembers() {
}

private List<Member> getRemovedMembers() {
return removedMembersHistory //
.stream()
.map(MembershipEvent::member)
.collect(Collectors.toList());
return removedMembersHistory.stream().map(MembershipEvent::member).collect(Collectors.toList());
}

private List<Member> findRecordsByCondition(Predicate<MembershipRecord> condition) {
Expand Down
17 changes: 4 additions & 13 deletions cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,7 @@ public void testJoinDynamicPort() {
} finally {
// Shutdown all nodes
shutdown(
Stream.concat(
Stream.of(seedNode), //
otherNodes.stream())
.collect(Collectors.toList()));
Stream.concat(Stream.of(seedNode), otherNodes.stream()).collect(Collectors.toList()));
}
}

Expand Down Expand Up @@ -243,9 +240,7 @@ public void onMembershipEvent(MembershipEvent event) {
} finally {
// Shutdown all nodes
shutdown(
Stream.concat(
Stream.of(seedNode, metadataNode), //
otherNodes.stream())
Stream.concat(Stream.of(seedNode, metadataNode), otherNodes.stream())
.collect(Collectors.toList()));
}
}
Expand Down Expand Up @@ -321,9 +316,7 @@ public void onMembershipEvent(MembershipEvent event) {
} finally {
// Shutdown all nodes
shutdown(
Stream.concat(
Stream.of(seedNode, metadataNode), //
otherNodes.stream())
Stream.concat(Stream.of(seedNode, metadataNode), otherNodes.stream())
.collect(Collectors.toList()));
}
}
Expand Down Expand Up @@ -400,9 +393,7 @@ public void onMembershipEvent(MembershipEvent event) {
} finally {
// Shutdown all nodes
shutdown(
Stream.concat(
Stream.of(seedNode, metadataNode), //
otherNodes.stream())
Stream.concat(Stream.of(seedNode, metadataNode), otherNodes.stream())
.collect(Collectors.toList()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand All @@ -36,8 +35,8 @@ public class FailureDetectorTest extends BaseTest {
private Scheduler scheduler;

@BeforeEach
void setUp(TestInfo testInfo) {
scheduler = Schedulers.newSingle(testInfo.getDisplayName().replaceAll(" ", "_"), true);
void setUp() {
scheduler = Schedulers.newSingle("scheduler", true);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -92,8 +91,8 @@ static Stream<Arguments> experiment() {
private Scheduler scheduler;

@BeforeEach
void setUp(TestInfo testInfo) {
scheduler = Schedulers.newSingle(testInfo.getDisplayName().replaceAll(" ", "_"), true);
void setUp() {
scheduler = Schedulers.newSingle("scheduler", true);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
Expand All @@ -51,8 +50,8 @@ public class MembershipProtocolTest extends BaseTest {
private Scheduler scheduler;

@BeforeEach
void setUp(TestInfo testInfo) {
scheduler = Schedulers.newSingle(testInfo.getDisplayName().replaceAll(" ", "_"), true);
void setUp() {
scheduler = Schedulers.newSingle("scheduler", true);
stopables = new ArrayList<>();
}

Expand Down
20 changes: 9 additions & 11 deletions cluster/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout>
<pattern>%level{length=1} %date{MMdd-HHmm:ss,SSS} %logger{1.} %message [%thread]%n</pattern>
<pattern>%level{length=1} %date{HHmm:ss,SSS} %logger{1.} %message [%thread]%n</pattern>
</PatternLayout>
</Console>
</Appenders>

<Loggers>
<Logger name="io.netty" level="info"/>
<Logger name="reactor.netty" level="info"/>
<Logger name="io.scalecube.cluster.transport" level="info"/>
<Logger name="io.scalecube.cluster.fdetector" level="info"/>
<Logger name="io.scalecube.cluster.gossip" level="info"/>
<Logger name="io.scalecube.cluster.membership" level="info"/>
<Logger name="io.scalecube.cluster.metadata" level="info"/>
<Logger name="io.scalecube.cluster" level="info"/>
<Logger name="io.scalecube.services" level="info"/>
<Logger name="io.netty" level="${env:nettyLogLevel:-error}"/>
<Logger name="reactor.netty" level="${env:reactorNettyLogLevel:-error}"/>
<Logger name="io.scalecube.cluster.transport" level="${env:clusterTransport:-error}"/>
<Logger name="io.scalecube.cluster.fdetector" level="${env:clusterFdetector:-error}"/>
<Logger name="io.scalecube.cluster.gossip" level="${env:clusterGossip:-error}"/>
<Logger name="io.scalecube.cluster.membership" level="${env:clusterMembership:-error}"/>
<Logger name="io.scalecube.cluster.metadata" level="${env:clusterMetadata:-error}"/>

<Root level="info">
<Root level="${env:logLevel:-error}">
<AppenderRef ref="console"/>
</Root>
</Loggers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ public Address sender() {
@Override
public String toString() {
return new StringJoiner(", ", Message.class.getSimpleName() + "[", "]")
.add("headers=" + headers)
.add("data=" + data)
.add("headers(" + headers.size() + ")")
.add("data=" + (data != null ? data.getClass().getName() : null))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Mono<Transport> bind0() {
config.port(),
ex.toString()))
.map(server -> new TransportImpl(server, this))
.doOnSuccess(t -> LOGGER.debug("[{}] Bound cluster transport", address))
.doOnSuccess(t -> LOGGER.info("[{}] Bound cluster transport", t.address()))
.cast(Transport.class);
}

Expand All @@ -214,13 +214,13 @@ public final Mono<Void> stop() {
private Mono<Void> doStop() {
return Mono.defer(
() -> {
LOGGER.debug("[{}] Transport is shutting down", address);
LOGGER.info("[{}] Transport is shutting down", address);
// Complete incoming messages observable
messageSink.complete();
return Flux.concatDelayError(closeServer(), shutdownLoopResources())
.then()
.doFinally(s -> connections.clear())
.doOnSuccess(avoid -> LOGGER.debug("[{}] Transport has been shut down", address));
.doOnSuccess(avoid -> LOGGER.info("[{}] Transport has been shut down", address));
});
}

Expand All @@ -233,6 +233,7 @@ public final Flux<Message> listen() {
public Mono<Void> send(Address address, Message message) {
return connections
.computeIfAbsent(address, this::connect0)
.doOnSubscribe(s -> LOGGER.debug("[{}] Send {} to {}", this.address, message, address))
.map(Connection::outbound)
.flatMap(out -> out.send(Mono.just(message).map(this::toByteBuf), bb -> true).then())
.then();
Expand Down Expand Up @@ -267,11 +268,7 @@ public Mono<Message> requestResponse(Address address, final Message request) {

@SuppressWarnings("unused")
private Mono<Void> onMessage(NettyInbound in, NettyOutbound out) {
return in.receive() //
.retain()
.map(this::toMessage)
.doOnNext(messageSink::next)
.then();
return in.receive().retain().map(this::toMessage).doOnNext(messageSink::next).then();
}

private Message toMessage(ByteBuf byteBuf) {
Expand Down Expand Up @@ -300,15 +297,15 @@ private Mono<? extends Connection> connect0(Address address) {
return newTcpClient(address)
.doOnDisconnected(
c -> {
LOGGER.debug("[{}] Disconnected from: {}, {}", this.address, address, c.channel());
LOGGER.debug("[{}] Disconnected from {}, {}", this.address, address, c.channel());
connections.remove(address);
})
.doOnConnected(
c -> LOGGER.debug("[{}] Connected to {}, {}", this.address, address, c.channel()))
.connect()
.doOnError(
th -> {
LOGGER.debug(
LOGGER.warn(
"[{}][connect0][{}] Exception occurred: {}",
this.address,
address,
Expand Down
Loading

0 comments on commit cfb058d

Please sign in to comment.