Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor sls client manager #1954

Merged
merged 47 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
08ad80b
refactor sls client manager
henryzhx8 Dec 9, 2024
6d4f05d
polish
henryzhx8 Dec 9, 2024
c9d725b
polish
henryzhx8 Dec 10, 2024
270a311
polish
henryzhx8 Dec 10, 2024
b83200c
polish
henryzhx8 Dec 10, 2024
37a8c1e
polish
henryzhx8 Dec 10, 2024
6c48535
polish
henryzhx8 Dec 10, 2024
1977793
polish
henryzhx8 Dec 10, 2024
4324e64
polish
henryzhx8 Dec 11, 2024
297eaea
polish
henryzhx8 Dec 12, 2024
5bcff46
polish
henryzhx8 Dec 12, 2024
d91e98a
polish
henryzhx8 Dec 12, 2024
dfdad39
polish
henryzhx8 Dec 12, 2024
ae97365
Merge branch 'main' into feat/client
henryzhx8 Dec 13, 2024
ecb3398
polish
henryzhx8 Dec 13, 2024
3ca2ae7
polish
henryzhx8 Dec 13, 2024
598b506
polish
henryzhx8 Dec 13, 2024
3bf8f08
polish
henryzhx8 Dec 17, 2024
587d0b5
Merge branch 'main' into feat/client
henryzhx8 Dec 19, 2024
6c0c5de
polish
henryzhx8 Dec 19, 2024
0f9d6b0
polish
henryzhx8 Dec 19, 2024
5d37ced
polish
henryzhx8 Dec 19, 2024
f7e4f83
polish
henryzhx8 Dec 20, 2024
019045c
polish
henryzhx8 Dec 23, 2024
d4183b1
polish
henryzhx8 Dec 23, 2024
e4039dc
polish
henryzhx8 Dec 23, 2024
f390d99
Merge branch 'main' into feat/client
henryzhx8 Dec 23, 2024
f637af8
polish
henryzhx8 Dec 23, 2024
f319b53
polish
henryzhx8 Dec 23, 2024
ed59495
polish
henryzhx8 Dec 23, 2024
5d21207
polish
henryzhx8 Dec 23, 2024
9c499ef
polish
henryzhx8 Dec 26, 2024
7ec64fc
polish
henryzhx8 Dec 27, 2024
114387e
Merge branch 'main' into feat/client
henryzhx8 Dec 27, 2024
23ba530
polish
henryzhx8 Dec 27, 2024
9d61d3e
polish
henryzhx8 Dec 27, 2024
b916dd7
polish
henryzhx8 Dec 28, 2024
bc0cd0d
polish
henryzhx8 Dec 28, 2024
842453b
polish
henryzhx8 Dec 28, 2024
9d76a1e
polish
henryzhx8 Dec 28, 2024
b216e85
polish
henryzhx8 Dec 28, 2024
4aa1c7d
polish
henryzhx8 Dec 30, 2024
5d937c4
Merge branch 'main' into feat/client
henryzhx8 Dec 30, 2024
4ac9671
polish
henryzhx8 Dec 30, 2024
848b7f3
polish
henryzhx8 Dec 30, 2024
8c3276f
polish
henryzhx8 Dec 30, 2024
664e61a
Merge branch 'main' into feat/client
henryzhx8 Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ set(SUB_DIRECTORIES_LIST
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
ebpf ebpf/observer ebpf/security ebpf/handler
parser sls_control sdk
parser
)
if (LINUX)
if (ENABLE_ENTERPRISE)
Expand Down
17 changes: 0 additions & 17 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ DECLARE_FLAG_INT32(reader_close_unused_file_time);
DECLARE_FLAG_INT32(batch_send_interval);
DECLARE_FLAG_INT32(batch_send_metric_size);

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_INT32(send_switch_real_ip_interval);
DECLARE_FLAG_INT32(truncate_pos_skip_bytes);
DECLARE_FLAG_INT32(default_tail_limit_kb);

