Skip to content

Commit b574371

Browse files
Merge pull request #181 from kaltura/master-throttle-transcoder
LIV-1205: Transcoder overload protection.
2 parents 94f6723 + 7acde1f commit b574371

File tree

10 files changed

+350
-12
lines changed

10 files changed

+350
-12
lines changed

transcoder/common/json_parser.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,6 @@ json_status_t json_get_double(const json_value_t* obj,char* path,double defaultV
10001000
if (jresult->type!=JSON_FRAC) {
10011001
return JSON_BAD_DATA;
10021002
}
1003-
*result = ((double)jresult->v.num.denom) / ((double)jresult->v.num.num);
1003+
*result = ((double)jresult->v.num.num) / ((double)jresult->v.num.denom);
10041004
return JSON_OK;
10051005
}

transcoder/debug/file_streamer.c

+28-7
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,14 @@ void* thread_stream_from_file(void *vargp)
4949
char channelId[KMP_MAX_CHANNEL_ID];
5050
json_get_string(GetConfig(),"input.channelId","1_abcdefgh",channelId,sizeof(channelId));
5151

52-
int jumpOffsetSec=0;
52+
int64_t jumpOffsetSec=0;
5353
json_get_int64(GetConfig(),"input.jumpoffsetsec",0,&jumpOffsetSec);
5454

55+
int64_t hiccupDurationSec, hiccupIntervalSec;
56+
json_get_int64(GetConfig(),"input.hiccupDurationSec",0,&hiccupDurationSec);
57+
58+
json_get_int64(GetConfig(),"input.hiccupIntervalSec",0,&hiccupIntervalSec);
59+
5560
AVPacket packet;
5661
av_init_packet(&packet);
5762

@@ -86,8 +91,10 @@ void* thread_stream_from_file(void *vargp)
8691
LOGGER("SENDER",AV_LOG_INFO,"Realtime = %s",realTime ? "true" : "false");
8792
srand((int)time(NULL));
8893
uint64_t lastDts=0;
89-
int64_t start_time=av_gettime_relative();
90-
94+
int64_t start_time=av_gettime_relative(),
95+
hiccup_duration = hiccupDurationSec * 1000 * 1000,
96+
hiccup_interval = hiccupIntervalSec * 1000 * 1000,
97+
next_hiccup = start_time + hiccup_interval;
9198

9299
samples_stats_t stats;
93100
sample_stats_init(&stats,standard_timebase);
@@ -143,12 +150,26 @@ void* thread_stream_from_file(void *vargp)
143150

144151
if (realTime && lastDts > 0) {
145152

146-
int64_t timePassed=av_rescale_q(packet.dts-lastDts,standard_timebase,AV_TIME_BASE_Q);
147-
//LOGGER("SENDER",AV_LOG_DEBUG,"XXXX dt=%ld dd=%ld", (av_gettime_relative() - start_time),timePassed);
148-
while ((av_gettime_relative() - start_time) < timePassed) {
153+
int64_t timePassed=av_rescale_q(packet.dts,standard_timebase,AV_TIME_BASE_Q) + start_time,
154+
clockPassed = av_gettime_relative();
155+
156+
if(clockPassed >= next_hiccup && clockPassed < next_hiccup + hiccup_duration) {
157+
next_hiccup += hiccup_duration;
158+
LOGGER("SENDER",AV_LOG_INFO,"hiccup! [ %ld - %ld ]",
159+
ts2str((clockPassed - start_time) * 90,true),
160+
ts2str((next_hiccup - start_time) * 90,true));
161+
162+
av_usleep(next_hiccup - clockPassed);
163+
164+
next_hiccup = av_gettime_relative() + hiccup_interval;
165+
}
166+
167+
LOGGER("SENDER",AV_LOG_DEBUG,"XXXX clockPassed=%ld timePassed=%ld", clockPassed - start_time,timePassed - start_time);
168+
while (clockPassed < timePassed) {
149169

150170
LOGGER0("SENDER",AV_LOG_DEBUG,"XXXX Sleep 10ms");
151171
av_usleep(10*1000);//10ms
172+
clockPassed = av_gettime_relative();
152173
}
153174
}
154175

@@ -180,7 +201,7 @@ void* thread_stream_from_file(void *vargp)
180201
LOGGER("SENDER",AV_LOG_DEBUG,"sent packet pts=%s dts=%s size=%d",
181202
ts2str(packet.pts,true),
182203
ts2str(packet.dts,true),
183-
packet.dts,packet.size);
204+
packet.size);
184205

