Skip to content

Commit 1895e1a

Browse files
committed
Fix input & not-serialized output on reconnect
1 parent d840bcf commit 1895e1a

File tree

10 files changed

+130
-56
lines changed

10 files changed

+130
-56
lines changed

packages/adapters/src/process-instance-adapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ class ProcessInstanceAdapter implements
254254
process.kill(this.processPID, 0);
255255
} catch (e) {
256256
this.logger.error("Runner process not exists", e);
257-
/** process not exists */
257+
258+
clearInterval(interval);
259+
258260
reject("pid not exists");
259261
}
260262
}

packages/host/src/lib/csi-controller.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export class CSIController extends TypedEmitter<Events> {
116116
apiInputEnabled = true;
117117

118118
executionTime: number = -1;
119+
inputHeadersSent = false;
119120

120121
/**
121122
* Topic to which the output stream should be routed
@@ -222,7 +223,8 @@ export class CSIController extends TypedEmitter<Events> {
222223
this.logger.info("Instance status: errored", e);
223224

224225
this.status ||= InstanceStatus.ERRORED;
225-
this.executionTime = (Date.now() - this.info.created!.getTime()) / 1000;
226+
227+
this.executionTime = this.info.created ? (Date.now() - this.info.created!.getTime()) / 1000 : -1;
226228

227229
this.setExitInfo(e.exitcode, e.message);
228230

@@ -403,9 +405,10 @@ export class CSIController extends TypedEmitter<Events> {
403405
.pipe(this.upStreams[CC.CONTROL]);
404406

405407
this.communicationHandler.addMonitoringHandler(RunnerMessageCode.PING, async (message) => {
406-
const { status, payload } = message[1];
408+
const { status, payload, inputHeadersSent } = message[1];
407409

408410
this.status = status || InstanceStatus.RUNNING;
411+
this.inputHeadersSent = inputHeadersSent;
409412

410413
if (!payload) {
411414
this.emit("error", "No payload in ping!");
@@ -485,6 +488,10 @@ export class CSIController extends TypedEmitter<Events> {
485488
this.logger.trace("Received a PING message with ports config");
486489
}
487490

491+
this.inputHeadersSent = !!message[1].inputHeadersSent;
492+
493+
this.logger.info("Headers already sent for input?", this.inputHeadersSent);
494+
488495
if (this.instanceAdapter.setRunner) {
489496
await this.instanceAdapter.setRunner({
490497
...message[1].payload.system,
@@ -537,8 +544,6 @@ export class CSIController extends TypedEmitter<Events> {
537544
}
538545

539546
createInstanceAPIRouter() {
540-
let inputHeadersSent = false;
541-
542547
if (!this.upStreams) {
543548
throw new AppError("UNATTACHED_STREAMS");
544549
}
@@ -551,11 +556,11 @@ export class CSIController extends TypedEmitter<Events> {
551556
* @experimental
552557
*/
553558
this.router.duplex("/inout", (duplex, _headers) => {
554-
if (!inputHeadersSent) {
559+
if (!this.inputHeadersSent) {
555560
this.downStreams![CC.IN].write(`Content-Type: ${_headers["content-type"]}\r\n`);
556561
this.downStreams![CC.IN].write("\r\n");
557562

558-
inputHeadersSent = true;
563+
this.inputHeadersSent = true;
559564
}
560565

561566
(duplex as unknown as DuplexStream).input.pipe(this.downStreams![CC.IN], { end: false });
@@ -597,15 +602,15 @@ export class CSIController extends TypedEmitter<Events> {
597602
const contentType = req.headers["content-type"];
598603

599604
// @TODO: Check if subsequent requests have the same content-type.
600-
if (!inputHeadersSent) {
605+
if (!this.inputHeadersSent) {
601606
if (contentType === undefined) {
602607
return { opStatus: ReasonPhrases.NOT_ACCEPTABLE, error: "Content-Type must be defined" };
603608
}
604609

605610
stream.write(`Content-Type: ${contentType}\r\n`);
606611
stream.write("\r\n");
607612

608-
inputHeadersSent = true;
613+
this.inputHeadersSent = true;
609614
}
610615

611616
return stream;

packages/host/src/lib/csi-dispatcher.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,14 @@ export class CSIDispatcher extends TypedEmitter<Events> {
6666
id,
6767
sequenceInfo,
6868
payload,
69-
status: InstanceStatus.INITIALIZING
69+
status: InstanceStatus.INITIALIZING,
70+
inputHeadersSent: false
7071
}, communicationHandler, config, instanceProxy, this.STHConfig.runtimeAdapter);
7172

7273
this.logger.trace("CSIController created", id, sequenceInfo);
7374

7475
csiController.logger.pipe(this.logger, { end: false });
76+
7577
communicationHandler.logger.pipe(this.logger, { end: false });
7678

7779
csiController
@@ -99,15 +101,15 @@ export class CSIDispatcher extends TypedEmitter<Events> {
99101
this.logger.warn("Missing topic content-type");
100102
}
101103

102-
if (data.requires && !csiController.inputRouted && data.contentType) {
103-
this.logger.trace("Routing topic to Sequence input", data.requires);
104+
if (data.requires && data.contentType) {
105+
this.logger.trace("Routing topic to Instance input", data.requires);
104106

105107
await this.serviceDiscovery.routeTopicToStream(
106108
{ topic: new TopicId(data.requires), contentType: data.contentType as ContentType },
107109
csiController.getInputStream()
108110
);
109111

110-
csiController.inputRouted = true;
112+
csiController.inputHeadersSent = true;
111113

112114
await this.serviceDiscovery.update({
113115
requires: data.requires, contentType: data.contentType, topicName: data.requires, status: "add"

packages/host/src/lib/host.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ export class Host implements IComponent {
340340

341341
const seq = this.sequenceStore.getById(instance.sequence.id);
342342

343-
if (!seq) {
343+
if (!seq && this.cpmConnector?.connected) {
344344
this.logger.info("Sequence not found. Checking Store...");
345345

346346
try {

packages/host/src/lib/serviceDiscovery/sd-adapter.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,8 @@ export class ServiceDiscovery {
179179
}
180180

181181
async update(data: STHTopicEventData) {
182-
this.logger.trace("Topic update. Send topic info to CPM", data);
183-
184182
if (this.cpmConnector?.connected) {
183+
this.logger.trace("Topic update. Send topic info to CPM", data);
185184
await this.cpmConnector?.sendTopicInfo(data);
186185
}
187186
}

packages/host/src/lib/socket-server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export class SocketServer extends TypedEmitter<Events> implements IComponent {
7777
this.server!
7878
.listen(this.port, this.hostname, () => {
7979
this.logger.info("SocketServer on", this.server?.address());
80+
8081
res();
8182
})
8283
.on("error", rej);

packages/runner/src/host-client.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import { ObjLogger } from "@scramjet/obj-logger";
33
import { CommunicationChannel as CC } from "@scramjet/symbols";
44
import { IHostClient, IObjectLogger, UpstreamStreamsConfig, } from "@scramjet/types";
5+
import { defer } from "@scramjet/utility";
56
import { Agent } from "http";
67
import net, { Socket, createConnection } from "net";
8+
import { PassThrough } from "stream";
79

810
type HostOpenConnections = [
911
net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket
@@ -42,20 +44,24 @@ class HostClient implements IHostClient {
4244
async init(id: string): Promise<void> {
4345
const openConnections = await Promise.all(
4446
Array.from(Array(9))
45-
.map(() => {
47+
.map((_e: any, i: number) => {
4648
// Error handling for each connection is process crash for now
4749
let connection: Socket;
4850

4951
try {
5052
connection = net.createConnection(this.instancesServerPort, this.instancesServerHost);
51-
connection.on("error", () => {});
53+
connection.on("error", () => {
54+
this.logger.warn(`${i} Stream error`);
55+
});
5256
connection.setNoDelay(true);
5357
} catch (e) {
5458
return Promise.reject(e);
5559
}
5660

5761
return new Promise<net.Socket>(res => {
58-
connection.on("connect", () => res(connection));
62+
connection.on("connect", () => {
63+
res(connection);
64+
});
5965
});
6066
})
6167
.map((connPromised, index) => {
@@ -74,6 +80,26 @@ class HostClient implements IHostClient {
7480

7581
this._streams = openConnections as HostOpenConnections;
7682

83+
const input = this._streams[CC.IN];
84+
85+
const inputTarget = new PassThrough({ emitClose: false });
86+
87+
input.on("end", async () => {
88+
await defer(500);
89+
90+
if ((this._streams![CC.CONTROL] as net.Socket).readableEnded) {
91+
this.logger.info("Input end. Control is also ended... We are disconnected.");
92+
} else {
93+
this.logger.info("Input end. Control not ended. We are online. Desired input end.");
94+
inputTarget.end();
95+
}
96+
});
97+
98+
input.pipe(inputTarget, { end: false });
99+
100+
this._streams[CC.IN] = inputTarget;
101+
//this._streams[CC.STDIN] = this._streams[CC.STDIN].pipe(new PassThrough({ emitClose: false }), { end: false });
102+
77103
try {
78104
this.bpmux = new BPMux(this._streams[CC.PACKAGE]);
79105
} catch (e) {
@@ -118,6 +144,11 @@ class HostClient implements IHostClient {
118144
const streamsExitedPromised: Promise<void>[] = this.streams.map((stream, i) =>
119145
new Promise(
120146
(res) => {
147+
if ([CC.IN, CC.STDIN, CC.CONTROL].includes(i)) {
148+
res();
149+
return;
150+
}
151+
121152
if (!hard && "writable" in stream!) {
122153
stream
123154
.on("error", (e) => {

0 commit comments

Comments
 (0)