Skip to content

Commit 44ceaff

Browse files
committed
optimizing finetuning downloads
Signed-off-by: wwanarif <[email protected]>
1 parent ddc903d commit 44ceaff

File tree

4 files changed

+173
-107
lines changed

4 files changed

+173
-107
lines changed

setup-scripts/setup-genai-studio/manifests/studio-manifest.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ spec:
426426
storageClassName: local-path
427427
resources:
428428
requests:
429-
storage: 1Gi
429+
storage: 30Gi
430430
---
431431
apiVersion: v1
432432
kind: Service

studio-frontend/packages/server/src/controllers/finetuning/index.ts

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { StatusCodes } from 'http-status-codes'
33
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
44
import finetuningService from '../../services/finetuning'
55

6+
// Declare timer globals for Node.js
7+
declare function setTimeout(cb: (...args: any[]) => void, ms?: number): any
8+
69
/**
710
* Upload a training file
811
* POST /api/v1/finetuning/files
@@ -154,6 +157,7 @@ const getFineTuningJobLogs = async (req: Request, res: Response, next: NextFunct
154157
/**
155158
* Download fine-tuning job output as a zip file
156159
* GET /api/v1/finetuning/download-ft/:jobId
160+
* Creates zip, streams it to client, then deletes the zip file after download completes
157161
*/
158162
const downloadFineTuningOutput = async (req: Request, res: Response, next: NextFunction) => {
159163
try {
@@ -166,31 +170,63 @@ const downloadFineTuningOutput = async (req: Request, res: Response, next: NextF
166170
)
167171
}
168172

169-
// Get the zip file path (creates if needed, but returns immediately if already exists)
173+
// Get the zip file path from service
170174
const filePath = await finetuningService.downloadFineTuningOutput(jobId)
175+
171176
if (!filePath) {
172177
throw new InternalFlowiseError(
173178
StatusCodes.NOT_FOUND,
174-
`Error: finetuningController.downloadFineTuningOutput - output not found for job: ${jobId}`
179+
`Error: finetuningController.downloadFineTuningOutput - zip file not found for job: ${jobId}. Please request download via WebSocket first.`
175180
)
176181
}
177182

183+
const fs = require('fs')
184+
178185
// Set response headers for file download
179186
const fileName = `${jobId}-output.zip`
180187
res.setHeader('Content-Type', 'application/zip')
181188
res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`)
182189

183190
// Stream the file
184-
const fs = require('fs')
185191
const fileStream = fs.createReadStream(filePath)
192+
193+
// Log when stream opens
194+
fileStream.on('open', () => {
195+
console.debug(`finetuningController.downloadFineTuningOutput - starting to stream: ${filePath}`)
196+
})
197+
198+
// Delete zip file after response fully finishes (browser completes download)
199+
res.on('finish', () => {
200+
setTimeout(() => {
201+
try {
202+
if (fs.existsSync(filePath)) {
203+
fs.unlinkSync(filePath)
204+
console.debug(`finetuningController.downloadFineTuningOutput - deleted zip after download complete: ${filePath}`)
205+
}
206+
} catch (deleteErr: any) {
207+
console.warn(`finetuningController.downloadFineTuningOutput - failed to delete zip: ${deleteErr?.message || deleteErr}`)
208+
}
209+
}, 100)
210+
})
211+
186212
fileStream.on('error', (err: any) => {
187-
console.error('Error streaming fine-tuning output file:', err)
213+
console.error('finetuningController.downloadFineTuningOutput - error streaming file:', err)
214+
// Try to delete zip on error too
215+
try {
216+
if (fs.existsSync(filePath)) {
217+
fs.unlinkSync(filePath)
218+
console.debug(`finetuningController.downloadFineTuningOutput - deleted zip after error: ${filePath}`)
219+
}
220+
} catch (deleteErr: any) {
221+
console.warn(`finetuningController.downloadFineTuningOutput - failed to delete zip on error: ${deleteErr?.message || deleteErr}`)
222+
}
188223
if (!res.headersSent) {
189224
res.status(StatusCodes.INTERNAL_SERVER_ERROR).json({
190225
error: 'Error streaming fine-tuning output file'
191226
})
192227
}
193228
})
229+
194230
fileStream.pipe(res)
195231
} catch (error) {
196232
next(error)

studio-frontend/packages/server/src/services/finetuning/index.ts

Lines changed: 122 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
1313
import { FineTuningJob } from '../../database/entities/FineTuningJob'
1414
import logger from '../../utils/logger'
1515

16+
// Declare timer globals for Node.js
17+
declare function setTimeout(cb: (...args: any[]) => void, ms?: number): any
18+
declare function clearTimeout(id: any): void
19+
1620
const execAsync = promisify(exec)
1721

1822
const FINETUNING_SERVICE_URL = process.env.FINETUNING_HOST ? `http://${process.env.FINETUNING_HOST}:8015` : 'undefined'
@@ -36,93 +40,6 @@ const axiosClient: AxiosInstance = axios.create({
3640
// In-memory mapping: filename (raw and decoded) -> { id, rawFilename }
3741
const uploadedFileIdMap: Map<string, { id: string; rawFilename: string }> = new Map()
3842

39-
/**
40-
* Helper function to zip a fine-tuning job output directory
41-
* Checks if zip already exists and is up-to-date before creating a new one
42-
* @param outputDir - Full path to the output directory for the job
43-
* @param jobId - ID of the fine-tuning job
44-
* @returns Path to the zipped file or null if failed
45-
*/
46-
const ensureFineTuningOutputZip = async (outputDir: string, jobId: string): Promise<string | null> => {
47-
try {
48-
// eslint-disable-next-line no-console
49-
console.debug(`finetuningService.ensureFineTuningOutputZip - processing output for job: ${jobId}`)
50-
51-
// Validate output directory exists
52-
if (!fs.existsSync(outputDir)) {
53-
// eslint-disable-next-line no-console
54-
console.warn(`finetuningService.ensureFineTuningOutputZip - output directory not found: ${outputDir}`)
55-
return null
56-
}
57-
58-
const zipFilePath = `${outputDir}.zip`
59-
const outputStats = fs.statSync(outputDir)
60-
61-
// Check if zip exists and is up-to-date
62-
if (fs.existsSync(zipFilePath)) {
63-
const zipStats = fs.statSync(zipFilePath)
64-
// If zip is newer than the output directory, skip re-zipping
65-
if (zipStats.mtimeMs > outputStats.mtimeMs) {
66-
// eslint-disable-next-line no-console
67-
console.debug(`finetuningService.ensureFineTuningOutputZip - zip already up-to-date: ${zipFilePath}`)
68-
return zipFilePath
69-
}
70-
// Remove outdated zip
71-
try {
72-
fs.unlinkSync(zipFilePath)
73-
// eslint-disable-next-line no-console
74-
console.debug(`finetuningService.ensureFineTuningOutputZip - removed outdated zip: ${zipFilePath}`)
75-
} catch (e) {
76-
// eslint-disable-next-line no-console
77-
console.warn(`finetuningService.ensureFineTuningOutputZip - failed to remove outdated zip: ${e}`)
78-
}
79-
}
80-
81-
// Create zip file using archiver (standard ZIP format compatible with Windows)
82-
// eslint-disable-next-line no-console
83-
console.debug(`finetuningService.ensureFineTuningOutputZip - starting to zip output for job ${jobId}`)
84-
try {
85-
return await new Promise((resolve, reject) => {
86-
const output = fs.createWriteStream(zipFilePath)
87-
const archive = archiver('zip', {
88-
zlib: { level: 6 } // compression level
89-
})
90-
91-
output.on('close', () => {
92-
// eslint-disable-next-line no-console
93-
console.debug(`finetuningService.ensureFineTuningOutputZip - zip created successfully for job ${jobId}: ${zipFilePath} (${archive.pointer()} bytes)`)
94-
resolve(zipFilePath)
95-
})
96-
97-
output.on('error', (err: any) => {
98-
// eslint-disable-next-line no-console
99-
console.error(`finetuningService.ensureFineTuningOutputZip - write stream error: ${err?.message || err}`)
100-
reject(err)
101-
})
102-
103-
archive.on('error', (err: any) => {
104-
// eslint-disable-next-line no-console
105-
console.error(`finetuningService.ensureFineTuningOutputZip - archiver error: ${err?.message || err}`)
106-
reject(err)
107-
})
108-
109-
archive.pipe(output)
110-
// Add the entire directory to the archive with basename as root
111-
archive.directory(outputDir, path.basename(outputDir))
112-
archive.finalize()
113-
})
114-
} catch (execErr: any) {
115-
// eslint-disable-next-line no-console
116-
console.error(`finetuningService.ensureFineTuningOutputZip - archiver failed for job ${jobId}: ${execErr?.message || execErr}`)
117-
return null
118-
}
119-
} catch (error: any) {
120-
// eslint-disable-next-line no-console
121-
console.error(`finetuningService.ensureFineTuningOutputZip - error: ${error?.message || error}`)
122-
return null
123-
}
124-
}
125-
12643
/**
12744
* Upload a training file to the finetuning service
12845
*/
@@ -783,12 +700,12 @@ const deleteFineTuningJob = async (fineTuningJobId: string) => {
783700
}
784701

785702
/**
786-
* Download fine-tuning job output as a zip file
787-
* Creates zip if needed, or returns existing zip immediately
703+
* Prepare fine-tuning job output as a zip file for download
704+
* Called by WebSocket to create and cache the zip
788705
* @param jobId - ID of the fine-tuning job
789706
* @returns Path to the zipped file or null if not found
790707
*/
791-
const downloadFineTuningOutput = async (jobId: string): Promise<string | null> => {
708+
const prepareFineTuningOutputZip = async (jobId: string): Promise<string | null> => {
792709
try {
793710
if (!jobId) {
794711
throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Job ID is required')
@@ -816,18 +733,82 @@ const downloadFineTuningOutput = async (jobId: string): Promise<string | null> =
816733
throw new InternalFlowiseError(StatusCodes.FORBIDDEN, 'Invalid job output path')
817734
}
818735

819-
// Ensure the output is zipped (returns immediately if zip is up-to-date)
820-
const finalZipPath = await ensureFineTuningOutputZip(jobOutputDir, jobId)
821-
if (!finalZipPath) {
822-
throw new InternalFlowiseError(
823-
StatusCodes.INTERNAL_SERVER_ERROR,
824-
`Failed to create zip for job ${jobId}`
825-
)
826-
}
736+
const zipFilePath = `${jobOutputDir}.zip`
827737

738+
// Create zip file using archiver
828739
// eslint-disable-next-line no-console
829-
console.debug(`finetuningService.downloadFineTuningOutput - file ready for download: ${finalZipPath}`)
830-
return finalZipPath
740+
console.debug(`finetuningService.downloadFineTuningOutput - creating zip for job ${jobId}`)
741+
742+
// Log directory contents for diagnostics
743+
try {
744+
const dirContents = fs.readdirSync(jobOutputDir)
745+
// eslint-disable-next-line no-console
746+
console.debug(`finetuningService.downloadFineTuningOutput - output directory contains ${dirContents.length} items: ${dirContents.slice(0, 10).join(', ')}${dirContents.length > 10 ? '...' : ''}`)
747+
} catch (e) {
748+
// eslint-disable-next-line no-console
749+
console.warn(`finetuningService.downloadFineTuningOutput - could not list directory: ${e}`)
750+
}
751+
752+
try {
753+
return await new Promise((resolve, reject) => {
754+
const output = fs.createWriteStream(zipFilePath)
755+
const archive = archiver('zip', {
756+
zlib: { level: 0 } // no compression for speed
757+
})
758+
759+
const zipTimeoutMs = 30 * 60 * 1000 // 30 minutes
760+
let resolved = false
761+
762+
const timeoutHandle = setTimeout(() => {
763+
if (!resolved) {
764+
resolved = true
765+
// eslint-disable-next-line no-console
766+
console.error(`finetuningService.downloadFineTuningOutput - archiver timeout for job ${jobId}`)
767+
try { output.destroy() } catch (e) {}
768+
try { archive.destroy() } catch (e) {}
769+
reject(new Error('Archiver timeout'))
770+
}
771+
}, zipTimeoutMs)
772+
773+
output.on('close', () => {
774+
if (!resolved) {
775+
resolved = true
776+
clearTimeout(timeoutHandle)
777+
// eslint-disable-next-line no-console
778+
console.debug(`finetuningService.downloadFineTuningOutput - zip created: ${zipFilePath}`)
779+
resolve(zipFilePath)
780+
}
781+
})
782+
783+
output.on('error', (err: any) => {
784+
if (!resolved) {
785+
resolved = true
786+
clearTimeout(timeoutHandle)
787+
// eslint-disable-next-line no-console
788+
console.error(`finetuningService.downloadFineTuningOutput - write stream error: ${err?.message || err}`)
789+
reject(err)
790+
}
791+
})
792+
793+
archive.on('error', (err: any) => {
794+
if (!resolved) {
795+
resolved = true
796+
clearTimeout(timeoutHandle)
797+
// eslint-disable-next-line no-console
798+
console.error(`finetuningService.downloadFineTuningOutput - archiver error: ${err?.message || err}`)
799+
reject(err)
800+
}
801+
})
802+
803+
archive.pipe(output)
804+
archive.directory(jobOutputDir, path.basename(jobOutputDir))
805+
archive.finalize()
806+
})
807+
} catch (archiverErr: any) {
808+
// eslint-disable-next-line no-console
809+
console.error(`finetuningService.downloadFineTuningOutput - archiver failed for job ${jobId}: ${archiverErr?.message || archiverErr}`)
810+
return null
811+
}
831812
} catch (error: any) {
832813
if (error instanceof InternalFlowiseError) {
833814
throw error
@@ -841,6 +822,46 @@ const downloadFineTuningOutput = async (jobId: string): Promise<string | null> =
841822
}
842823
}
843824

825+
/**
826+
* Download fine-tuning job output - HTTP endpoint
827+
* Returns path to cached ZIP file
828+
* @param jobId - ID of the fine-tuning job
829+
* @returns Path to the zipped file or null if not found
830+
*/
831+
const downloadFineTuningOutput = async (jobId: string): Promise<string | null> => {
832+
try {
833+
if (!jobId) {
834+
return null
835+
}
836+
837+
const OUTPUT_BASE_DIR = '/tmp/finetuning/output'
838+
const zipFilePath = `${OUTPUT_BASE_DIR}/${jobId}.zip`
839+
840+
// Check if zip file exists
841+
if (fs.existsSync(zipFilePath)) {
842+
try {
843+
const stat = fs.statSync(zipFilePath)
844+
if (stat.size > 0) {
845+
// eslint-disable-next-line no-console
846+
console.debug(`finetuningService.downloadFineTuningOutput - returning cached zip: ${zipFilePath}`)
847+
return zipFilePath
848+
}
849+
} catch (e) {
850+
// eslint-disable-next-line no-console
851+
console.warn(`finetuningService.downloadFineTuningOutput - could not stat zip file: ${e}`)
852+
}
853+
}
854+
855+
// eslint-disable-next-line no-console
856+
console.warn(`finetuningService.downloadFineTuningOutput - zip file not found: ${zipFilePath}`)
857+
return null
858+
} catch (error: any) {
859+
// eslint-disable-next-line no-console
860+
console.error(`finetuningService.downloadFineTuningOutput - error: ${error?.message || error}`)
861+
return null
862+
}
863+
}
864+
844865
/**
845866
* Get logs for a fine-tuning job by querying the Ray head node HTTP API.
846867
* It will call: http://<RAY_HEAD_NODE>/api/jobs/<job_id>/logs
@@ -929,5 +950,6 @@ export default {
929950
cancelFineTuningJob,
930951
deleteFineTuningJob,
931952
getFineTuningJobLogs,
953+
prepareFineTuningOutputZip,
932954
downloadFineTuningOutput
933955
}

studio-frontend/packages/server/src/ws/finetuningDownload.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,18 @@ export const setupFineTuningDownloadHandlers = (io: Server) => {
9999

100100
// Kick off the async preparation and store the promise so others can join
101101
task.status = 'zipping'
102+
103+
// Emit progress update to socket immediately
104+
socket.emit('download-finetuning-progress', {
105+
jobId,
106+
status: 'zipping',
107+
message: 'Creating zip archive (this may take a few minutes)...'
108+
})
109+
102110
task.promise = (async () => {
103111
try {
104-
// Call the service to prepare the zip file (returns path)
105-
const zipFilePath = await finetuningService.downloadFineTuningOutput(jobId)
112+
// Call the service to prepare the zip file
113+
const zipFilePath = await finetuningService.prepareFineTuningOutputZip(jobId)
106114

107115
if (!zipFilePath) {
108116
task.status = 'error'

0 commit comments

Comments
 (0)