Skip to content

Commit

Permalink
fix alignment problem in ctor, refactor in onSchedule
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Jul 10, 2023
1 parent 91d8d09 commit 1c380c8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 27 deletions.
76 changes: 49 additions & 27 deletions extensions/standard-processors/processors/GetTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,34 +95,41 @@ void GetTCP::initialize() {
setSupportedRelationships(relationships());
}

void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {

std::vector<utils::net::ConnectionId> GetTCP::parseEndpointList(core::ProcessContext& context) {
std::vector<utils::net::ConnectionId> connections_to_make;
if (auto endpoint_list_str = context->getProperty(EndpointList)) {
for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
if (hostname_service_pair.size() != 2) {
logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
continue;
}
connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
}
if (auto endpoint_list_str = context.getProperty(EndpointList)) {
for (const auto& endpoint_str : utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
auto hostname_service_pair = utils::StringUtils::splitAndTrim(endpoint_str, ":");
if (hostname_service_pair.size() != 2) {
logger_->log_error("%s endpoint is invalid, expected {hostname}:{service} format", endpoint_str);
continue;
}
connections_to_make.emplace_back(hostname_service_pair[0], hostname_service_pair[1]);
}
}

if (connections_to_make.empty())
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("No valid endpoint in {} property", EndpointList.getName()));

return connections_to_make;
}

char GetTCP::parseDelimiter(core::ProcessContext& context) {
char delimiter = '\n';
if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
if (auto delimiter_str = context.getProperty(GetTCP::MessageDelimiter)) {
auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
if (!parsed_delimiter || !parsed_delimiter->has_value())
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid delimiter: {} (it must be a single (escaped or not) character", *delimiter_str));
delimiter = **parsed_delimiter;
}
return delimiter;
}

std::optional<asio::ssl::context> GetTCP::parseSSLContext(core::ProcessContext& context) {
std::optional<asio::ssl::context> ssl_context;
if (auto context_name = context->getProperty(SSLContextService)) {
if (auto controller_service = context->getControllerService(*context_name)) {
if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(*context_name))) {
if (auto context_name = context.getProperty(SSLContextService)) {
if (auto controller_service = context.getControllerService(*context_name)) {
if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) {
ssl_context = utils::net::getSslContext(*ssl_context_service);
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
Expand All @@ -131,16 +138,29 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, co
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
}
}
return ssl_context;
}

std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);

if (auto max_batch_size = context->getProperty<uint64_t>(MaxBatchSize)) {
uint64_t GetTCP::parseMaxBatchSize(core::ProcessContext& context) {
if (auto max_batch_size = context.getProperty<uint64_t>(MaxBatchSize)) {
if (*max_batch_size == 0) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("{} should be non-zero.", MaxBatchSize.getName()));
}
max_batch_size_ = *max_batch_size;
return *max_batch_size;
}
return MaxBatchSize.getDefaultValue();
}

void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
gsl_Expects(context);

auto connections_to_make = parseEndpointList(*context);
auto delimiter = parseDelimiter(*context);
auto ssl_context = parseSSLContext(*context);

std::optional<size_t> max_queue_size = context->getProperty<uint64_t>(MaxQueueSize);
std::optional<size_t> max_message_size = context->getProperty<uint64_t>(MaxMessageSize);


asio::steady_timer::duration timeout_duration = 1s;
if (auto timeout_value = context->getProperty<core::TimePeriodValue>(Timeout)) {
Expand All @@ -155,6 +175,8 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, co

client_.emplace(delimiter, timeout_duration, reconnection_interval, std::move(ssl_context), max_queue_size, max_message_size, std::move(connections_to_make), logger_);
client_thread_ = std::thread([this]() { client_->run(); }); // NOLINT

max_batch_size_ = parseMaxBatchSize(*context);
}

void GetTCP::notifyStop() {
Expand Down Expand Up @@ -193,13 +215,13 @@ GetTCP::TcpClient::TcpClient(char delimiter,
std::vector<utils::net::ConnectionId> connections,
std::shared_ptr<core::logging::Logger> logger)
: delimiter_(delimiter),
timeout_duration_(timeout_duration),
reconnection_interval_(reconnection_interval),
ssl_context_(std::move(ssl_context)),
max_queue_size_(max_queue_size),
max_message_size_(max_message_size),
connections_(std::move(connections)),
logger_(std::move(logger)) {
timeout_duration_(timeout_duration),
reconnection_interval_(reconnection_interval),
ssl_context_(std::move(ssl_context)),
max_queue_size_(max_queue_size),
max_message_size_(max_message_size),
connections_(std::move(connections)),
logger_(std::move(logger)) {
}

GetTCP::TcpClient::~TcpClient() {
Expand Down
5 changes: 5 additions & 0 deletions extensions/standard-processors/processors/GetTCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ class GetTCP : public core::Processor {
private:
static void transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session);

std::vector<utils::net::ConnectionId> parseEndpointList(core::ProcessContext& context);
static char parseDelimiter(core::ProcessContext& context);
static std::optional<asio::ssl::context> parseSSLContext(core::ProcessContext& context);
static uint64_t parseMaxBatchSize(core::ProcessContext& context);

class TcpClient {
public:
TcpClient(char delimiter,
Expand Down

0 comments on commit 1c380c8

Please sign in to comment.