Skip to content

Commit

Permalink
perf(cu): broadcast to all worker threads when eval stream is closed #…
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Sep 3, 2024
1 parent 2f22753 commit 2a632fa
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 61 deletions.
32 changes: 30 additions & 2 deletions servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { fileURLToPath } from 'node:url'
import { randomBytes } from 'node:crypto'
import { writeFile, mkdir } from 'node:fs/promises'
import { createReadStream } from 'node:fs'
import { BroadcastChannel } from 'node:worker_threads'

import pMap from 'p-map'
import PQueue from 'p-queue'
Expand Down Expand Up @@ -76,13 +77,18 @@ export const createApis = async (ctx) => {
const DB_URL = `${ctx.DB_URL}.sqlite`
const sqlite = await SqliteClient.createSqliteClient({ url: DB_URL, bootstrap: true })

const BROADCAST = 'workers'
const workerBroadcast = new BroadcastChannel(BROADCAST).unref()
const broadcastCloseStream = (streamId) => workerBroadcast.postMessage({ type: 'close-stream', streamId })

const onCreateWorker = (type) => () => {
const workerId = randomBytes(8).toString('hex')
ctx.logger('Spinning up "%s" pool worker with id "%s"...', type, workerId)

return {
workerThreadOpts: {
workerData: {
BROADCAST,
WASM_MODULE_CACHE_MAX_SIZE: ctx.WASM_MODULE_CACHE_MAX_SIZE,
WASM_INSTANCE_CACHE_MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE,
WASM_BINARY_FILE_DIRECTORY: ctx.WASM_BINARY_FILE_DIRECTORY,
Expand Down Expand Up @@ -334,7 +340,18 @@ export const createApis = async (ctx) => {
* prep work is deferred until the work queue tasks is executed
*/
.then(prep)
.then(([args, options]) => primaryWorkerPool.exec('evaluate', [args], options))
.then(([args, options]) => {
/**
* TODO: is this the best place for this?
*
* It keeps it abstracted away from business logic,
* and tied to the specific evaluator, so seems kosher,
* but also feels kind of misplaced
*/
if (args.close) return broadcastCloseStream(args.streamId)

return primaryWorkerPool.exec('evaluate', [args], options)
})
),
logger
}),
Expand Down Expand Up @@ -377,7 +394,18 @@ export const createApis = async (ctx) => {
.then(prep)
.then(([args, options]) =>
Promise.resolve()
.then(() => dryRunWorkerPool.exec('evaluate', [args], options))
.then(() => {
/**
* TODO: is this the best place for this?
*
* It keeps it abstracted away from business logic,
* and tied to the specific evaluator, so seems kosher,
* but also feels kind of misplaced
*/
if (args.close) return broadcastCloseStream(args.streamId)

return dryRunWorkerPool.exec('evaluate', [args], options)
})
.catch((err) => {
/**
* Hack to detect when the max queue size is being exceeded and to reject
Expand Down
12 changes: 5 additions & 7 deletions servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,11 @@ export function evaluateWith (env) {
* See https://nodejs.org/api/stream.html#streamfinishedstream-options-callback
*/
cleanup()
if (!err) {
/**
* Send a flag to the evaluator that the eval stream is finished.
* This will allow for the WASM instance to be removed from the cache.
*/
ctx.evaluator({ close: true })
}
/**
* Signal the evaluator to close any resources spun up as part
* of handling this eval stream
*/
ctx.evaluator({ close: true })
err ? reject(err) : resolve()
}
)
Expand Down
10 changes: 1 addition & 9 deletions servers/cu/src/effects/worker/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,7 @@ export function evaluateWith ({
*
* Finally, evaluates the message and returns the result of the evaluation.
*/
return ({ streamId, moduleId, wasmModule, moduleOptions, processId, noSave, name, deepHash, cron, ordinate, isAssignment, Memory, message, AoGlobal, close }) => {
/**
* If we receive the close flag, it means we have reached the end of the eval stream.
* We will remove the WASM instance from the cache and skip loading the module.
*/
if (close) {
wasmInstanceCache.delete(streamId)
return Promise.resolve()
}
return ({ streamId, moduleId, wasmModule, moduleOptions, processId, noSave, name, deepHash, cron, ordinate, isAssignment, Memory, message, AoGlobal }) => {
/**
* Dynamically load the module, either from cache,
* or from a file
Expand Down
29 changes: 0 additions & 29 deletions servers/cu/src/effects/worker/evaluate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -428,34 +428,5 @@ describe('evaluate', async () => {
assert.equal(cache.size, 0)
})
})

test('returns nothing, removes from cache if close flag received', async () => {
const wasmInstanceCache = new LRUCache({ max: 1 })
wasmInstanceCache.set('stream-123', 'foo')

const evaluate = evaluateWith({
saveEvaluation: async (evaluation) => evaluation,
wasmInstanceCache,
loadWasmModule: () => assert.fail('should not call if close flag received'),
bootstrapWasmInstance: () => assert.fail('should not call if close flag received'),
ARWEAVE_URL: 'https://foo.bar',
logger
})

const args = {
streamId: 'stream-123',
close: true
}

/**
* Should return undefined if close flag is received
*/
const res = await evaluate(args)
assert.ok(!res)
/**
* Should remove stream-123 from cache, leaving size of 0 remaining.
*/
assert.equal(wasmInstanceCache.size, 0)
})
})
})
11 changes: 10 additions & 1 deletion servers/cu/src/effects/worker/evaluator/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { workerData } from 'node:worker_threads'
import { BroadcastChannel, workerData } from 'node:worker_threads'
import { hostname } from 'node:os'

import { fetch, setGlobalDispatcher, Agent } from 'undici'
Expand All @@ -24,6 +24,15 @@ const apis = await createApis({
logger
})

const broadcast = new BroadcastChannel(workerData.BROADCAST)

broadcast.onmessage = (event) => {
const data = event.data
if (data.type === 'close-stream') return apis.close(data.streamId)

logger.warn('Unrecognized event type "%s". Doing nothing...', data.type)
}

/**
* Expose our worker api
*/
Expand Down
18 changes: 5 additions & 13 deletions servers/cu/src/effects/worker/evaluator/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,18 @@ import { evaluateWith } from '../evaluate.js'

export const createApis = async (ctx) => {
const sqlite = await SqliteClient.createSqliteClient({ url: ctx.DB_URL, bootstrap: false })
const wasmInstanceCache = WasmClient.createWasmInstanceCache({ MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE })

const close = async (streamId) => wasmInstanceCache.delete(streamId)

const evaluate = evaluateWith({
/**
* TODO: no longer needed since the wasmModule
* is passed in. Eventually remove
*/
loadWasmModule: WasmClient.loadWasmModuleWith({
fetch,
ARWEAVE_URL: ctx.ARWEAVE_URL,
WASM_BINARY_FILE_DIRECTORY: ctx.WASM_BINARY_FILE_DIRECTORY,
logger: ctx.logger,
cache: WasmClient.createWasmModuleCache({ MAX_SIZE: ctx.WASM_MODULE_CACHE_MAX_SIZE })
}),
wasmInstanceCache: WasmClient.createWasmInstanceCache({ MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE }),
wasmInstanceCache,
addExtension: WasmClient.addExtensionWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }),
bootstrapWasmInstance: WasmClient.bootstrapWasmInstanceWith(),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db: sqlite, logger: ctx.logger }),
ARWEAVE_URL: ctx.ARWEAVE_URL,
logger: ctx.logger
})

return { evaluate }
return { evaluate, close }
}

0 comments on commit 2a632fa

Please sign in to comment.