Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor sls client manager #1954

Merged
merged 47 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
08ad80b
refactor sls client manager
henryzhx8 Dec 9, 2024
6d4f05d
polish
henryzhx8 Dec 9, 2024
c9d725b
polish
henryzhx8 Dec 10, 2024
270a311
polish
henryzhx8 Dec 10, 2024
b83200c
polish
henryzhx8 Dec 10, 2024
37a8c1e
polish
henryzhx8 Dec 10, 2024
6c48535
polish
henryzhx8 Dec 10, 2024
1977793
polish
henryzhx8 Dec 10, 2024
4324e64
polish
henryzhx8 Dec 11, 2024
297eaea
polish
henryzhx8 Dec 12, 2024
5bcff46
polish
henryzhx8 Dec 12, 2024
d91e98a
polish
henryzhx8 Dec 12, 2024
dfdad39
polish
henryzhx8 Dec 12, 2024
ae97365
Merge branch 'main' into feat/client
henryzhx8 Dec 13, 2024
ecb3398
polish
henryzhx8 Dec 13, 2024
3ca2ae7
polish
henryzhx8 Dec 13, 2024
598b506
polish
henryzhx8 Dec 13, 2024
3bf8f08
polish
henryzhx8 Dec 17, 2024
587d0b5
Merge branch 'main' into feat/client
henryzhx8 Dec 19, 2024
6c0c5de
polish
henryzhx8 Dec 19, 2024
0f9d6b0
polish
henryzhx8 Dec 19, 2024
5d37ced
polish
henryzhx8 Dec 19, 2024
f7e4f83
polish
henryzhx8 Dec 20, 2024
019045c
polish
henryzhx8 Dec 23, 2024
d4183b1
polish
henryzhx8 Dec 23, 2024
e4039dc
polish
henryzhx8 Dec 23, 2024
f390d99
Merge branch 'main' into feat/client
henryzhx8 Dec 23, 2024
f637af8
polish
henryzhx8 Dec 23, 2024
f319b53
polish
henryzhx8 Dec 23, 2024
ed59495
polish
henryzhx8 Dec 23, 2024
5d21207
polish
henryzhx8 Dec 23, 2024
9c499ef
polish
henryzhx8 Dec 26, 2024
7ec64fc
polish
henryzhx8 Dec 27, 2024
114387e
Merge branch 'main' into feat/client
henryzhx8 Dec 27, 2024
23ba530
polish
henryzhx8 Dec 27, 2024
9d61d3e
polish
henryzhx8 Dec 27, 2024
b916dd7
polish
henryzhx8 Dec 28, 2024
bc0cd0d
polish
henryzhx8 Dec 28, 2024
842453b
polish
henryzhx8 Dec 28, 2024
9d76a1e
polish
henryzhx8 Dec 28, 2024
b216e85
polish
henryzhx8 Dec 28, 2024
4aa1c7d
polish
henryzhx8 Dec 30, 2024
5d937c4
Merge branch 'main' into feat/client
henryzhx8 Dec 30, 2024
4ac9671
polish
henryzhx8 Dec 30, 2024
848b7f3
polish
henryzhx8 Dec 30, 2024
8c3276f
polish
henryzhx8 Dec 30, 2024
664e61a
Merge branch 'main' into feat/client
henryzhx8 Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ set(SUB_DIRECTORIES_LIST
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
ebpf ebpf/observer ebpf/security ebpf/handler
parser sls_control sdk
parser
)
if (LINUX)
if (ENABLE_ENTERPRISE)
Expand Down
17 changes: 0 additions & 17 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ DECLARE_FLAG_INT32(reader_close_unused_file_time);
DECLARE_FLAG_INT32(batch_send_interval);
DECLARE_FLAG_INT32(batch_send_metric_size);

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_INT32(send_switch_real_ip_interval);
DECLARE_FLAG_INT32(truncate_pos_skip_bytes);
DECLARE_FLAG_INT32(default_tail_limit_kb);

Expand Down Expand Up @@ -973,26 +971,11 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mCheckPointFilePath = AbsolutePath(mCheckPointFilePath, mProcessExecutionDir);
LOG_INFO(sLogger, ("logtail checkpoint path", mCheckPointFilePath));

if (confJson.isMember("send_prefer_real_ip") && confJson["send_prefer_real_ip"].isBool()) {
BOOL_FLAG(send_prefer_real_ip) = confJson["send_prefer_real_ip"].asBool();
}

