-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.ts
166 lines (140 loc) · 4.69 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* Provides an Azure EventHubs collector that can be used with `@paychex/core` Tracker.
*
* @module main
*/
import { pull, noop, identity, defaults } from 'lodash';
import { EventDataBatch, EventHubProducerClient } from '@azure/event-hubs';
import type { TrackingInfo, TrackingSubscriber } from '@paychex/core/types/trackers';
export type { TrackingInfo, TrackingSubscriber }
/** @ignore */
export interface EventHubProvider {
(...args: any[]): EventHubProducerClient
}
export interface EventHubConfiguration {
/** The name of the Event Hub to connect to. */
name: string,
/** The full connection string of the Event Hub to connect to. */
connection: string,
/**
* The formatter to use to convert a TrackingInfo item into
* a payload suitable for the EventHub. If not provided, the
* entry will be persisted as a normal JSON object.
*/
formatter?: (info: TrackingInfo) => any,
/** @ignore */
provider?: EventHubProvider
}
function CONSOLE_LOGGER(info: TrackingInfo): void {
const label = info.label;
const type = info.type.toUpperCase();
const duration = info.duration > 0 ?
` (${info.duration} ms)` : '';
console.log(`[${type}] ${label}${duration}`);
}
function AZURE_EVENTHUB_PROVIDER(connection: string, name: string): EventHubProducerClient {
return new EventHubProducerClient(connection, name);
}
/** Collects {@link TrackingInfo} instances to an EventHub. */
export interface EventHubSubscriber extends TrackingSubscriber {
/** Flushes any pending batches to the underlying EventHub collector. */
flush(): void
}
/**
* Constructs a collector method that can persist TrackingInfo items to an
* Azure EventHub in batches. Uses known size and constraint limitations to
* ensure events reach the hub, and retries if any failures occur.
*
* @param options The options required to create the EventHub.
* @returns A collector that can be passed to @paychex/core's `createTracker` method.
* @example
* ```js
* const hub = eventHub({
* name: process.env.HUB_NAME,
* connection: process.env.HUB_CONNECTION
* });
*
* const tracker = trackers.create(hub);
*
* // this data will be sent to the EventHub
* tracker.event('label', { optional: 'data' });
*
* // we send events to the eventHub every 1 second;
* // you can force an immediate send by calling flush:
* hub.flush();
* ```
*/
export function eventHub(options: Partial<EventHubConfiguration> = Object.create(null)): EventHubSubscriber {
const {
name,
connection,
formatter,
provider,
} = defaults(options, {
formatter: identity,
provider: AZURE_EVENTHUB_PROVIDER,
});
let hub: EventHubProducerClient = null;
const queue: TrackingInfo[] = [];
function collect(info: TrackingInfo): void {
hub ?
queue.push(info) :
CONSOLE_LOGGER(info);
}
async function connectHubs() {
if (!name || !connection)
console.warn('An EventHub name and connection string are required. Logging to console instead.');
if (hub)
await hub.close().catch(noop);
hub = provider(connection, name);
}
async function sendBatch() {
let item: TrackingInfo,
result = true;
const sent: TrackingInfo[] = [];
const items: TrackingInfo[] = queue.concat();
const batch: EventDataBatch = await hub.createBatch();
while (result && (item = items.shift())) {
result = batch.tryAdd({ body: formatter(item) });
result ? sent.push(item) : items.unshift(item);
}
if (batch.count) {
// send batch and remove
// sent items from the queue
await hub.sendBatch(batch);
pull(queue, ...sent);
} else if (item) {
// batch is empty but there
// was an item to send, so it
// must be too large for our
// EventHub; we'll need to drop
// it; otherwise, our other
// events would never be sent
pull(queue, item);
pull(items, item);
}
if (items.length) {
// recurse until there are no
// more events to send
await sendBatch();
}
}
async function flushQueue() {
try {
await sendBatch();
} catch (e) {
console.error(e.message, e.stack);
await connectHubs();
} finally {
scheduleFlush();
}
}
function scheduleFlush() {
setTimeout(flushQueue, 1000);
}
connectHubs()
.then(scheduleFlush)
.catch(console.error);
collect.flush = flushQueue;
return collect;
}