diff --git a/jaeger-core/src/main/java/io/jaegertracing/internal/reporters/RemoteReporter.java b/jaeger-core/src/main/java/io/jaegertracing/internal/reporters/RemoteReporter.java index de30706d7..8cb1ebd0c 100644 --- a/jaeger-core/src/main/java/io/jaegertracing/internal/reporters/RemoteReporter.java +++ b/jaeger-core/src/main/java/io/jaegertracing/internal/reporters/RemoteReporter.java @@ -21,6 +21,8 @@ import io.jaegertracing.internal.senders.SenderResolver; import io.jaegertracing.spi.Reporter; import io.jaegertracing.spi.Sender; +import java.util.HashSet; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; @@ -166,17 +168,29 @@ public void execute() throws SenderException { @ToString class QueueProcessor implements Runnable { private boolean open = true; + private final Set> commandFailedBefore = new HashSet>(); @Override public void run() { while (open) { try { RemoteReporter.Command command = commandQueue.take(); + Class commandClass = command.getClass(); + boolean failedBefore = commandFailedBefore.contains(commandClass); try { command.execute(); + if (failedBefore) { + log.info(commandClass.getSimpleName() + " is working again!"); + commandFailedBefore.remove(commandClass); + } } catch (SenderException e) { metrics.reporterFailure.inc(e.getDroppedSpanCount()); + if (!failedBefore) { + log.warn(commandClass.getSimpleName() + + " execution failed! Repeated errors of this command will not be logged.", e); + commandFailedBefore.add(commandClass); + } } } catch (Exception e) { log.error("QueueProcessor error:", e); diff --git a/jaeger-core/src/test/java/io/jaegertracing/internal/reporters/RemoteReporterTest.java b/jaeger-core/src/test/java/io/jaegertracing/internal/reporters/RemoteReporterTest.java index e1f863258..5b784ff2e 100644 --- a/jaeger-core/src/test/java/io/jaegertracing/internal/reporters/RemoteReporterTest.java +++ b/jaeger-core/src/test/java/io/jaegertracing/internal/reporters/RemoteReporterTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,15 +41,21 @@ import io.jaegertracing.spi.Reporter; import io.jaegertracing.spi.Sender; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.ArgumentMatchers; import org.slf4j.LoggerFactory; public class RemoteReporterTest { @@ -215,15 +223,8 @@ public void testCloseWhenQueueFull() { @Test public void testCloseLogSenderException() throws SenderException { - // set up mocking - ch.qos.logback.classic.Logger root = - (ch.qos.logback.classic.Logger) LoggerFactory - .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); - - @SuppressWarnings("unchecked") final Appender mockAppender = - mock(Appender.class); - when(mockAppender.getName()).thenReturn("MOCK"); - root.addAppender(mockAppender); + Appender mockAppender = mockLogger(e -> { + }); final Sender mockedSender = mock(Sender.class); when(mockedSender.close()).thenThrow(SenderException.class); @@ -322,6 +323,74 @@ public int flush() throws SenderException { assertEquals("mySpan", (sender.getReceived().get(0)).getOperationName()); } + @Test + public void testFlushErrorsLoggedJustOnce() throws InterruptedException { + + Object logMonitor = new Object(); + AtomicReference logMsg = new AtomicReference<>(null); + mockLogger(e -> { + synchronized (logMonitor) { + logMsg.set(e.getFormattedMessage()); + logMonitor.notifyAll(); + } + }); + + class FailingSender extends InMemorySender { + private final AtomicInteger flushCounter = new AtomicInteger(0); + + @Override + public int flush() throws SenderException { + int i = super.flush(); + switch (flushCounter.getAndIncrement()) { + case 1: + case 2: + case 3: + throw new SenderException("test1", super.flush()); + default: + return i; + } + } + + private String awaitMessage(AtomicReference ref) throws InterruptedException { + synchronized (logMonitor) { + while (ref.get() == null) { + logMonitor.wait(); + } + return ref.getAndSet(null); + } + } + } + + FailingSender sender = new FailingSender(); + + RemoteReporter remoteReporter = new Builder() + .withSender(sender) + .withFlushInterval(Integer.MAX_VALUE) + .withMaxQueueSize(maxQueueSize) + .withMetrics(metrics) + .build(); + tracer = new JaegerTracer.Builder("test-remote-reporter") + .withReporter(remoteReporter) + .withSampler(new ConstSampler(true)) + .withMetrics(metrics) + .build(); + + tracer.buildSpan("mySpan").start().finish(); + remoteReporter.flush(); + + tracer.buildSpan("mySpan").start().finish(); + remoteReporter.flush(); + + assertEquals("FlushCommand execution failed! Repeated errors of this command will not be logged.", + sender.awaitMessage(logMsg)); + + remoteReporter.flush(); + remoteReporter.flush(); + remoteReporter.flush(); + assertEquals("FlushCommand is working again!", sender.awaitMessage(logMsg)); + + } + @Test public void testUpdateSuccessMetricWhenAppendFlushed() throws InterruptedException { int totalSpans = 3; @@ -396,4 +465,21 @@ public int append(JaegerSpan span) throws SenderException { private JaegerSpan newSpan() { return tracer.buildSpan("x").start(); } + + private static Appender mockLogger(Consumer append) { + ch.qos.logback.classic.Logger root = + (ch.qos.logback.classic.Logger) LoggerFactory + .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); + + @SuppressWarnings("unchecked") + final Appender mockAppender = mock(Appender.class); + when(mockAppender.getName()).thenReturn("MOCK"); + doAnswer(i -> { + append.accept(i.getArgument(0)); + return null; + }).when(mockAppender).doAppend(ArgumentMatchers.any(ILoggingEvent.class)); + root.addAppender(mockAppender); + + return mockAppender; + } }