From f42792a1065070df6861f12a65d36cbdd0b0a922 Mon Sep 17 00:00:00 2001 From: igorshevach Date: Mon, 6 Nov 2023 10:48:41 +0200 Subject: [PATCH] - add throttler.h,c - define config throttler.maxDataRate as max limit on transcoder usage. * measured in times x normal fps. e.g. if maxDataRate = 2 we expect no more than twice the original throughput. * By default maxDataRate is set to Infinity and no data rate limit is performed. --- transcoder/common/json_parser.c | 2 +- transcoder/config_v.json | 7 ++- transcoder/receiver_server.c | 9 +++- transcoder/utils/logger.h | 1 + transcoder/utils/samples_stats.c | 9 +++- transcoder/utils/samples_stats.h | 1 + transcoder/utils/throttler.c | 86 ++++++++++++++++++++++++++++++++ transcoder/utils/throttler.h | 20 ++++++++ 8 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 transcoder/utils/throttler.c create mode 100644 transcoder/utils/throttler.h diff --git a/transcoder/common/json_parser.c b/transcoder/common/json_parser.c index f8957af5..9e1c9536 100644 --- a/transcoder/common/json_parser.c +++ b/transcoder/common/json_parser.c @@ -1000,6 +1000,6 @@ json_status_t json_get_double(const json_value_t* obj,char* path,double defaultV if (jresult->type!=JSON_FRAC) { return JSON_BAD_DATA; } - *result = ((double)jresult->v.num.denom) / ((double)jresult->v.num.num); + *result = ((double)jresult->v.num.num) / ((double)jresult->v.num.denom); return JSON_OK; } diff --git a/transcoder/config_v.json b/transcoder/config_v.json index 229b36e7..aacf72f4 100644 --- a/transcoder/config_v.json +++ b/transcoder/config_v.json @@ -1,12 +1,17 @@ { "input": { "file": "/media/worng_order_video_only.mp4", - "realTime": true, + "realTime": false, "activeStream": 0, "xduration": 9000000, "randomDataPercentage": 0, "jumpOffsetSec": -60 }, + "throttler": { + "maxDataRate": 2.5, + "coldSeconds": 0, + "minThrottleWaitMs": 1 + }, "frameDropper1": { "enabled": false, "queueDuration": 10, diff --git a/transcoder/receiver_server.c b/transcoder/receiver_server.c index 5b5529ae..d04f5365 100644 --- a/transcoder/receiver_server.c +++ b/transcoder/receiver_server.c @@ -9,6 +9,7 @@ #include "receiver_server.h" #include "KMP/KMP.h" #include "transcode_session.h" +#include "utils/throttler.h" int atomFileWrite (char* fileName,char* content,size_t size) @@ -27,8 +28,9 @@ int atomFileWrite (char* fileName,char* content,size_t size) int processedFrameCB(receiver_server_session_t *session,bool completed) { uint64_t now=av_gettime(); + receiver_server_t *server=session->server; if (completed || now-session->lastStatsUpdated>session->diagnosticsIntervalInSeconds) {//1 second interval - receiver_server_t *server=session->server; + char* tmpBuf=av_malloc(MAX_DIAGNOSTICS_STRING_LENGTH); @@ -67,9 +69,11 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran bool autoAckMode; uint64_t received_frame_id=0; kmp_frame_position_t current_position; + throttler_t throttler = {0}; json_get_bool(GetConfig(),"autoAckModeEnabled",false,&autoAckMode); + _S(throttler_init(&server->receiverStats,&throttler)); while (retVal >= 0 && session->kmpClient.socket) { @@ -119,6 +123,9 @@ int clientLoop(receiver_server_t *server,receiver_server_session_t *session,tran pthread_mutex_lock(&server->diagnostics_locker); // lock the critical section samples_stats_add(&server->receiverStats,packet->dts,packet->pos,packet->size); pthread_mutex_unlock(&server->diagnostics_locker); // lock the critical section + + throttler_process(&throttler,transcode_session); + if(add_packet_frame_id_and_pts(packet,received_frame_id,packet->pts)){ LOGGER(CATEGORY_RECEIVER,AV_LOG_ERROR,"[%s] failed to set frame id %lld on packet",session->stream_name,received_frame_id); } diff --git a/transcoder/utils/logger.h b/transcoder/utils/logger.h index dfd5f9fa..8ac370c8 100644 --- a/transcoder/utils/logger.h +++ b/transcoder/utils/logger.h @@ -20,6 +20,7 @@ #define CATEGORY_RECEIVER "RECEIVER" #define CATEGORY_KMP "KMP" #define CATEGORY_HTTP_SERVER "HTTPSERVER" +#define CATEGORY_THROTTLER "THROTTLER" void logger1(const char* category,int level,const char *fmt, ...); void loggerFlush(); diff --git a/transcoder/utils/samples_stats.c b/transcoder/utils/samples_stats.c index c99459fe..59fdfaea 100644 --- a/transcoder/utils/samples_stats.c +++ b/transcoder/utils/samples_stats.c @@ -24,6 +24,7 @@ void sample_stats_init(samples_stats_t* pStats,AVRational basetime) pStats->firstTimeStamp=0; pStats->lastTimeStamp=0; pStats->lastDts=0; + pStats->throttleWait=0; } void drain(samples_stats_t* pStats,uint64_t clock) @@ -102,12 +103,15 @@ void sample_stats_get_diagnostics(samples_stats_t *pStats,json_writer_ctx_t js) JSON_SERIALIZE_STRING("firstTimeStamp",pStats->firstTimeStamp>0 ? ts2str(pStats->firstTimeStamp,false): "N/A") JSON_SERIALIZE_STRING("lastTimeStamp",pStats->lastTimeStamp>0 ? ts2str(pStats->lastTimeStamp,false) : "N/A") JSON_SERIALIZE_INT64("lastDts",pStats->lastDts) + if(pStats->throttleWait > 0) { + JSON_SERIALIZE_INT64("throttle",pStats->throttleWait) + } } void samples_stats_log(const char* category,int level,samples_stats_t *stats,const char *prefix) { - LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf", + LOGGER(category,level,"[%s] Stats: total frames: %ld total errors: %ld total time: %s (%s), clock drift %s,bitrate %.2lf Kbit/s fps=%.2lf rate=x%.2lf throttleWait %s", prefix, stats->totalFrames, stats->totalErrors, @@ -116,5 +120,6 @@ void samples_stats_log(const char* category,int level,samples_stats_t *stats,con pts2str(stats->clockDrift), ((double)stats->currentBitRate)/(1000.0), stats->currentFrameRate, - stats->currentRate) + stats->currentRate, + pts2str(stats->throttleWait)) } diff --git a/transcoder/utils/samples_stats.h b/transcoder/utils/samples_stats.h index 0169bfaa..295c92ea 100644 --- a/transcoder/utils/samples_stats.h +++ b/transcoder/utils/samples_stats.h @@ -41,6 +41,7 @@ typedef struct uint64_t firstTimeStamp,lastTimeStamp; int64_t timeStampPassed; int64_t clockDrift; + int64_t throttleWait; } samples_stats_t; void sample_stats_init(samples_stats_t* pStats,AVRational basetime); diff --git a/transcoder/utils/throttler.c b/transcoder/utils/throttler.c new file mode 100644 index 00000000..6acac7dc --- /dev/null +++ b/transcoder/utils/throttler.c @@ -0,0 +1,86 @@ +// +// throttler.c +// live_transcoder +// +// + + +#include "../transcode/transcode_session.h" +#include "throttler.h" +#include "json_parser.h" + +static void doThrottle(float maxDataRate, + int coldSeconds, + int minThrottleWaitMs, + samples_stats_t *stats, + AVRational targetFramerate); + +int +throttler_init(samples_stats_t *stats,throttler_t *throttler) { + json_value_t *config = GetConfig(); + json_get_double(config,"throttler.maxDataRate",INFINITY,(double*)&throttler->maxDataRate); + *(bool*)&throttler->enabled = throttler->maxDataRate < INFINITY; + if(throttler->enabled){ + json_get_int(config,"throttler.coldSeconds",0,(int*)&throttler->coldSeconds); + json_get_int(config,"throttler.minThrottleWaitMs",1,(int*)&throttler->minThrottleWaitMs); + throttler->stats = stats; + } + return 0; +} + +void +throttler_process(throttler_t *throttler,transcode_session_t *transcode_session) { + if(throttler && throttler->maxDataRate < INFINITY) { + const transcode_mediaInfo_t *mediaInfo = transcode_session ? transcode_session->currentMediaInfo : NULL; + if(mediaInfo && mediaInfo->codecParams){ + const bool isVideo = mediaInfo->codecParams->codec_type == AVMEDIA_TYPE_VIDEO; + const AVRational frameRate = isVideo ? mediaInfo->frameRate : + (AVRational){ .num = mediaInfo->codecParams->sample_rate , + .den = 1024 }; + + doThrottle(throttler->maxDataRate, + throttler->coldSeconds, + throttler->minThrottleWaitMs, + throttler->stats, + frameRate); + } + } +} + +static +void +doThrottle(float maxDataRate, + int coldSeconds, + int minThrottleWaitMs, + samples_stats_t *stats, + AVRational targetFramerate) +{ + + if(targetFramerate.den > 0 && targetFramerate.num > 0) { + float currentDataRate; + + samples_stats_log(CATEGORY_RECEIVER,AV_LOG_DEBUG,stats,"throttleThread-Stats"); + + if(stats->totalFrames < coldSeconds * targetFramerate.num / targetFramerate.den) { + return; + } + + currentDataRate = stats->currentFrameRate * targetFramerate.den / (float)targetFramerate.num; + + LOGGER(CATEGORY_THROTTLER, + AV_LOG_DEBUG,"throttleThread. data rate current: %.3f max: %.3f", + currentDataRate, + maxDataRate); + + if(currentDataRate > maxDataRate) { + // going to sleep for a period of time gained due to race + int throttleWaitUSec = (currentDataRate - maxDataRate) * 1000 * 1000; //av_rescale(1000 * 1000,targetFramerate.den,targetFramerate.num); + const int minThrottleWaitMs = 1; + if(throttleWaitUSec > minThrottleWaitMs * 1000) { + LOGGER(CATEGORY_THROTTLER,AV_LOG_INFO,"throttleThread. throttling %.3f ms",throttleWaitUSec / 1000.f); + stats->throttleWait += av_rescale_q( throttleWaitUSec, clockScale, standard_timebase); + av_usleep(throttleWaitUSec); + } + } + } +} \ No newline at end of file diff --git a/transcoder/utils/throttler.h b/transcoder/utils/throttler.h new file mode 100644 index 00000000..c614ca92 --- /dev/null +++ b/transcoder/utils/throttler.h @@ -0,0 +1,20 @@ +// live_transcoder +// + +#ifndef Throttler_h +#define Throttler_h + +typedef struct { + const bool enabled; + const double maxDataRate; + const int coldSeconds; + const int minThrottleWaitMs; + samples_stats_t *stats; +} throttler_t; + +int throttler_init(samples_stats_t *stats,throttler_t *throttler); + +void +throttler_process(throttler_t *throttler,transcode_session_t *session); + +#endif \ No newline at end of file