Skip to content

Commit 1f569e3

Browse files
author
Krishna Rajendran
authored
Invoke logEvent callbacks for each event when events are actually sent (#253)
1 parent e2718b9 commit 1f569e3

File tree

3 files changed

+174
-161
lines changed

3 files changed

+174
-161
lines changed

src/amplitude-client.js

Lines changed: 80 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -168,21 +168,8 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
168168

169169
// load unsent events and identifies before any attempt to log new ones
170170
if (this.options.saveEvents) {
171-
// validate event properties for unsent events
172-
for (let i = 0; i < this._unsentEvents.length; i++) {
173-
var eventProperties = this._unsentEvents[i].event_properties;
174-
var groups = this._unsentEvents[i].groups;
175-
this._unsentEvents[i].event_properties = utils.validateProperties(eventProperties);
176-
this._unsentEvents[i].groups = utils.validateGroups(groups);
177-
}
178-
179-
// validate user properties for unsent identifys
180-
for (let j = 0; j < this._unsentIdentifys.length; j++) {
181-
var userProperties = this._unsentIdentifys[j].user_properties;
182-
var identifyGroups = this._unsentIdentifys[j].groups;
183-
this._unsentIdentifys[j].user_properties = utils.validateProperties(userProperties);
184-
this._unsentIdentifys[j].groups = utils.validateGroups(identifyGroups);
185-
}
171+
_validateUnsentEventQueue(this._unsentEvents);
172+
_validateUnsentEventQueue(this._unsentIdentifys);
186173
}
187174

188175
this._lastEventTime = now;
@@ -213,8 +200,8 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
213200
}
214201
}
215202
if (this.options.saveEvents) {
216-
this._unsentEvents = this._parseSavedUnsentEventsString(values[1]).concat(this._unsentEvents);
217-
this._unsentIdentifys = this._parseSavedUnsentEventsString(values[2]).concat(this._unsentIdentifys);
203+
this._unsentEvents = this._parseSavedUnsentEventsString(values[1]).map(event => ({event})).concat(this._unsentEvents);
204+
this._unsentIdentifys = this._parseSavedUnsentEventsString(values[2]).map(event => ({event})).concat(this._unsentIdentifys);
218205
}
219206
if (DeviceInfo) {
220207
Promise.all([
@@ -248,8 +235,8 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
248235
});
249236
} else {
250237
if (this.options.saveEvents) {
251-
this._unsentEvents = this._loadSavedUnsentEvents(this.options.unsentKey).concat(this._unsentEvents);
252-
this._unsentIdentifys = this._loadSavedUnsentEvents(this.options.unsentIdentifyKey).concat(this._unsentIdentifys);
238+
this._unsentEvents = this._loadSavedUnsentEvents(this.options.unsentKey).map(event => ({event})).concat(this._unsentEvents);
239+
this._unsentIdentifys = this._loadSavedUnsentEvents(this.options.unsentIdentifyKey).map(event => ({event})).concat(this._unsentIdentifys);
253240
}
254241
initFromStorage();
255242
this.runQueuedFunctions();
@@ -263,6 +250,19 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
263250
}
264251
};
265252

