Skip to content

Commit 97cc29a

Browse files
feat: add events
1 parent 49ac138 commit 97cc29a

File tree

10 files changed

+381
-16
lines changed

10 files changed

+381
-16
lines changed

__tests__/services/abstract-microservice-test.ts

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import AbstractMicroservice from '@services/abstract-microservice';
1212
import Microservice from '@services/microservice';
1313

1414
const notSupposedMessage = 'was not supposed to succeed';
15+
const stopMsMessage = 'socket hang up';
1516

1617
describe('services/abstract-microservice', () => {
1718
const options = {
@@ -52,7 +53,7 @@ describe('services/abstract-microservice', () => {
5253
.resolves({ data: task, method: 'POST' })
5354
// Throw error for exit from infinite loop (stop worker)
5455
.onCall(1)
55-
.rejects({ message: 'socket hang up' });
56+
.rejects({ message: stopMsMessage });
5657

5758
await ms.start();
5859
stubbed.restore();
@@ -63,6 +64,14 @@ describe('services/abstract-microservice', () => {
6364
const testEndpoint = 'endpoint';
6465
const endpointHandler = () => ({ hello: 'world' });
6566

67+
const rpcChannels = {
68+
$info: {},
69+
'ms/demo': { worker_ids: [] },
70+
'ms/example': { worker_ids: ['worker-id'] },
71+
'events/demo': { worker_ids: [] },
72+
'events/example': { worker_ids: ['worker-id'] },
73+
};
74+
6675
beforeEach(() => {
6776
sinon.stub(process, 'exit');
6877
sinon.stub(console, 'info');
@@ -436,16 +445,111 @@ describe('services/abstract-microservice', () => {
436445
});
437446

438447
it('should correctly return list of registered microservices', async () => {
439-
const channels = {
440-
$info: {},
441-
'ms/demo': { worker_ids: [] },
442-
'ms/example': { worker_ids: ['worker-id'] },
443-
};
444-
const stubbed = sinon.stub(axios, 'request').resolves({ data: channels });
448+
const stubbed = sinon.stub(axios, 'request').resolves({ data: rpcChannels });
445449

446450
expect(await ms.lookup()).to.deep.equal(['demo', 'example']);
447451
expect(await ms.lookup(true)).to.deep.equal(['example']);
448452

449453
stubbed.restore();
450454
});
455+
456+
it('should correctly return event channel prefix', () => {
457+
expect(ms).to.have.property('eventChannelPrefix').to.equal(ms.getEventChannelPrefix());
458+
});
459+
460+
it('should correctly add/get/remove event handler', () => {
461+
const handler = () => true;
462+
const channel = 'test.operations.channel';
463+
464+
ms.addEventHandler(channel, handler);
465+
466+
const isAdded = ms.getEventHandlers()[channel].indexOf(handler) !== -1;
467+
468+
ms.removeEventHandler(channel, handler);
469+
470+
const isRemoved = ms.getEventHandlers()[channel].indexOf(handler) === -1;
471+
472+
expect(isAdded).to.ok;
473+
expect(isRemoved).to.ok;
474+
});
475+
476+
it('should correctly publish event', async () => {
477+
const stubbed = sinon
478+
.stub(axios, 'request')
479+
// return rpc channels
480+
.onCall(0)
481+
.resolves({ data: rpcChannels })
482+
// send event on first channel successful
483+
.onCall(1)
484+
.resolves({ status: 200 })
485+
// send event on second channel failed
486+
.onCall(2)
487+
.rejects();
488+
const testData = { test: 1 };
489+
const eventName = 'test.event';
490+
491+
const result = await Microservice.eventPublish(eventName, testData);
492+
const { url, data, headers } = stubbed.secondCall.firstArg;
493+
494+
stubbed.restore();
495+
496+
expect(result).to.equal(1);
497+
expect(url).to.equal(`/${ms.getEventChannelPrefix()}/demo`);
498+
expect(headers).to.deep.equal({ type: 'async' });
499+
expect(data).to.deep.equal({ ...testData, payload: { eventName, sender: options.name } });
500+
});
501+
502+
it('should throw error when publish event', async () => {
503+
const message = 'publish event error';
504+
const stubbed = sinon.stub(axios, 'request').rejects(new Error(message));
505+
const channel = 'test.error';
506+
507+
const result = await Microservice.eventPublish(channel);
508+
509+
stubbed.restore();
510+
511+
expect(result).to.equal(message);
512+
});
513+
514+
it('should successful handle event', async () => {
515+
const eventName = 'sample.event';
516+
const eventParams = { sample: 'param', payload: { sender: 'demo', eventName } };
517+
const handler = sinon.stub();
518+
const stubbedAxios = sinon
519+
.stub(axios, 'request')
520+
// unknown event, just skip
521+
.onCall(0)
522+
.resolves({ data: {} })
523+
// unknown event worker error
524+
.onCall(1)
525+
.rejects({ message: 'unknown error' })
526+
// successful event
527+
.onCall(2)
528+
.resolves({ data: eventParams })
529+
// end
530+
.onCall(3)
531+
.rejects({ message: stopMsMessage });
532+
const clock = sinon.useFakeTimers();
533+
534+
ms.addEventHandler(eventName, handler); // full match
535+
ms.addEventHandler('sample.*', handler); // partial match
536+
ms.addEventHandler('*', handler); // listen all events
537+
ms.addEventHandler('another.event.channel', handler);
538+
539+
ms['options']['workers'] = 0; // temporary disable standard workers
540+
541+
const start = ms.start();
542+
543+
await clock.tickAsync(5000);
544+
await start;
545+
546+
clock.restore();
547+
stubbedAxios.restore();
548+
ms['options']['workers'] = 1;
549+
550+
const receivedParams = handler.firstCall.firstArg;
551+
552+
expect(handler.getCalls().length).to.equal(3);
553+
expect(receivedParams).to.deep.equal(eventParams);
554+
});
451555
});

example/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ microservice.addEndpoint('test-with-cookies-manipulations', () => ({
3333
},
3434
}));
3535

36+
/**
37+
* Working with events
38+
*/
39+
microservice.addEndpoint('test-event', async () => {
40+
// Send event
41+
const result = await Microservice.eventPublish('test.event', { hello: 'world' });
42+
43+
return { result };
44+
});
45+
microservice.addEventHandler('test.event', (params) => {
46+
console.info('New event:', params);
47+
});
48+
3649
/**
3750
* THIS IS MICROSERVICE: gateway
3851
*

example/scratch.http

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,11 @@ Content-Type: application/json
5959
}]
6060

6161
###
62+
63+
POST http://127.0.0.1:3000
64+
Content-Type: application/json
65+
66+
{
67+
"id": 1,
68+
"method": "demo.test-event"
69+
}

src/helpers/wait-sec.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/**
2+
* Just delay execute on provided seconds
3+
*/
4+
const WaitSec = (seconds: number): Promise<void> =>
5+
new Promise((resolve) => {
6+
setTimeout(resolve, seconds * 1000);
7+
});
8+
9+
export default WaitSec;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/**
2+
* Microservices data
3+
*/
4+
interface IEventRequestPayload {
5+
sender?: string;
6+
eventName: string;
7+
}
8+
9+
type PayloadExtends<TParams> = TParams & IEventRequestPayload;
10+
11+
type IEventRequest<TParams = Record<string, any>, TPayload = Record<string, any>> = TParams & {
12+
payload?: PayloadExtends<TPayload>;
13+
};
14+
15+
export { IEventRequest, IEventRequestPayload };

src/interfaces/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ export * from './core/i-microservice-response';
66

77
export * from './core/i-microservice-request';
88

9+
export * from './core/i-event-request';
10+
911
export * from './services/i-abstract-microservice';
1012

1113
export * from './services/i-gateway';

src/interfaces/services/i-abstract-microservice.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { AxiosRequestConfig, AxiosResponse } from 'axios';
22
import type { Request } from 'express';
33
import MicroserviceRequest from '@core/microservice-request';
44
import MicroserviceResponse from '@core/microservice-response';
5+
import type { IEventRequest } from '@interfaces/core/i-event-request';
56
import type { IMicroserviceRequest } from '@interfaces/core/i-microservice-request';
67
import type {
78
IMicroserviceResponseResult,
@@ -15,6 +16,8 @@ interface IAbstractMicroserviceOptions {
1516
version: string;
1617
connection: string;
1718
isSRV: boolean;
19+
eventWorkers: number;
20+
eventWorkerTimeout: number;
1821
}
1922

2023
interface IAbstractMicroserviceParams {
@@ -92,6 +95,18 @@ interface IEndpoints {
9295
};
9396
}
9497

98+
interface IEventHandlerOptions {
99+
app: AbstractMicroservice;
100+
sender?: string;
101+
}
102+
103+
interface IEventHandler<TParams = Record<string, any>> {
104+
(params: IEventRequest<TParams>, options: IEventHandlerOptions):
105+
| Promise<void | boolean>
106+
| void
107+
| boolean;
108+
}
109+
95110
interface ITask {
96111
task: MicroserviceRequest | MicroserviceResponse;
97112
req: AxiosResponse<IMicroserviceRequest>;
@@ -119,6 +134,8 @@ export {
119134
IMiddlewareParams,
120135
IEndpoints,
121136
IEndpointOptions,
137+
IEventHandler,
138+
IEventHandlerOptions,
122139
IEndpointHandler,
123140
IEndpointHandlerOptions,
124141
ITask,

0 commit comments

Comments
 (0)