Skip to content

Commit

Permalink
Updated code for Pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
daveaglick committed Dec 11, 2018
1 parent 514d7b5 commit 1e1d7b5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 166 deletions.
20 changes: 0 additions & 20 deletions .appveyor.yml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ PipelinesTestLogger can report test results automatically to the CI build.

## Credit

This project is based on [appveyor.testlogger](https://github.com/spekt/appveyor.testlogger).
This project is based on [appveyor.testlogger](https://github.com/spekt/appveyor.testlogger) and [xunit](https://github.com/xunit/xunit/blob/master/src/xunit.runner.reporters/VstsReporter.cs).
16 changes: 11 additions & 5 deletions src/PipelinesTestLogger/AsyncProducerConsumerCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void Cancel()
waiting.Clear();
}

foreach (var tcs in allWaiting)
foreach (TaskCompletionSource<T[]> tcs in allWaiting)
{
tcs.TrySetResult(new T[] { });
}
Expand All @@ -34,8 +34,14 @@ public void Add(T item)
TaskCompletionSource<T[]> tcs = null;
lock (collection)
{
if (waiting.Count > 0) tcs = waiting.Dequeue();
else collection.Enqueue(item);
if (waiting.Count > 0)
{
tcs = waiting.Dequeue();
}
else
{
collection.Enqueue(item);
}
}

tcs?.TrySetResult(new [] {item});
Expand All @@ -51,13 +57,13 @@ public Task<T[]> TakeAsync()
{
if (collection.Count > 0)
{
var result = Task.FromResult(collection.ToArray());
Task<T[]> result = Task.FromResult(collection.ToArray());
collection.Clear();
return result;
}
else if (canceled == false)
{
var tcs = new TaskCompletionSource<T[]>();
TaskCompletionSource<T[]> tcs = new TaskCompletionSource<T[]>();
waiting.Enqueue(tcs);
return tcs.Task;
}
Expand Down
77 changes: 32 additions & 45 deletions src/PipelinesTestLogger/LoggerQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,45 +10,43 @@ namespace PipelinesTestLogger
{
internal class LoggerQueue
{
private static readonly HttpClient client = new HttpClient();
private static readonly HttpClient _client = new HttpClient();

/// <summary>
/// it is localhost with a random port, e.g. http://localhost:9023/
/// </summary>
private readonly string appveyorApiUrl;
private readonly string _apiUrl;

private readonly AsyncProducerConsumerCollection<string> queue = new AsyncProducerConsumerCollection<string>();
private readonly Task consumeTask;
private readonly CancellationTokenSource consumeTaskCancellationSource = new CancellationTokenSource();
private readonly AsyncProducerConsumerCollection<string> _queue = new AsyncProducerConsumerCollection<string>();
private readonly Task _consumeTask;
private readonly CancellationTokenSource _consumeTaskCancellationSource = new CancellationTokenSource();

private int totalEnqueued = 0;
private int totalSent = 0;

public LoggerQueue(string appveyorApiUrl)
public LoggerQueue(string accessToken, string apiUrl)
{
this.appveyorApiUrl = appveyorApiUrl;
this.consumeTask = ConsumeItemsAsync(consumeTaskCancellationSource.Token);
_client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken);
_apiUrl = apiUrl;
_consumeTask = ConsumeItemsAsync(_consumeTaskCancellationSource.Token);
}

public void Enqueue(string json)
{
queue.Add(json);
_queue.Add(json);
totalEnqueued++;
}

public void Flush()
{
// Cancel any idle consumers and let them return
queue.Cancel();
_queue.Cancel();

try
{
// any active consumer will circle back around and batch post the remaining queue.
consumeTask.Wait(TimeSpan.FromSeconds(60));
// Any active consumer will circle back around and batch post the remaining queue.
_consumeTask.Wait(TimeSpan.FromSeconds(60));

// Cancel any active HTTP requests if still hasn't finished flushing
consumeTaskCancellationSource.Cancel();
if (!consumeTask.Wait(TimeSpan.FromSeconds(10)))
_consumeTaskCancellationSource.Cancel();
if (!_consumeTask.Wait(TimeSpan.FromSeconds(10)))
{
throw new TimeoutException("cancellation didn't happen quickly");
}
Expand All @@ -56,48 +55,36 @@ public void Flush()
{
Console.WriteLine(ex);
}

#if DEBUG
Console.WriteLine("PipelinesTestLogger: {0} test results reported ({1} enqueued).", totalSent, totalEnqueued);
#endif
}

private async Task ConsumeItemsAsync(CancellationToken cancellationToken)
{
while (true)
{
string[] nextItems = await this.queue.TakeAsync();
if (nextItems == null || nextItems.Length == 0) return; // Queue is cancelling and and empty.

if (nextItems.Length == 1) await PostItemAsync(nextItems[0], cancellationToken);
else if (nextItems.Length > 1) await PostBatchAsync(nextItems, cancellationToken);
string[] nextItems = await _queue.TakeAsync();

if (cancellationToken.IsCancellationRequested) return;
}
}

private async Task PostItemAsync(string json, CancellationToken cancellationToken)
{
HttpContent content = new StringContent(json, Encoding.UTF8, "application/json");
try
{
var response = await client.PostAsync(appveyorApiUrl + "api/tests", content, cancellationToken);
response.EnsureSuccessStatusCode();
totalSent += 1;
}
catch (Exception e)
{
Console.WriteLine(e);
if (nextItems == null || nextItems.Length == 0)
{
// Queue is canceling and is empty
return;
}

await PostResultsAsync(nextItems, cancellationToken);

if (cancellationToken.IsCancellationRequested)
{
return;
}
}
}

private async Task PostBatchAsync(ICollection<string> jsonEntities, CancellationToken cancellationToken)
private async Task PostResultsAsync(ICollection<string> jsonEntities, CancellationToken cancellationToken)
{
var jsonArray = "[" + string.Join(",", jsonEntities) + "]";
string jsonArray = "[" + string.Join(",", jsonEntities) + "]";
HttpContent content = new StringContent(jsonArray, Encoding.UTF8, "application/json");
try
{
var response = await client.PostAsync(appveyorApiUrl + "api/tests/batch", content, cancellationToken);
HttpResponseMessage response = await _client.PostAsync(_apiUrl, content, cancellationToken);
response.EnsureSuccessStatusCode();
totalSent += jsonEntities.Count;
}
Expand Down
Loading

0 comments on commit 1e1d7b5

Please sign in to comment.