Skip to content

Commit

Permalink
feat: update probe info with custom data
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Oct 30, 2023
1 parent a3fee7e commit eb6fb92
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/adoption-code/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ export class CodeSender {
return sockets.find(socket => socket.data.probe.ipAddress === ip);
}

private sendToSocket (sockets: RemoteProbeSocket, code: string) {
sockets.emit('probe:adoption:code', {
private sendToSocket (socket: RemoteProbeSocket, code: string) {
socket.emit('probe:adoption:code', {
code,
});
}
Expand Down
30 changes: 15 additions & 15 deletions src/lib/adopted-probes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ const logger = scopedLogger('adopted-probes');

const TABLE_NAME = 'adopted_probes';

type AdoptedProbe = {
export type AdoptedProbe = {
ip: string;
uuid: string;
uuid?: string;
lastSyncDate: string;
tags: string[];
isCustomCity: boolean;
status: string;
version: string;
country: string;
city: string;
latitude: number;
longitude: number;
asn: number;
network: string;
tags?: string[];
isCustomCity: number;
status?: string;
version?: string;
country?: string;
city?: string;
latitude?: number;
longitude?: number;
asn?: number;
network?: string;
}

export class AdoptedProbes {
Expand Down Expand Up @@ -80,7 +80,7 @@ export class AdoptedProbes {
this.syncDashboardData()
.finally(() => this.scheduleSync())
.catch(error => logger.error(error));
}, 60_000);
}, 10_000);
}

async syncDashboardData () {
Expand All @@ -95,7 +95,7 @@ export class AdoptedProbes {
await Bluebird.map(adoptedProbes, ({ ip, lastSyncDate }) => this.updateSyncDate(ip, lastSyncDate), { concurrency: 8 });
}

private async syncProbeIds (ip: string, uuid: string) {
private async syncProbeIds (ip: string, uuid?: string) {
const connectedProbe = this.connectedIpToProbe.get(ip);

if (connectedProbe && connectedProbe.uuid === uuid) { // ip and uuid are synced
Expand All @@ -107,7 +107,7 @@ export class AdoptedProbes {
return;
}

const connectedIp = this.connectedUuidToIp.get(uuid);
const connectedIp = uuid && this.connectedUuidToIp.get(uuid);

if (connectedIp) { // data was found by uuid, but not found by ip, therefore ip is outdated
await this.updateIp(connectedIp, uuid);
Expand Down
51 changes: 48 additions & 3 deletions src/lib/ws/fetch-sockets.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,63 @@
import config from 'config';
import { throttle, LRUOptions } from './helper/throttle.js';
import { fetchRawSockets, RemoteProbeSocket } from './server.js';
import { adoptedProbes } from '../adopted-probes.js';
import { type AdoptedProbe, adoptedProbes } from '../adopted-probes.js';

const throttledFetchSockets = throttle<RemoteProbeSocket[]>(
async () => {
const connected = await fetchRawSockets();
const adopted = adoptedProbes.getAdoptedIpToProbe();
console.log('adopted', adopted);
return connected;
const withAdoptedData = addAdoptedProbesData(connected, adopted);
return withAdoptedData;
},
config.get<number>('ws.fetchSocketsCacheTTL'),
);

const addAdoptedProbesData = (connectedProbes: RemoteProbeSocket[], AdoptedIpToProbe: Map<string, AdoptedProbe>) => {
return connectedProbes.map((connected) => {
const ip = connected.data.probe.ipAddress;
const adopted = AdoptedIpToProbe.get(ip);

if (!adopted) {
return connected;
}

const isCustomCity = adopted.isCustomCity;
const hasUserTags = adopted.tags && adopted.tags.length;

if (!isCustomCity && !hasUserTags) {
return connected;
}

const result = {
...connected,
data: {
...connected.data,
probe: {
...connected.data.probe,
...(isCustomCity && {
location: {
...connected.data.probe.location,
city: adopted.city!,
latitude: adopted.latitude!,
longitude: adopted.longitude!,
},
}),
...((adopted.tags && adopted.tags.length) && {
tags: [
...connected.data.probe.tags,
...adopted.tags.map(tag => ({ type: 'user' as const, value: `u-baderfall-${tag}` })),
],
}),
} },
} as RemoteProbeSocket;

// TODO: Update index here

return result;
});
};

export const fetchSockets = async (options?: LRUOptions) => {
const sockets = await throttledFetchSockets(options);
return sockets;
Expand Down
42 changes: 24 additions & 18 deletions src/probe/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,7 @@ export const buildProbe = async (socket: Socket): Promise<Probe> => {

const tags = getTags(ip, ipInfo);

// Storing index as string[][] so every category will have it's exact position in the index array across all probes
const index = [
[ location.country ],
[ getCountryIso3ByIso2(location.country) ],
[ getCountryByIso(location.country) ],
getCountryAliases(location.country),
[ location.normalizedCity ],
location.state ? [ location.state ] : [],
location.state ? [ getStateNameByIso(location.state) ] : [],
[ location.continent ],
getContinentAliases(location.continent),
[ location.normalizedRegion ],
getRegionAliases(location.normalizedRegion),
[ `as${location.asn}` ],
tags.filter(tag => tag.type === 'system').map(tag => tag.value),
[ location.normalizedNetwork ],
getNetworkAliases(location.normalizedNetwork),
].map(category => category.map(s => s.toLowerCase().replaceAll('-', ' ')));
const index = getIndex(location, tags);

// Todo: add validation and handle missing or partial data
return {
Expand All @@ -100,6 +83,29 @@ export const buildProbe = async (socket: Socket): Promise<Probe> => {
};
};

const getIndex = (location: ProbeLocation, tags: Tag[]) => {
// Storing index as string[][] so every category will have it's exact position in the index array across all probes
const index = [
[ location.country ],
[ getCountryIso3ByIso2(location.country) ],
[ getCountryByIso(location.country) ],
getCountryAliases(location.country),
[ location.normalizedCity ],
location.state ? [ location.state ] : [],
location.state ? [ getStateNameByIso(location.state) ] : [],
[ location.continent ],
getContinentAliases(location.continent),
[ location.normalizedRegion ],
getRegionAliases(location.normalizedRegion),
[ `as${location.asn}` ],
tags.filter(tag => tag.type === 'system').map(tag => tag.value),
[ location.normalizedNetwork ],
getNetworkAliases(location.normalizedNetwork),
].map(category => category.map(s => s.toLowerCase().replaceAll('-', ' ')));

return index;
};

const getLocation = (ipInfo: ProbeLocation): ProbeLocation => ({
continent: ipInfo.continent,
region: ipInfo.region,
Expand Down
128 changes: 124 additions & 4 deletions test/tests/unit/ws/fetch-sockets.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,142 @@ import * as td from 'testdouble';
import { expect } from 'chai';

const fetchRawSockets = sinon.stub();
const getAdoptedIpToProbe = sinon.stub();

describe('fetchSockets', () => {
let fetchSockets;

before(async () => {
await td.replaceEsm('../../../../src/lib/ws/server.ts', {
fetchRawSockets,
});

await td.replaceEsm('../../../../src/lib/ws/server.ts', { fetchRawSockets });
await td.replaceEsm('../../../../src/lib/adopted-probes.ts', { adoptedProbes: { getAdoptedIpToProbe } });
({ fetchSockets } = await import('../../../../src/lib/ws/fetch-sockets.js'));
});

beforeEach(() => {
sinon.resetHistory();
});

after(() => {
td.reset();
});

it('should apply adopted probes data to the result sockets', async () => {
fetchRawSockets.resolves([{
data: {
probe: {
client: 'pf2pER5jLnhxTgBqAAAB',
version: '0.26.0',
nodeVersion: 'v18.17.0',
uuid: 'c873cd81-5ede-4fff-9314-5905ad2bdb58',
ipAddress: '1.1.1.1',
host: '',
location: {
continent: 'EU',
region: 'Western Europe',
normalizedRegion: 'western europe',
country: 'FR',
state: undefined,
city: 'Paris',
normalizedCity: 'paris',
asn: 12876,
latitude: 48.8534,
longitude: 2.3488,
network: 'SCALEWAY S.A.S.',
normalizedNetwork: 'scaleway s.a.s.',
},
index: [
[ 'fr' ],
[ 'fra' ],
[ 'france' ],
[],
[ 'paris' ],
[],
[],
[ 'eu' ],
[ 'eu', 'europe' ],
[ 'western europe' ],
[ 'western europe', 'west europe' ],
[ 'as12876' ],
[ 'datacenter network' ],
[ 'scaleway s.a.s.' ],
[],
],
resolvers: [ 'private' ],
tags: [{ type: 'system', value: 'datacenter-network' }],
stats: { cpu: { count: 8, load: [] }, jobs: { count: 0 } },
status: 'ready',
},
},
}]);

getAdoptedIpToProbe.returns(new Map([ [ '1.1.1.1', {
ip: '1.1.1.1',
uuid: 'c873cd81-5ede-4fff-9314-5905ad2bdb58',
lastSyncDate: '2023-10-29T21:00:00.000Z',
isCustomCity: 1,
tags: [ 'user-tag' ],
status: 'ready',
version: '0.26.0',
asn: 12876,
network: 'SCALEWAY S.A.S.',
country: 'FR',
city: 'Marseille',
latitude: 43.29695,
longitude: 5.38107,
}] ]));

const result = await fetchSockets();
expect(result).to.deep.equal([
{
data: {
probe: {
client: 'pf2pER5jLnhxTgBqAAAB',
version: '0.26.0',
nodeVersion: 'v18.17.0',
uuid: 'c873cd81-5ede-4fff-9314-5905ad2bdb58',
ipAddress: '1.1.1.1',
host: '',
location: {
continent: 'EU',
region: 'Western Europe',
normalizedRegion: 'western europe',
country: 'FR',
state: undefined,
city: 'Marseille',
normalizedCity: 'paris',
asn: 12876,
latitude: 43.29695,
longitude: 5.38107,
network: 'SCALEWAY S.A.S.',
normalizedNetwork: 'scaleway s.a.s.',
},
index: [
[ 'fr' ],
[ 'fra' ],
[ 'france' ],
[],
[ 'marseille' ],
[],
[],
[ 'eu' ],
[ 'eu', 'europe' ],
[ 'western europe' ],
[ 'western europe', 'west europe' ],
[ 'as12876' ],
[ 'datacenter network', 'user tag' ],
[ 'scaleway s.a.s.' ],
[],
],
resolvers: [ 'private' ],
tags: [{ type: 'system', value: 'datacenter-network' }, { type: 'user', value: 'u-baderfall-user-tag' }],
stats: { cpu: { count: 8, load: [] }, jobs: { count: 0 } },
status: 'ready',
},
},
},
]);
});

it('multiple calls to fetchSockets should result in one socket.io fetchSockets call', async () => {
expect(fetchRawSockets.callCount).to.equal(0);

Expand Down

0 comments on commit eb6fb92

Please sign in to comment.