forked from microsoft/azure-health-data-services-toolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMyService.cs
72 lines (64 loc) · 2.57 KB
/
MyService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using Microsoft.AzureHealth.DataServices.Channels;
using Microsoft.AzureHealth.DataServices.Pipelines;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace EventHubChannelSample
{
public class MyService : IMyService, IDisposable
{
private readonly ILogger _logger;
private readonly IPipeline<HttpRequestMessage, HttpResponseMessage> _pipeline;
private readonly IChannel receiveChannel;
private bool _disposed;
public MyService(MyServiceConfig config, IPipeline<HttpRequestMessage, HttpResponseMessage> pipeline, ILogger<MyService> logger = null)
{
_pipeline = pipeline;
_logger = logger;
IOptions<EventHubChannelOptions> options = Options.Create<EventHubChannelOptions>(new EventHubChannelOptions()
{
ConnectionString = config.ConnectionString,
ExecutionStatusType = config.ExecutionStatusType,
FallbackStorageConnectionString = config.FallbackStorageConnectionString,
FallbackStorageContainer = config.FallbackStorageContainer,
HubName = config.HubName,
ProcessorStorageContainer = config.ProcessorStorageContainer,
});
receiveChannel = new EventHubChannel(options);
}
public event EventHandler<ChannelReceivedEventArgs> OnReceive;
public async Task SendMessageAsync(HttpRequestMessage message)
{
_logger?.LogInformation("Send message to event hub.");
_ = _pipeline.ExecuteAsync(message);
await OpenReceiveChannelAsync();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !_disposed)
{
_disposed = true;
receiveChannel.Dispose();
}
}
private async Task OpenReceiveChannelAsync()
{
_logger?.LogInformation("Open event hub receiver.");
if (receiveChannel.State == ChannelState.None)
{
receiveChannel.OnReceive += ReceiveChannel_OnReceive;
await receiveChannel.OpenAsync();
receiveChannel.ReceiveAsync().GetAwaiter();
}
}
private void ReceiveChannel_OnReceive(object sender, ChannelReceivedEventArgs e)
{
_logger?.LogInformation("Event received from event hub.");
OnReceive?.Invoke(sender, e);
}
}
}