Skip to content

Commit

Permalink
fix tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt committed Jun 28, 2023
1 parent 2de5693 commit 0e34292
Showing 1 changed file with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class BlobBatchReceiver<TEvent> where TEvent : Event
readonly EventHubsTraceHelper lowestTraceLevel;
readonly BlobContainerClient containerClient;
readonly bool keepUntilConfirmed;
readonly bool isClientReceiver;

// Event Hubs discards messages after 24h, so we can throw away batches that are older than that
readonly static TimeSpan expirationTimeSpan = TimeSpan.FromHours(24) + TimeSpan.FromMinutes(1);
Expand All @@ -43,6 +44,7 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
this.containerClient = serviceClient.GetBlobContainerClient(containerName);
this.keepUntilConfirmed = keepUntilConfirmed;
this.blobDeletions = this.keepUntilConfirmed ? new BlobDeletions(this) : null;
this.isClientReceiver = typeof(TEvent) == typeof(ClientEvent);
}

public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long)> ReceiveEventsAsync(
Expand All @@ -51,6 +53,8 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
[EnumeratorCancellation] CancellationToken token,
long? nextPacketToReceive = null)
{
int ignoredPacketCount = 0;

foreach (var eventData in hubMessages)
{
var seqno = eventData.SystemProperties.SequenceNumber;
Expand Down Expand Up @@ -87,7 +91,8 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
{
if (evt == null)
{
this.traceHelper.LogWarning("{context} ignored packet #{seqno} for different taskhub or client", this.traceContext, seqno);
this.lowestTraceLevel?.LogTrace("{context} ignored packet #{seqno} ({size} bytes) because its guid does not match taskhub/client", this.traceContext, seqno, eventData.Body.Count);
ignoredPacketCount++;
}
else
{
Expand Down Expand Up @@ -182,6 +187,20 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
nextPacketToReceive = seqno + 1;
}
}

if (ignoredPacketCount > 0)
{
if (this.isClientReceiver)
{
// Ignored packets are very common for clients because multiple clients may share the same partition. We log this only for debug purposes.
this.traceHelper.LogDebug("{context} ignored {count} packets for different client", this.traceContext, ignoredPacketCount);
}
else
{
// Ignored packets may indicate misconfiguration (multiple taskhubs using same EH namespace). We create a visible warning.
this.traceHelper.LogWarning("{context} ignored {count} packets for different taskhub", this.traceContext, ignoredPacketCount);
}
}
}

public async Task<int> DeleteBlobAsync(IEnumerable<BlockBlobClient> blobClients)
Expand Down

0 comments on commit 0e34292

Please sign in to comment.