Expand Down Expand Up @@ -989,26 +987,11 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mCheckPointFilePath = AbsolutePath(mCheckPointFilePath, mProcessExecutionDir);
LOG_INFO(sLogger, ("logtail checkpoint path", mCheckPointFilePath));

if (confJson.isMember("send_prefer_real_ip") && confJson["send_prefer_real_ip"].isBool()) {
BOOL_FLAG(send_prefer_real_ip) = confJson["send_prefer_real_ip"].asBool();
}

if (confJson.isMember("send_switch_real_ip_interval") && confJson["send_switch_real_ip_interval"].isInt()) {
INT32_FLAG(send_switch_real_ip_interval) = confJson["send_switch_real_ip_interval"].asInt();
}

LoadInt32Parameter(INT32_FLAG(truncate_pos_skip_bytes),
confJson,
"truncate_pos_skip_bytes",
"ALIYUN_LOGTAIL_TRUNCATE_POS_SKIP_BYTES");

if (BOOL_FLAG(send_prefer_real_ip)) {
LOG_INFO(sLogger,
("change send policy, prefer use real ip, switch interval seconds",
INT32_FLAG(send_switch_real_ip_interval))("truncate skip read offset",
INT32_FLAG(truncate_pos_skip_bytes)));
}

if (confJson.isMember("ignore_dir_inode_changed") && confJson["ignore_dir_inode_changed"].isBool()) {
mIgnoreDirInodeChanged = confJson["ignore_dir_inode_changed"].asBool();
}
Expand Down
4 changes: 3 additions & 1 deletion core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class AppConfig {

public:
AppConfig();
~AppConfig(){};
~AppConfig() {};

void LoadInstanceConfig(const std::map<std::string, std::shared_ptr<InstanceConfig>>&);

Expand Down Expand Up @@ -533,6 +533,8 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class EnterpriseSLSClientManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};
Expand Down
41 changes: 18 additions & 23 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "prometheus/PrometheusInputRunner.h"
#include "runner/FlusherRunner.h"
Expand Down Expand Up @@ -73,9 +74,6 @@ DEFINE_FLAG_INT32(queue_check_gc_interval_sec, "30s", 30);
DEFINE_FLAG_BOOL(enable_cgroup, "", false);
#endif

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_BOOL(global_network_success);

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -199,11 +197,13 @@ void Application::Start() { // GCOVR_EXCL_START
#if defined(__ENTERPRISE__) && defined(_MSC_VER)
InitWindowsSignalObject();
#endif
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());

HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
// resource monitor
// TODO: move metric related initialization to input Init
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

// config provider
{
// add local config dir
filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
Expand All @@ -217,18 +217,23 @@ void Application::Start() { // GCOVR_EXCL_START
}
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
}

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Start();
LegacyConfigProvider::GetInstance()->Init("legacy");
#else
InitRemoteConfigProviders();
#endif

AlarmManager::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();
// runner
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());
HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
ProcessorRunner::GetInstance()->Init();

// flusher_sls resource should be explicitly initialized to allow internal metrics and alarms to be sent
FlusherSLS::InitResource();

// plugin registration
PluginRegistry::GetInstance()->LoadPlugins();
InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces();

Expand Down Expand Up @@ -258,10 +263,10 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

ProcessorRunner::GetInstance()->Init();
// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0;
time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
#endif
Expand Down Expand Up @@ -393,16 +398,6 @@ void Application::CheckCriticalCondition(int32_t curTime) {
_exit(1);
}
#endif
// if network is fail in 2 hours, force exit (for ant only)
// work around for no network when docker start
if (BOOL_FLAG(send_prefer_real_ip) && !BOOL_FLAG(global_network_success) && curTime - mStartTime > 7200) {
yyuuttaaoo marked this conversation as resolved.
Show resolved Hide resolved
LOG_ERROR(sLogger, ("network is fail", "prepare force exit"));
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
"network is fail since " + ToString(mStartTime) + " force exit");
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
}

