Skip to content

Commit

Permalink
- add throttler.h,c
Browse files Browse the repository at this point in the history
- define config throttler.maxDataRate as max limit on transcoder usage.
   * measured in times x normal fps. e.g. if maxDataRate = 2 we expect
   no more than twice the original throughput.
   * By default maxDataRate is set to Infinity and no data rate limit is performed.
  • Loading branch information
igorshevach committed Nov 6, 2023
1 parent cd20f1c commit f42792a
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 5 deletions.
2 changes: 1 addition & 1 deletion transcoder/common/json_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,6 @@ json_status_t json_get_double(const json_value_t* obj,char* path,double defaultV
if (jresult->type!=JSON_FRAC) {
return JSON_BAD_DATA;
}
*result = ((double)jresult->v.num.denom) / ((double)jresult->v.num.num);
*result = ((double)jresult->v.num.num) / ((double)jresult->v.num.denom);
return JSON_OK;
}
7 changes: 6 additions & 1 deletion transcoder/config_v.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
{
"input": {
"file": "/media/worng_order_video_only.mp4",
"realTime": true,
"realTime": false,
"activeStream": 0,
"xduration": 9000000,
"randomDataPercentage": 0,
"jumpOffsetSec": -60
},
"throttler": {
"maxDataRate": 2.5,
"coldSeconds": 0,
"minThrottleWaitMs": 1
},
"frameDropper1": {
"enabled": false,
"queueDuration": 10,
Expand Down
9 changes: 8 additions & 1 deletion transcoder/receiver_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "receiver_server.h"
#include "KMP/KMP.h"
#include "transcode_session.h"
#include "utils/throttler.h"


int atomFileWrite (char* fileName,char* content,size_t size)
Expand All @@ -27,8 +28,9 @@ int atomFileWrite (char* fileName,char* content,size_t size)
int processedFrameCB(receiver_server_session_t *session,bool completed)
{
uint64_t now=av_gettime();
receiver_server_t *server=session->server;
if (completed || now-session->lastStatsUpdated>session->diagnosticsIntervalInSeconds) {//1 second interval
receiver_server_t *server=session->server;



char* tmpBuf=av_malloc(MAX_DIAGNOSTICS_STRING_LENGTH);
Expand Down Expand Up @@ -67,9 +69,11 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
bool autoAckMode;
uint64_t received_frame_id=0;
kmp_frame_position_t current_position;
throttler_t throttler = {0};

json_get_bool(GetConfig(),"autoAckModeEnabled",false,&autoAckMode);

_S(throttler_init(&server->receiverStats,&throttler));

while (retVal >= 0 && session->kmpClient.socket) {

Expand Down Expand Up @@ -119,6 +123,9 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran
pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section
samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size);
pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section

throttler_process(&throttler,transcode_session);

if(add_packet_frame_id_and_pts(packet,received_frame_id,packet->pts)){
LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id);
}
Expand Down
1 change: 1 addition & 0 deletions transcoder/utils/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define CATEGORY_RECEIVER "RECEIVER"
#define CATEGORY_KMP "KMP"
#define CATEGORY_HTTP_SERVER "HTTPSERVER"
#define CATEGORY_THROTTLER "THROTTLER"

void logger1(const char* category,int level,const char *fmt, ...);
void loggerFlush();
Expand Down
9 changes: 7 additions & 2 deletions transcoder/utils/samples_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void sample_stats_init(samples_stats_t* pStats,AVRational basetime)
pStats->firstTimeStamp=0;
pStats->lastTimeStamp=0;
pStats->lastDts=0;
pStats->throttleWait=0;
}

void drain(samples_stats_t* pStats,uint64_t clock)
Expand Down Expand Up @@ -102,12 +103,15 @@ void sample_stats_get_diagnostics(samples_stats_t *pStats,json_writer_ctx_t js)
JSON_SERIALIZE_STRING("firstTimeStamp",pStats->firstTimeStamp>0 ? ts2str(pStats->firstTimeStamp,false): "N/A")
JSON_SERIALIZE_STRING("lastTimeStamp",pStats->lastTimeStamp>0 ? ts2str(pStats->lastTimeStamp,false) : "N/A")
JSON_SERIALIZE_INT64("lastDts",pStats->lastDts)
if(pStats->throttleWait > 0) {
JSON_SERIALIZE_INT64("throttle",pStats->throttleWait)
}
}


void samples_stats_log(const char* category,int level,samples_stats_t *stats,const char *prefix)
{
LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf",
LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf throttleWait %s",
prefix,
stats->totalFrames,
stats->totalErrors,
Expand All @@ -116,5 +120,6 @@ void samples_stats_log(const char* category,int level,samples_stats_t *stats,con
pts2str(stats->clockDrift),
((double)stats->currentBitRate)/(1000.0),
stats->currentFrameRate,
stats->currentRate)
stats->currentRate,
pts2str(stats->throttleWait))
}
1 change: 1 addition & 0 deletions transcoder/utils/samples_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ typedef struct
uint64_t firstTimeStamp,lastTimeStamp;
int64_t timeStampPassed;
int64_t clockDrift;
int64_t throttleWait;
} samples_stats_t;