185206

186207
av_packet_unref(&packet);

transcoder/receiver_server.c

+6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "receiver_server.h"
1010
#include "KMP/KMP.h"
1111
#include "transcode_session.h"
12+
#include "utils/throttler.h"
1213

1314

1415
int atomFileWrite (char* fileName,char* content,size_t size)
@@ -67,9 +68,11 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
6768
bool autoAckMode;
6869
uint64_t received_frame_id=0;
6970
kmp_frame_position_t current_position;
71+
throttler_t throttler = {0};
7072

7173
json_get_bool(GetConfig(),"autoAckModeEnabled",false,&autoAckMode);
7274

75+
_S(throttler_init(&server->receiverStats,&throttler));
7376

7477
while (retVal >= 0 && session->kmpClient.socket) {
7578

@@ -119,6 +122,9 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
119122
pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section
120123
samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size);
121124
pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section
125+
126+
throttler_process(&throttler,transcode_session);
127+
122128
if(add_packet_frame_id_and_pts(packet,received_frame_id,packet->pts)){
123129
LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id);
124130
}

transcoder/tests/config_a.json

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
{
2+
"input": {
3+
"file": "/media/worng_order_audio_only.ts",
4+
"realTime": false,
5+
"activeStream": 0,
6+
"zduration": 9000000,
7+
"randomDataPercentage": 0,
8+
"xhiccupIntervalSec": 5,
9+
"xhiccupDurationSec": 2
10+
},
11+
"throttler": {
12+
"maxDataRate": 1.5,
13+
"useStatsDataRate": false,
14+
"minThrottleWaitMs": 1
15+
},
16+
"frameDropper1": {
17+
"enabled": false,
18+
"queueDuration": 10,
19+
"queueSize": 0,
20+
"nonKeyFrameDropperThreshold": 4,
21+
"decodedFrameDropperThreshold": 2
22+
},
23+
"logger": {
24+
"logLevels": ["DEBUG","VERBOSE","INFO","WARN","ERROR","FATAL","PANIC"],
25+
"logLevel": "DEBUG"
26+
},
27+
"kmp": {
28+
"listenPort": 16543,
29+
"listenAddress": "0.0.0.0",
30+
"acceptTimeout": 15,
31+
"idleTimeout": 10,
32+
"connectTimeout": 10
33+
},
34+
"control": {
35+
"listenPort": 18001,
36+
"listenAddress": "0.0.0.0"
37+
},
38+
"debug": {
39+
"diagnosticsIntervalInSeconds": 1
40+
},
41+
"output": {
42+
"saveFile": true,
43+
"outputFileNamePattern": "./output_%s.ts",
44+
"streamingUrla": "kmp://localhost:6543",
45+
"streamingUrl12": "kmp://192.168.11.59:6543",
46+
"streamingUrl1": ""
47+
},
48+
"engine": {
49+
"encoders": {
50+
"h264": ["h264_nvenc","libx264","h264_videotoolbox"]
51+
},
52+
"presets": {
53+
"A": {
54+
"h264_videotoolbox": "default",
55+
"libx264": "veryfast",
56+
"h264_nvenc": "fast"
57+
}
58+
}
59+
},
60+
"errorPolicy": {
61+
"exitOnError": false
62+
},
63+
"outputTracks": [
64+
{
65+
"trackId": "v32",
66+
"enabled": false,
67+
"passthrough": true
68+
},
69+
{
70+
"trackId": "a32",
71+
"enabled": true,
72+
"passthrough": true
73+
},
74+
{
75+
"trackId": "a33",
76+
"enabled": true,
77+
"bitrate": 64,
78+
"passthrough": false,
79+
"codec": "aac",
80+
"audioParams": {
81+
"samplingRate": -1,
82+
"channels": 2
83+
}
84+
},
85+
{
86+
"trackId": "v33",
87+
"passthrough": false,
88+
"enabled": false,
89+
"bitrate": 900,
90+
"codec": "h264",
91+
"videoParams": {
92+
"profile": "main",
93+
"preset": "A",
94+
"height": 480
95+
}
96+
},
97+
{
98+
"trackId": "v34",
99+
"enabled": false,
100+
"passthrough": false,
101+
"bitrate": 600,
102+
"codec": "h264",
103+
"videoParams": {
104+
"profile": "baseline",
105+
"preset": "A",
106+
"height": 360
107+
}
108+
},
109+
{
110+
"trackId": "v35",
111+
"enabled": false,
112+
"passthrough": false,
113+
"bitrate": 400,
114+
"codec": "h264",
115+
"videoParams": {
116+
"profile": "baseline",
117+
"preset": "A",
118+
"height": 360
119+
}
120+
},
121+
{
122+
"trackId": "v42",
123+
"enabled": false,
124+
"passthrough": false,
125+
"bitrate": 1500,
126+
"codec": "h264",
127+
"videoParams": {
128+
"profile": "high",
129+
"preset": "A",
130+
"height": 720
131+
}
132+
},
133+
{
134+
"trackId": "v43",
135+
"enabled": false,
136+
"passthrough": false,
137+
"bitrate": 2500,
138+
"codec": "h264",
139+
"videoParams": {
140+
"profile": "high",
141+
"preset": "A",
142+
"height": 2160,
143+
"skipFrame": 1
144+
}
145+
}
146+
]
147+
}

