Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ffmpeg: Fix short segments flushing without using NULL packets #189

Merged
merged 2 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions ffmpeg/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,206 @@ func TestTranscoder_API_AlternatingTimestamps(t *testing.T) {
`
run(cmd)
}

// test short segments
func shortSegments(t *testing.T, accel Acceleration, fc int) {
run, dir := setupTest(t)
defer os.RemoveAll(dir)

cmd := `
# generate segments with #fc frames
cp "$1/../transcoder/test.ts" .
frame_count=%d

ffmpeg -loglevel warning -ss 0 -i test.ts -c copy -frames:v $frame_count -copyts short0.ts
ffmpeg -loglevel warning -ss 2 -i test.ts -c copy -frames:v $frame_count -copyts short1.ts
ffmpeg -loglevel warning -ss 4 -i test.ts -c copy -frames:v $frame_count -copyts short2.ts
ffmpeg -loglevel warning -ss 6 -i test.ts -c copy -frames:v $frame_count -copyts short3.ts

ffprobe -loglevel warning -count_frames -show_streams -select_streams v short0.ts | grep nb_read_frames=$frame_count
ffprobe -loglevel warning -count_frames -show_streams -select_streams v short1.ts | grep nb_read_frames=$frame_count
ffprobe -loglevel warning -count_frames -show_streams -select_streams v short2.ts | grep nb_read_frames=$frame_count
ffprobe -loglevel warning -count_frames -show_streams -select_streams v short3.ts | grep nb_read_frames=$frame_count
`
run(fmt.Sprintf(cmd, fc))

// Test if decoding/encoding expected number of frames
tc := NewTranscoder()
defer tc.StopTranscoder()
for i := 0; i < 4; i++ {
fname := fmt.Sprintf("%s/short%d.ts", dir, i)
t.Log("fname ", fname)
in := &TranscodeOptionsIn{Fname: fname, Accel: accel}
out := []TranscodeOptions{{Oname: dir + "/out.ts", Profile: P144p30fps16x9, Accel: accel}}
res, err := tc.Transcode(in, out)
if err != nil {
t.Error(err)
}
if fc != res.Decoded.Frames {
t.Error("Did not decode expected number of frames: ", res.Decoded.Frames)
}
if 0 == res.Encoded[0].Frames {
// TODO not sure what should be a reasonable number here
t.Error("Did not encode any frames: ", res.Encoded[0].Frames)
}
}

// test standalone stream copy
tc.StopTranscoder()
tc = NewTranscoder()
for i := 0; i < 4; i++ {
fname := fmt.Sprintf("%s/short%d.ts", dir, i)
oname := fmt.Sprintf("%s/vcopy%d.ts", dir, i)
in := &TranscodeOptionsIn{Fname: fname, Accel: accel}
out := []TranscodeOptions{
{
Oname: oname,
VideoEncoder: ComponentOptions{Name: "copy", Opts: map[string]string{
"mpegts_flags": "resend_headers,initial_discontinuity",
}},
Accel: accel,
},
}
res, err := tc.Transcode(in, out)
if err != nil {
t.Error(err)
}
if res.Encoded[0].Frames != 0 {
t.Error("Unexpected frame counts from stream copy")
t.Error(res)
}
cmd = `
# extract video track, compare md5sums
i=%d
ffmpeg -i short$i.ts -an -c:v copy -f md5 short$i.md5
ffmpeg -i vcopy$i.ts -an -c:v copy -f md5 vcopy$i.md5
diff -u short$i.md5 vcopy$i.md5
`
run(fmt.Sprintf(cmd, i))
}

// test standalone stream drop
tc.StopTranscoder()
tc = NewTranscoder()
for i := 0; i < 4; i++ {
fname := fmt.Sprintf("%s/short%d.ts", dir, i)
oname := fmt.Sprintf("%s/vdrop%d.ts", dir, i)
// Normal case : drop only video
in := &TranscodeOptionsIn{Fname: fname, Accel: accel}
out := []TranscodeOptions{
{
Oname: oname,
VideoEncoder: ComponentOptions{Name: "drop"},
Accel: accel,
},
}
res, err := tc.Transcode(in, out)
if err != nil {
t.Error(err)
}
if res.Decoded.Frames != 0 || res.Encoded[0].Frames != 0 {
t.Error("Unexpected count of decoded frames ", res.Decoded.Frames, res.Decoded.Pixels)
}

}

// test framerate passthrough
tc.StopTranscoder()
tc = NewTranscoder()
for i := 0; i < 4; i++ {
fname := fmt.Sprintf("%s/short%d.ts", dir, i)
oname := fmt.Sprintf("%s/vpassthru%d.ts", dir, i)
out := []TranscodeOptions{{Profile: P144p30fps16x9, Accel: accel}}
out[0].Profile.Framerate = 0 // Passthrough!

out[0].Oname = oname
in := &TranscodeOptionsIn{Fname: fname, Accel: accel}
res, err := tc.Transcode(in, out)
if err != nil {
t.Error("Could not transcode: ", err)
}
// verify that output frame count is same as input frame count
if res.Decoded.Frames != fc || res.Encoded[0].Frames != fc {
t.Error("Did not get expected frame count; got ", res.Encoded[0].Frames)
}
}

// test low fps (3) to low fps (1)
tc.StopTranscoder()
tc = NewTranscoder()

cmd = `
frame_count=%d
# convert segment to 3fps and trim it to #fc frames
ffmpeg -loglevel warning -i test.ts -vf fps=3/1 -c:v libx264 -c:a copy -frames:v $frame_count short3fps.ts

# sanity check
ffprobe -loglevel warning -show_streams short3fps.ts | grep r_frame_rate=3/1
ffprobe -loglevel warning -count_frames -show_streams -select_streams v short3fps.ts | grep nb_read_frames=$frame_count
`
run(fmt.Sprintf(cmd, fc))

fname := fmt.Sprintf("%s/short3fps.ts", dir)
in := &TranscodeOptionsIn{Fname: fname, Accel: accel}
out := []TranscodeOptions{{Oname: dir + "/out1fps.ts", Profile: P144p30fps16x9, Accel: accel}}
out[0].Profile.Framerate = 1 // Force 1fps
res, err := tc.Transcode(in, out)
if err != nil {
t.Error(err)
}
if fc != res.Decoded.Frames {
t.Error("Did not decode expected number of frames: ", res.Decoded.Frames)
}
if 0 == res.Encoded[0].Frames {
t.Error("Did not encode any frames: ", res.Encoded[0].Frames)
}

// test a bunch of weird cases together
tc.StopTranscoder()
tc = NewTranscoder()
profile_low_fps := P144p30fps16x9
profile_low_fps.Framerate = uint(fc) // use the input frame count as the output fps, why not
profile_passthrough_fps := P144p30fps16x9
profile_passthrough_fps.Framerate = 0
for i := 0; i < 4; i++ {
fname := fmt.Sprintf("%s/short%d.ts", dir, i)
in := &TranscodeOptionsIn{Fname: fname, Accel: accel}
out := []TranscodeOptions{
{
Oname: fmt.Sprintf("%s/lowfps%d.ts", dir, i),
Profile: profile_low_fps,
AudioEncoder: ComponentOptions{Name: "copy"},
Accel: accel,
},
{
Oname: fmt.Sprintf("%s/copyall%d.ts", dir, i),
VideoEncoder: ComponentOptions{Name: "copy"},
AudioEncoder: ComponentOptions{Name: "drop"},
Accel: accel,
},
{
Oname: fmt.Sprintf("%s/passthru%d.ts", dir, i),
Profile: profile_passthrough_fps,
Accel: accel,
},
}
res, err := tc.Transcode(in, out)
if err != nil {
t.Error(err)
}
if res.Encoded[0].Frames == 0 || res.Encoded[1].Frames != 0 || res.Encoded[2].Frames != fc {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for res.Encoded[0].Frames == 0 makes me sweat a bit, because we don't really know what might be a reasonably correct value. But I understand it's pretty hard to get predictable results across different runs, probably OK for now.

t.Error("Unexpected frame counts from short segment copy-drop-passthrough case")
t.Error(res)
}
}

}

func TestTranscoder_ShortSegments(t *testing.T) {
shortSegments(t, Software, 1)
shortSegments(t, Software, 2)
shortSegments(t, Software, 3)
shortSegments(t, Software, 5)
shortSegments(t, Software, 6)
shortSegments(t, Software, 10)
}
73 changes: 57 additions & 16 deletions ffmpeg/lpms_ffmpeg.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ const int lpms_ERR_DTS = FFERRTAG('-','D','T','S');
// Decoder: For audio, we pay the price of closing and re-opening the decoder.
// For video, we cache the first packet we read (input_ctx.first_pkt).
// The pts is set to a sentinel value and fed to the decoder. Once we
// receive a frame from the decoder with the pts set to the sentinel
// value, then we know the decoder has been fully flushed. Ignore any
// subsequent frames we receive with the sentiel pts.
// receive all frames from the decoder OR have sent too many sentinel
// pkts without receiving anything, then we know the decoder has been
// fully flushed.
//
// Filter: The challenge here is around fps filter adding and dropping frames.
// The fps filter expects a strictly monotonic input pts: frames with
Expand Down Expand Up @@ -75,6 +75,16 @@ struct input_ctx {
// Decoder flush
AVPacket *first_pkt;
int flushed;
int flushing;
// The diff of `packets sent - frames recv` serves as an estimate of
// internally buffered packets by the decoder. We're done flushing when this
// becomes 0.
uint16_t pkt_diff;
// We maintain a count of sentinel packets sent without receiving any
// valid frames back, and stop flushing if it crosses SENTINEL_MAX.
// FIXME This is needed due to issue #155 - input/output frame mismatch.
j0sh marked this conversation as resolved.
Show resolved Hide resolved
#define SENTINEL_MAX 5
uint16_t sentinel_count;

// Filter flush
AVFrame *last_frame_v, *last_frame_a;
Expand Down Expand Up @@ -318,7 +328,7 @@ static void send_first_pkt(struct input_ctx *ictx)
char errstr[AV_ERROR_MAX_STRING_SIZE];
av_strerror(ret, errstr, sizeof errstr);
fprintf(stderr, "Error sending flush packet : %s\n", errstr);
}
} else ictx->sentinel_count++;
}

static enum AVPixelFormat hw2pixfmt(AVCodecContext *ctx)
Expand Down Expand Up @@ -955,6 +965,24 @@ static int open_input(input_params *params, struct input_ctx *ctx)
#undef dd_err
}

static int lpms_send_packet(struct input_ctx *ictx, AVCodecContext *dec, AVPacket *pkt)
{
int ret = avcodec_send_packet(dec, pkt);
if (ret == 0 && dec == ictx->vc) ictx->pkt_diff++; // increase buffer count for video packets
return ret;
}

static int lpms_receive_frame(struct input_ctx *ictx, AVCodecContext *dec, AVFrame *frame)
{
int ret = avcodec_receive_frame(dec, frame);
if (dec != ictx->vc) return ret;
if (!ret && frame && !is_flush_frame(frame)) {
ictx->pkt_diff--; // decrease buffer count for non-sentinel video frames
if (ictx->flushing) ictx->sentinel_count = 0;
j0sh marked this conversation as resolved.
Show resolved Hide resolved
}
return ret;
}

int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt)
{
#define dec_err(msg) { \
Expand Down Expand Up @@ -998,9 +1026,9 @@ int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt)
}
}

ret = avcodec_send_packet(decoder, pkt);
ret = lpms_send_packet(ictx, decoder, pkt);
if (ret < 0) dec_err("Error sending packet to decoder\n");
ret = avcodec_receive_frame(decoder, frame);
ret = lpms_receive_frame(ictx, decoder, frame);
if (ret == AVERROR(EAGAIN)) {
// Distinguish from EAGAIN that may occur with
// av_read_frame or avcodec_send_packet
Expand All @@ -1021,17 +1049,21 @@ int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt)
// video frames, so continue on to audio.

// Flush video decoder.
// To accommodate CUDA, we feed the decoder a a sentinel (flush) frame.
// Once the flush frame has been decoded, the decoder is fully flushed.
// To accommodate CUDA, we feed the decoder sentinel (flush) frames, till we
// get back all sent frames, or we've made SENTINEL_MAX attempts to retrieve
// buffered frames with no success.
// TODO this is unnecessary for SW decoding! SW process should match audio
if (ictx->vc) {
ictx->flushing = 1;
send_first_pkt(ictx);

ret = avcodec_receive_frame(ictx->vc, frame);
ret = lpms_receive_frame(ictx, ictx->vc, frame);
pkt->stream_index = ictx->vi;
if (!ret) {
if (is_flush_frame(frame)) ictx->flushed = 1;
return ret;
// Keep flushing if we haven't received all frames back but stop after SENTINEL_MAX tries.
if (ictx->pkt_diff != 0 && ictx->sentinel_count <= SENTINEL_MAX && (!ret || ret == AVERROR(EAGAIN))) {
return 0; // ignore actual return value and keep flushing
} else {
ictx->flushed = 1;
if (!ret) return ret;
}
}
// Flush audio decoder.
Expand Down Expand Up @@ -1148,11 +1180,17 @@ int process_out(struct input_ctx *ictx, struct output_ctx *octx, AVCodecContext
// Start filter flushing process if necessary
if (!inf && !filter->flushed) {
// Set input frame to the last frame
// And increment pts offset by pkt_duration
// TODO It may make sense to use the expected output packet duration instead
int is_video = AVMEDIA_TYPE_VIDEO == ost->codecpar->codec_type;
AVFrame *frame = is_video ? ictx->last_frame_v : ictx->last_frame_a;
filter->flush_offset += frame->pkt_duration;
// Increment pts offset by:
// - input packet's duration usually
int64_t dur = frame->pkt_duration;
// - output packet's duration if using fps filter & haven't encoded anything yet
if (is_video && octx->fps.den && !octx->res->frames) {
AVStream *ist = ictx->ic->streams[ictx->vi];
dur = av_rescale_q(frame->pkt_duration, ist->r_frame_rate, octx->fps);
}
filter->flush_offset += dur;
j0sh marked this conversation as resolved.
Show resolved Hide resolved
inf = frame;
inf->opaque = (void*)inf->pts; // value doesn't matter; just needs to be set
is_flushing = 1;
Expand Down Expand Up @@ -1419,6 +1457,9 @@ int transcode(struct transcode_thread *h,
avio_closep(&ictx->ic->pb);
if (dframe) av_frame_free(&dframe);
ictx->flushed = 0;
ictx->flushing = 0;
ictx->pkt_diff = 0;
ictx->sentinel_count = 0;
av_packet_unref(&ipkt); // needed for early exits
if (ictx->first_pkt) av_packet_free(&ictx->first_pkt);
if (ictx->ac) avcodec_free_context(&ictx->ac);
Expand Down
9 changes: 9 additions & 0 deletions ffmpeg/nvidia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,4 +764,13 @@ func TestNvidia_API_AlternatingTimestamps(t *testing.T) {
tc.StopTranscoder()
}

func TestNvidia_ShortSegments(t *testing.T) {
shortSegments(t, Nvidia, 1)
shortSegments(t, Nvidia, 2)
shortSegments(t, Nvidia, 3)
shortSegments(t, Nvidia, 5)
j0sh marked this conversation as resolved.
Show resolved Hide resolved
shortSegments(t, Nvidia, 6)
shortSegments(t, Nvidia, 10)
}

// XXX test bframes or delayed frames