Skip to content

Commit

Permalink
Turn off Statsbeat if not able to connect to endpoint (#956)
Browse files Browse the repository at this point in the history
* WIP

* WIP

* Adding tests

* Package-lock.json

* Addressing comments
  • Loading branch information
hectorhdzg authored Apr 27, 2022
1 parent 491aea2 commit 141b6d9
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 170 deletions.
8 changes: 3 additions & 5 deletions AutoCollection/Statsbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Statsbeat {
let statsbeatConnectionString = this._getConnectionString(config);
this._statsbeatConfig = new Config(statsbeatConnectionString);
this._statsbeatConfig.samplingPercentage = 100; // Do not sample
this._sender = new Sender(this._statsbeatConfig, null, null, this._handleNetworkError);
this._sender = new Sender(this._statsbeatConfig, null, null, null, null, true, this._shutdownStatsbeat);
}

public enable(isEnabled: boolean) {
Expand Down Expand Up @@ -351,10 +351,8 @@ class Statsbeat {
});
}

private _handleNetworkError(error: Error) {
if (error && error.message && error.message.indexOf("ENOTFOUND") > -1) { // ENOTFOUND
this.enable(false);// Disable Statsbeat as is possible SDK is running in private or restricted network
}
private _shutdownStatsbeat() {
this.enable(false);// Disable Statsbeat as is it failed 3 times cosnecutively during initialization, is possible SDK is running in private or restricted network
}

private _getConnectionString(config: Config): string {
Expand Down
78 changes: 60 additions & 18 deletions Library/Sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { FileAccessControl } from "./FileAccessControl";

const legacyThrottleStatusCode = 439; // - Too many requests and refresh cache
const throttleStatusCode = 402; // Monthly Quota Exceeded (new SDK)
const RESPONSE_CODES_INDICATING_REACHED_BREEZE = [200, 206, 402, 408, 429, 439, 500];

class Sender {
private static TAG = "Sender";
Expand All @@ -30,6 +31,10 @@ class Sender {
public static TEMPDIR_PREFIX: string = "appInsights-node";

private _config: Config;
private _isStatsbeatSender: boolean;
private _shutdownStatsbeat: () => void;
private _failedToIngestCounter: number;
private _statsbeatHasReachedIngestionAtLeastOnce: boolean;
private _statsbeat: Statsbeat;
private _onSuccess: (response: string) => void;
private _onError: (error: Error) => void;
Expand All @@ -44,7 +49,7 @@ class Sender {
protected _resendInterval: number;
protected _maxBytesOnDisk: number;

constructor(config: Config, getAuthorizationHandler?: (config: Config) => AuthorizationHandler, onSuccess?: (response: string) => void, onError?: (error: Error) => void, statsbeat?: Statsbeat) {
constructor(config: Config, getAuthorizationHandler?: (config: Config) => AuthorizationHandler, onSuccess?: (response: string) => void, onError?: (error: Error) => void, statsbeat?: Statsbeat, isStatsbeatSender?: boolean, shutdownStatsbeat?: () => void) {
this._config = config;
this._onSuccess = onSuccess;
this._onError = onError;
Expand All @@ -59,6 +64,10 @@ class Sender {
this._fileCleanupTimer = null;
// tmpdir is /tmp for *nix and USERDIR/AppData/Local/Temp for Windows
this._tempDir = path.join(os.tmpdir(), Sender.TEMPDIR_PREFIX + this._config.instrumentationKey);
this._isStatsbeatSender = isStatsbeatSender || false;
this._shutdownStatsbeat = shutdownStatsbeat;
this._failedToIngestCounter = 0;
this._statsbeatHasReachedIngestionAtLeastOnce = false;
}

/**
Expand All @@ -78,7 +87,7 @@ class Sender {

if (value && !FileAccessControl.OS_PROVIDES_FILE_PROTECTION) {
this._enableDiskRetryMode = false;
Logging.warn(Sender.TAG, "Ignoring request to enable disk retry mode. Sufficient file protection capabilities were not detected.")
this._logWarn("Ignoring request to enable disk retry mode. Sufficient file protection capabilities were not detected.")
}
if (this._enableDiskRetryMode) {
if (this._statsbeat) {
Expand Down Expand Up @@ -131,7 +140,7 @@ class Sender {
this._storeToDisk(envelopes);
}
errorMsg += "Error:" + authError.toString();
Logging.warn(Sender.TAG, errorMsg);
this._logWarn(errorMsg);

if (typeof callback === "function") {
callback(errorMsg);
Expand All @@ -158,15 +167,15 @@ class Sender {
zlib.gzip(payload, (err, buffer) => {
var dataToSend = buffer;
if (err) {
Logging.warn(Sender.TAG, err);
this._logWarn(Util.dumpObj(err));
dataToSend = payload; // something went wrong so send without gzip
options.headers["Content-Length"] = payload.length.toString();
} else {
options.headers["Content-Encoding"] = "gzip";
options.headers["Content-Length"] = buffer.length.toString();
}

Logging.info(Sender.TAG, options);
this._logInfo(Util.dumpObj(options));

// Ensure this request is not captured by auto-collection.
(<any>options)[AutoCollectHttpDependencies.disableCollectionRequestOption] = true;
Expand All @@ -186,6 +195,15 @@ class Sender {
let endTime = +new Date();
let duration = endTime - startTime;
this._numConsecutiveFailures = 0;
// Handling of Statsbeat instance sending data, should turn it off if is not able to reach ingestion endpoint
if (this._isStatsbeatSender && !this._statsbeatHasReachedIngestionAtLeastOnce) {
if (RESPONSE_CODES_INDICATING_REACHED_BREEZE.includes(res.statusCode)) {
this._statsbeatHasReachedIngestionAtLeastOnce = true;
}
else {
this._statsbeatFailedToIngest();
}
}
if (this._statsbeat) {
if (res.statusCode == throttleStatusCode || res.statusCode == legacyThrottleStatusCode) { // Throttle
this._statsbeat.countThrottle(Constants.StatsbeatNetworkCategory.Breeze, endpointHost);
Expand Down Expand Up @@ -257,7 +275,7 @@ class Sender {
if (typeof callback === "function") {
callback(responseString);
}
Logging.info(Sender.TAG, responseString);
this._logInfo(responseString);
if (typeof this._onSuccess === "function") {
this._onSuccess(responseString);
}
Expand All @@ -268,6 +286,9 @@ class Sender {
var req = Util.makeRequest(this._config, endpointUrl, options, requestCallback);

req.on("error", (error: Error) => {
if (this._isStatsbeatSender && !this._statsbeatHasReachedIngestionAtLeastOnce) {
this._statsbeatFailedToIngest();
}
// todo: handle error codes better (group to recoverable/non-recoverable and persist)
this._numConsecutiveFailures++;
if (this._statsbeat) {
Expand All @@ -282,10 +303,10 @@ class Sender {
if (this._enableDiskRetryMode) {
notice = `Ingestion endpoint could not be reached ${this._numConsecutiveFailures} consecutive times. There may be resulting telemetry loss. Most recent error:`;
}
Logging.warn(Sender.TAG, notice, Util.dumpObj(error));
this._logWarn(notice, Util.dumpObj(error));
} else {
let notice = "Transient failure to reach ingestion endpoint. This batch of telemetry items will be retried. Error:";
Logging.info(Sender.TAG, notice, Util.dumpObj(error));
this._logInfo(notice, Util.dumpObj(error));
}
this._onErrorHelper(error);

Expand Down Expand Up @@ -327,31 +348,52 @@ class Sender {
);
}

private _logInfo(message?: any, ...optionalParams: any[]) {
if (!this._isStatsbeatSender) {
Logging.info(Sender.TAG, message, optionalParams);
}
}

private _logWarn(message?: any, ...optionalParams: any[]) {
if (!this._isStatsbeatSender) {
Logging.warn(Sender.TAG, message, optionalParams);
}
}

private _statsbeatFailedToIngest() {
if (this._shutdownStatsbeat) { // Check if callback is available
this._failedToIngestCounter++;
if (this._failedToIngestCounter >= 3) {
this._shutdownStatsbeat();
}
}
}

/**
* Stores the payload as a json file on disk in the temp directory
*/
private async _storeToDisk(envelopes: Contracts.EnvelopeTelemetry[]): Promise<void> {
try {
Logging.info(Sender.TAG, "Checking existence of data storage directory: " + this._tempDir);
this._logInfo("Checking existence of data storage directory: " + this._tempDir);
await FileSystemHelper.confirmDirExists(this._tempDir);
}
catch (ex) {
Logging.warn(Sender.TAG, "Failed to create folder to put telemetry: " + (ex && ex.message));
this._logWarn("Failed to create folder to put telemetry: " + Util.dumpObj(ex));
this._onErrorHelper(ex);
return;
}
try {
await FileAccessControl.applyACLRules(this._tempDir);
}
catch (ex) {
Logging.warn(Sender.TAG, "Failed to apply file access control to folder: " + (ex && ex.message));
this._logWarn("Failed to apply file access control to folder: " + Util.dumpObj(ex));
this._onErrorHelper(ex);
return;
}
try {
let size = await FileSystemHelper.getShallowDirectorySize(this._tempDir);
if (size > this._maxBytesOnDisk) {
Logging.warn(Sender.TAG, "Not saving data due to max size limit being met. Directory size in bytes is: " + size);
this._logWarn("Not saving data due to max size limit being met. Directory size in bytes is: " + size);
return;
}
//create file - file name for now is the timestamp, a better approach would be a UUID but that
Expand All @@ -361,11 +403,11 @@ class Sender {

// Mode 600 is w/r for creator and no read access for others (only applies on *nix)
// For Windows, ACL rules are applied to the entire directory (see logic in _confirmDirExists and _applyACLRules)
Logging.info(Sender.TAG, "saving data to disk at: " + fileFullPath);
this._logInfo("saving data to disk at: " + fileFullPath);
FileSystemHelper.writeFileAsync(fileFullPath, Util.stringify(envelopes), { mode: 0o600 });
}
catch (ex) {
Logging.warn(Sender.TAG, "Failed to persist telemetry to disk: " + (ex && ex.message));
this._logWarn("Failed to persist telemetry to disk: " + Util.dumpObj(ex));
this._onErrorHelper(ex);
return;
}
Expand All @@ -377,7 +419,7 @@ class Sender {
*/
private _storeToDiskSync(payload: any) {
try {
Logging.info(Sender.TAG, "Checking existence of data storage directory: " + this._tempDir);
this._logInfo("Checking existence of data storage directory: " + this._tempDir);
if (!fs.existsSync(this._tempDir)) {
fs.mkdirSync(this._tempDir);
}
Expand All @@ -387,7 +429,7 @@ class Sender {

let dirSize = FileSystemHelper.getShallowDirectorySizeSync(this._tempDir);
if (dirSize > this._maxBytesOnDisk) {
Logging.info(Sender.TAG, "Not saving data due to max size limit being met. Directory size in bytes is: " + dirSize);
this._logInfo("Not saving data due to max size limit being met. Directory size in bytes is: " + dirSize);
return;
}

Expand All @@ -397,11 +439,11 @@ class Sender {
var fileFullPath = path.join(this._tempDir, fileName);

// Mode 600 is w/r for creator and no access for anyone else (only applies on *nix)
Logging.info(Sender.TAG, "saving data before crash to disk at: " + fileFullPath);
this._logInfo("saving data before crash to disk at: " + fileFullPath);
fs.writeFileSync(fileFullPath, payload, { mode: 0o600 });

} catch (error) {
Logging.warn(Sender.TAG, "Error while saving data to disk: " + (error && error.message));
this._logWarn("Error while saving data to disk: " + Util.dumpObj(error));
this._onErrorHelper(error);
}
}
Expand Down
23 changes: 15 additions & 8 deletions Library/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,22 @@ class Util {
* Returns string representation of an object suitable for diagnostics logging.
*/
public static dumpObj(object: any): string {
const objectTypeDump: string = Object["prototype"].toString.call(object);
let propertyValueDump: string = "";
if (objectTypeDump === "[object Error]") {
propertyValueDump = "{ stack: '" + object.stack + "', message: '" + object.message + "', name: '" + object.name + "'";
} else {
propertyValueDump = JSON.stringify(object);
}
if (object) {
try {
const objectTypeDump: string = Object["prototype"].toString.call(object);
let propertyValueDump: string = "";
if (objectTypeDump === "[object Error]") {
propertyValueDump = "{ stack: '" + object.stack + "', message: '" + object.message + "', name: '" + object.name + "'";
} else {
propertyValueDump = this.stringify(object);
}

return objectTypeDump + propertyValueDump;
return objectTypeDump + propertyValueDump;
}
catch (ex) {
return object.toString();
}
}
}

public static stringify(payload: any) {
Expand Down
77 changes: 77 additions & 0 deletions Tests/Library/Sender.tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,83 @@ describe("Library/Sender", () => {
});
});

it("[Statsbeat Sender] should not turn Statsbeat off succesfully reaching ingestion endpoint at least once", (done) => {
var config = new Config("InstrumentationKey=1aa11111-bbbb-1ccc-8ddd-eeeeffff3333");
let shutdownCalled = false;
let shutdown = () => {
shutdownCalled = true;
};
nockScope = interceptor.reply(200, breezeResponse);
let testSender = new Sender(config, null, null, null, null, true, shutdown);
assert.equal(testSender["_statsbeatHasReachedIngestionAtLeastOnce"], false);
testSender.setDiskRetryMode(false);
testSender.send([testEnvelope], (responseText) => {
assert.equal(testSender["_statsbeatHasReachedIngestionAtLeastOnce"], true);
nockScope = interceptor.reply(503, null);
testSender.send([testEnvelope], (responseText) => {
assert.equal(shutdownCalled, false);
testSender.send([testEnvelope], (responseText) => {
assert.equal(shutdownCalled, false);
testSender.send([testEnvelope], (responseText) => {
assert.equal(shutdownCalled, false);
done();
});
});
});
});
});

it("[Statsbeat Sender] should turn Statsbeat off if there are 3 failures after initialization", (done) => {
var config = new Config("InstrumentationKey=1aa11111-bbbb-1ccc-8ddd-eeeeffff3333");
let shutdownCalled = false;
let shutdown = () => {
shutdownCalled = true;
};
let testSender = new Sender(config, null, null, null, null, true, shutdown);
testSender.setDiskRetryMode(false);
nockScope = interceptor.reply(503, null);

testSender.send([testEnvelope], (responseText) => {
assert.equal(shutdownCalled, false);
assert.equal(testSender["_failedToIngestCounter"], 1);
testSender.send([testEnvelope], (responseText) => {
assert.equal(shutdownCalled, false);
assert.equal(testSender["_failedToIngestCounter"], 2);
testSender.send([testEnvelope], (responseText) => {
assert.equal(testSender["_failedToIngestCounter"], 3);
assert.equal(shutdownCalled, true);
done();
});
});
});
});

it("[Statsbeat Sender] should turn off warn logging", (done) => {
var config = new Config("InstrumentationKey=1aa11111-bbbb-1ccc-8ddd-eeeeffff3333");
let testSender = new Sender(config, null, null, null, null, true, () => { });
testSender.setDiskRetryMode(true);
let warntub = sandbox.stub(Logging, "warn");
nockScope = interceptor.replyWithError("Test Error");
testSender.send([testEnvelope], (responseText) => {
assert.ok(warntub.notCalled);
assert.equal(testSender["_failedToIngestCounter"], 1);
done();
});
});

it("[Statsbeat Sender] should turn off info logging", (done) => {
var config = new Config("InstrumentationKey=1aa11111-bbbb-1ccc-8ddd-eeeeffff3333");
let testSender = new Sender(config, null, null, null, null, true, () => { });
testSender.setDiskRetryMode(false);
let infoStub = sandbox.stub(Logging, "info");
nockScope = interceptor.replyWithError("Test Error");
testSender.send([testEnvelope], (responseText) => {
assert.ok(infoStub.notCalled);
assert.equal(testSender["_failedToIngestCounter"], 1);
done();
});
});

it("Exception counts", (done) => {
statsbeatSender.setDiskRetryMode(false);
var statsbeatSpy = sandbox.spy(statsbeat, "countRequest");
Expand Down
Loading

0 comments on commit 141b6d9

Please sign in to comment.