Skip to content

Commit 29aa4eb

Browse files
fix: change abstract microservice handling request and events
1 parent d96cc4d commit 29aa4eb

File tree

1 file changed

+56
-54
lines changed

1 file changed

+56
-54
lines changed

src/services/abstract-microservice.ts

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ abstract class AbstractMicroservice {
209209

210210
/**
211211
* Add process exit handler
212-
* E.g. for close DB connection and etc.
212+
* E.g. for close DB connection etc.
213213
*/
214214
public onExit(handler: ProcessExitHandler): void {
215215
PROCESS_EXIT_EVENT_TYPES.forEach((eventType) => {
@@ -325,7 +325,7 @@ abstract class AbstractMicroservice {
325325
}
326326

327327
/**
328-
* Get task from queue
328+
* Send response (if exist) and get new task from queue
329329
* @protected
330330
*/
331331
protected async getTask(httpAgent: Agent, response?: MicroserviceResponse): Promise<ITask> {
@@ -344,13 +344,6 @@ abstract class AbstractMicroservice {
344344
});
345345

346346
const task = new MicroserviceRequest(req.data);
347-
const taskSender = task.getParams()?.payload?.sender ?? 'client';
348-
349-
this.logDriver(
350-
() => `--> from ${taskSender}: ${task.toString()}`,
351-
LogType.REQ_INTERNAL,
352-
task.getId(),
353-
);
354347

355348
return { task, req };
356349
} catch (e) {
@@ -368,28 +361,6 @@ abstract class AbstractMicroservice {
368361
}
369362
}
370363

371-
/**
372-
* Send result of processing the task and get new task from queue
373-
* @protected
374-
*/
375-
protected sendResponse(
376-
response: MicroserviceResponse,
377-
httpAgent: Agent,
378-
task: ITask['task'],
379-
): Promise<ITask> {
380-
const taskId = response.getId();
381-
const receiver =
382-
task instanceof MicroserviceRequest ? task.getParams()?.payload?.sender ?? 'queue' : 'queue';
383-
384-
this.logDriver(
385-
() => `<-- to ${receiver}: ${response.toString()}`,
386-
LogType.RES_INTERNAL,
387-
taskId,
388-
);
389-
390-
return this.getTask(httpAgent, response);
391-
}
392-
393364
/**
394365
* Execute request
395366
* @protected
@@ -399,6 +370,14 @@ abstract class AbstractMicroservice {
399370
req: ITask['req'],
400371
): Promise<MicroserviceResponse> {
401372
const response = new MicroserviceResponse({ id: task.getId() });
373+
const taskSender =
374+
task instanceof MicroserviceRequest ? task.getParams()?.payload?.sender : null;
375+
376+
this.logDriver(
377+
() => `--> from ${taskSender || 'client'}: ${task.toString()}`,
378+
LogType.REQ_INTERNAL,
379+
task.getId(),
380+
);
402381

403382
// Response error
404383
if (task instanceof MicroserviceResponse) {
@@ -452,9 +431,51 @@ abstract class AbstractMicroservice {
452431
}
453432
}
454433

434+
this.logDriver(
435+
() => `<-- to ${taskSender || 'queue'}: ${response.toString()}`,
436+
LogType.RES_INTERNAL,
437+
response.getId(),
438+
);
439+
455440
return response;
456441
}
457442

443+
/**
444+
* Execute incoming event
445+
* @protected
446+
*/
447+
protected async executeEvent(data: IEventRequest): Promise<void> {
448+
const sender = data?.payload?.sender ?? 'unknown';
449+
const eventName = data?.payload?.eventName;
450+
451+
this.logDriver(
452+
() => `<-- event ${eventName as string} from ${sender}: ${JSON.stringify(data)}`,
453+
LogType.INFO,
454+
);
455+
456+
if (!eventName) {
457+
return;
458+
}
459+
460+
for (const [eventHandlersName, handlers] of Object.entries(this.eventHandlers)) {
461+
if (
462+
eventHandlersName === eventName ||
463+
eventName.startsWith(eventHandlersName.replace('*', ''))
464+
) {
465+
for (const handler of handlers) {
466+
try {
467+
await handler(data, { app: this, sender });
468+
} catch (e) {
469+
this.logDriver(
470+
() => `event handler error ${eventHandlersName}: ${e.message as string}`,
471+
LogType.INFO,
472+
);
473+
}
474+
}
475+
}
476+
}
477+
}
478+
458479
/**
459480
* Start queue worker
460481
* @protected
@@ -469,7 +490,7 @@ abstract class AbstractMicroservice {
469490
while (true) {
470491
const response = await this.executeRequest(task, req);
471492

472-
({ task, req } = await this.sendResponse(response, httpAgent, task));
493+
({ task, req } = await this.getTask(httpAgent, response));
473494
}
474495
}
475496

@@ -505,28 +526,7 @@ abstract class AbstractMicroservice {
505526
timeout: eventWorkerTimeout,
506527
});
507528

508-
const sender = data?.payload?.sender ?? 'unknown';
509-
const eventName = data?.payload?.eventName;
510-
511-
this.logDriver(
512-
() => `<-- event ${eventName as string} from ${sender}: ${JSON.stringify(data)}`,
513-
LogType.INFO,
514-
);
515-
516-
if (!eventName) {
517-
continue;
518-
}
519-
520-
Object.entries(this.eventHandlers).forEach(([eventHandlersName, handlers]) => {
521-
if (
522-
eventHandlersName === eventName ||
523-
eventName.startsWith(eventHandlersName.replace('*', ''))
524-
) {
525-
handlers.forEach((handler) => {
526-
void handler(data, { app: this, sender });
527-
});
528-
}
529-
});
529+
await this.executeEvent(data);
530530
} catch (e) {
531531
// Could not connect to ijson or channel
532532
if (e.message === 'socket hang up' || e.message.includes('ECONNREFUSED')) {
@@ -546,6 +546,7 @@ abstract class AbstractMicroservice {
546546
public static async eventPublish<TParams>(
547547
eventName: string,
548548
params?: IEventRequest<TParams>,
549+
payload?: Record<string, any>,
549550
): Promise<number | string> {
550551
const ms = this.instance;
551552
const {
@@ -570,6 +571,7 @@ abstract class AbstractMicroservice {
570571
payload: {
571572
sender: name,
572573
eventName,
574+
...payload,
573575
},
574576
},
575577
headers: {

0 commit comments

Comments
 (0)