253+
// validate properties for unsent events
254+
const _validateUnsentEventQueue = (queue) => {
255+
for (let i = 0; i < queue.length; i++) {
256+
const userProperties = queue[i].event.user_properties;
257+
const eventProperties = queue[i].event.event_properties;
258+
const groups = queue[i].event.groups;
259+
260+
queue[i].event.user_properties = utils.validateProperties(userProperties);
261+
queue[i].event.event_properties = utils.validateProperties(eventProperties);
262+
queue[i].event.groups = utils.validateGroups(groups);
263+
}
264+
};
265+
266266
/**
267267
* @private
268268
*/
@@ -487,20 +487,20 @@ AmplitudeClient.prototype._unsentCount = function _unsentCount() {
487487
* Send events if ready. Returns true if events are sent.
488488
* @private
489489
*/
490-
AmplitudeClient.prototype._sendEventsIfReady = function _sendEventsIfReady(callback) {
490+
AmplitudeClient.prototype._sendEventsIfReady = function _sendEventsIfReady() {
491491
if (this._unsentCount() === 0) {
492492
return false;
493493
}
494494

495495
// if batching disabled, send any unsent events immediately
496496
if (!this.options.batchEvents) {
497-
this.sendEvents(callback);
497+
this.sendEvents();
498498
return true;
499499
}
500500

501501
// if batching enabled, check if min threshold met for batch size
502502
if (this._unsentCount() >= this.options.eventUploadThreshold) {
503-
this.sendEvents(callback);
503+
this.sendEvents();
504504
return true;
505505
}
506506

@@ -740,18 +740,22 @@ AmplitudeClient.prototype._saveReferrer = function _saveReferrer(referrer) {
740740
*/
741741
AmplitudeClient.prototype.saveEvents = function saveEvents() {
742742
try {
743+
const serializedUnsentEvents = JSON.stringify(this._unsentEvents.map(({event}) => event));
744+
743745
if (AsyncStorage) {
744-
AsyncStorage.setItem(this.options.unsentKey + this._storageSuffix, JSON.stringify(this._unsentEvents));
746+
AsyncStorage.setItem(this.options.unsentKey + this._storageSuffix, serializedUnsentEvents);
745747
} else {
746-
this._setInStorage(localStorage, this.options.unsentKey, JSON.stringify(this._unsentEvents));
748+
this._setInStorage(localStorage, this.options.unsentKey, serializedUnsentEvents);
747749
}
748750
} catch (e) {}
749751

750752
try {
753+
const serializedIdentifys = JSON.stringify(this._unsentIdentifys.map(unsentIdentify => unsentIdentify.event));
754+
751755
if (AsyncStorage) {
752-
AsyncStorage.setItem(this.options.unsentIdentifyKey + this._storageSuffix, JSON.stringify(this._unsentIdentifys));
756+
AsyncStorage.setItem(this.options.unsentIdentifyKey + this._storageSuffix, serializedIdentifys);
753757
} else {
754-
this._setInStorage(localStorage, this.options.unsentIdentifyKey, JSON.stringify(this._unsentIdentifys));
758+
this._setInStorage(localStorage, this.options.unsentIdentifyKey, serializedIdentifys);
755759
}
756760
} catch (e) {}
757761
};
@@ -1184,20 +1188,18 @@ AmplitudeClient.prototype._logEvent = function _logEvent(eventType, eventPropert
11841188
};
11851189

11861190
if (eventType === Constants.IDENTIFY_EVENT || eventType === Constants.GROUP_IDENTIFY_EVENT) {
1187-
this._unsentIdentifys.push(event);
1191+
this._unsentIdentifys.push({event, callback});
11881192
this._limitEventsQueued(this._unsentIdentifys);
11891193
} else {
1190-
this._unsentEvents.push(event);
1194+
this._unsentEvents.push({event, callback});
11911195
this._limitEventsQueued(this._unsentEvents);
11921196
}
11931197

11941198
if (this.options.saveEvents) {
11951199
this.saveEvents();
11961200
}
11971201

1198-
if (!this._sendEventsIfReady(callback) && type(callback) === 'function') {
1199-
callback(0, 'No request sent', {reason: 'No events to send or upload queued'});
1200-
}
1202+
this._sendEventsIfReady(callback);
12011203

12021204
return eventId;
12031205
} catch (e) {
@@ -1401,25 +1403,31 @@ if (BUILD_COMPAT_2_0) {
14011403
* Remove events in storage with event ids up to and including maxEventId.
14021404
* @private
14031405
*/
1404-
AmplitudeClient.prototype.removeEvents = function removeEvents(maxEventId, maxIdentifyId) {
1405-
_removeEvents(this, '_unsentEvents', maxEventId);
1406-
_removeEvents(this, '_unsentIdentifys', maxIdentifyId);
1406+
AmplitudeClient.prototype.removeEvents = function removeEvents(maxEventId, maxIdentifyId, status, response) {
1407+
_removeEvents(this, '_unsentEvents', maxEventId, status, response);
1408+
_removeEvents(this, '_unsentIdentifys', maxIdentifyId, status, response);
14071409
};
14081410

14091411
/**
14101412
* Helper function to remove events up to maxId from a single queue.
14111413
* Does a true filter in case events get out of order or old events are removed.
14121414
* @private
14131415
*/
1414-
var _removeEvents = function _removeEvents(scope, eventQueue, maxId) {
1416+
var _removeEvents = function _removeEvents(scope, eventQueue, maxId, status, response) {
14151417
if (maxId < 0) {
14161418
return;
14171419
}
14181420

14191421
var filteredEvents = [];
14201422
for (var i = 0; i < scope[eventQueue].length || 0; i++) {
1421-
if (scope[eventQueue][i].event_id > maxId) {
1422-
filteredEvents.push(scope[eventQueue][i]);
1423+
const unsentEvent = scope[eventQueue][i];
1424+
1425+
if (unsentEvent.event.event_id > maxId) {
1426+
filteredEvents.push(unsentEvent);
1427+
} else {
1428+
if (unsentEvent.callback) {
1429+
unsentEvent.callback(status, response);
1430+
}
14231431
}
14241432
}
14251433
scope[eventQueue] = filteredEvents;
@@ -1429,32 +1437,26 @@ var _removeEvents = function _removeEvents(scope, eventQueue, maxId) {
14291437
* Send unsent events. Note: this is called automatically after events are logged if option batchEvents is false.
14301438
* If batchEvents is true, then events are only sent when batch criterias are met.
14311439
* @private
1432-
* @param {Amplitude~eventCallback} callback - (optional) callback to run after events are sent.
1433-
* Note the server response code and response body are passed to the callback as input arguments.
14341440
*/
1435-
AmplitudeClient.prototype.sendEvents = function sendEvents(callback) {
1441+
AmplitudeClient.prototype.sendEvents = function sendEvents() {
14361442
if (!this._apiKeySet('sendEvents()')) {
1437-
if (type(callback) === 'function') {
1438-
callback(0, 'No request sent', {reason: 'API key not set'});
1439-
}
1443+
this.removeEvents(Infinity, Infinity, 0, 'No request sent', {reason: 'API key not set'});
14401444
return;
14411445
}
1446+
14421447
if (this.options.optOut) {
1443-
if (type(callback) === 'function') {
1444-
callback(0, 'No request sent', {reason: 'optOut is set to true'});
1445-
}
1448+
this.removeEvents(Infinity, Infinity, 0, 'No request sent', {reason: 'Opt out is set to true'});
14461449
return;
14471450
}
1451+
1452+
// How is it possible to get into this state?
14481453
if (this._unsentCount() === 0) {
1449-
if (type(callback) === 'function') {
1450-
callback(0, 'No request sent', {reason: 'No events to send'});
1451-
}
14521454
return;
14531455
}
1456+
1457+
// We only make one request at a time. sendEvents will be invoked again once
1458+
// the last request completes.
14541459
if (this._sending) {
1455-
if (type(callback) === 'function') {
1456-
callback(0, 'No request sent', {reason: 'Request already in progress. Events will be sent once this request is complete'});
1457-
}
14581460
return;
14591461
}
14601462

@@ -1467,7 +1469,7 @@ AmplitudeClient.prototype.sendEvents = function sendEvents(callback) {
14671469
var mergedEvents = this._mergeEventsAndIdentifys(numEvents);
14681470
var maxEventId = mergedEvents.maxEventId;
14691471
var maxIdentifyId = mergedEvents.maxIdentifyId;
1470-
var events = JSON.stringify(mergedEvents.eventsToSend);
1472+
var events = JSON.stringify(mergedEvents.eventsToSend.map(({event}) => event));
14711473
var uploadTime = new Date().getTime();
14721474

14731475
var data = {
@@ -1483,33 +1485,35 @@ AmplitudeClient.prototype.sendEvents = function sendEvents(callback) {
14831485
scope._sending = false;
14841486
try {
14851487
if (status === 200 && response === 'success') {
1486-
scope.removeEvents(maxEventId, maxIdentifyId);
1488+
scope.removeEvents(maxEventId, maxIdentifyId, status, response);
14871489

14881490
// Update the event cache after the removal of sent events.
14891491
if (scope.options.saveEvents) {
14901492
scope.saveEvents();
14911493
}
14921494

14931495
// Send more events if any queued during previous send.
1494-
if (!scope._sendEventsIfReady(callback) && type(callback) === 'function') {
1495-
callback(status, response);
1496-
}
1496+
scope._sendEventsIfReady();
14971497

14981498
// handle payload too large
14991499
} else if (status === 413) {
15001500
// utils.log('request too large');
15011501
// Can't even get this one massive event through. Drop it, even if it is an identify.
15021502
if (scope.options.uploadBatchSize === 1) {
1503-
scope.removeEvents(maxEventId, maxIdentifyId);
1503+
scope.removeEvents(maxEventId, maxIdentifyId, status, response);
15041504
}
15051505

15061506
// The server complained about the length of the request. Backoff and try again.
15071507
scope.options.uploadBatchSize = Math.ceil(numEvents / 2);
1508-
scope.sendEvents(callback);
1508+
scope.sendEvents();
15091509

1510-
} else if (type(callback) === 'function') { // If server turns something like a 400
1511-
callback(status, response);
15121510
}
1511+
// else {
1512+
// all the events are still queued, and will be retried when the next
1513+
// event is sent In the interest of debugging, it would be nice to have
1514+
// something like an event emitter for a better debugging experince
1515+
// here.
1516+
// }
15131517
} catch (e) {
15141518
// utils.log('failed upload');
15151519
}
@@ -1529,9 +1533,9 @@ AmplitudeClient.prototype._mergeEventsAndIdentifys = function _mergeEventsAndIde
15291533
var maxIdentifyId = -1;
15301534

15311535
while (eventsToSend.length < numEvents) {
1532-
var event;
1533-
var noIdentifys = identifyIndex >= this._unsentIdentifys.length;
1534-
var noEvents = eventIndex >= this._unsentEvents.length;
1536+
let unsentEvent;
1537+
let noIdentifys = identifyIndex >= this._unsentIdentifys.length;
1538+
let noEvents = eventIndex >= this._unsentEvents.length;
15351539

15361540
// case 0: no events or identifys left
15371541
// note this should not happen, this means we have less events and identifys than expected
@@ -1542,29 +1546,29 @@ AmplitudeClient.prototype._mergeEventsAndIdentifys = function _mergeEventsAndIde
15421546

15431547
// case 1: no identifys - grab from events
15441548
else if (noIdentifys) {
1545-
event = this._unsentEvents[eventIndex++];
1546-
maxEventId = event.event_id;
1549+
unsentEvent = this._unsentEvents[eventIndex++];
1550+
maxEventId = unsentEvent.event.event_id;
15471551

15481552
// case 2: no events - grab from identifys
15491553
} else if (noEvents) {
1550-
event = this._unsentIdentifys[identifyIndex++];
1551-
maxIdentifyId = event.event_id;
1554+
unsentEvent = this._unsentIdentifys[identifyIndex++];
1555+
maxIdentifyId = unsentEvent.event.event_id;
15521556

15531557
// case 3: need to compare sequence numbers
15541558
} else {
15551559
// events logged before v2.5.0 won't have a sequence number, put those first
1556-
if (!('sequence_number' in this._unsentEvents[eventIndex]) ||
1557-
this._unsentEvents[eventIndex].sequence_number <
1558-
this._unsentIdentifys[identifyIndex].sequence_number) {
1559-
event = this._unsentEvents[eventIndex++];
1560-
maxEventId = event.event_id;
1560+
if (!('sequence_number' in this._unsentEvents[eventIndex].event) ||
1561+
this._unsentEvents[eventIndex].event.sequence_number <
1562+
this._unsentIdentifys[identifyIndex].event.sequence_number) {
1563+
unsentEvent = this._unsentEvents[eventIndex++];
1564+
maxEventId = unsentEvent.event.event_id;
15611565
} else {
1562-
event = this._unsentIdentifys[identifyIndex++];
1563-
maxIdentifyId = event.event_id;
1566+
unsentEvent = this._unsentIdentifys[identifyIndex++];
1567+
maxIdentifyId = unsentEvent.event.event_id;
15641568
}
15651569
}
15661570

1567-
eventsToSend.push(event);
1571+
eventsToSend.push(unsentEvent);
15681572
}
15691573

15701574
return {

0 commit comments

Comments
 (0)