diff --git a/src/context/context.js b/src/context/context.js index fd6618b..8061c10 100644 --- a/src/context/context.js +++ b/src/context/context.js @@ -1641,6 +1641,35 @@ class ZitiContext extends EventEmitter { } + async shouldRouteOverZitiSync(url) { + + let parsedURL = new URL(url); + let hostname = parsedURL.hostname; + let port = parseInt(parsedURL.port, 10); + if ((port === '') || (parsedURL.port === '')) { + if ((parsedURL.protocol === 'https:') || (parsedURL.protocol === 'wss:')) { + port = 443; + } else { + port = 80; + } + } + + let self = this; + + let serviceName = result(find(this._services, function(obj) { + + if (self._getMatchConfigTunnelerClientV1( obj.config['ziti-tunneler-client.v1'], hostname, port )) { + return true; + } + + return self._getMatchConfigInterceptV1( obj.config['intercept.v1'], hostname, port ); + + }), 'name'); + + return serviceName; + + } + /** * Determine if the given URL should be handled via CORS Proxy. * @@ -2007,12 +2036,12 @@ class ZitiContext extends EventEmitter { } else { buffer = options.body; } - req.write( buffer ); + req.end( buffer ); } + } else { + req.end(); } - req.end(); - } req.on('error', err => { diff --git a/src/http/_http_outgoing.js b/src/http/_http_outgoing.js index 68f280f..473bfd5 100644 --- a/src/http/_http_outgoing.js +++ b/src/http/_http_outgoing.js @@ -810,12 +810,40 @@ OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { const outputData = this.outputData; socket.cork(); let ret; - // Retain for(;;) loop for performance reasons - // Refs: https://github.com/nodejs/node/pull/30958 + + var mergedArrayLength = 0; + var mergedArrayCursor = 0; + const utf8EncodeText = new TextEncoder(); + + // Get combined length of all items in output array for (let i = 0; i < outputLength; i++) { - const { data, encoding, callback } = outputData[i]; - ret = socket.write(data, encoding, callback); + const { data } = outputData[i]; + mergedArrayLength += data.length; } + var mergedArray = new Uint8Array(mergedArrayLength); + + // Create the merged Array + for (let i = 0; i < outputLength; i++) { + const { data } = outputData[i]; + var byteArray; + if (typeof data === 'string') { + byteArray = utf8EncodeText.encode(data); + } else { + byteArray = data; + } + if (i==0) { + mergedArray.set(byteArray); + mergedArrayCursor = byteArray.length; + } else { + mergedArray.set(byteArray, mergedArrayCursor); + mergedArrayCursor += byteArray.length; + } + } + + // Write the merged array to the ER + const { data, encoding, callback } = outputData[0]; + ret = socket.write(mergedArray, encoding, callback); + socket.uncork(); this.outputData = []; diff --git a/src/http/ziti-inner-tls-socket.js b/src/http/ziti-inner-tls-socket.js index be0a476..6c02388 100644 --- a/src/http/ziti-inner-tls-socket.js +++ b/src/http/ziti-inner-tls-socket.js @@ -155,6 +155,8 @@ class ZitiInnerTLSSocket extends EventEmitter { this._tlsReadActive = false; + this.pendingWriteArray = new Uint8Array(0) + } getWASMFD() { @@ -348,12 +350,21 @@ class ZitiInnerTLSSocket extends EventEmitter { } } - + _appendBuffer(buffer1, buffer2) { + var tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength); + tmp.set(new Uint8Array(buffer1), 0); + tmp.set(new Uint8Array(buffer2), buffer1.byteLength); + return tmp; + }; + /** * */ async write(conn, buffer) { + let MAX_IMMEDIATE_WRITE_LENGTH = 10; // data shorter than this goes out immediately + let MAX_DELAY_WRITE_TIME = 50; // number of ms to wait for more data before writing + // Complete the TLS handshake if necessary let isConnected = await this.isConnected(); if (!isConnected) { @@ -362,7 +373,32 @@ class ZitiInnerTLSSocket extends EventEmitter { } if (buffer.length > 0) { - conn.channel.write(conn, buffer); + + if (buffer.length < MAX_IMMEDIATE_WRITE_LENGTH) { + + conn.channel.write(conn, buffer); + + } + else { + + this.pendingWriteArray = this._appendBuffer(this.pendingWriteArray, buffer); + + this._zitiContext.logger.trace(`ZitiInnerTLSSocket.write() buffer.length[${buffer.length}] pendingWriteArray[${this.pendingWriteArray.length}]`); + + if (this.pendingWriteArray.length == buffer.length) { + + setTimeout((self, conn) => { + + this._zitiContext.logger.trace(`ZitiInnerTLSSocket.write() AFTER TIMEOUT, now writing pendingWriteArray[${self.pendingWriteArray.length}]`); + + conn.channel.write(conn, self.pendingWriteArray); + + self.pendingWriteArray = new Uint8Array(0); + + }, MAX_DELAY_WRITE_TIME, this, conn) + + } + } } } @@ -581,9 +617,7 @@ class ZitiInnerTLSSocket extends EventEmitter { let { self } = args; - let isConnected = await this.isConnected(); - - self._zitiContext.logger.trace('ZitiInnerTLSSocket.processDataDecryption() fd[%d] isConnected[%o] starting to decrypt enqueued data, calling tls_read', self.wasmFD, isConnected); + self._zitiContext.logger.trace('ZitiInnerTLSSocket.processDataDecryption() fd[%d] starting to decrypt enqueued data, calling tls_read', self.wasmFD ); let decryptedData = await self._zitiContext.tls_read(self._wasmInstance, self._SSL); // TLS-decrypt some data from the queue diff --git a/src/http/ziti-websocket-wrapper-ctor.js b/src/http/ziti-websocket-wrapper-ctor.js index 1be2bd3..9d36c31 100644 --- a/src/http/ziti-websocket-wrapper-ctor.js +++ b/src/http/ziti-websocket-wrapper-ctor.js @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +import { isNull } from 'lodash-es'; import { ZitiWebSocketWrapper } from './ziti-websocket-wrapper'; /** @@ -33,18 +34,29 @@ class ZitiWebSocketWrapperCtor { // We only want to intercept WebSockets that target the Ziti BrowZer Bootstrapper var regex = new RegExp( zitiBrowzerRuntime.zitiConfig.browzer.bootstrapper.self.host, 'g' ); - if (address.match( regex )) { // the request is targeting the Ziti BrowZer Bootstrapper + let ws; - let ws = new ZitiWebSocketWrapper(address, protocols, options, zitiBrowzerRuntime.zitiContext, zitiBrowzerRuntime.zitiConfig); + if (address.match( regex )) { // the request is targeting the Ziti BrowZer Bootstrapper - return ws; + ws = new ZitiWebSocketWrapper(address, protocols, options, zitiBrowzerRuntime.zitiContext, zitiBrowzerRuntime.zitiConfig); } else { - return new window._ziti_realWebSocket(address, protocols, options); + let service = zitiBrowzerRuntime.zitiContext.shouldRouteOverZitiSync(address); + + if (!isNull(service)) { + + ws = new ZitiWebSocketWrapper(address, protocols, options, zitiBrowzerRuntime.zitiContext, zitiBrowzerRuntime.zitiConfig); + } else { + + ws = new window._ziti_realWebSocket(address, protocols, options); + + } } + return ws; + } } diff --git a/src/http/ziti-websocket-wrapper.js b/src/http/ziti-websocket-wrapper.js index cb16d08..270792a 100644 --- a/src/http/ziti-websocket-wrapper.js +++ b/src/http/ziti-websocket-wrapper.js @@ -568,13 +568,31 @@ async function initAsClient(websocket, address, protocols, options) { opts.host = serviceName; } - } else { // the request is targeting the raw internet + } else { + serviceName = await websocket._zitiContext.shouldRouteOverZiti( address ); + + if (!isNull(serviceName)) { + + let newUrl = new URL( address ); + + opts.createConnection = zitiConnect; // We're going over Ziti + + let configHostAndPort = await websocket._zitiContext.getConfigHostAndPortByServiceName (serviceName); + + newUrl.protocol = websocket._zitiConfig.browzer.bootstrapper.target.scheme + ":"; + opts.href = newUrl.protocol + '//' + configHostAndPort.host.toLowerCase() + newUrl.pathname + newUrl.search; + opts.origin = websocket._zitiConfig.browzer.bootstrapper.target.scheme + "://" + configHostAndPort.host.toLowerCase() + ":" + configHostAndPort.port; + opts.host = serviceName; - websocket._zitiContext.logger.warn('ZitiWebSocketWrapper(): no associated serviceConfig, bypassing intercept of [%s]', address); - opts.createConnection = isSecure ? tlsConnect : netConnect; - opts.host = parsedUrl.hostname.startsWith('[') - ? parsedUrl.hostname.slice(1, -1) - : parsedUrl.hostname; + } + else { // the request is targeting the raw internet + + websocket._zitiContext.logger.warn('ZitiWebSocketWrapper(): no associated serviceConfig, bypassing intercept of [%s]', address); + opts.createConnection = isSecure ? tlsConnect : netConnect; + opts.host = parsedUrl.hostname.startsWith('[') + ? parsedUrl.hostname.slice(1, -1) + : parsedUrl.hostname; + } } @@ -947,6 +965,10 @@ function sendAfterClose(websocket, data, cb) { if (typeof data === 'string') { this[CONSTANTS.kWebSocket]._zitiContext.logger.info('ZitiWebSocketWrapper.receiverOnMessage() entered, emitting STRING: %o', data); this[CONSTANTS.kWebSocket].emit('message', data); + } + else if (typeof data === 'object') { + this[CONSTANTS.kWebSocket]._zitiContext.logger.info('ZitiWebSocketWrapper.receiverOnMessage() entered, emitting object: %o', data); + this[CONSTANTS.kWebSocket].emit('message', data); } else { let blob = new Blob([new Uint8Array(data, 0, data.byteLength)]); this[CONSTANTS.kWebSocket]._zitiContext.logger.info('ZitiWebSocketWrapper.receiverOnMessage() entered, emitting BLOB: %o', blob);