Skip to content

Commit 6297004

Browse files
authored
Refactor Elasticsearch.Extensions.Logging (#149)
This commit refactors Elasticsearch.Extensions.Logging in preparation for an initial release. - Remove caching loggers in a concurrent dictionary in ElasticsearchLoggerProvider Can evaluate later if the performance improvements in doing this would justify the added complexity. Most notably, there is no mechanism to remove loggers from the dictionary, which for an unknown logger naming convention, could allow it to grow unbounded. - Move ElasticsearchLogger required properties into the ctor - Fix bulk item response serialization - Add package details to Elastic.Ingest
1 parent c0ecf81 commit 6297004

File tree

10 files changed

+99
-107
lines changed

10 files changed

+99
-107
lines changed

src/Elastic.Ingest/ChannelBuffer.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public interface IChannelBuffer
2323
internal class ChannelBuffer<TEvent> : IChannelBuffer, IDisposable
2424
{
2525
private readonly int _maxBufferSize;
26+
private CancellationTokenSource _breaker = new CancellationTokenSource();
2627

2728
public TimeSpan ForceFlushAfter { get; }
2829
public List<TEvent> Buffer { get; }
@@ -65,9 +66,6 @@ private TimeSpan Wait
6566
}
6667
}
6768

68-
private CancellationTokenSource _breaker = new CancellationTokenSource();
69-
70-
7169
/// <summary>
7270
/// Call <see cref="ChannelReader{T}.WaitToReadAsync"/> with a timeout to force a flush to happen every
7371
/// <see cref="ForceFlushAfter"/>. This tries to avoid allocation too many <see cref="CancellationTokenSource"/>'s

src/Elastic.Ingest/Elastic.Ingest.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
5-
<Title>Elasticsearch Buffer backed data shipper</Title>
6-
<Description>TODO</Description>
7-
<PackageTags>TODO</PackageTags>
5+
<Title>A buffer-backed channel for indexing documents into Elasticsearch</Title>
6+
<Description>Provides components to build a buffer-backed channel for indexing documents into Elasticsearch</Description>
7+
<PackageTags>elastic, elasticsearch, ingest, search</PackageTags>
88
<LangVersion>latest</LangVersion>
99
<Nullable>enable</Nullable>
1010
<IsPackable>false</IsPackable>

src/Elastic.Ingest/ElasticsearchChannel.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,7 @@ private void UpdateClient()
221221
config = new TransportConfiguration(connectionPool);
222222
}
223223

224-
config = config.Proxy(new Uri("http://localhost:8080"), "", "");
225-
config = config.EnableDebugMode();
226-
227224
var transport = new Transport<TransportConfiguration>(config);
228-
229225
_ = Interlocked.Exchange(ref _transport, transport);
230226
}
231227

src/Elastic.Ingest/Serialization/Model.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,16 @@ public override BulkResponseItem Read(ref Utf8JsonReader reader, Type typeToConv
9393
break;
9494
}
9595
}
96-
BulkResponseItem r = (status == 200)
96+
BulkResponseItem r = status == 200
9797
? OkayBulkResponseItem
9898
: new BulkResponseItem { Action = action!, Status = status, Error = error };
99-
return r;
10099

100+
return r;
101101
}
102102