if (confJson.isMember("send_switch_real_ip_interval") && confJson["send_switch_real_ip_interval"].isInt()) {
INT32_FLAG(send_switch_real_ip_interval) = confJson["send_switch_real_ip_interval"].asInt();
}

LoadInt32Parameter(INT32_FLAG(truncate_pos_skip_bytes),
confJson,
"truncate_pos_skip_bytes",
"ALIYUN_LOGTAIL_TRUNCATE_POS_SKIP_BYTES");

if (BOOL_FLAG(send_prefer_real_ip)) {
LOG_INFO(sLogger,
("change send policy, prefer use real ip, switch interval seconds",
INT32_FLAG(send_switch_real_ip_interval))("truncate skip read offset",
INT32_FLAG(truncate_pos_skip_bytes)));
}

if (confJson.isMember("ignore_dir_inode_changed") && confJson["ignore_dir_inode_changed"].isBool()) {
mIgnoreDirInodeChanged = confJson["ignore_dir_inode_changed"].asBool();
}
Expand Down
3 changes: 2 additions & 1 deletion core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class AppConfig {

public:
AppConfig();
~AppConfig(){};
~AppConfig() {};

void LoadInstanceConfig(const std::map<std::string, std::shared_ptr<InstanceConfig>>&);

Expand Down Expand Up @@ -518,6 +518,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class EnterpriseSLSClientManagerUnittest;
#endif
};

Expand Down
41 changes: 18 additions & 23 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "prometheus/PrometheusInputRunner.h"
#include "runner/FlusherRunner.h"
Expand Down Expand Up @@ -73,9 +74,6 @@ DEFINE_FLAG_INT32(queue_check_gc_interval_sec, "30s", 30);
DEFINE_FLAG_BOOL(enable_cgroup, "", false);
#endif

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_BOOL(global_network_success);

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -199,11 +197,13 @@ void Application::Start() { // GCOVR_EXCL_START
#if defined(__ENTERPRISE__) && defined(_MSC_VER)
InitWindowsSignalObject();
#endif
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());

HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
// resource monitor
// TODO: move metric related initialization to input Init
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

// config provider
{
// add local config dir
filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
Expand All @@ -217,18 +217,23 @@ void Application::Start() { // GCOVR_EXCL_START
}
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
}

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Start();
LegacyConfigProvider::GetInstance()->Init("legacy");
#else
InitRemoteConfigProviders();
#endif

AlarmManager::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();
// runner
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());
HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
ProcessorRunner::GetInstance()->Init();

// flusher_sls resource should be explicitly initialized to allow internal metrics and alarms to be sent
FlusherSLS::InitResource();

// plugin registration
PluginRegistry::GetInstance()->LoadPlugins();
InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces();

Expand Down Expand Up @@ -258,10 +263,10 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

ProcessorRunner::GetInstance()->Init();
// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0;
time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
#endif
Expand Down Expand Up @@ -393,16 +398,6 @@ void Application::CheckCriticalCondition(int32_t curTime) {
_exit(1);
}
#endif
// if network is fail in 2 hours, force exit (for ant only)
// work around for no network when docker start
if (BOOL_FLAG(send_prefer_real_ip) && !BOOL_FLAG(global_network_success) && curTime - mStartTime > 7200) {
yyuuttaaoo marked this conversation as resolved.
Show resolved Hide resolved
LOG_ERROR(sLogger, ("network is fail", "prepare force exit"));
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
"network is fail since " + ToString(mStartTime) + " force exit");
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
}

