Skip to content

Commit

Permalink
feat: JSPI (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
rentallect authored Oct 23, 2023
1 parent 1f12bd4 commit 2149b5a
Show file tree
Hide file tree
Showing 10 changed files with 1,894 additions and 1,599 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"@rollup/plugin-commonjs": "^22.0.0",
"@rollup/plugin-json": "^4.1.0",
"@rollup/plugin-node-resolve": "^13.3.0",
"@types/node": "^20.6.3",
"chai": "^4.3.6",
"del": "^6.1.1",
"execa": "^6.1.0",
Expand All @@ -53,10 +54,10 @@
"rollup-plugin-esformatter": "^2.0.1",
"rollup-plugin-polyfill-node": "^0.9.0",
"rollup-plugin-terser": "^7.0.2",
"typescript": "^4.6.4"
"typescript": "^5.2.2"
},
"dependencies": {
"@openziti/libcrypto-js": "^0.15.0",
"@openziti/libcrypto-js": "^0.16.1",
"@openziti/ziti-browzer-edge-client": "^0.6.2",
"asn1js": "^2.4.0",
"assert": "^2.0.0",
Expand Down
27 changes: 16 additions & 11 deletions src/channel/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ class ZitiChannel {
*/
async hello() {

this._zitiContext.logger.trace('ZitiChannel.hello entered for ch[%o]', this);
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] entered', this._id);

await this._zws.open();

this._zitiContext.logger.trace('ZitiChannel.hello _zws.open completed for ch[%o]', this);
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] _zws.open completed', this._id);

if (this.isHelloCompleted) {
this._zitiContext.logger.trace('Hello handshake was previously completed');
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] Hello handshake was previously completed', this._id);
return new Promise( async (resolve) => {
resolve( {channel: this, data: null});
});
Expand All @@ -248,17 +248,17 @@ class ZitiChannel {

await this._tlsConn.create();

this._zitiContext.logger.debug('initiating TLS handshake');
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] initiating TLS handshake', this._id);

await this._tlsConn.handshake();

await this.awaitTLSHandshakeComplete();

this._zitiContext.logger.debug('TLS handshake complete');
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] TLS handshake complete', this._id);

}

this._zitiContext.logger.debug('initiating message: ZitiEdgeProtocol.content_type.HelloType: ', ZitiEdgeProtocol.header_type.StringType);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] initiating message: ZitiEdgeProtocol.content_type.HelloType: ', this._id, ZitiEdgeProtocol.header_type.StringType);
let uuid = uuidv4();