void sample_stats_init(samples_stats_t* pStats,AVRational basetime);
Expand Down
86 changes: 86 additions & 0 deletions transcoder/utils/throttler.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// throttler.c
// live_transcoder
//
//


#include "../transcode/transcode_session.h"
#include "throttler.h"
#include "json_parser.h"

static void doThrottle(float maxDataRate,
int coldSeconds,
int minThrottleWaitMs,
samples_stats_t *stats,
AVRational targetFramerate);

int
throttler_init(samples_stats_t *stats,throttler_t *throttler) {
json_value_t *config = GetConfig();
json_get_double(config,"throttler.maxDataRate",INFINITY,(double*)&throttler->maxDataRate);
*(bool*)&throttler->enabled = throttler->maxDataRate < INFINITY;
if(throttler->enabled){
json_get_int(config,"throttler.coldSeconds",0,(int*)&throttler->coldSeconds);
json_get_int(config,"throttler.minThrottleWaitMs",1,(int*)&throttler->minThrottleWaitMs);
throttler->stats = stats;
}
return 0;
}

void
throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) {
if(throttler && throttler->maxDataRate < INFINITY) {
const transcode_mediaInfo_t *mediaInfo = transcode_session ? transcode_session->currentMediaInfo : NULL;
if(mediaInfo && mediaInfo->codecParams){
const bool isVideo = mediaInfo->codecParams->codec_type == AVMEDIA_TYPE_VIDEO;
const AVRational frameRate = isVideo ? mediaInfo->frameRate :
(AVRational){ .num = mediaInfo->codecParams->sample_rate ,
.den = 1024 };

doThrottle(throttler->maxDataRate,
throttler->coldSeconds,
throttler->minThrottleWaitMs,
throttler->stats,
frameRate);
}
}
}

static
void
doThrottle(float maxDataRate,
int coldSeconds,
int minThrottleWaitMs,
samples_stats_t *stats,
AVRational targetFramerate)
{

if(targetFramerate.den > 0 && targetFramerate.num > 0) {
float currentDataRate;

samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"throttleThread-Stats");

if(stats->totalFrames < coldSeconds * targetFramerate.num / targetFramerate.den) {
return;
}

currentDataRate = stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num;

LOGGER(CATEGORY_THROTTLER,
AV_LOG_DEBUG,"throttleThread. data rate current: %.3f max: %.3f",
currentDataRate,
maxDataRate);

if(currentDataRate > maxDataRate) {
// going to sleep for a period of time gained due to race
int throttleWaitUSec = (currentDataRate - maxDataRate) * 1000 * 1000; //av_rescale(1000 * 1000,targetFramerate.den,targetFramerate.num);
const int minThrottleWaitMs = 1;
if(throttleWaitUSec > minThrottleWaitMs * 1000) {
LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"throttleThread. throttling %.3f ms",throttleWaitUSec / 1000.f);
stats->throttleWait += av_rescale_q( throttleWaitUSec, clockScale, standard_timebase);
av_usleep(throttleWaitUSec);
}
}
}
}
20 changes: 20 additions & 0 deletions transcoder/utils/throttler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// live_transcoder
//

#ifndef Throttler_h
#define Throttler_h

typedef struct {
const bool enabled;
const double maxDataRate;
const int coldSeconds;
const int minThrottleWaitMs;
samples_stats_t *stats;
} throttler_t;

int throttler_init(samples_stats_t *stats,throttler_t *throttler);

void
throttler_process(throttler_t *throttler,transcode_session_t *session);

#endif

0 comments on commit f42792a

Please sign in to comment.