103103
public override void Write(Utf8JsonWriter writer, BulkResponseItem value, JsonSerializerOptions options)
104104
{
105-
if (value == null)
105+
if (value is null)
106106
{
107107
writer.WriteNullValue();
108108
return;
@@ -118,9 +118,7 @@ public override void Write(Utf8JsonWriter writer, BulkResponseItem value, JsonSe
118118
JsonSerializer.Serialize(writer, value.Error, options);
119119
}
120120

121-
writer.WritePropertyName("status");
122121
writer.WriteNumber("status", value.Status);
123-
124122
writer.WriteEndObject();
125123
writer.WriteEndObject();
126124
}

src/Elastic.Ingest/ShipTo.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ public ShipTo(IEnumerable<Uri> nodeUris, ConnectionPoolType connectionPoolType)
2323
public ShipTo(string cloudId, string apiKey)
2424
{
2525
if (string.IsNullOrEmpty(cloudId))
26-
throw new ArgumentException("cloudId may not be null.", nameof(cloudId));
26+
throw new ArgumentException("cloudId may not be null or empty.", nameof(cloudId));
2727

2828
if (string.IsNullOrEmpty(apiKey))
29-
throw new ArgumentException("apiKey may not be null.", nameof(apiKey));
29+
throw new ArgumentException("apiKey may not be null or empty.", nameof(apiKey));
3030

3131
CloudId = cloudId;
3232
ApiKey = apiKey;
@@ -36,13 +36,13 @@ public ShipTo(string cloudId, string apiKey)
3636
public ShipTo(string cloudId, string username, string password)
3737
{
3838
if (string.IsNullOrEmpty(cloudId))
39-
throw new ArgumentException("cloudId may not be null.", nameof(cloudId));
39+
throw new ArgumentException("cloudId may not be null or empty.", nameof(cloudId));
4040

4141
if (string.IsNullOrEmpty(username))
42-
throw new ArgumentException("username may not be null.", nameof(username));
42+
throw new ArgumentException("username may not be null or empty.", nameof(username));
4343

4444
if (string.IsNullOrEmpty(password))
45-
throw new ArgumentException("password may not be null.", nameof(password));
45+
throw new ArgumentException("password may not be null or empty.", nameof(password));
4646

4747
CloudId = cloudId;
4848
Username = username;

src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,25 @@ public class ElasticsearchLogger : ILogger
1919
{
2020
private readonly string _categoryName;
2121
private readonly ElasticsearchChannel<LogEvent> _channel;
22-
23-
internal ElasticsearchLogger(string categoryName, ElasticsearchChannel<LogEvent> channel)
22+
private readonly ElasticsearchLoggerOptions _options;
23+
private readonly IExternalScopeProvider? _scopeProvider;
24+
25+
internal ElasticsearchLogger(
26+
string categoryName,
27+
ElasticsearchChannel<LogEvent> channel,
28+
ElasticsearchLoggerOptions options,
29+
IExternalScopeProvider? scopeProvider
30+
)
2431
{
2532
_categoryName = categoryName;
2633
_channel = channel;
34+
_options = options;
35+
_scopeProvider = scopeProvider;
2736
}
2837

29-
internal ElasticsearchLoggerOptions Options { get; set; } = default!;
30-
31-
internal IExternalScopeProvider ScopeProvider { get; set; } = default!;
32-
33-
public IDisposable BeginScope<TState>(TState state) => ScopeProvider.Push(state);
34-
35-
public bool IsEnabled(LogLevel logLevel) => Options.IsEnabled;
38+
public IDisposable? BeginScope<TState>(TState state) => _scopeProvider?.Push(state);
3639

40+
public bool IsEnabled(LogLevel logLevel) => _options.IsEnabled;
3741

3842
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception,
3943
Func<TState, Exception, string> formatter
@@ -42,16 +46,13 @@ Func<TState, Exception, string> formatter
4246
try
4347
{
4448
if (!IsEnabled(logLevel)) return;
45-
46-
if (formatter == null) throw new ArgumentNullException(nameof(formatter));
49+
if (formatter is null) throw new ArgumentNullException(nameof(formatter));
4750

4851
// TODO: Want to render state values (separate from message) to pass to log event, for semantic logging
4952
// Maybe render to JSON in-process, then queue bytes for sending to index ??
5053

51-
var elasticsearchData =
52-
BuildLogEvent(_categoryName, logLevel, eventId, state, exception, formatter);
53-
54-
_channel.TryWrite(elasticsearchData);
54+
var logEvent = BuildLogEvent(_categoryName, logLevel, eventId, state, exception, formatter);
55+
_channel.TryWrite(logEvent);
5556
}
5657
catch (Exception ex)
5758
{
@@ -70,14 +71,12 @@ private static void AddException(Exception exception, LogEvent logEvent)
7071

7172
private void AddScopeValues(LogEvent logEvent)
7273
{
73-
var scopeProvider = ScopeProvider;
74-
if (Options.IncludeScopes && scopeProvider != null)
74+
if (_options.IncludeScopes)
7575
{
76-
scopeProvider.ForEachScope((scope, innerData) =>
76+
_scopeProvider?.ForEachScope((scope, le) =>
7777
{
78-
if (logEvent.Labels == null) logEvent.Labels = new Dictionary<string, string>();
79-
80-
if (logEvent.Scopes == null) logEvent.Scopes = new List<string>();
78+
le.Labels ??= new Dictionary<string, string>();
79+
le.Scopes ??= new List<string>();
8180

8281
var isFormattedLogValues = false;
8382
if (scope is IEnumerable<KeyValuePair<string, object>> scopeValues)
@@ -90,14 +89,14 @@ private void AddScopeValues(LogEvent logEvent)
9089
continue;
9190
}
9291

93-
if (CheckTracingValues(logEvent, kvp)) continue;
92+
if (CheckTracingValues(le, kvp)) continue;
9493

95-
logEvent.Labels[kvp.Key] = FormatValue(kvp.Value);
94+
le.Labels[kvp.Key] = FormatValue(kvp.Value);
9695
}
9796
}
9897

9998
var formattedScope = isFormattedLogValues ? scope.ToString() : FormatValue(scope);
100-
logEvent.Scopes.Add(formattedScope);
99+
le.Scopes.Add(formattedScope);
101100
}, logEvent);
102101
}
103102
}
@@ -117,7 +116,6 @@ private void AddStateValues<TState>(TState state, LogEvent logEvent)
117116
if (CheckTracingValues(logEvent, kvp)) continue;
118117

119118
logEvent.Labels ??= new Dictionary<string, string>();
120-
121119
logEvent.Labels[kvp.Key] = FormatValue(kvp.Value);
122120
}
123121
}
@@ -171,13 +169,13 @@ Func<TState, Exception, string> formatter
171169
logEvent.Agent = LogEventToEcsHelper.GetAgent();
172170
logEvent.Service = LogEventToEcsHelper.GetService();
173171

174-
if (Options.Tags != null && Options.Tags.Length > 0) logEvent.Tags = Options.Tags;
172+
if (_options.Tags != null && _options.Tags.Length > 0) logEvent.Tags = _options.Tags;
175173

176-
if (Options.IncludeHost) logEvent.Host = LogEventToEcsHelper.GetHost();
174+
if (_options.IncludeHost) logEvent.Host = LogEventToEcsHelper.GetHost();
177175

178-
if (Options.IncludeProcess) logEvent.Process = LogEventToEcsHelper.GetProcess();
176+
if (_options.IncludeProcess) logEvent.Process = LogEventToEcsHelper.GetProcess();
179177

180-
if (Options.IncludeUser)
178+
if (_options.IncludeUser)
181179
{
182180
logEvent.User = new User
183181
{
@@ -187,7 +185,7 @@ Func<TState, Exception, string> formatter
187185

188186
AddTracing(logEvent);
189187

190-
if (Options.IncludeScopes) AddScopeValues(logEvent);
188+
if (_options.IncludeScopes) AddScopeValues(logEvent);
191189

192190
// These will overwrite any scope values with the same name
193191
AddStateValues(state, logEvent);
@@ -245,7 +243,7 @@ private string FormatEnumerable(IEnumerable enumerable, int depth)
245243
var index = 0;
246244
foreach (var item in enumerable)
247245
{
248-
if (index > 0) stringBuilder.Append(Options.ListSeparator);
246+
if (index > 0) stringBuilder.Append(_options.ListSeparator);
249247

250248
var value = FormatValue(item, depth);
251249
stringBuilder.Append(value);
@@ -315,7 +313,7 @@ private string FormatValue(object value, int depth = 0)
315313
}
316314
}
317315

318-
private void WriteName(StringBuilder stringBuilder, string name)
316+
private static void WriteName(StringBuilder stringBuilder, string name)
319317
{
320318
foreach (var c in name)
321319
{
@@ -328,7 +326,7 @@ private void WriteName(StringBuilder stringBuilder, string name)
328326
}
329327
}
330328

331-
private void WriteValue(StringBuilder stringBuilder, string value)
329+
private static void WriteValue(StringBuilder stringBuilder, string value)
332330
{
333331
foreach (var c in value)
334332
{

src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class ElasticsearchLoggerOptions
2424
/// </summary>
2525
public bool IncludeUser { get; set; } = true;
2626

27-
//TODO index patters are more complex then this, ILM, write alias, buffer tier, datastreams
27+
//TODO index patterns are more complex then this, ILM, write alias, buffer tier, datastreams
2828
/// <summary>
2929
/// Gets or sets the format string for the Elastic search index. The current <c>DateTimeOffset</c> is passed as parameter
3030
/// 0.

src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,69 +16,59 @@ namespace Elasticsearch.Extensions.Logging
1616
public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScope
1717
{
1818
private readonly IChannelSetup[] _channelConfigurations;
19-
20-
private readonly ConcurrentDictionary<string, ElasticsearchLogger> _loggers;
21-
2219
private readonly IOptionsMonitor<ElasticsearchLoggerOptions> _options;
23-
2420
private readonly IDisposable _optionsReloadToken;
25-
26-
private IExternalScopeProvider _scopeProvider = default!;
21+
private IExternalScopeProvider? _scopeProvider;
2722
private readonly ElasticsearchChannel<LogEvent> _shipper;
2823

2924
public ElasticsearchLoggerProvider(IOptionsMonitor<ElasticsearchLoggerOptions> options,
3025
IEnumerable<IChannelSetup> channelConfigurations
3126
)
3227
{
33-
_options = options;
28+
_options = options ?? throw new ArgumentNullException(nameof(options));
29+
30+
if (channelConfigurations is null)
31+
throw new ArgumentNullException(nameof(channelConfigurations));
32+
3433
_channelConfigurations = channelConfigurations.ToArray();
3534

3635
var channelOptions = CreateChannelOptions(options.CurrentValue, _channelConfigurations);
3736
_shipper = new ElasticsearchChannel<LogEvent>(channelOptions);
3837

39-
_loggers = new ConcurrentDictionary<string, ElasticsearchLogger>();
4038
ReloadLoggerOptions(options.CurrentValue);
41-
_optionsReloadToken = _options.OnChange(ReloadLoggerOptions);
39+
_optionsReloadToken = _options.OnChange(o => ReloadLoggerOptions(o));
4240
}
4341

4442
public static Func<DateTimeOffset> LocalDateTimeProvider { get; set; } = () => DateTimeOffset.UtcNow;
4543

4644
public ILogger CreateLogger(string name) =>
47-
_loggers.GetOrAdd(name,
48-
loggerName =>
49-
new ElasticsearchLogger(name, _shipper) { Options = _options.CurrentValue, ScopeProvider = _scopeProvider });
45+
new ElasticsearchLogger(name, _shipper, _options.CurrentValue, _scopeProvider);
5046

5147
public void Dispose()
5248
{
5349
_optionsReloadToken.Dispose();
5450
_shipper.Dispose();
5551
}
5652

57-
public void SetScopeProvider(IExternalScopeProvider scopeProvider)
58-
{
59-
_scopeProvider = scopeProvider;
60-
foreach (var logger in _loggers) logger.Value.ScopeProvider = scopeProvider;
61-
}
53+
public void SetScopeProvider(IExternalScopeProvider scopeProvider) => _scopeProvider = scopeProvider;
6254

63-
private ElasticsearchChannelOptions<LogEvent> CreateChannelOptions(ElasticsearchLoggerOptions options,
64-
IChannelSetup[] channelConfigurations
65-
)
55+
private static ElasticsearchChannelOptions<LogEvent> CreateChannelOptions(ElasticsearchLoggerOptions options, IChannelSetup[] channelConfigurations)
6656
{
67-
var channelOptions = new ElasticsearchChannelOptions<LogEvent>();
68-
channelOptions.Index = options.Index;
69-
channelOptions.IndexOffset = options.IndexOffset;
70-
channelOptions.ConnectionPoolType = options.ShipTo.ConnectionPoolType;
71-
72-
channelOptions.WriteEvent = async (stream, ctx, l) => await l.SerializeAsync(stream, ctx).ConfigureAwait(false);
73-
channelOptions.TimestampLookup = l => l.Timestamp;
57+
var channelOptions = new ElasticsearchChannelOptions<LogEvent>
58+
{
59+
Index = options.Index,
60+
IndexOffset = options.IndexOffset,
61+
ConnectionPoolType = options.ShipTo.ConnectionPoolType,
62+
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
63+
TimestampLookup = l => l.Timestamp
64+
};
7465

7566
if (options.ShipTo.ConnectionPoolType == ConnectionPoolType.Cloud
7667
|| options.ShipTo.ConnectionPoolType == ConnectionPoolType.Unknown && !string.IsNullOrEmpty(options.ShipTo.CloudId))
7768
{
78-
if (!string.IsNullOrWhiteSpace(options.ShipTo.Username))
79-
channelOptions.ShipTo = new ShipTo(options.ShipTo.CloudId, options.ShipTo.Username, options.ShipTo.Password);
80-
else
81-
channelOptions.ShipTo = new ShipTo(options.ShipTo.CloudId, options.ShipTo.ApiKey);
69+
channelOptions.ShipTo = !string.IsNullOrWhiteSpace(options.ShipTo.Username)
70+
? new ShipTo(options.ShipTo.CloudId, options.ShipTo.Username, options.ShipTo.Password)
71+
: new ShipTo(options.ShipTo.CloudId, options.ShipTo.ApiKey);
8272
}
8373
else
8474
channelOptions.ShipTo = new ShipTo(options.ShipTo.NodeUris, options.ShipTo.ConnectionPoolType);
@@ -89,12 +79,7 @@ IChannelSetup[] channelConfigurations
8979
return channelOptions;
9080
}
9181

92-
private void ReloadLoggerOptions(ElasticsearchLoggerOptions options)
93-
{
94-
var channelOptions = CreateChannelOptions(options, _channelConfigurations);
95-
_shipper.Options = channelOptions;
96-
97-
foreach (var logger in _loggers) logger.Value.Options = options;
98-
}
82+
private void ReloadLoggerOptions(ElasticsearchLoggerOptions options) =>
83+
_shipper.Options = CreateChannelOptions(options, _channelConfigurations);
9984
}
10085
}

0 commit comments

Comments
 (0)