Skip to content

Commit

Permalink
Refactored Orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
getroot committed Aug 11, 2024
1 parent 9844959 commit c87c6d9
Show file tree
Hide file tree
Showing 102 changed files with 1,957 additions and 2,539 deletions.
2 changes: 1 addition & 1 deletion misc/conf_examples/Edge.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
<UnusedStreamDeletionTimeout>60000</UnusedStreamDeletionTimeout>
</Properties>
<!--
<Origin>
<Origin>
<Location>/app/stream</Location>
<Pass>
<Scheme>ovt</Scheme>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ namespace api
throw http::HttpError(http::StatusCode::NotFound,
"Could not find publisher: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto application = std::static_pointer_cast<pub::FileApplication>(publisher->GetApplicationByName(app->GetName()));
auto application = std::static_pointer_cast<pub::FileApplication>(publisher->GetApplicationByName(app->GetVHostAppName()));
if (application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound,
"Could not find application: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto record = ::serdes::RecordFromJson(request_body);
Expand All @@ -82,12 +82,12 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest,
"Could not parse json context: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}
else
{
record->SetVhost(vhost->GetName());
record->SetApplication(app->GetName().GetAppName());
record->SetApplication(app->GetVHostAppName().GetAppName());
}

std::vector<std::shared_ptr<info::Record>> results;
Expand All @@ -97,7 +97,7 @@ namespace api
{
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get records: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

response = Json::arrayValue;
Expand All @@ -122,16 +122,16 @@ namespace api
throw http::HttpError(http::StatusCode::NotFound,
"Could not find publisher: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto application = std::static_pointer_cast<pub::FileApplication>(publisher->GetApplicationByName(app->GetName()));
auto application = std::static_pointer_cast<pub::FileApplication>(publisher->GetApplicationByName(app->GetVHostAppName()));
if (application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound,
"Could not find application: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto record = ::serdes::RecordFromJson(request_body);
Expand All @@ -140,12 +140,12 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest,
"Could not parse json context: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}
else
{
record->SetVhost(vhost->GetName());
record->SetApplication(app->GetName().GetAppName());
record->SetApplication(app->GetVHostAppName().GetAppName());
}

auto error = application->RecordStart(record);
Expand Down Expand Up @@ -178,16 +178,16 @@ namespace api
throw http::HttpError(http::StatusCode::NotFound,
"Could not find publisher: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto application = std::static_pointer_cast<pub::FileApplication>(publisher->GetApplicationByName(app->GetName()));
auto application = std::static_pointer_cast<pub::FileApplication>(publisher->GetApplicationByName(app->GetVHostAppName()));
if (application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound,
"Could not find application: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto record = ::serdes::RecordFromJson(request_body);
Expand All @@ -196,12 +196,12 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest,
"Could not parse json context: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}
else
{
record->SetVhost(vhost->GetName());
record->SetApplication(app->GetName().GetAppName());
record->SetApplication(app->GetVHostAppName().GetAppName());
}

auto error = application->RecordStop(record);
Expand Down Expand Up @@ -242,7 +242,7 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest,
"Could not parse json context: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto publisher{ocst::Orchestrator::GetInstance()->GetPublisherFromType(PublisherType::Push)};
Expand All @@ -251,10 +251,10 @@ namespace api
throw http::HttpError(http::StatusCode::NotFound,
"Could not find publisher: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto application{std::static_pointer_cast<pub::PushApplication>(publisher->GetApplicationByName(app->GetName()))};
auto application{std::static_pointer_cast<pub::PushApplication>(publisher->GetApplicationByName(app->GetVHostAppName()))};
if (application != nullptr)
{
application->GetPushes(push, results);
Expand Down Expand Up @@ -282,7 +282,7 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest,
"Could not parse json context: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto push_protocol{push->GetProtocol().LowerCaseString()};
Expand All @@ -294,7 +294,7 @@ namespace api
"Could not find protocol '%s': [%s/%s]",
push_protocol.CStr(),
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto publisher{ocst::Orchestrator::GetInstance()->GetPublisherFromType(publisher_type)};
Expand All @@ -303,20 +303,20 @@ namespace api
throw http::HttpError(http::StatusCode::NotFound,
"Could not find publisher: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto application{std::static_pointer_cast<pub::PushApplication>(publisher->GetApplicationByName(app->GetName()))};
auto application{std::static_pointer_cast<pub::PushApplication>(publisher->GetApplicationByName(app->GetVHostAppName()))};
if (application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound,
"Could not find application: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

push->SetVhost(vhost->GetName());
push->SetApplication(app->GetName().GetAppName());
push->SetApplication(app->GetVHostAppName().GetAppName());

auto error{application->StartPush(push)};
if (error->GetCode() != pub::PushApplication::ErrorCode::Success)
Expand All @@ -328,7 +328,7 @@ namespace api
if (application->GetStream(push->GetStreamName()) == nullptr)
{
auto url = ov::Url::Parse(client->GetRequest()->GetUri());
auto app_name = app->GetName();
auto app_name = app->GetVHostAppName();
auto stream_name = push->GetStreamName();
ocst::Orchestrator::GetInstance()->RequestPullStreamWithOriginMap(url, app_name, stream_name);
}
Expand All @@ -349,22 +349,22 @@ namespace api
throw http::HttpError(http::StatusCode::BadRequest,
"Could not parse json context: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

push->SetVhost(vhost->GetName());
push->SetApplication(app->GetName().GetAppName());
push->SetApplication(app->GetVHostAppName().GetAppName());

auto publisher{ocst::Orchestrator::GetInstance()->GetPublisherFromType(PublisherType::Push)};
if (publisher == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound,
"Could not find publisher: [%s/%s]",
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

auto application{std::static_pointer_cast<pub::PushApplication>(publisher->GetApplicationByName(app->GetName()))};
auto application{std::static_pointer_cast<pub::PushApplication>(publisher->GetApplicationByName(app->GetVHostAppName()))};
if (application != nullptr)
{
auto error{application->StopPush(push)};
Expand All @@ -383,14 +383,14 @@ namespace api
"Could not find id '%s': [%s/%s]",
push->GetId().CStr(),
vhost->GetName().CStr(),
app->GetName().GetAppName().CStr());
app->GetVHostAppName().GetAppName().CStr());
}

ApiResponse AppActionsController::OnGetDummyAction(const std::shared_ptr<http::svr::HttpExchange> &client,
const std::shared_ptr<mon::HostMetrics> &vhost,
const std::shared_ptr<mon::ApplicationMetrics> &app)
{
logte("Called OnGetDummyAction. invoke [%s/%s]", vhost->GetName().CStr(), app->GetName().GetAppName().CStr());
logte("Called OnGetDummyAction. invoke [%s/%s]", vhost->GetName().CStr(), app->GetVHostAppName().GetAppName().CStr());

return app->GetConfig().ToJson();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ namespace api
{
auto &app = item.second;

response.append(app->GetName().GetAppName().CStr());
response.append(app->GetVHostAppName().GetAppName().CStr());
}

return response;
Expand Down Expand Up @@ -176,13 +176,13 @@ namespace api
ocst::Orchestrator::GetInstance()->DeleteApplication(*app),
"delete",
"application",
ov::String::FormatString("%s/%s", vhost->GetName().CStr(), app->GetName().GetAppName().CStr()));
ov::String::FormatString("%s/%s", vhost->GetName().CStr(), app->GetVHostAppName().GetAppName().CStr()));

ThrowIfOrchestratorNotSucceeded(
ocst::Orchestrator::GetInstance()->CreateApplication(*vhost, app_config),
"create",
"application",
ov::String::FormatString("%s/%s", vhost->GetName().CStr(), app->GetName().GetAppName().CStr()));
ov::String::FormatString("%s/%s", vhost->GetName().CStr(), app->GetVHostAppName().GetAppName().CStr()));

auto app_metrics = GetApplication(vhost, app_config.GetName().CStr());

Expand All @@ -199,7 +199,7 @@ namespace api
ocst::Orchestrator::GetInstance()->DeleteApplication(*app),
"delete",
"application",
ov::String::FormatString("%s/%s", vhost->GetName().CStr(), app->GetName().GetAppName().CStr()));
ov::String::FormatString("%s/%s", vhost->GetName().CStr(), app->GetVHostAppName().GetAppName().CStr()));

return {};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace api
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the multiplex provider");
}

auto multiplex_application = std::static_pointer_cast<pvd::MultiplexApplication>(provider->GetApplicationByName(app->GetName()));
auto multiplex_application = std::static_pointer_cast<pvd::MultiplexApplication>(provider->GetApplicationByName(app->GetVHostAppName()));
if (multiplex_application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound, "Could not get the multiplex application");
Expand Down Expand Up @@ -127,7 +127,7 @@ namespace api
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the multiplex provider");
}

auto multiplex_application = std::static_pointer_cast<pvd::MultiplexApplication>(provider->GetApplicationByName(app->GetName()));
auto multiplex_application = std::static_pointer_cast<pvd::MultiplexApplication>(provider->GetApplicationByName(app->GetVHostAppName()));
if (multiplex_application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound, "Could not get the multiplex application");
Expand Down Expand Up @@ -179,7 +179,7 @@ namespace api
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the multiplex provider");
}

auto multiplex_application = std::static_pointer_cast<pvd::MultiplexApplication>(provider->GetApplicationByName(app->GetName()));
auto multiplex_application = std::static_pointer_cast<pvd::MultiplexApplication>(provider->GetApplicationByName(app->GetVHostAppName()));
if (multiplex_application == nullptr)
{
throw http::HttpError(http::StatusCode::NotFound, "Could not get the multiplex application");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
http::StatusCode::NotFound, \
"Could not find the output profile: [%s/%s/%s]", \
vhost_metrics->GetName().CStr(), \
app_metrics->GetName().GetAppName().CStr(), \
app_metrics->GetVHostAppName().GetAppName().CStr(), \
output_profile_name.CStr())

namespace api
Expand Down Expand Up @@ -137,7 +137,7 @@ namespace api
std::shared_ptr<mon::ApplicationMetrics> new_app;
Json::Value new_app_json;

new_app = GetApplication(vhost, app->GetName().GetAppName().CStr());
new_app = GetApplication(vhost, app->GetVHostAppName().GetAppName().CStr());
new_app_json = new_app->GetConfig().ToJson();

for (auto &response_profile : response)
Expand Down Expand Up @@ -251,7 +251,7 @@ namespace api

RecreateApplication(vhost, app, app_json);

auto new_app = GetApplication(vhost, app->GetName().GetAppName().CStr());
auto new_app = GetApplication(vhost, app->GetVHostAppName().GetAppName().CStr());

Json::Value value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ namespace api
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the scheduled provider");
}

auto scheduled_stream = std::static_pointer_cast<pvd::ScheduledStream>(provider->GetStreamByName(app->GetName(), stream->GetName()));
auto scheduled_stream = std::static_pointer_cast<pvd::ScheduledStream>(provider->GetStreamByName(app->GetVHostAppName(), stream->GetName()));
if (scheduled_stream == nullptr)
{
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the scheduled stream");
Expand Down Expand Up @@ -188,7 +188,7 @@ namespace api
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the scheduled provider");
}

auto scheduled_stream = std::static_pointer_cast<pvd::ScheduledStream>(provider->GetStreamByName(app->GetName(), stream->GetName()));
auto scheduled_stream = std::static_pointer_cast<pvd::ScheduledStream>(provider->GetStreamByName(app->GetVHostAppName(), stream->GetName()));
if (scheduled_stream == nullptr)
{
throw http::HttpError(http::StatusCode::InternalServerError, "Could not get the scheduled stream");
Expand Down
Loading

0 comments on commit c87c6d9

Please sign in to comment.