Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions apps/peertube-runner/src/server/process/shared/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,38 @@ export function scheduleTranscodingProgress (options: {
runnerToken: string
job: JobWithToken
progressGetter: () => number
onAbort?: () => void
}) {
const { job, server, progressGetter, runnerToken } = options
const { job, server, progressGetter, runnerToken, onAbort } = options

const updateInterval = ConfigManager.Instance.isTestInstance()
? 500
: 60000

let aborted = false

const update = () => {
if (aborted) return

job.progress = progressGetter() || 0

server.runnerJobs.update({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
progress: job.progress
}).catch(err => logger.error({ err }, 'Cannot send job progress'))
}).catch(err => {
// Job was deleted on the server, gracefully abort processing
if (err.res?.status === 404) {
logger.info({ jobUUID: job.uuid }, 'Job was deleted on the server, aborting processing')
aborted = true
clearInterval(interval)
if (onAbort) onAbort()
return
}

logger.error({ err }, 'Cannot send job progress')
})
}

const interval = setInterval(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,30 @@ export async function processGenerateStoryboard (options: ProcessOptions<RunnerJ

let ffmpegProgress: number
let videoInputPath: string
let jobAborted = false

const outputPath = join(ConfigManager.Instance.getStoryboardDirectory(), `storyboard-${buildUUID()}.jpg`)

const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
progressGetter: () => ffmpegProgress,
onAbort: () => {
jobAborted = true
}
})

try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for storyboard job ${job.jobToken}`)

videoInputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted, stopping processing`)
return
}

logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Generating storyboard.`)

const ffmpegImage = buildFFmpegImage()
Expand All @@ -39,6 +48,11 @@ export async function processGenerateStoryboard (options: ProcessOptions<RunnerJ
sprites: payload.sprites
})

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted during storyboard generation, stopping processing`)
return
}

const successBody: GenerateStoryboardSuccess = {
storyboardFile: outputPath
}
Expand All @@ -50,6 +64,13 @@ export async function processGenerateStoryboard (options: ProcessOptions<RunnerJ
payload: successBody,
reqPayload: payload
})
} catch (err) {
// If job was aborted, don't report the error
if (jobAborted) {
logger.info(`Job ${job.uuid} processing stopped after abort`)
return
}
throw err
} finally {
if (videoInputPath) await remove(videoInputPath)
if (outputPath) await remove(outputPath)
Expand Down
28 changes: 27 additions & 1 deletion apps/peertube-runner/src/server/process/shared/process-studio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
let outputPath: string

let tasksProgress = 0
let jobAborted = false

const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => tasksProgress
progressGetter: () => tasksProgress,
onAbort: () => {
jobAborted = true
}
})

try {
Expand All @@ -50,12 +54,22 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
videoInputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
separatedAudioInputPath = await downloadSeparatedAudioFileIfNeeded({ urls: payload.input.separatedAudioFileUrl, runnerToken, job })

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted, stopping processing`)
return
}

tmpVideoInputFilePath = videoInputPath
tmpSeparatedAudioInputFilePath = separatedAudioInputPath

logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`)

for (const task of payload.tasks) {
if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted during processing, stopping`)
return
}

const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)

Expand All @@ -78,6 +92,11 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
tasksProgress += Math.floor(100 / payload.tasks.length)
}

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted, stopping processing`)
return
}

const successBody: VideoStudioTranscodingSuccess = {
videoFile: outputPath
}
Expand All @@ -89,6 +108,13 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
payload: successBody,
reqPayload: payload
})
} catch (err) {
// If job was aborted, don't report the error
if (jobAborted) {
logger.info(`Job ${job.uuid} processing stopped after abort`)
return
}
throw err
} finally {
if (tmpVideoInputFilePath) await remove(tmpVideoInputFilePath)
if (tmpSeparatedAudioInputFilePath) await remove(tmpSeparatedAudioInputFilePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ
const payload = job.payload

let inputPath: string
let jobAborted = false

const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => undefined
progressGetter: () => undefined,
onAbort: () => {
jobAborted = true
}
})

const outputPath = join(ConfigManager.Instance.getTranscriptionDirectory(), buildSUUID())
Expand All @@ -38,6 +42,11 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ

inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted, stopping processing`)
return
}

logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running transcription.`)

if (await hasAudioStream(inputPath) !== true) {
Expand All @@ -51,6 +60,11 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ
return
}

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted, stopping processing`)
return
}

const transcriptFile = await transcriber.transcribe({
mediaFilePath: inputPath,
model: config.modelPath
Expand All @@ -60,6 +74,11 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ
transcriptDirectory: outputPath
})

if (jobAborted) {
logger.info(`Job ${job.uuid} was aborted during transcription, stopping processing`)
return
}

const successBody: TranscriptionSuccess = {
inputLanguage: transcriptFile.language,
vttFile: transcriptFile.path
Expand All @@ -72,6 +91,13 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ
payload: successBody,
reqPayload: payload
})
} catch (err) {
// If job was aborted, don't report the error
if (jobAborted) {
logger.info(`Job ${job.uuid} processing stopped after abort`)
return
}
throw err
} finally {
if (inputPath) await remove(inputPath)
if (outputPath) await remove(outputPath)
Expand Down
Loading
Loading