let headers = [
Expand All @@ -285,7 +285,7 @@ class ZitiChannel {
this._helloCompletedTimestamp = Date.now();
this._helloCompleted = true;
this.state = (ZitiEdgeProtocol.conn_state.Connected);
this._zitiContext.logger.debug('ch[%d] Hello handshake to Edge Router [%s] completed at timestamp[%o]', this._id, this._edgeRouterHost, this._helloCompletedTimestamp);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] Hello handshake to Edge Router [%s] completed at timestamp[%o]', this._id, this._edgeRouterHost, this._helloCompletedTimestamp);

return new Promise( async (resolve) => {
resolve( {channel: this, data: null});
Expand Down Expand Up @@ -406,13 +406,13 @@ class ZitiChannel {

self._zitiContext.logger.debug('about to send Close to Edge Router [%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);

let msg = await self.sendMessage( ZitiEdgeProtocol.content_type.StateClosed, headers, self._network_session_token, {
self.sendMessageNoWait( ZitiEdgeProtocol.content_type.StateClosed, headers, self._network_session_token, {
conn: conn,
sequence: sequence,
}
);

self._zitiContext.logger.debug('close() completed with response[%o]', msg);
// self._zitiContext.logger.debug('close() completed with response[%o]', msg);

conn.state = (ZitiEdgeProtocol.conn_state.Closed);

Expand Down Expand Up @@ -698,7 +698,8 @@ class ZitiChannel {
// contentType,
// (body ? body.toString() : 'n/a'));

this._zitiContext.logger.debug("send (no wait) -> conn[%o] seq[%o] contentType[%o] bodyLen[%o] ",
this._zitiContext.logger.debug("send (no wait) -> ch[%o] conn[%o] seq[%o] contentType[%o] bodyLen[%o] ",
this._id,
(options.conn ? options.conn.id : 'n/a'),
messageId, contentType,
(body ? body.length : 'n/a')
Expand Down Expand Up @@ -1100,7 +1101,11 @@ class ZitiChannel {
throwIf(isUndefined(connId), formatMessage('Cannot find ConnId header', { } ) );
conn = this._connections._getConnection(connId);
if (!zeroByteData) {
throwIf(isUndefined(conn), formatMessage('Conn not found. Seeking connId { actual }', { actual: connId}) );
if (isUndefined(conn)) {
this._zitiContext.logger.warn("contentType [%d] received for unknown conn.id[%d]", contentType, connId);
release();
return;
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/channel/wasm-tls-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';

this._uuid = uuidv4();

this._zitiContext.logger.trace('ZitiWASMTLSConnection.ctor: %o, _ws: %o', this._uuid, this._ws);
this._zitiContext.logger.trace('ZitiWASMTLSConnection.ctor: %s', this._uuid);

/**
* This stream is where we'll put any data arriving from an ER
Expand Down Expand Up @@ -137,7 +137,8 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';
// Tie the WASM-based SSL object back to this ZitiWASMTLSConnection so that later when
// the low-level WASM code does fd-level i/o, our WASM-JS will intercept it, and
// interface with this connection, so we can route traffic over the WebSocket to the ER
this._zitiContext.ssl_set_fd( this._wasmInstance, this._SSL, this._ch.id );
this.setWASMFD(this._zitiContext.addWASMFD(this));
this._zitiContext.ssl_set_fd( this._wasmInstance, this._SSL, this.getWASMFD() );

}

Expand Down Expand Up @@ -198,7 +199,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';
// this._connected_cb = this.handshake_cb;

this._zitiContext.logger.trace('ZitiWASMTLSConnection.handshake(): fd[%d] calling ssl_do_handshake()', this.wasmFD );
let result = this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL );
let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL );
this._zitiContext.logger.trace('ZitiWASMTLSConnection.handshake(): fd[%d] back from ssl_do_handshake() for %o: result=%d (now awaiting cb)', this.wasmFD, this._id, result );
}

Expand Down Expand Up @@ -272,7 +273,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';
// Make sure WASM knows where to callback when decrypted data is ready
// this._read_cb = this.read_cb;

let decryptedData = this._zitiContext.tls_read(this._wasmInstance, this._SSL); // TLS-decrypt some data from the queue (bring back from WASM memory into JS memory)
let decryptedData = await this._zitiContext.tls_read(this._wasmInstance, this._SSL); // TLS-decrypt some data from the queue (bring back from WASM memory into JS memory)

this._zitiContext.logger.trace('ZitiWASMTLSConnection.process[%d]: clear data from the ER is ready <--- len[%d]', this.wasmFD, decryptedData.byteLength);
this._datacb(this._ch, decryptedData.buffer); // propagate clear data to the waiting Promise
Expand Down Expand Up @@ -330,7 +331,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';
* @param {*} wireData (already TLS-encrypted)
*/
fd_write(wireData) {
// this._zitiContext.logger.trace('ZitiWASMTLSConnection.fd_write[%o] encrypted data is being sent to the ER ---> [%o]', this._uuid, wireData);
this._zitiContext.logger.trace('ZitiWASMTLSConnection.fd_write[%o] encrypted data is being sent to the ER ---> [%o]', this._uuid, wireData);
this._ws.send(wireData);
}

Expand Down
5 changes: 5 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ const ZITI_CONSTANTS =
'ZITI_EVENT_IDP_AUTH_HEALTH': 'idpAuthHealthEvent',
'ZITI_EVENT_CHANNEL_CONNECT_FAIL': 'channelConnectFailEvent',

/**
* Name of event indicating encrypted data for a nestedTLS connection has arrived and needs decryption
*/
'ZITI_EVENT_XGRESS_RX_NESTED_TLS': 'xgressEventNestedTLS',

};


Expand Down
114 changes: 58 additions & 56 deletions src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class ZitiContext extends EventEmitter {
this._channels = new Map();
this._channelsById = new Map();
this._wasmFDsById = new Map();
this._lockByFD = new Map();

/**
* We start the channel id's at 10 so that they will be well above any 'fd'
Expand Down Expand Up @@ -643,11 +642,11 @@ class ZitiContext extends EventEmitter {
/**
*
*/
ssl_do_handshake(wasmInstance, ssl) {
async ssl_do_handshake(wasmInstance, ssl) {

this.logger.trace('ZitiContext.ssl_do_handshake() entered');

let result = this._libCrypto.ssl_do_handshake(wasmInstance, ssl);
let result = await this._libCrypto.ssl_do_handshake(wasmInstance, ssl);

this.logger.trace('ZitiContext.ssl_do_handshake() exiting, result=', result);

Expand Down Expand Up @@ -683,7 +682,7 @@ class ZitiContext extends EventEmitter {
*/
ssl_set_fd(wasmInstance, ssl, fd) {

this.logger.trace('ZitiContext.ssl_set_fd() entered');
this.logger.trace('ZitiContext.ssl_set_fd() entered SSL[%o] fd[%d]', ssl, fd);

let result = this._libCrypto.ssl_set_fd(wasmInstance, ssl, fd);

Expand Down Expand Up @@ -751,21 +750,23 @@ class ZitiContext extends EventEmitter {

this.logger.trace('ZitiContext.tls_enqueue(%d) [%o] entered', wasmFD, arrayBuffer);

// let lock = this._lockByFD.get( wasmFD );
// if (isUndefined(lock)) {
// lock = withTimeout(new Mutex(), 30 * 1000, new Error('timeout on _lockByFD'));
// this._lockByFD.set(wasmFD, lock);
// }
this._libCrypto.tls_enqueue(wasmInstance, wasmFD, arrayBuffer);

}

// this.logger.trace('ZitiContext.tls_enqueue() attempting to acquire _lockByFD [%o]', wasmFD);
// const release = await lock.acquire();
// this.logger.trace('ZitiContext.tls_enqueue() acquired _lockByFD [%o]', wasmFD);
/**
*
* @param {*} wasmFD // id of socket
*/
peekTLSData(wasmInstance, wasmFD) {

this._libCrypto.tls_enqueue(wasmInstance, wasmFD, arrayBuffer);
this.logger.trace('ZitiContext.peekTLSData(%d) entered', wasmFD);

let item = this._libCrypto.peekTLSData(wasmInstance, wasmFD);

// this.logger.trace('ZitiContext.tls_enqueue() releasing _lockByFD [%o]', wasmFD);
// release();
this.logger.trace('ZitiContext.peekTLSData(%d) returning', item);

return item;
}

/**
Expand All @@ -775,21 +776,8 @@ class ZitiContext extends EventEmitter {

this.logger.trace('ZitiContext.tls_write() entered, ssl, wireData: ', ssl, wireData);

// let lock = this._lockByFD.get( ssl );
// if (isUndefined(lock)) {
// lock = withTimeout(new Mutex(), 30 * 1000, new Error('timeout on _lockByFD'));
// this._lockByFD.set(ssl, lock);
// }

// this.logger.trace('ZitiContext.tls_write() attempting to acquire _lockByFD [%o]', ssl);
// const release = await lock.acquire();
// this.logger.trace('ZitiContext.tls_write() acquired _lockByFD [%o]', ssl);

let result = this._libCrypto.tls_write(wasmInstance, ssl, wireData);

// this.logger.trace('ZitiContext.tls_write() releasing _lockByFD [%o]', ssl);
// release();

this.logger.trace('ZitiContext.tls_write() exiting with: ', result);

return result;
Expand All @@ -798,25 +786,12 @@ class ZitiContext extends EventEmitter {
/**
*
*/
tls_read(wasmInstance, ssl) {
async tls_read(wasmInstance, ssl) {

this.logger.trace('ZitiContext.tls_read(%d) entered', ssl);

// let lock = this._lockByFD.get( ssl );
// if (isUndefined(lock)) {
// lock = withTimeout(new Mutex(), 30 * 1000, new Error('timeout on _lockByFD'));
// this._lockByFD.set(ssl, lock);
// }

// this.logger.trace('ZitiContext.tls_read() attempting to acquire _lockByFD [%o]', ssl);
// const release = await lock.acquire();
// this.logger.trace('ZitiContext.tls_read() acquired _lockByFD [%o]', ssl);

let result = this._libCrypto.tls_read(wasmInstance, ssl);
let result = await this._libCrypto.tls_read(wasmInstance, ssl);

// this.logger.trace('ZitiContext.tls_read() releasing _lockByFD [%o]', ssl);
// release();

return result;
}

Expand Down Expand Up @@ -1198,7 +1173,7 @@ class ZitiContext extends EventEmitter {
throw new Error('response contains no data');
}

this.logger.info('Controller Version acquired: [%o]', this._controllerVersion);
this.logger.info('Controller Version acquired: ', this._controllerVersion.version);

return this._controllerVersion;
}
Expand Down Expand Up @@ -1459,7 +1434,7 @@ class ZitiContext extends EventEmitter {
data: data
});

this.logger.trace('newConnection: conn[%d] data[%o]', conn.id, conn.data);
this.logger.trace('newConnection: conn[%d]', conn.id);

return conn;
};
Expand Down Expand Up @@ -1569,22 +1544,40 @@ class ZitiContext extends EventEmitter {

this.logger.trace('getChannelByEdgeRouter key[%s]', key);

let ch = this._channels.get( key );
let channelsArray = this._channels.get( key );
if (isUndefined(channelsArray)) {
channelsArray = new Array();
this._channels.set(key, channelsArray);
}

// Select a Channel that is currently NOT in use (has no active Connections on it)
let freeChannel;
find(channelsArray, function(ch) {
let activeConnectionCount = ch._connections._items.size;
if (isEqual( activeConnectionCount, 0 )) {
freeChannel = ch;
return true;
}
});


this.logger.trace('getChannelByEdgeRouter ch[%o]', ch);
// let ch = this._channels.get( key );
let ch = freeChannel;

if (!isUndefined(ch)) {

this.logger.debug('ch[%d] state[%d] found for edgeRouter[%s]', ch.id, ch.state, edgeRouter.hostname);
this.logger.trace('ch[%d] state[%d] found for edgeRouter[%s]', ch.id, ch.state, edgeRouter.hostname);

await this.awaitChannelConnectComplete(ch);

this.logger.debug('ch[%d] state[%d] for edgeRouter[%s] is connect-complete', ch.id, ch.state, edgeRouter.hostname);
this.logger.trace('ch[%d] state[%d] for edgeRouter[%s] is connect-complete', ch.id, ch.state, edgeRouter.hostname);

if (!isEqual( ch.state, ZitiEdgeProtocol.conn_state.Connected )) {
this.logger.error('should not be here: ch[%d] has state[%d]', ch.id, ch.state);
}

this.logger.trace('getChannelByEdgeRouter returning existing ch[%d]', ch.id);

return (ch);
}

Expand All @@ -1598,11 +1591,14 @@ class ZitiContext extends EventEmitter {

ch.state = ZitiEdgeProtocol.conn_state.Connecting;

this.logger.debug('Created ch[%o] ', ch);
this._channels.set(key, ch);
this._channelsById.set(ch.id, ch);
this.logger.trace('Created ch[%d] ', ch.id);
// this._channels.set(key, ch);
// this._channelsById.set(ch.id, ch);
channelsArray.push(ch);

this.logger.trace(`getChannelByEdgeRouter channelsArray length [${channelsArray.length}] items`);

this.logger.trace('getChannelByEdgeRouter returning ch[%o]', ch);
this.logger.trace('getChannelByEdgeRouter returning new ch[%d]', ch.id);

return ch;
}
Expand Down Expand Up @@ -2028,8 +2024,14 @@ class ZitiContext extends EventEmitter {
*/
async close(conn) {
let ch = conn.channel;
await ch.close(conn);
ch._connections._deleteConnection(conn);

setTimeout(async (self, ch, conn) => {
await ch.close(conn);
self.logger.trace('ZitiConnection.close: conn.id[%d]', conn.id);
ch._connections._deleteConnection(conn);
self.logger.trace('ZitiConnection.close: ch._connections.length is now [%d]', ch._connections._items.size);
}, 500, this, ch, conn);

}


Expand Down Expand Up @@ -2126,7 +2128,7 @@ class ZitiContext extends EventEmitter {

req.on('error', err => {
self.logger.error('conn[%o] error EVENT: err: %o', req.socket.zitiConnection.id, err);
reject(new Error(`conn[${req.socket.zitiConnection.id}] request to ${request.url} failed, reason: ${err.message}`));
reject(new Error(`conn[${req.socket.zitiConnection.id}] request to ${req.url} failed, reason: ${err.message}`));
});

req.on('response', async res => {
Expand Down
1 change: 1 addition & 0 deletions src/http/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ function statusIsInformational(status) {
function parserOnIncomingClient(res, shouldKeepAlive) {
const socket = this.socket;
const req = socket._httpMessage;
// console.log(`parserOnIncomingClient() entered req.path[${req.path}] res: `, res);

// debug('AGENT incoming response!');

Expand Down
Loading

0 comments on commit 2149b5a

Please sign in to comment.