bool Application::GetUUIDThread() {
Expand Down
171 changes: 0 additions & 171 deletions core/common/CompressTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,141 +15,9 @@
#include "CompressTools.h"

#include <lz4/lz4.h>
#ifdef __ANDROID__
#include <zlib.h>
#else
#include <zlib/zlib.h>
#endif
#include <zstd/zstd.h>

#include <cstring>

#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {

const int32_t ZSTD_DEFAULT_LEVEL = 1;

bool UncompressData(sls_logs::SlsCompressType compressType,
const std::string& src,
uint32_t rawSize,
std::string& dst) {
switch (compressType) {
case sls_logs::SLS_CMP_NONE:
dst = src;
return true;
case sls_logs::SLS_CMP_LZ4:
return UncompressLz4(src, rawSize, dst);
case sls_logs::SLS_CMP_DEFLATE:
return UncompressDeflate(src, rawSize, dst);
case sls_logs::SLS_CMP_ZSTD:
return UncompressZstd(src, rawSize, dst);
default:
return false;
}
}

bool CompressData(sls_logs::SlsCompressType compressType, const std::string& src, std::string& dst) {
switch (compressType) {
case sls_logs::SLS_CMP_NONE:
dst = src;
return true;
case sls_logs::SLS_CMP_LZ4:
return CompressLz4(src, dst);
case sls_logs::SLS_CMP_DEFLATE:
return CompressDeflate(src, dst);
case sls_logs::SLS_CMP_ZSTD:
return CompressZstd(src, dst, ZSTD_DEFAULT_LEVEL);
default:
return false;
}
}

bool CompressData(sls_logs::SlsCompressType compressType, const char* src, uint32_t size, std::string& dst) {
switch (compressType) {
case sls_logs::SLS_CMP_NONE: {
dst.assign(src, size);
return true;
}
case sls_logs::SLS_CMP_LZ4:
return CompressLz4(src, size, dst);
case sls_logs::SLS_CMP_DEFLATE:
return CompressDeflate(src, size, dst);
case sls_logs::SLS_CMP_ZSTD:
return CompressZstd(src, size, dst, ZSTD_DEFAULT_LEVEL);
default:
return false;
}
}

bool UncompressLz4(const std::string& src, const uint32_t rawSize, char* dst) {
uint32_t length = 0;
try {
length = LZ4_decompress_safe(src.c_str(), dst, src.length(), rawSize);
} catch (...) {
return false;
}
if (length != rawSize) {
return false;
}
return true;
}

bool UncompressLz4(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst) {
dst.resize(rawSize);
char* unCompressed = const_cast<char*>(dst.c_str());
uint32_t length = 0;
try {
length = LZ4_decompress_safe(srcPtr, unCompressed, srcSize, rawSize);
} catch (...) {
return false;
}
if (length != rawSize) {
return false;
}
return true;
}
bool CompressDeflate(const char* srcPtr, const uint32_t srcSize, std::string& dst) {
int64_t dstLen = compressBound(srcSize);
dst.resize(dstLen);
if (compress((Bytef*)(dst.c_str()), (uLongf*)&dstLen, (const Bytef*)srcPtr, srcSize) == Z_OK) {
dst.resize(dstLen);
return true;
}
return false;
}

bool CompressDeflate(const std::string& src, std::string& dst) {
int64_t dstLen = compressBound(src.size());
dst.resize(dstLen);
if (compress((Bytef*)(dst.c_str()), (uLongf*)&dstLen, (const Bytef*)(src.c_str()), src.size()) == Z_OK) {
dst.resize(dstLen);
return true;
}
return false;
}

bool UncompressDeflate(const char* srcPtr, const uint32_t srcSize, const int64_t rawSize, std::string& dst) {
static const int64_t MAX_UMCOMPRESS_SIZE = 128 * 1024 * 1024;
if (rawSize > MAX_UMCOMPRESS_SIZE) {
return false;
}
dst.resize(rawSize);
if (uncompress((Bytef*)(dst.c_str()), (uLongf*)&rawSize, (const Bytef*)(srcPtr), srcSize) != Z_OK) {
return false;
}
return true;
}


bool UncompressDeflate(const std::string& src, const int64_t rawSize, std::string& dst) {
return UncompressDeflate(src.c_str(), src.size(), rawSize, dst);
}


bool UncompressLz4(const std::string& src, const uint32_t rawSize, std::string& dst) {
return UncompressLz4(src.c_str(), src.length(), rawSize, dst);
}
bool CompressLz4(const char* srcPtr, const uint32_t srcSize, std::string& dst) {
uint32_t encodingSize = LZ4_compressBound(srcSize);
dst.resize(encodingSize);
Expand All @@ -169,43 +37,4 @@ bool CompressLz4(const std::string& src, std::string& dst) {
return CompressLz4(src.c_str(), src.length(), dst);
}

bool UncompressZstd(const std::string& src, const uint32_t rawSize, std::string& dst) {
return UncompressZstd(src.c_str(), src.length(), rawSize, dst);
}

bool UncompressZstd(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst) {
dst.resize(rawSize);
char* unCompressed = const_cast<char*>(dst.c_str());
uint32_t length = 0;
try {
length = ZSTD_decompress(unCompressed, rawSize, srcPtr, srcSize);
} catch (...) {
return false;
}
if (length != rawSize) {
return false;
}
return true;
}

bool CompressZstd(const char* srcPtr, const uint32_t srcSize, std::string& dst, int32_t level) {
uint32_t encodingSize = ZSTD_compressBound(srcSize);
dst.resize(encodingSize);
char* compressed = const_cast<char*>(dst.c_str());
try {
size_t const cmp_size = ZSTD_compress(compressed, encodingSize, srcPtr, srcSize, level);
if (ZSTD_isError(cmp_size)) {
return false;
}
dst.resize(cmp_size);
return true;
} catch (...) {
}
return false;
}

bool CompressZstd(const std::string& src, std::string& dst, int32_t level) {
return CompressZstd(src.c_str(), src.length(), dst, level);
}

} // namespace logtail
28 changes: 3 additions & 25 deletions core/common/CompressTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,14 @@
*/

#pragma once
#include <string>
#include <cstdint>
#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {

extern const int32_t ZSTD_DEFAULT_LEVEL;

bool UncompressData(sls_logs::SlsCompressType compressType, const std::string& src, uint32_t rawSize, std::string& dst);

bool CompressData(sls_logs::SlsCompressType compressType, const std::string& src, std::string& dst);
bool CompressData(sls_logs::SlsCompressType compressType, const char* src, uint32_t size, std::string& dst);

bool UncompressDeflate(const std::string& src, const int64_t rawSize, std::string& dst);
bool UncompressDeflate(const char* srcPtr, const uint32_t srcSize, const int64_t rawSize, std::string& dst);
#include <cstdint>

bool CompressDeflate(const std::string& src, std::string& dst);
bool CompressDeflate(const char* srcPtr, const uint32_t srcSize, std::string& dst);
#include <string>

bool UncompressLz4(const std::string& src, const uint32_t rawSize, std::string& dst);
bool UncompressLz4(const std::string& src, const uint32_t rawSize, char* dst);
bool UncompressLz4(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst);
namespace logtail {

bool CompressLz4(const std::string& src, std::string& dst);
bool CompressLz4(const char* srcPtr, const uint32_t srcSize, std::string& dest);

bool UncompressZstd(const std::string& src, const uint32_t rawSize, std::string& dst);
bool UncompressZstd(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst);

bool CompressZstd(const char* srcPtr, const uint32_t srcSize, std::string& dst, int32_t level);
bool CompressZstd(const std::string& src, std::string& dst, int32_t level);

} // namespace logtail
Loading
Loading