transcoder/config_v.json renamed to transcoder/tests/config_v.json

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
{
22
"input": {
33
"file": "/media/worng_order_video_only.mp4",
4-
"realTime": true,
4+
"realTime": false,
55
"activeStream": 0,
66
"xduration": 9000000,
77
"randomDataPercentage": 0,
8-
"jumpOffsetSec": -60
8+
"jumpOffsetSec": -60,
9+
"hiccupIntervalSec": 0,
10+
"hiccupDurationSec": 0
11+
},
12+
"throttler": {
13+
"maxDataRate": 1.5,
14+
"useStatsDataRate": false,
15+
"minThrottleWaitMs": 1
916
},
1017
"frameDropper1": {
1118
"enabled": false,

transcoder/utils/logger.h

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define CATEGORY_RECEIVER "RECEIVER"
2121
#define CATEGORY_KMP "KMP"
2222
#define CATEGORY_HTTP_SERVER "HTTPSERVER"
23+
#define CATEGORY_THROTTLER "THROTTLER"
2324

2425
void logger1(const char* category,int level,const char *fmt, ...);
2526
void loggerFlush();

transcoder/utils/samples_stats.c

+7-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ void sample_stats_init(samples_stats_t* pStats,AVRational basetime)
2424
pStats->firstTimeStamp=0;
2525
pStats->lastTimeStamp=0;
2626
pStats->lastDts=0;
27+
pStats->throttleWait=0;
2728
}
2829

2930
void drain(samples_stats_t* pStats,uint64_t clock)
@@ -102,12 +103,15 @@ void sample_stats_get_diagnostics(samples_stats_t *pStats,json_writer_ctx_t js)
102103
JSON_SERIALIZE_STRING("firstTimeStamp",pStats->firstTimeStamp>0 ? ts2str(pStats->firstTimeStamp,false): "N/A")
103104
JSON_SERIALIZE_STRING("lastTimeStamp",pStats->lastTimeStamp>0 ? ts2str(pStats->lastTimeStamp,false) : "N/A")
104105
JSON_SERIALIZE_INT64("lastDts",pStats->lastDts)
106+
if(pStats->throttleWait > 0) {
107+
JSON_SERIALIZE_INT64("throttle",pStats->throttleWait)
108+
}
105109
}
106110

107111

108112
void samples_stats_log(const char* category,int level,samples_stats_t *stats,const char *prefix)
109113
{
110-
LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf",
114+
LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf throttleWait %s",
111115
prefix,
112116
stats->totalFrames,
113117
stats->totalErrors,
@@ -116,5 +120,6 @@ void samples_stats_log(const char* category,int level,samples_stats_t *stats,con
116120
pts2str(stats->clockDrift),
117121
((double)stats->currentBitRate)/(1000.0),
118122
stats->currentFrameRate,
119-
stats->currentRate)
123+
stats->currentRate,
124+
pts2str(stats->throttleWait))
120125
}

transcoder/utils/samples_stats.h

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ typedef struct
4141
uint64_t firstTimeStamp,lastTimeStamp;
4242
int64_t timeStampPassed;
4343
int64_t clockDrift;
44+
int64_t throttleWait;
4445
} samples_stats_t;
4546

4647
void sample_stats_init(samples_stats_t* pStats,AVRational basetime);

0 commit comments

Comments
 (0)