bool Application::GetUUIDThread() {
Expand Down
144 changes: 75 additions & 69 deletions core/common/DNSCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "DNSCache.h"

#include <cstring>
#if defined(__linux__)
#include <arpa/inet.h>
Expand All @@ -22,86 +23,91 @@
#include <ws2tcpip.h>
#endif

DEFINE_FLAG_INT32(dns_cache_ttl_sec, "", 600);

namespace logtail {
// ParseHost only supports IPv4 now.
bool DnsCache::ParseHost(const char* host, std::string& ip) {
DnsCache::DnsCache(const int32_t ttlSeconds) : mUpdateTime(time(NULL)), mDnsTTL(ttlSeconds) {
}

// ParseHost only supports IPv4 now.
bool DnsCache::ParseHost(const char* host, std::string& ip) {
#if defined(__linux__)
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;

char* buffer = NULL;
if (host && host[0]) {
if (IsRawIp(host)) {
if ((addr.sin_addr.s_addr = inet_addr(host)) == INADDR_NONE)
return false;
} else {
int bufferLen = 2048;
int rc, res;
struct hostent* hp = NULL;
struct hostent h;
while (true) {
buffer = new char[bufferLen];
res = gethostbyname_r(host, &h, buffer, bufferLen, &hp, &rc);
if (res == ERANGE) {
if (buffer != NULL)
delete[] buffer;
bufferLen *= 4;
if (bufferLen > 32768) // 32KB
return false;
continue;
}
if (res != 0 || hp == NULL || hp->h_addr == NULL) {
if (buffer != NULL)
delete[] buffer;
char* buffer = NULL;
if (host && host[0]) {
if (IsRawIp(host)) {
if ((addr.sin_addr.s_addr = inet_addr(host)) == INADDR_NONE)
return false;
} else {
int bufferLen = 2048;
int rc, res;
struct hostent* hp = NULL;
struct hostent h;
while (true) {
buffer = new char[bufferLen];
res = gethostbyname_r(host, &h, buffer, bufferLen, &hp, &rc);
if (res == ERANGE) {
if (buffer != NULL)
delete[] buffer;
bufferLen *= 4;
if (bufferLen > 32768) // 32KB
return false;
} else
break;
continue;
}
addr.sin_addr.s_addr = *((in_addr_t*)(hp->h_addr));
if (res != 0 || hp == NULL || hp->h_addr == NULL) {
if (buffer != NULL)
delete[] buffer;
return false;
} else
break;
}
} else {
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_addr.s_addr = *((in_addr_t*)(hp->h_addr));
}
ip = inet_ntoa(addr.sin_addr);
if (buffer != NULL)
delete[] buffer;
return true;
} else {
addr.sin_addr.s_addr = htonl(INADDR_ANY);
}
ip = inet_ntoa(addr.sin_addr);
if (buffer != NULL)
delete[] buffer;
return true;
#elif defined(_MSC_VER)
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
if (host && host[0]) {
if (IsRawIp(host)) {
if ((addr.sin_addr.s_addr = inet_addr(host)) == INADDR_NONE)
return false;
} else {
addrinfo hints;
struct addrinfo* result = NULL;
std::memset(&hints, 0, sizeof(hints));
auto ret = ::getaddrinfo(host, NULL, &hints, &result);
if (ret != 0) {
return false;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
if (host && host[0]) {
if (IsRawIp(host)) {
if ((addr.sin_addr.s_addr = inet_addr(host)) == INADDR_NONE)
return false;
} else {
addrinfo hints;
struct addrinfo* result = NULL;
std::memset(&hints, 0, sizeof(hints));
auto ret = ::getaddrinfo(host, NULL, &hints, &result);
if (ret != 0) {
return false;
}

bool found = false;
for (auto ptr = result; ptr != NULL; ptr = ptr->ai_next) {
if (AF_INET == ptr->ai_family) {
addr.sin_addr = ((struct sockaddr_in*)ptr->ai_addr)->sin_addr;
found = true;
break;
}
bool found = false;
for (auto ptr = result; ptr != NULL; ptr = ptr->ai_next) {
if (AF_INET == ptr->ai_family) {
addr.sin_addr = ((struct sockaddr_in*)ptr->ai_addr)->sin_addr;
found = true;
break;
}
freeaddrinfo(result);
if (!found)
return false;
}
} else {
addr.sin_addr.s_addr = htonl(INADDR_ANY);
freeaddrinfo(result);
if (!found)
return false;
}
ip = inet_ntoa(addr.sin_addr);
return true;
#endif
} else {
addr.sin_addr.s_addr = htonl(INADDR_ANY);
}
ip = inet_ntoa(addr.sin_addr);
return true;
#endif
}

} // namespace logtail
} // namespace logtail
7 changes: 6 additions & 1 deletion core/common/DNSCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/

#pragma once

#include <cstdint>
#include <ctime>
#include <map>
#include <mutex>

#include "common/Flags.h"

DECLARE_FLAG_INT32(dns_cache_ttl_sec);

namespace logtail {

class DnsCache {
Expand Down Expand Up @@ -75,7 +80,7 @@ class DnsCache {
}

private:
DnsCache(const int32_t ttlSeconds = 60 * 10) : mUpdateTime(time(NULL)), mDnsTTL(ttlSeconds) {}
DnsCache(const int32_t ttlSeconds = INT32_FLAG(dns_cache_ttl_sec));
~DnsCache() = default;

bool IsRawIp(const char* host) {
Expand Down
Loading
Loading