From 65738701322e58654843cf0f6095b26aaa27a532 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 5 Jul 2023 11:51:29 +0200 Subject: [PATCH 1/3] MINIFICPP-2157 Move response node implementations to source files --- .../core/state/nodes/AgentInformation.h | 543 +----------------- .../core/state/nodes/BuildInformation.h | 79 +-- .../core/state/nodes/DeviceInformation.h | 348 +---------- .../core/state/nodes/FlowInformation.h | 142 +---- .../include/core/state/nodes/QueueMetrics.h | 37 +- .../core/state/nodes/RepositoryMetrics.h | 20 +- .../core/state/nodes/SchedulingNodes.h | 52 +- .../include/core/state/nodes/StateMonitor.h | 18 +- .../core/state/nodes/TreeUpdateListener.h | 82 --- .../src/core/state/nodes/AgentInformation.cpp | 378 ++++++++++++ .../src/core/state/nodes/BuildInformation.cpp | 32 +- .../core/state/nodes/DeviceInformation.cpp | 294 ++++++++++ .../src/core/state/nodes/FlowInformation.cpp | 71 +++ .../src/core/state/nodes/QueueMetrics.cpp | 16 + .../core/state/nodes/RepositoryMetrics.cpp | 12 + .../src/core/state/nodes/SchedulingNodes.cpp | 39 ++ .../core/state/nodes/SupportedOperations.cpp | 44 +- 17 files changed, 902 insertions(+), 1305 deletions(-) delete mode 100644 libminifi/include/core/state/nodes/TreeUpdateListener.h create mode 100644 libminifi/src/core/state/nodes/SchedulingNodes.cpp diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h index 00a0983720..d4d013f189 100644 --- a/libminifi/include/core/state/nodes/AgentInformation.h +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -20,47 +20,17 @@ #include #include #include - -#ifndef WIN32 -#include -#include - -#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) -#include -#include - -#endif -#include -#include -#include -#include -#include -#include - -#endif -#include -#include -#include - -#include #include -#include #include "agent/agent_docs.h" -#include "agent/agent_version.h" #include "agent/build_description.h" -#include "Connection.h" -#include "core/ClassLoader.h" -#include "core/ProcessorConfig.h" #include "core/state/nodes/MetricsBase.h" #include "core/state/nodes/StateMonitor.h" -#include "io/ClientSocket.h" -#include "SchedulingNodes.h" -#include "utils/OsUtils.h" #include "utils/ProcessCpuUsageTracker.h" #include "core/AgentIdentificationProvider.h" #include "utils/Export.h" -#include "SupportedOperations.h" +#include "core/RepositoryMetricsSource.h" +#include "controllers/UpdatePolicyControllerService.h" #include "RepositoryMetricsSourceStore.h" namespace org::apache::nifi::minifi::state::response { @@ -81,220 +51,10 @@ class ComponentManifest : public DeviceInformation { return CoreComponent::getName(); } - std::vector serialize() override { - std::vector serialized; - SerializedResponseNode resp; - resp.name = "componentManifest"; - struct Components group = build_description_.getClassDescriptions(getName()); - serializeClassDescription(group.processors_, "processors", resp); - serializeClassDescription(group.controller_services_, "controllerServices", resp); - serialized.push_back(resp); - return serialized; - } + std::vector serialize() override; protected: - void serializeClassDescription(const std::vector& descriptions, const std::string& name, SerializedResponseNode& response) const { - if (!descriptions.empty()) { - SerializedResponseNode type; - type.name = name; - type.array = true; - std::vector serialized; - for (const auto& group : descriptions) { - SerializedResponseNode desc; - desc.name = group.full_name_; - SerializedResponseNode className; - className.name = "type"; - className.value = group.full_name_; - - if (!group.class_properties_.empty()) { - SerializedResponseNode props; - props.name = "propertyDescriptors"; - for (auto && prop : group.class_properties_) { - SerializedResponseNode child; - child.name = prop.getName(); - - SerializedResponseNode descriptorName; - descriptorName.name = "name"; - descriptorName.value = prop.getName(); - - SerializedResponseNode descriptorDescription; - descriptorDescription.name = "description"; - descriptorDescription.value = prop.getDescription(); - - SerializedResponseNode validatorName; - validatorName.name = "validator"; - validatorName.value = std::string{prop.getValidator().getValidatorName()}; - - SerializedResponseNode supportsExpressionLanguageScope; - supportsExpressionLanguageScope.name = "expressionLanguageScope"; - supportsExpressionLanguageScope.value = prop.supportsExpressionLanguage() ? "FLOWFILE_ATTRIBUTES" : "NONE"; - - SerializedResponseNode descriptorRequired; - descriptorRequired.name = "required"; - descriptorRequired.value = prop.getRequired(); - - SerializedResponseNode descriptorDefaultValue; - descriptorDefaultValue.name = "defaultValue"; - descriptorDefaultValue.value = prop.getValue(); - - SerializedResponseNode descriptorDependentProperties; - descriptorDependentProperties.name = "dependentProperties"; - - for (const auto &propName : prop.getDependentProperties()) { - SerializedResponseNode descriptorDependentProperty; - descriptorDependentProperty.name = propName; - descriptorDependentProperties.children.push_back(descriptorDependentProperty); - } - - SerializedResponseNode descriptorExclusiveOfProperties; - descriptorExclusiveOfProperties.name = "exclusiveOfProperties"; - - for (const auto &exclusiveProp : prop.getExclusiveOfProperties()) { - SerializedResponseNode descriptorExclusiveOfProperty; - descriptorExclusiveOfProperty.name = exclusiveProp.first; - descriptorExclusiveOfProperty.value = exclusiveProp.second; - descriptorExclusiveOfProperties.children.push_back(descriptorExclusiveOfProperty); - } - - const auto &allowed_types = prop.getAllowedTypes(); - if (!allowed_types.empty()) { - SerializedResponseNode allowed_type; - allowed_type.name = "typeProvidedByValue"; - for (const auto &type : allowed_types) { - std::string class_name = utils::StringUtils::split(type, "::").back(); - SerializedResponseNode typeNode; - typeNode.name = "type"; - std::string typeClazz = type; - utils::StringUtils::replaceAll(typeClazz, "::", "."); - typeNode.value = typeClazz; - - SerializedResponseNode bgroup; - bgroup.name = "group"; - bgroup.value = GROUP_STR; - - SerializedResponseNode artifact; - artifact.name = "artifact"; - artifact.value = core::ClassLoader::getDefaultClassLoader().getGroupForClass(class_name).value_or(""); - allowed_type.children.push_back(typeNode); - allowed_type.children.push_back(bgroup); - allowed_type.children.push_back(artifact); - } - child.children.push_back(allowed_type); - } - - child.children.push_back(descriptorName); - - if (prop.getName() != prop.getDisplayName()) { - SerializedResponseNode displayName; - displayName.name = "displayName"; - displayName.value = prop.getDisplayName(); - child.children.push_back(displayName); - } - child.children.push_back(descriptorDescription); - child.children.push_back(validatorName); - child.children.push_back(descriptorRequired); - child.children.push_back(supportsExpressionLanguageScope); - child.children.push_back(descriptorDefaultValue); - child.children.push_back(descriptorDependentProperties); - child.children.push_back(descriptorExclusiveOfProperties); - - if (!prop.getAllowedValues().empty()) { - SerializedResponseNode allowedValues; - allowedValues.name = "allowableValues"; - allowedValues.array = true; - for (const auto &av : prop.getAllowedValues()) { - SerializedResponseNode allowableValue; - allowableValue.name = "allowableValues"; - - SerializedResponseNode allowedValue; - allowedValue.name = "value"; - allowedValue.value = av; - SerializedResponseNode allowedDisplayName; - allowedDisplayName.name = "displayName"; - allowedDisplayName.value = av; - - allowableValue.children.push_back(allowedValue); - allowableValue.children.push_back(allowedDisplayName); - - allowedValues.children.push_back(allowableValue); - } - child.children.push_back(allowedValues); - } - - props.children.push_back(child); - } - - desc.children.push_back(props); - } - - SerializedResponseNode dyn_prop; - dyn_prop.name = "supportsDynamicProperties"; - dyn_prop.value = group.supports_dynamic_properties_; - - SerializedResponseNode dyn_relat; - dyn_relat.name = "supportsDynamicRelationships"; - dyn_relat.value = group.supports_dynamic_relationships_; - - // only for processors - if (!group.class_relationships_.empty()) { - SerializedResponseNode inputReq; - inputReq.name = "inputRequirement"; - inputReq.value = group.inputRequirement_; - - SerializedResponseNode isSingleThreaded; - isSingleThreaded.name = "isSingleThreaded"; - isSingleThreaded.value = group.isSingleThreaded_; - - SerializedResponseNode relationships; - relationships.name = "supportedRelationships"; - relationships.array = true; - - for (const auto &relationship : group.class_relationships_) { - SerializedResponseNode child; - child.name = "supportedRelationships"; - - SerializedResponseNode nameNode; - nameNode.name = "name"; - nameNode.value = relationship.getName(); - - SerializedResponseNode descriptorDescription; - descriptorDescription.name = "description"; - descriptorDescription.value = relationship.getDescription(); - child.children.push_back(nameNode); - child.children.push_back(descriptorDescription); - - relationships.children.push_back(child); - } - desc.children.push_back(inputReq); - desc.children.push_back(isSingleThreaded); - desc.children.push_back(relationships); - } - - auto lastOfIdx = group.full_name_.find_last_of('.'); - std::string processorName = group.full_name_; - if (lastOfIdx != std::string::npos) { - lastOfIdx++; // if a value is found, increment to move beyond the . - size_t nameLength = group.full_name_.length() - lastOfIdx; - processorName = group.full_name_.substr(lastOfIdx, nameLength); - } - - - { - SerializedResponseNode proc_desc; - proc_desc.name = "typeDescription"; - proc_desc.value = group.description_; - desc.children.push_back(proc_desc); - } - - desc.children.push_back(dyn_relat); - desc.children.push_back(dyn_prop); - desc.children.push_back(className); - - type.children.push_back(desc); - } - response.children.push_back(type); - } - } + static void serializeClassDescription(const std::vector& descriptions, const std::string& name, SerializedResponseNode& response); private: BuildDescription build_description_; @@ -310,16 +70,7 @@ class ExternalManifest : public ComponentManifest { : ComponentManifest(std::move(name)) { } - std::vector serialize() override { - std::vector serialized; - SerializedResponseNode resp; - resp.name = "componentManifest"; - struct Components group = ExternalBuildDescription::getClassDescriptions(getName()); - serializeClassDescription(group.processors_, "processors", resp); - serializeClassDescription(group.controller_services_, "controllerServices", resp); - serialized.push_back(resp); - return serialized; - } + std::vector serialize() override; }; class Bundles : public DeviceInformation { @@ -338,66 +89,7 @@ class Bundles : public DeviceInformation { return "bundles"; } - std::vector serialize() override { - std::vector serialized; - for (auto group : AgentBuild::getExtensions()) { - SerializedResponseNode bundle; - bundle.name = "bundles"; - - ComponentManifest component_manifest(group); - const auto components = component_manifest.serialize(); - gsl_Expects(components.size() == 1); - if (components[0].children.empty()) { - continue; - } - bundle.children.push_back(components[0]); - - SerializedResponseNode bgroup; - bgroup.name = "group"; - bgroup.value = GROUP_STR; - SerializedResponseNode artifact; - artifact.name = "artifact"; - artifact.value = group; - SerializedResponseNode version; - version.name = "version"; - version.value = AgentBuild::VERSION; - - bundle.children.push_back(bgroup); - bundle.children.push_back(artifact); - bundle.children.push_back(version); - - serialized.push_back(bundle); - } - - // let's provide our external manifests. - for (auto group : ExternalBuildDescription::getExternalGroups()) { - SerializedResponseNode bundle; - bundle.name = "bundles"; - - SerializedResponseNode bgroup; - bgroup.name = "group"; - bgroup.value = group.group; - SerializedResponseNode artifact; - artifact.name = "artifact"; - artifact.value = group.artifact; - SerializedResponseNode version; - version.name = "version"; - version.value = group.version; - - bundle.children.push_back(bgroup); - bundle.children.push_back(artifact); - bundle.children.push_back(version); - - ExternalManifest compMan(group.artifact); - // serialize the component information. - for (auto component : compMan.serialize()) { - bundle.children.push_back(component); - } - serialized.push_back(bundle); - } - - return serialized; - } + std::vector serialize() override; }; /** @@ -434,122 +126,16 @@ class AgentStatus : public StateMonitorNode { repository_metrics_source_store_.addRepository(repo); } - std::vector serialize() override { - std::vector serialized; - auto serializedRepositories = serializeRepositories(); - if (!serializedRepositories.empty()) { - serialized.push_back(serializedRepositories); - } - serialized.push_back(serializeUptime()); - - auto serializedComponents = serializeComponents(); - if (!serializedComponents.empty()) { - serialized.push_back(serializedComponents); - } - - serialized.push_back(serializeResourceConsumption()); - - return serialized; - } - - std::vector calculateMetrics() override { - auto metrics = repository_metrics_source_store_.calculateMetrics(); - if (nullptr != monitor_) { - auto uptime = monitor_->getUptime(); - metrics.push_back({"uptime_milliseconds", static_cast(uptime), {{"metric_class", getName()}}}); - } - - if (nullptr != monitor_) { - monitor_->executeOnAllComponents([this, &metrics](StateController& component){ - metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), - {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", getName()}}}); - }); - } - - metrics.push_back({"agent_memory_usage_bytes", static_cast(utils::OsUtils::getCurrentProcessPhysicalMemoryUsage()), {{"metric_class", getName()}}}); - - double cpu_usage = -1.0; - { - std::lock_guard guard(cpu_load_tracker_mutex_); - cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); - } - metrics.push_back({"agent_cpu_utilization", cpu_usage, {{"metric_class", getName()}}}); - return metrics; - } + std::vector serialize() override; + std::vector calculateMetrics() override; protected: - SerializedResponseNode serializeRepositories() const { - SerializedResponseNode repositories; - repositories.name = "repositories"; - repositories.children = repository_metrics_source_store_.serialize(); - return repositories; - } - - SerializedResponseNode serializeUptime() const { - SerializedResponseNode uptime; - - uptime.name = "uptime"; - if (nullptr != monitor_) { - uptime.value = monitor_->getUptime(); - } else { - uptime.value = "0"; - } - - return uptime; - } - - SerializedResponseNode serializeComponents() const { - SerializedResponseNode components_node; - components_node.collapsible = false; - components_node.name = "components"; - if (monitor_ != nullptr) { - monitor_->executeOnAllComponents([&components_node](StateController& component){ - SerializedResponseNode component_node; - component_node.collapsible = false; - component_node.name = component.getComponentName(); - - SerializedResponseNode uuid_node; - uuid_node.name = "uuid"; - uuid_node.value = std::string{component.getComponentUUID().to_string()}; - - SerializedResponseNode component_status_node; - component_status_node.name = "running"; - component_status_node.value = component.isRunning(); - - component_node.children.push_back(component_status_node); - component_node.children.push_back(uuid_node); - components_node.children.push_back(component_node); - }); - } - return components_node; - } - - SerializedResponseNode serializeAgentMemoryUsage() const { - SerializedResponseNode used_physical_memory; - used_physical_memory.name = "memoryUsage"; - used_physical_memory.value = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage(); - return used_physical_memory; - } - - SerializedResponseNode serializeAgentCPUUsage() const { - double system_cpu_usage = -1.0; - { - std::lock_guard guard(cpu_load_tracker_mutex_); - system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); - } - SerializedResponseNode cpu_usage; - cpu_usage.name = "cpuUtilization"; - cpu_usage.value = system_cpu_usage; - return cpu_usage; - } - - SerializedResponseNode serializeResourceConsumption() const { - SerializedResponseNode resource_consumption; - resource_consumption.name = "resourceConsumption"; - resource_consumption.children.push_back(serializeAgentMemoryUsage()); - resource_consumption.children.push_back(serializeAgentCPUUsage()); - return resource_consumption; - } + SerializedResponseNode serializeRepositories() const; + SerializedResponseNode serializeUptime() const; + SerializedResponseNode serializeComponents() const; + static SerializedResponseNode serializeAgentMemoryUsage(); + static SerializedResponseNode serializeAgentCPUUsage(); + static SerializedResponseNode serializeResourceConsumption(); RepositoryMetricsSourceStore repository_metrics_source_store_; @@ -625,38 +211,7 @@ class AgentManifest : public DeviceInformation { configuration_reader_ = std::move(configuration_reader); } - std::vector serialize() override { - std::vector serialized = { - {.name = "identifier", .value = AgentBuild::BUILD_IDENTIFIER}, - {.name = "agentType", .value = "cpp"}, - {.name = "buildInfo", .children = { - {.name = "flags", .value = AgentBuild::COMPILER_FLAGS}, - {.name = "compiler", .value = AgentBuild::COMPILER}, - {.name = "version", .value = AgentBuild::VERSION}, - {.name = "revision", .value = AgentBuild::BUILD_REV}, - {.name = "timestamp", .value = static_cast(std::stoull(AgentBuild::BUILD_DATE))} - }} - }; - { - auto bundles = Bundles{"bundles"}.serialize(); - std::move(std::begin(bundles), std::end(bundles), std::back_inserter(serialized)); - } - { - auto schedulingDefaults = SchedulingDefaults{"schedulingDefaults"}.serialize(); - std::move(std::begin(schedulingDefaults), std::end(schedulingDefaults), std::back_inserter(serialized)); - } - { - auto supportedOperations = [this]() { - SupportedOperations supported_operations("supportedOperations"); - supported_operations.setStateMonitor(monitor_); - supported_operations.setUpdatePolicyController(update_policy_controller_); - supported_operations.setConfigurationReader(configuration_reader_); - return supported_operations.serialize(); - }(); - std::move(std::begin(supportedOperations), std::end(supportedOperations), std::back_inserter(serialized)); - } - return serialized; - } + std::vector serialize() override; private: state::StateMonitor* monitor_ = nullptr; @@ -685,56 +240,10 @@ class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIde } protected: - std::vector serialize() override { - std::vector serialized = { - {.name = "identifier", .value = provider_->getAgentIdentifier()}, - }; - - const auto agent_class = provider_->getAgentClass(); - if (agent_class) { - serialized.push_back({.name = "agentClass", .value = *agent_class}); - } - - serialized.push_back({.name = "agentManifestHash", .value = getAgentManifestHash()}); - return serialized; - } - - std::vector getAgentManifest() const { - if (agent_manifest_cache_) { return std::vector{*agent_manifest_cache_}; } - agent_manifest_cache_ = {.name = "agentManifest", .children = [this] { - AgentManifest manifest{"manifest"}; - manifest.setStateMonitor(monitor_); - manifest.setUpdatePolicyController(update_policy_controller_); - manifest.setConfigurationReader(configuration_reader_); - return manifest.serialize(); - }()}; - agent_manifest_hash_cache_.clear(); - return std::vector{ *agent_manifest_cache_ }; - } - - std::string getAgentManifestHash() const { - if (agent_manifest_hash_cache_.empty()) { - agent_manifest_hash_cache_ = hashResponseNodes(getAgentManifest()); - } - return agent_manifest_hash_cache_; - } - - std::vector getAgentStatus() const { - std::vector serialized; - - AgentStatus status("status", getName()); - status.setRepositories(repositories_); - status.setStateMonitor(monitor_); - - SerializedResponseNode agentStatus; - agentStatus.name = "status"; - for (auto &ser : status.serialize()) { - agentStatus.children.push_back(std::move(ser)); - } - - serialized.push_back(agentStatus); - return serialized; - } + std::vector serialize() override; + std::vector getAgentManifest() const; + std::string getAgentManifestHash() const; + std::vector getAgentStatus() const; private: mutable std::optional agent_manifest_cache_; @@ -773,19 +282,7 @@ class AgentInformation : public AgentNode { include_agent_status_ = include; } - std::vector serialize() override { - std::vector serialized(AgentNode::serialize()); - if (include_agent_manifest_) { - auto manifest = getAgentManifest(); - serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end())); - } - - if (include_agent_status_) { - auto status = getAgentStatus(); - serialized.insert(serialized.end(), std::make_move_iterator(status.begin()), std::make_move_iterator(status.end())); - } - return serialized; - } + std::vector serialize() override; protected: bool include_agent_status_; diff --git a/libminifi/include/core/state/nodes/BuildInformation.h b/libminifi/include/core/state/nodes/BuildInformation.h index 973487359c..fc8e096ac1 100644 --- a/libminifi/include/core/state/nodes/BuildInformation.h +++ b/libminifi/include/core/state/nodes/BuildInformation.h @@ -20,37 +20,7 @@ #include #include -#ifndef WIN32 - -#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) -#include -#include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include - -#endif - -#include -#include -#include - -#include -#include -#include - -#include "../../../agent/agent_version.h" -#include "../nodes/MetricsBase.h" -#include "Connection.h" -#include "core/ClassLoader.h" -#include "io/ClientSocket.h" +#include "core/state/nodes/MetricsBase.h" namespace org::apache::nifi::minifi::state::response { @@ -74,52 +44,7 @@ class BuildInformation : public DeviceInformation { return "BuildInformation"; } - std::vector serialize() override { - std::vector serialized; - - SerializedResponseNode build_version; - build_version.name = "build_version"; - build_version.value = AgentBuild::VERSION; - - SerializedResponseNode build_rev; - build_rev.name = "build_rev"; - build_rev.value = AgentBuild::BUILD_REV; - - SerializedResponseNode build_date; - build_date.name = "build_date"; - build_date.value = AgentBuild::BUILD_DATE; - - SerializedResponseNode compiler; - compiler.name = "compiler"; - { - SerializedResponseNode compiler_command; - compiler_command.name = "compiler_command"; - compiler_command.value = AgentBuild::COMPILER; - - SerializedResponseNode compiler_version; - compiler_version.name = "compiler_version"; - compiler_version.value = AgentBuild::COMPILER_VERSION; - - SerializedResponseNode compiler_flags; - compiler_flags.name = "compiler_flags"; - compiler_flags.value = AgentBuild::COMPILER_FLAGS; - - compiler.children.push_back(compiler_command); - compiler.children.push_back(compiler_version); - compiler.children.push_back(compiler_flags); - } - SerializedResponseNode device_id; - device_id.name = "device_id"; - device_id.value = AgentBuild::BUILD_IDENTIFIER; - - serialized.push_back(build_version); - serialized.push_back(build_rev); - serialized.push_back(build_date); - serialized.push_back(compiler); - serialized.push_back(device_id); - - return serialized; - } + std::vector serialize() override; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/DeviceInformation.h b/libminifi/include/core/state/nodes/DeviceInformation.h index 1faee8336a..a934e48dde 100644 --- a/libminifi/include/core/state/nodes/DeviceInformation.h +++ b/libminifi/include/core/state/nodes/DeviceInformation.h @@ -37,24 +37,16 @@ #pragma comment(lib, "iphlpapi.lib") #include #include +#include #endif -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include -#include "../nodes/MetricsBase.h" +#include "core/state/nodes/MetricsBase.h" #include "Connection.h" -#include "io/ClientSocket.h" #include "utils/OsUtils.h" #include "utils/NetworkInterfaceInfo.h" #include "utils/SystemCpuUsageTracker.h" @@ -64,218 +56,15 @@ namespace org::apache::nifi::minifi::state::response { class Device { public: - Device() { - initialize(); - } - void initialize() { - addrinfo hints; - memset(&hints, 0, sizeof hints); // make sure the struct is empty - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - hints.ai_protocol = 0; /* any protocol */ - - char hostname[1024]; - hostname[1023] = '\0'; - gethostname(hostname, 1023); - - std::ifstream device_id_file(".device_id"); - if (device_id_file) { - std::string line; - while (device_id_file) { - if (std::getline(device_id_file, line)) - device_id_ += line; - } - device_id_file.close(); - } else { - device_id_ = getDeviceId(); - - std::ofstream outputFile(".device_id"); - if (outputFile) { - outputFile.write(device_id_.c_str(), device_id_.length()); - } - outputFile.close(); - } - - canonical_hostname_ = hostname; - - std::stringstream ips; - auto ipaddressess = getIpAddresses(); - for (auto ip : ipaddressess) { - if (ipaddressess.size() > 1 && (ip.find("127") == 0 || ip.find("192") == 0)) - continue; - ip_ = ip; - break; - } - } + Device(); std::string canonical_hostname_; std::string ip_; std::string device_id_; protected: - std::vector getIpAddresses() { - static std::vector ips; - if (ips.empty()) { - const auto filter = [](const utils::NetworkInterfaceInfo& interface_info) { - return !interface_info.isLoopback() && interface_info.isRunning(); - }; - auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(filter); - for (const auto& network_interface_info : network_interface_infos) - for (const auto& ip_v4_address : network_interface_info.getIpV4Addresses()) - ips.push_back(ip_v4_address); - } - return ips; - } - -#if __linux__ - std::string getDeviceId() { - std::hash hash_fn; - std::string macs; - struct ifaddrs *ifaddr, *ifa; - int family, s, n; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) == -1) { - exit(EXIT_FAILURE); - } - - /* Walk through linked list, maintaining head pointer so we - can free list later */ - for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { - if (ifa->ifa_addr == NULL) - continue; - - family = ifa->ifa_addr->sa_family; - - /* Display interface name and family (including symbolic - form of the latter for the common families) */ - - /* For an AF_INET* interface address, display the address */ - - if (family == AF_INET || family == AF_INET6) { - s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host, NI_MAXHOST, - NULL, - 0, NI_NUMERICHOST); - if (s != 0) { - printf("getnameinfo() failed: %s\n", gai_strerror(s)); - exit(EXIT_FAILURE); - } - } - } - - freeifaddrs(ifaddr); - - int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP); - struct ifreq ifr; - struct ifconf ifc; - char buf[1024]; - ifc.ifc_len = sizeof(buf); - ifc.ifc_buf = buf; - if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */ - } - - struct ifreq* it = ifc.ifc_req; - const struct ifreq* const end = it + (ifc.ifc_len / sizeof(struct ifreq)); - - for (; it != end; ++it) { - strcpy(ifr.ifr_name, it->ifr_name); // NOLINT - if (ioctl(sock, SIOCGIFFLAGS, &ifr) == 0) { - if (!(ifr.ifr_flags & IFF_LOOPBACK)) { // don't count loopback - if (ioctl(sock, SIOCGIFHWADDR, &ifr) == 0) { - unsigned char mac[6]; - - memcpy(mac, ifr.ifr_hwaddr.sa_data, 6); - - char mac_add[13]; - snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); // NOLINT - - macs += mac_add; - } - } - - } else { /* handle error */ - } - } - - close(sock); - - return std::to_string(hash_fn(macs)); - } -#elif(defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD)) // should work on bsd variants as well - std::string getDeviceId() { - ifaddrs* iflist; - std::hash hash_fn; - std::set macs; - - if (getifaddrs(&iflist) == 0) { - for (ifaddrs* cur = iflist; cur; cur = cur->ifa_next) { - if (cur->ifa_addr && (cur->ifa_addr->sa_family == AF_LINK) && (reinterpret_cast(cur->ifa_addr))->sdl_alen) { - sockaddr_dl* sdl = reinterpret_cast(cur->ifa_addr); - - if (sdl->sdl_type != IFT_ETHER) { - continue; - } else { - } - char mac[32]; - memcpy(mac, LLADDR(sdl), sdl->sdl_alen); - char mac_add[13]; - snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); // NOLINT - // macs += mac_add; - macs.insert(mac_add); - } - } - - freeifaddrs(iflist); - } - std::string macstr; - for (auto &mac : macs) { - macstr += mac; - } - return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; - } -#else - std::string getDeviceId() { - PIP_ADAPTER_INFO adapterPtr; - PIP_ADAPTER_INFO adapter = NULL; - - DWORD dwRetVal = 0; - - std::hash hash_fn; - std::set macs; - - ULONG adapterLen = sizeof(IP_ADAPTER_INFO); - adapterPtr = reinterpret_cast(malloc(sizeof(IP_ADAPTER_INFO))); - if (adapterPtr == NULL) { - return ""; - } - if (GetAdaptersInfo(adapterPtr, &adapterLen) == ERROR_BUFFER_OVERFLOW) { - free(adapterPtr); - adapterPtr = reinterpret_cast(malloc(adapterLen)); - if (adapterPtr == NULL) { - return ""; - } - } - - if ((dwRetVal = GetAdaptersInfo(adapterPtr, &adapterLen)) == NO_ERROR) { - adapter = adapterPtr; - while (adapter) { - char mac_add[13]; - snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", adapter->Address[0], adapter->Address[1], adapter->Address[2], adapter->Address[3], adapter->Address[4], adapter->Address[5]); // NOLINT - macs.insert(mac_add); - adapter = adapter->Next; - } - } - - if (adapterPtr) - free(adapterPtr); - std::string macstr; - for (auto &mac : macs) { - macstr += mac; - } - return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; - } -#endif + static std::vector getIpAddresses(); + static std::string getDeviceId(); // connection information int32_t socket_file_descriptor_; @@ -310,122 +99,21 @@ class DeviceInfoNode : public DeviceInformation { return "deviceInfo"; } - std::vector serialize() override { - std::vector serialized; - - serialized.push_back(serializeIdentifier()); - serialized.push_back(serializeSystemInfo()); - serialized.push_back(serializeNetworkInfo()); - - return serialized; - } - - std::vector calculateMetrics() override { - double system_cpu_usage = -1.0; - { - std::lock_guard guard(cpu_load_tracker_mutex_); - system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); - } - SerializedResponseNode cpu_usage; - cpu_usage.name = "cpuUtilization"; - cpu_usage.value = system_cpu_usage; - return { - {"physical_mem", static_cast(utils::OsUtils::getSystemTotalPhysicalMemory()), {{"metric_class", "DeviceInfoNode"}}}, - {"memory_usage", static_cast(utils::OsUtils::getSystemPhysicalMemoryUsage()), {{"metric_class", "DeviceInfoNode"}}}, - {"cpu_utilization", system_cpu_usage, {{"metric_class", "DeviceInfoNode"}}}, - }; - } + std::vector serialize() override; + std::vector calculateMetrics() override; protected: - SerializedResponseNode serializeIdentifier() const { - SerializedResponseNode identifier; - identifier.name = "identifier"; - identifier.value = device_id_; - return identifier; - } - - SerializedResponseNode serializeVCoreInfo() const { - SerializedResponseNode v_cores; - v_cores.name = "vCores"; - v_cores.value = std::thread::hardware_concurrency(); - return v_cores; - } - - SerializedResponseNode serializeOperatingSystemType() const { - SerializedResponseNode os_type; - os_type.name = "operatingSystem"; - os_type.value = getOperatingSystem(); - return os_type; - } - - SerializedResponseNode serializeTotalPhysicalMemoryInformation() const { - SerializedResponseNode total_physical_memory; - total_physical_memory.name = "physicalMem"; - total_physical_memory.value = utils::OsUtils::getSystemTotalPhysicalMemory(); - return total_physical_memory; - } - - SerializedResponseNode serializePhysicalMemoryUsageInformation() const { - SerializedResponseNode used_physical_memory; - used_physical_memory.name = "memoryUsage"; - used_physical_memory.value = utils::OsUtils::getSystemPhysicalMemoryUsage(); - return used_physical_memory; - } - - SerializedResponseNode serializeSystemCPUUsageInformation() const { - double system_cpu_usage = -1.0; - { - std::lock_guard guard(cpu_load_tracker_mutex_); - system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); - } - SerializedResponseNode cpu_usage; - cpu_usage.name = "cpuUtilization"; - cpu_usage.value = system_cpu_usage; - return cpu_usage; - } - - SerializedResponseNode serializeArchitectureInformation() const { - SerializedResponseNode arch; - arch.name = "machineArch"; - arch.value = utils::OsUtils::getMachineArchitecture(); - return arch; - } - - SerializedResponseNode serializeSystemInfo() const { - SerializedResponseNode systemInfo; - systemInfo.name = "systemInfo"; - - systemInfo.children.push_back(serializeVCoreInfo()); - systemInfo.children.push_back(serializeOperatingSystemType()); - systemInfo.children.push_back(serializeTotalPhysicalMemoryInformation()); - systemInfo.children.push_back(serializeArchitectureInformation()); - systemInfo.children.push_back(serializePhysicalMemoryUsageInformation()); - systemInfo.children.push_back(serializeSystemCPUUsageInformation()); - - return systemInfo; - } - - SerializedResponseNode serializeHostNameInfo() const { - SerializedResponseNode hostname; - hostname.name = "hostname"; - hostname.value = hostname_; - return hostname; - } - - SerializedResponseNode serializeIPAddress() const { - SerializedResponseNode ip; - ip.name = "ipAddress"; - ip.value = !ip_.empty() ? ip_ : "127.0.0.1"; - return ip; - } - - SerializedResponseNode serializeNetworkInfo() const { - SerializedResponseNode network_info; - network_info.name = "networkInfo"; - network_info.children.push_back(serializeHostNameInfo()); - network_info.children.push_back(serializeIPAddress()); - return network_info; - } + SerializedResponseNode serializeIdentifier() const; + static SerializedResponseNode serializeVCoreInfo(); + static SerializedResponseNode serializeOperatingSystemType(); + static SerializedResponseNode serializeTotalPhysicalMemoryInformation(); + static SerializedResponseNode serializePhysicalMemoryUsageInformation(); + static SerializedResponseNode serializeSystemCPUUsageInformation(); + static SerializedResponseNode serializeArchitectureInformation(); + static SerializedResponseNode serializeSystemInfo(); + SerializedResponseNode serializeHostNameInfo() const; + SerializedResponseNode serializeIPAddress() const; + SerializedResponseNode serializeNetworkInfo() const; /** * Have found various ways of identifying different operating system variants diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index f864f7d215..494257cd6d 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -16,30 +16,16 @@ */ #pragma once -#include #include #include #include #include -#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) -#include -#include - -#endif -#include -#include -#include - -#include -#include - -#include "../FlowIdentifier.h" -#include "../nodes/MetricsBase.h" -#include "../nodes/StateMonitor.h" +#include "core/state/FlowIdentifier.h" +#include "core/state/nodes/MetricsBase.h" +#include "core/state/nodes/StateMonitor.h" #include "Connection.h" -#include "io/ClientSocket.h" -#include "../ConnectionStore.h" +#include "core/state/ConnectionStore.h" namespace org::apache::nifi::minifi::state::response { @@ -93,26 +79,7 @@ class FlowVersion : public DeviceInformation { identifier = std::make_shared(url, bucket_id, flow_id.empty() ? utils::IdGenerator::getIdGenerator()->generate().to_string() : flow_id); } - std::vector serialize() override { - std::lock_guard lock(guard); - std::vector serialized; - SerializedResponseNode ru; - ru.name = "registryUrl"; - ru.value = identifier->getRegistryUrl(); - - SerializedResponseNode bucketid; - bucketid.name = "bucketId"; - bucketid.value = identifier->getBucketId(); - - SerializedResponseNode flowId; - flowId.name = "flowId"; - flowId.value = identifier->getFlowId(); - - serialized.push_back(ru); - serialized.push_back(bucketid); - serialized.push_back(flowId); - return serialized; - } + std::vector serialize() override; FlowVersion &operator=(FlowVersion &&fv) noexcept { identifier = std::move(fv.identifier); @@ -167,103 +134,8 @@ class FlowInformation : public FlowMonitor { return "flowInfo"; } - std::vector serialize() override { - std::vector serialized; - - SerializedResponseNode fv; - fv.name = "flowId"; - fv.value = flow_version_->getFlowId(); - - SerializedResponseNode uri; - uri.name = "versionedFlowSnapshotURI"; - for (auto &entry : flow_version_->serialize()) { - uri.children.push_back(entry); - } - - serialized.push_back(fv); - serialized.push_back(uri); - - const auto& connections = connection_store_.getConnections(); - if (!connections.empty()) { - SerializedResponseNode queues; - queues.collapsible = false; - queues.name = "queues"; - - for (const auto& queue : connections) { - SerializedResponseNode repoNode; - repoNode.collapsible = false; - repoNode.name = queue.second->getName(); - - SerializedResponseNode queueUUIDNode; - queueUUIDNode.name = "uuid"; - queueUUIDNode.value = std::string{queue.second->getUUIDStr()}; - - SerializedResponseNode queuesize; - queuesize.name = "size"; - queuesize.value = queue.second->getQueueSize(); - - SerializedResponseNode queuesizemax; - queuesizemax.name = "sizeMax"; - queuesizemax.value = queue.second->getBackpressureThresholdCount(); - - SerializedResponseNode datasize; - datasize.name = "dataSize"; - datasize.value = queue.second->getQueueDataSize(); - SerializedResponseNode datasizemax; - - datasizemax.name = "dataSizeMax"; - datasizemax.value = queue.second->getBackpressureThresholdDataSize(); - - repoNode.children.push_back(queuesize); - repoNode.children.push_back(queuesizemax); - repoNode.children.push_back(datasize); - repoNode.children.push_back(datasizemax); - repoNode.children.push_back(queueUUIDNode); - - queues.children.push_back(repoNode); - } - serialized.push_back(queues); - } - - if (nullptr != monitor_) { - SerializedResponseNode componentsNode; - componentsNode.collapsible = false; - componentsNode.name = "components"; - - monitor_->executeOnAllComponents([&componentsNode](StateController& component){ - SerializedResponseNode componentNode; - componentNode.collapsible = false; - componentNode.name = component.getComponentName(); - - SerializedResponseNode uuidNode; - uuidNode.name = "uuid"; - uuidNode.value = std::string{component.getComponentUUID().to_string()}; - - SerializedResponseNode componentStatusNode; - componentStatusNode.name = "running"; - componentStatusNode.value = component.isRunning(); - - componentNode.children.push_back(componentStatusNode); - componentNode.children.push_back(uuidNode); - componentsNode.children.push_back(componentNode); - }); - serialized.push_back(componentsNode); - } - - return serialized; - } - - std::vector calculateMetrics() override { - std::vector metrics = connection_store_.calculateConnectionMetrics("FlowInformation"); - - if (nullptr != monitor_) { - monitor_->executeOnAllComponents([&metrics](StateController& component){ - metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), - {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); - }); - } - return metrics; - } + std::vector serialize() override; + std::vector calculateMetrics() override; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h b/libminifi/include/core/state/nodes/QueueMetrics.h index c9436ec776..57ac99944b 100644 --- a/libminifi/include/core/state/nodes/QueueMetrics.h +++ b/libminifi/include/core/state/nodes/QueueMetrics.h @@ -17,14 +17,12 @@ */ #pragma once -#include #include -#include #include -#include "../nodes/MetricsBase.h" #include "Connection.h" -#include "../ConnectionStore.h" +#include "core/state/nodes/MetricsBase.h" +#include "core/state/ConnectionStore.h" namespace org::apache::nifi::minifi::state::response { @@ -57,36 +55,7 @@ class QueueMetrics : public ResponseNode { connection_store_.updateConnection(connection); } - std::vector serialize() override { - std::vector serialized; - for (const auto& [_, connection] : connection_store_.getConnections()) { - SerializedResponseNode parent; - parent.name = connection->getName(); - SerializedResponseNode datasize; - datasize.name = "datasize"; - datasize.value = std::to_string(connection->getQueueDataSize()); - - SerializedResponseNode datasizemax; - datasizemax.name = "datasizemax"; - datasizemax.value = std::to_string(connection->getBackpressureThresholdDataSize()); - - SerializedResponseNode queuesize; - queuesize.name = "queued"; - queuesize.value = std::to_string(connection->getQueueSize()); - - SerializedResponseNode queuesizemax; - queuesizemax.name = "queuedmax"; - queuesizemax.value = std::to_string(connection->getBackpressureThresholdCount()); - - parent.children.push_back(datasize); - parent.children.push_back(datasizemax); - parent.children.push_back(queuesize); - parent.children.push_back(queuesizemax); - - serialized.push_back(parent); - } - return serialized; - } + std::vector serialize() override; std::vector calculateMetrics() override { return connection_store_.calculateConnectionMetrics("QueueMetrics"); diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h b/libminifi/include/core/state/nodes/RepositoryMetrics.h index 788beb0062..da3c2d51fd 100644 --- a/libminifi/include/core/state/nodes/RepositoryMetrics.h +++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h @@ -21,11 +21,9 @@ #include #include #include -#include -#include -#include "../nodes/MetricsBase.h" -#include "Connection.h" +#include "core/state/nodes/MetricsBase.h" +#include "core/RepositoryMetricsSource.h" #include "RepositoryMetricsSourceStore.h" namespace org::apache::nifi::minifi::state::response { @@ -58,17 +56,9 @@ class RepositoryMetrics : public ResponseNode { return "RepositoryMetrics"; } - void addRepository(const std::shared_ptr &repo) { - return repository_metrics_source_store_.addRepository(repo); - } - - std::vector serialize() override { - return repository_metrics_source_store_.serialize(); - } - - std::vector calculateMetrics() override { - return repository_metrics_source_store_.calculateMetrics(); - } + void addRepository(const std::shared_ptr &repo); + std::vector serialize() override; + std::vector calculateMetrics() override; protected: RepositoryMetricsSourceStore repository_metrics_source_store_; diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h index a98bb3bd69..841fcb15c8 100644 --- a/libminifi/include/core/state/nodes/SchedulingNodes.h +++ b/libminifi/include/core/state/nodes/SchedulingNodes.h @@ -15,8 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_SCHEDULINGNODES_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_SCHEDULINGNODES_H_ +#pragma once #include #include @@ -41,54 +40,7 @@ class SchedulingDefaults : public DeviceInformation { return "schedulingDefaults"; } - std::vector serialize() override { - std::vector serialized; - - SerializedResponseNode schedulingDefaults; - schedulingDefaults.name = "schedulingDefaults"; - - SerializedResponseNode defaultSchedulingStrategy; - defaultSchedulingStrategy.name = "defaultSchedulingStrategy"; - defaultSchedulingStrategy.value = core::DEFAULT_SCHEDULING_STRATEGY; - - schedulingDefaults.children.push_back(defaultSchedulingStrategy); - - SerializedResponseNode defaultSchedulingPeriod; - defaultSchedulingPeriod.name = "defaultSchedulingPeriodMillis"; - defaultSchedulingPeriod.value = int64_t{core::DEFAULT_SCHEDULING_PERIOD_MILLIS.count()}; - - schedulingDefaults.children.push_back(defaultSchedulingPeriod); - - SerializedResponseNode defaultRunDuration; - defaultRunDuration.name = "defaultRunDurationNanos"; - defaultRunDuration.value = int64_t{core::DEFAULT_RUN_DURATION.count()}; - - schedulingDefaults.children.push_back(defaultRunDuration); - - SerializedResponseNode defaultMaxConcurrentTasks; - defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks"; - defaultMaxConcurrentTasks.value = core::DEFAULT_MAX_CONCURRENT_TASKS; - - schedulingDefaults.children.push_back(defaultMaxConcurrentTasks); - - SerializedResponseNode yieldDuration; - yieldDuration.name = "yieldDurationMillis"; - yieldDuration.value = int64_t{std::chrono::milliseconds(core::DEFAULT_YIELD_PERIOD_SECONDS).count()}; - - schedulingDefaults.children.push_back(yieldDuration); - - SerializedResponseNode penalizationPeriod; - penalizationPeriod.name = "penalizationPeriodMillis"; - penalizationPeriod.value = int64_t{std::chrono::milliseconds{core::DEFAULT_PENALIZATION_PERIOD}.count()}; - - schedulingDefaults.children.push_back(penalizationPeriod); - - serialized.push_back(schedulingDefaults); - - return serialized; - } + std::vector serialize() override; }; } // namespace org::apache::nifi::minifi::state::response - -#endif // LIBMINIFI_INCLUDE_CORE_STATE_NODES_SCHEDULINGNODES_H_ diff --git a/libminifi/include/core/state/nodes/StateMonitor.h b/libminifi/include/core/state/nodes/StateMonitor.h index 3f6d74d2ff..7278645aaf 100644 --- a/libminifi/include/core/state/nodes/StateMonitor.h +++ b/libminifi/include/core/state/nodes/StateMonitor.h @@ -15,25 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ +#pragma once -#include -#include - -#include -#include -#include #include #include -#include "../nodes/MetricsBase.h" -#include "agent/agent_version.h" -#include "agent/build_description.h" -#include "Connection.h" -#include "core/ClassLoader.h" +#include "core/state/nodes/MetricsBase.h" #include "core/state/UpdateController.h" -#include "io/ClientSocket.h" namespace org::apache::nifi::minifi::state::response { @@ -57,5 +45,3 @@ class StateMonitorNode : public DeviceInformation { }; } // namespace org::apache::nifi::minifi::state::response - -#endif // LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ diff --git a/libminifi/include/core/state/nodes/TreeUpdateListener.h b/libminifi/include/core/state/nodes/TreeUpdateListener.h deleted file mode 100644 index 5636017e43..0000000000 --- a/libminifi/include/core/state/nodes/TreeUpdateListener.h +++ /dev/null @@ -1,82 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_TREEUPDATELISTENER_H_ -#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_TREEUPDATELISTENER_H_ - -#include -#include -#include - -#include "../nodes/MetricsBase.h" -#include "core/state/UpdateController.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace response { - -/** - * Purpose: Class that will represent the metrics updates, which can be performed asynchronously. - */ -class MetricsUpdate : public Update { - public: - MetricsUpdate(UpdateStatus status) // NOLINT - : Update(status) { - } - virtual bool validate() { - return true; - } -}; - -class OperationWatcher : public utils::AfterExecute { - public: - explicit OperationWatcher(std::atomic *running) - : running_(running) { - } - - explicit OperationWatcher(OperationWatcher && other) - : running_(std::move(other.running_)) { - } - - ~OperationWatcher() = default; - - virtual bool isFinished(const Update &result) { - if (result.getStatus().getState() == UpdateState::READ_COMPLETE && running_) { - return false; - } else { - return true; - } - } - virtual bool isCancelled(const Update &result) { - return false; - } - - protected: - std::atomic *running_; -}; - -} // namespace response -} // namespace state -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_CORE_STATE_NODES_TREEUPDATELISTENER_H_ diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/src/core/state/nodes/AgentInformation.cpp index 7bbf44d3b8..52d9b1ab8e 100644 --- a/libminifi/src/core/state/nodes/AgentInformation.cpp +++ b/libminifi/src/core/state/nodes/AgentInformation.cpp @@ -16,13 +16,391 @@ * limitations under the License. */ #include "core/state/nodes/AgentInformation.h" + +#include "agent/agent_version.h" #include "core/Resource.h" +#include "core/ClassLoader.h" +#include "utils/OsUtils.h" +#include "core/state/nodes/SchedulingNodes.h" +#include "core/state/nodes/SupportedOperations.h" namespace org::apache::nifi::minifi::state::response { utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_; std::mutex AgentStatus::cpu_load_tracker_mutex_; +std::vector ComponentManifest::serialize() { + std::vector serialized; + SerializedResponseNode resp; + resp.name = "componentManifest"; + struct Components group = build_description_.getClassDescriptions(getName()); + serializeClassDescription(group.processors_, "processors", resp); + serializeClassDescription(group.controller_services_, "controllerServices", resp); + serialized.push_back(resp); + return serialized; +} + +void ComponentManifest::serializeClassDescription(const std::vector& descriptions, const std::string& name, SerializedResponseNode& response) { + if (descriptions.empty()) { + return; + } + SerializedResponseNode type{.name = name, .array = true}; + std::vector serialized; + for (const auto& group : descriptions) { + SerializedResponseNode desc{.name = group.full_name_}; + + if (!group.class_properties_.empty()) { + SerializedResponseNode props{.name = "propertyDescriptors"}; + for (auto&& prop : group.class_properties_) { + SerializedResponseNode child = {.name = prop.getName()}; + SerializedResponseNode descriptorDependentProperties{.name = "dependentProperties"}; + for (const auto &propName : prop.getDependentProperties()) { + SerializedResponseNode descriptorDependentProperty{.name = propName}; + descriptorDependentProperties.children.push_back(descriptorDependentProperty); + } + + SerializedResponseNode descriptorExclusiveOfProperties{.name = "exclusiveOfProperties"}; + + for (const auto &exclusiveProp : prop.getExclusiveOfProperties()) { + SerializedResponseNode descriptorExclusiveOfProperty{.name = exclusiveProp.first, .value = exclusiveProp.second}; + descriptorExclusiveOfProperties.children.push_back(descriptorExclusiveOfProperty); + } + + const auto &allowed_types = prop.getAllowedTypes(); + if (!allowed_types.empty()) { + SerializedResponseNode allowed_type; + allowed_type.name = "typeProvidedByValue"; + for (const auto &type : allowed_types) { + std::string class_name = utils::StringUtils::split(type, "::").back(); + std::string typeClazz = type; + utils::StringUtils::replaceAll(typeClazz, "::", "."); + allowed_type.children.push_back({.name = "type", .value = typeClazz}); + allowed_type.children.push_back({.name = "group", .value = GROUP_STR}); + allowed_type.children.push_back({.name = "artifact", .value = core::ClassLoader::getDefaultClassLoader().getGroupForClass(class_name).value_or("")}); + } + child.children.push_back(allowed_type); + } + + child.children.push_back({.name = "name", .value = prop.getName()}); + + if (prop.getName() != prop.getDisplayName()) { + SerializedResponseNode displayName{.name = "displayName", .value = prop.getDisplayName()}; + child.children.push_back(displayName); + } + + child.children.push_back({.name = "description", .value = prop.getDescription()}); + child.children.push_back({.name = "validator", .value = std::string{prop.getValidator().getValidatorName()}}); + child.children.push_back({.name = "required", .value = prop.getRequired()}); + child.children.push_back({.name = "expressionLanguageScope", .value = prop.supportsExpressionLanguage() ? "FLOWFILE_ATTRIBUTES" : "NONE"}); + child.children.push_back({.name = "defaultValue", .value = prop.getValue()}); + child.children.push_back(descriptorDependentProperties); + child.children.push_back(descriptorExclusiveOfProperties); + + if (!prop.getAllowedValues().empty()) { + SerializedResponseNode allowedValues{.name = "allowableValues", .array = true}; + for (const auto &av : prop.getAllowedValues()) { + SerializedResponseNode allowableValue{ + .name = "allowableValues", + .children = { + {.name = "value", .value = av}, + {.name = "displayName", .value = av}, + } + }; + + allowedValues.children.push_back(allowableValue); + } + child.children.push_back(allowedValues); + } + + props.children.push_back(child); + } + + desc.children.push_back(props); + } + + // only for processors + if (!group.class_relationships_.empty()) { + desc.children.push_back({.name = "inputRequirement", .value = group.inputRequirement_}); + desc.children.push_back({.name = "isSingleThreaded", .value = group.isSingleThreaded_}); + + SerializedResponseNode relationships{.name = "supportedRelationships", .array = true}; + for (const auto &relationship : group.class_relationships_) { + SerializedResponseNode child{.name = "supportedRelationships"}; + child.children.push_back({.name = "name", .value = relationship.getName()}); + child.children.push_back({.name = "description", .value = relationship.getDescription()}); + relationships.children.push_back(child); + } + + desc.children.push_back(relationships); + } + + desc.children.push_back({.name = "typeDescription", .value = group.description_}); + desc.children.push_back({.name = "supportsDynamicRelationships", .value = group.supports_dynamic_relationships_}); + desc.children.push_back({.name = "supportsDynamicProperties", .value = group.supports_dynamic_properties_}); + desc.children.push_back({.name = "type", .value = group.full_name_}); + + type.children.push_back(desc); + } + response.children.push_back(type); +} + +std::vector ExternalManifest::serialize() { + std::vector serialized; + SerializedResponseNode resp; + resp.name = "componentManifest"; + struct Components group = ExternalBuildDescription::getClassDescriptions(getName()); + serializeClassDescription(group.processors_, "processors", resp); + serializeClassDescription(group.controller_services_, "controllerServices", resp); + serialized.push_back(resp); + return serialized; +} + +std::vector Bundles::serialize() { + std::vector serialized; + for (const auto& group : AgentBuild::getExtensions()) { + ComponentManifest component_manifest(group); + const auto components = component_manifest.serialize(); + gsl_Expects(components.size() == 1); + if (components[0].children.empty()) { + continue; + } + + SerializedResponseNode bundle { + .name = "bundles", + .children = { + components[0], + {.name = "group", .value = GROUP_STR}, + {.name = "artifact", .value = group}, + {.name = "version", .value = AgentBuild::VERSION}, + } + }; + + serialized.push_back(bundle); + } + + // let's provide our external manifests. + for (const auto& group : ExternalBuildDescription::getExternalGroups()) { + SerializedResponseNode bundle { + .name = "bundles", + .children = { + {.name = "group", .value = group.group}, + {.name = "artifact", .value = group.artifact}, + {.name = "version", .value = group.version}, + } + }; + + ExternalManifest compMan(group.artifact); + // serialize the component information. + for (const auto& component : compMan.serialize()) { + bundle.children.push_back(component); + } + serialized.push_back(bundle); + } + + return serialized; +} + +std::vector AgentStatus::serialize() { + std::vector serialized; + auto serializedRepositories = serializeRepositories(); + if (!serializedRepositories.empty()) { + serialized.push_back(serializedRepositories); + } + serialized.push_back(serializeUptime()); + + auto serializedComponents = serializeComponents(); + if (!serializedComponents.empty()) { + serialized.push_back(serializedComponents); + } + + serialized.push_back(serializeResourceConsumption()); + + return serialized; +} + +std::vector AgentStatus::calculateMetrics() { + auto metrics = repository_metrics_source_store_.calculateMetrics(); + if (nullptr != monitor_) { + auto uptime = monitor_->getUptime(); + metrics.push_back({"uptime_milliseconds", static_cast(uptime), {{"metric_class", getName()}}}); + } + + if (nullptr != monitor_) { + monitor_->executeOnAllComponents([this, &metrics](StateController& component){ + metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), + {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", getName()}}}); + }); + } + + metrics.push_back({"agent_memory_usage_bytes", static_cast(utils::OsUtils::getCurrentProcessPhysicalMemoryUsage()), {{"metric_class", getName()}}}); + + double cpu_usage = -1.0; + { + std::lock_guard guard(cpu_load_tracker_mutex_); + cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); + } + metrics.push_back({"agent_cpu_utilization", cpu_usage, {{"metric_class", getName()}}}); + return metrics; +} + +SerializedResponseNode AgentStatus::serializeRepositories() const { + SerializedResponseNode repositories; + repositories.name = "repositories"; + repositories.children = repository_metrics_source_store_.serialize(); + return repositories; +} + +SerializedResponseNode AgentStatus::serializeUptime() const { + SerializedResponseNode uptime; + + uptime.name = "uptime"; + if (nullptr != monitor_) { + uptime.value = monitor_->getUptime(); + } else { + uptime.value = "0"; + } + + return uptime; +} + +SerializedResponseNode AgentStatus::serializeComponents() const { + SerializedResponseNode components_node; + components_node.collapsible = false; + components_node.name = "components"; + if (monitor_ != nullptr) { + monitor_->executeOnAllComponents([&components_node](StateController& component){ + SerializedResponseNode component_node { + .name = component.getComponentName(), + .collapsible = false, + .children = { + {.name = "running", .value = component.isRunning()}, + {.name = "uuid", .value = std::string{component.getComponentUUID().to_string()}}, + } + }; + components_node.children.push_back(component_node); + }); + } + return components_node; +} + +SerializedResponseNode AgentStatus::serializeAgentMemoryUsage() { + return {.name = "memoryUsage", .value = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage()}; +} + +SerializedResponseNode AgentStatus::serializeAgentCPUUsage() { + double system_cpu_usage = -1.0; + { + std::lock_guard guard(cpu_load_tracker_mutex_); + system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); + } + return {.name = "cpuUtilization", .value = system_cpu_usage}; +} + +SerializedResponseNode AgentStatus::serializeResourceConsumption() { + return { + .name = "resourceConsumption", + .children = {serializeAgentMemoryUsage(), serializeAgentCPUUsage()} + }; +} + +std::vector AgentManifest::serialize() { + std::vector serialized = { + {.name = "identifier", .value = AgentBuild::BUILD_IDENTIFIER}, + {.name = "agentType", .value = "cpp"}, + {.name = "buildInfo", .children = { + {.name = "flags", .value = AgentBuild::COMPILER_FLAGS}, + {.name = "compiler", .value = AgentBuild::COMPILER}, + {.name = "version", .value = AgentBuild::VERSION}, + {.name = "revision", .value = AgentBuild::BUILD_REV}, + {.name = "timestamp", .value = static_cast(std::stoull(AgentBuild::BUILD_DATE))} + }} + }; + { + auto bundles = Bundles{"bundles"}.serialize(); + std::move(std::begin(bundles), std::end(bundles), std::back_inserter(serialized)); + } + { + auto schedulingDefaults = SchedulingDefaults{"schedulingDefaults"}.serialize(); + std::move(std::begin(schedulingDefaults), std::end(schedulingDefaults), std::back_inserter(serialized)); + } + { + auto supportedOperations = [this]() { + SupportedOperations supported_operations("supportedOperations"); + supported_operations.setStateMonitor(monitor_); + supported_operations.setUpdatePolicyController(update_policy_controller_); + supported_operations.setConfigurationReader(configuration_reader_); + return supported_operations.serialize(); + }(); + std::move(std::begin(supportedOperations), std::end(supportedOperations), std::back_inserter(serialized)); + } + return serialized; +} + +std::vector AgentNode::serialize() { + std::vector serialized = { + {.name = "identifier", .value = provider_->getAgentIdentifier()}, + }; + + const auto agent_class = provider_->getAgentClass(); + if (agent_class) { + serialized.push_back({.name = "agentClass", .value = *agent_class}); + } + + serialized.push_back({.name = "agentManifestHash", .value = getAgentManifestHash()}); + return serialized; +} + +std::vector AgentNode::getAgentManifest() const { + if (agent_manifest_cache_) { return std::vector{*agent_manifest_cache_}; } + agent_manifest_cache_ = {.name = "agentManifest", .children = [this] { + AgentManifest manifest{"manifest"}; + manifest.setStateMonitor(monitor_); + manifest.setUpdatePolicyController(update_policy_controller_); + manifest.setConfigurationReader(configuration_reader_); + return manifest.serialize(); + }()}; + agent_manifest_hash_cache_.clear(); + return std::vector{ *agent_manifest_cache_ }; +} + +std::string AgentNode::getAgentManifestHash() const { + if (agent_manifest_hash_cache_.empty()) { + agent_manifest_hash_cache_ = hashResponseNodes(getAgentManifest()); + } + return agent_manifest_hash_cache_; +} + +std::vector AgentNode::getAgentStatus() const { + std::vector serialized; + + AgentStatus status("status", getName()); + status.setRepositories(repositories_); + status.setStateMonitor(monitor_); + + SerializedResponseNode agentStatus; + agentStatus.name = "status"; + for (auto &ser : status.serialize()) { + agentStatus.children.push_back(std::move(ser)); + } + + serialized.push_back(agentStatus); + return serialized; +} + +std::vector AgentInformation::serialize() { + std::vector serialized(AgentNode::serialize()); + if (include_agent_manifest_) { + auto manifest = getAgentManifest(); + serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end())); + } + + if (include_agent_status_) { + auto status = getAgentStatus(); + serialized.insert(serialized.end(), std::make_move_iterator(status.begin()), std::make_move_iterator(status.end())); + } + return serialized; +} + REGISTER_RESOURCE(AgentInformation, DescriptionOnly); REGISTER_RESOURCE(AgentStatus, DescriptionOnly); diff --git a/libminifi/src/core/state/nodes/BuildInformation.cpp b/libminifi/src/core/state/nodes/BuildInformation.cpp index 596403a9ea..6b469f4299 100644 --- a/libminifi/src/core/state/nodes/BuildInformation.cpp +++ b/libminifi/src/core/state/nodes/BuildInformation.cpp @@ -18,20 +18,28 @@ #include "core/state/nodes/BuildInformation.h" #include "core/Resource.h" +#include "agent/agent_version.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace state { -namespace response { +namespace org::apache::nifi::minifi::state::response { + +std::vector BuildInformation::serialize() { + return { + {.name = "build_version", .value = AgentBuild::VERSION}, + {.name = "build_rev", .value = AgentBuild::BUILD_REV}, + {.name = "build_date", .value = AgentBuild::BUILD_DATE}, + { + .name = "compiler", + .children = { + {.name = "compiler_command", .value = AgentBuild::COMPILER}, + {.name = "compiler_version", .value = AgentBuild::COMPILER_VERSION}, + {.name = "compiler_flags", .value = AgentBuild::COMPILER_FLAGS}, + } + }, + {.name = "device_id", .value = AgentBuild::BUILD_IDENTIFIER} + }; +} REGISTER_RESOURCE(BuildInformation, DescriptionOnly); -} // namespace response -} // namespace state -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/DeviceInformation.cpp b/libminifi/src/core/state/nodes/DeviceInformation.cpp index 48991dc10e..0bb593aef3 100644 --- a/libminifi/src/core/state/nodes/DeviceInformation.cpp +++ b/libminifi/src/core/state/nodes/DeviceInformation.cpp @@ -15,6 +15,10 @@ * limitations under the License. */ #include "core/state/nodes/DeviceInformation.h" + +#include +#include + #include "core/Resource.h" namespace org::apache::nifi::minifi::state::response { @@ -22,6 +26,296 @@ namespace org::apache::nifi::minifi::state::response { utils::SystemCpuUsageTracker DeviceInfoNode::cpu_load_tracker_; std::mutex DeviceInfoNode::cpu_load_tracker_mutex_; +Device::Device() { + addrinfo hints; + memset(&hints, 0, sizeof hints); // make sure the struct is empty + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + hints.ai_protocol = 0; /* any protocol */ + + char hostname[1024]; + hostname[1023] = '\0'; + gethostname(hostname, 1023); + + std::ifstream device_id_file(".device_id"); + if (device_id_file) { + std::string line; + while (device_id_file) { + if (std::getline(device_id_file, line)) + device_id_ += line; + } + device_id_file.close(); + } else { + device_id_ = getDeviceId(); + + std::ofstream outputFile(".device_id"); + if (outputFile) { + outputFile.write(device_id_.c_str(), device_id_.length()); + } + outputFile.close(); + } + + canonical_hostname_ = hostname; + + std::stringstream ips; + auto ipaddressess = getIpAddresses(); + for (const auto& ip : ipaddressess) { + if (ipaddressess.size() > 1 && (ip.find("127") == 0 || ip.find("192") == 0)) + continue; + ip_ = ip; + break; + } +} + +std::vector Device::getIpAddresses() { + static std::vector ips; + if (ips.empty()) { + const auto filter = [](const utils::NetworkInterfaceInfo& interface_info) { + return !interface_info.isLoopback() && interface_info.isRunning(); + }; + auto network_interface_infos = utils::NetworkInterfaceInfo::getNetworkInterfaceInfos(filter); + for (const auto& network_interface_info : network_interface_infos) + for (const auto& ip_v4_address : network_interface_info.getIpV4Addresses()) + ips.push_back(ip_v4_address); + } + return ips; +} + +#if __linux__ +std::string Device::getDeviceId() { + std::hash hash_fn; + std::string macs; + ifaddrs *ifaddr; + ifaddrs *ifa; + int family; + int s; + int n; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) == -1) { + exit(EXIT_FAILURE); + } + + /* Walk through linked list, maintaining head pointer so we + can free list later */ + for (ifa = ifaddr, n = 0; ifa != nullptr; ifa = ifa->ifa_next, n++) { + if (ifa->ifa_addr == nullptr) + continue; + + family = ifa->ifa_addr->sa_family; + + /* Display interface name and family (including symbolic + form of the latter for the common families) */ + + /* For an AF_INET* interface address, display the address */ + + if (family == AF_INET || family == AF_INET6) { + s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host, NI_MAXHOST, + nullptr, + 0, NI_NUMERICHOST); + if (s != 0) { + printf("getnameinfo() failed: %s\n", gai_strerror(s)); + exit(EXIT_FAILURE); + } + } + } + + freeifaddrs(ifaddr); + + int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_IP); + struct ifreq ifr; + struct ifconf ifc; + char buf[1024]; + ifc.ifc_len = sizeof(buf); + ifc.ifc_buf = buf; + if (ioctl(sock, SIOCGIFCONF, &ifc) == -1) { /* handle error */ + } + + struct ifreq* it = ifc.ifc_req; + const struct ifreq* const end = it + (ifc.ifc_len / sizeof(struct ifreq)); + + for (; it != end; ++it) { + strcpy(ifr.ifr_name, it->ifr_name); // NOLINT + if (ioctl(sock, SIOCGIFFLAGS, &ifr) == 0) { + if (!(ifr.ifr_flags & IFF_LOOPBACK)) { // don't count loopback + if (ioctl(sock, SIOCGIFHWADDR, &ifr) == 0) { + unsigned char mac[6]; + + memcpy(mac, ifr.ifr_hwaddr.sa_data, 6); + + char mac_add[13]; + snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); // NOLINT + + macs += mac_add; + } + } + + } else { /* handle error */ + } + } + + close(sock); + + return std::to_string(hash_fn(macs)); +} +#elif(defined(__unix__) || defined(__APPLE__) || defined(__MACH__) || defined(BSD)) // should work on bsd variants as well +std::string Device::getDeviceId() { + ifaddrs* iflist; + std::hash hash_fn; + std::set macs; + + if (getifaddrs(&iflist) == 0) { + for (ifaddrs* cur = iflist; cur; cur = cur->ifa_next) { + if (cur->ifa_addr && (cur->ifa_addr->sa_family == AF_LINK) && (reinterpret_cast(cur->ifa_addr))->sdl_alen) { + sockaddr_dl* sdl = reinterpret_cast(cur->ifa_addr); + + if (sdl->sdl_type != IFT_ETHER) { + continue; + } else { + } + char mac[32]; + memcpy(mac, LLADDR(sdl), sdl->sdl_alen); + char mac_add[13]; + snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); // NOLINT + // macs += mac_add; + macs.insert(mac_add); + } + } + + freeifaddrs(iflist); + } + std::string macstr; + for (auto &mac : macs) { + macstr += mac; + } + return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; +} +#else +std::string Device::getDeviceId() { + PIP_ADAPTER_INFO adapterPtr; + PIP_ADAPTER_INFO adapter = nullptr; + + DWORD dwRetVal = 0; + + std::hash hash_fn; + std::set macs; + + ULONG adapterLen = sizeof(IP_ADAPTER_INFO); + adapterPtr = reinterpret_cast(malloc(sizeof(IP_ADAPTER_INFO))); + if (adapterPtr == nullptr) { + return ""; + } + if (GetAdaptersInfo(adapterPtr, &adapterLen) == ERROR_BUFFER_OVERFLOW) { + free(adapterPtr); + adapterPtr = reinterpret_cast(malloc(adapterLen)); + if (adapterPtr == nullptr) { + return ""; + } + } + + if ((dwRetVal = GetAdaptersInfo(adapterPtr, &adapterLen)) == NO_ERROR) { + adapter = adapterPtr; + while (adapter) { + char mac_add[13]; + snprintf(mac_add, 13, "%02X%02X%02X%02X%02X%02X", adapter->Address[0], adapter->Address[1], adapter->Address[2], adapter->Address[3], adapter->Address[4], adapter->Address[5]); // NOLINT + macs.insert(mac_add); + adapter = adapter->Next; + } + } + + if (adapterPtr) + free(adapterPtr); + std::string macstr; + for (auto &mac : macs) { + macstr += mac; + } + return macstr.length() > 0 ? std::to_string(hash_fn(macstr)) : "8675309"; +} +#endif + +std::vector DeviceInfoNode::serialize() { + return {serializeIdentifier(), serializeSystemInfo(), serializeNetworkInfo()}; +} + +std::vector DeviceInfoNode::calculateMetrics() { + double system_cpu_usage = -1.0; + { + std::lock_guard guard(cpu_load_tracker_mutex_); + system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); + } + SerializedResponseNode cpu_usage; + cpu_usage.name = "cpuUtilization"; + cpu_usage.value = system_cpu_usage; + return { + {"physical_mem", static_cast(utils::OsUtils::getSystemTotalPhysicalMemory()), {{"metric_class", "DeviceInfoNode"}}}, + {"memory_usage", static_cast(utils::OsUtils::getSystemPhysicalMemoryUsage()), {{"metric_class", "DeviceInfoNode"}}}, + {"cpu_utilization", system_cpu_usage, {{"metric_class", "DeviceInfoNode"}}}, + }; +} + +SerializedResponseNode DeviceInfoNode::serializeIdentifier() const { + return {.name = "identifier", .value = device_id_}; +} + +SerializedResponseNode DeviceInfoNode::serializeVCoreInfo() { + return {.name = "vCores", .value = std::thread::hardware_concurrency()}; +} + +SerializedResponseNode DeviceInfoNode::serializeOperatingSystemType() { + return {.name = "operatingSystem", .value = getOperatingSystem()}; +} + +SerializedResponseNode DeviceInfoNode::serializeTotalPhysicalMemoryInformation() { + return {.name = "physicalMem", .value = utils::OsUtils::getSystemTotalPhysicalMemory()}; +} + +SerializedResponseNode DeviceInfoNode::serializePhysicalMemoryUsageInformation() { + SerializedResponseNode used_physical_memory; + used_physical_memory.name = "memoryUsage"; + used_physical_memory.value = utils::OsUtils::getSystemPhysicalMemoryUsage(); + return used_physical_memory; +} + +SerializedResponseNode DeviceInfoNode::serializeSystemCPUUsageInformation() { + double system_cpu_usage = -1.0; + { + std::lock_guard guard(cpu_load_tracker_mutex_); + system_cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); + } + return {.name = "cpuUtilization", .value = system_cpu_usage}; +} + +SerializedResponseNode DeviceInfoNode::serializeArchitectureInformation() { + return {.name = "machineArch", .value = utils::OsUtils::getMachineArchitecture()}; +} + +SerializedResponseNode DeviceInfoNode::serializeSystemInfo() { + return { + .name = "systemInfo", + .children = { + serializeVCoreInfo(), + serializeOperatingSystemType(), + serializeTotalPhysicalMemoryInformation(), + serializeArchitectureInformation(), + serializePhysicalMemoryUsageInformation(), + serializeSystemCPUUsageInformation() + } + }; +} + +SerializedResponseNode DeviceInfoNode::serializeHostNameInfo() const { + return {.name = "hostname", .value = hostname_}; +} + +SerializedResponseNode DeviceInfoNode::serializeIPAddress() const { + return {.name = "ipAddress", .value = !ip_.empty() ? ip_ : "127.0.0.1"}; +} + +SerializedResponseNode DeviceInfoNode::serializeNetworkInfo() const { + return {.name = "networkInfo", .children = { serializeHostNameInfo(), serializeIPAddress()}}; +} + REGISTER_RESOURCE(DeviceInfoNode, DescriptionOnly); } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index 5ef30ed8fc..5756072acf 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -20,6 +20,77 @@ namespace org::apache::nifi::minifi::state::response { +std::vector FlowVersion::serialize() { + std::lock_guard lock(guard); + return { + {.name = "registryUrl", .value = identifier->getRegistryUrl()}, + {.name = "bucketId", .value = identifier->getBucketId()}, + {.name = "flowId", .value = identifier->getFlowId()} + }; +} + +std::vector FlowInformation::serialize() { + std::vector serialized = { + {.name = "flowId", .value = flow_version_->getFlowId()} + }; + + SerializedResponseNode uri; + uri.name = "versionedFlowSnapshotURI"; + for (auto &entry : flow_version_->serialize()) { + uri.children.push_back(entry); + } + serialized.push_back(uri); + + const auto& connections = connection_store_.getConnections(); + if (!connections.empty()) { + SerializedResponseNode queues{.name = "queues", .collapsible = false}; + + for (const auto& queue : connections) { + queues.children.push_back({ + .name = queue.second->getName(), + .collapsible = false, + .children = { + {.name = "size", .value = queue.second->getQueueSize()}, + {.name = "sizeMax", .value = queue.second->getBackpressureThresholdCount()}, + {.name = "dataSize", .value = queue.second->getQueueDataSize()}, + {.name = "dataSizeMax", .value = queue.second->getBackpressureThresholdDataSize()}, + {.name = "uuid", .value = std::string{queue.second->getUUIDStr()}} + } + }); + } + serialized.push_back(queues); + } + + if (nullptr != monitor_) { + SerializedResponseNode componentsNode{.name = "components", .collapsible = false}; + monitor_->executeOnAllComponents([&componentsNode](StateController& component){ + componentsNode.children.push_back({ + .name = component.getComponentName(), + .collapsible = false, + .children = { + {.name = "running", .value = component.isRunning()}, + {.name = "uuid", .value = std::string{component.getComponentUUID().to_string()}} + } + }); + }); + serialized.push_back(componentsNode); + } + + return serialized; +} + +std::vector FlowInformation::calculateMetrics() { + std::vector metrics = connection_store_.calculateConnectionMetrics("FlowInformation"); + + if (nullptr != monitor_) { + monitor_->executeOnAllComponents([&metrics](StateController& component){ + metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), + {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", "FlowInformation"}}}); + }); + } + return metrics; +} + REGISTER_RESOURCE(FlowInformation, DescriptionOnly); } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/QueueMetrics.cpp b/libminifi/src/core/state/nodes/QueueMetrics.cpp index d372681025..2a6f0fd655 100644 --- a/libminifi/src/core/state/nodes/QueueMetrics.cpp +++ b/libminifi/src/core/state/nodes/QueueMetrics.cpp @@ -21,6 +21,22 @@ namespace org::apache::nifi::minifi::state::response { +std::vector QueueMetrics::serialize() { + std::vector serialized; + for (const auto& [_, connection] : connection_store_.getConnections()) { + serialized.push_back({ + .name = connection->getName(), + .children = { + {.name = "datasize", .value = std::to_string(connection->getQueueDataSize())}, + {.name = "datasizemax", .value = std::to_string(connection->getBackpressureThresholdDataSize())}, + {.name = "queued", .value = std::to_string(connection->getQueueSize())}, + {.name = "queuedmax", .value = std::to_string(connection->getBackpressureThresholdCount())}, + } + }); + } + return serialized; +} + REGISTER_RESOURCE(QueueMetrics, DescriptionOnly); } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/RepositoryMetrics.cpp b/libminifi/src/core/state/nodes/RepositoryMetrics.cpp index a8d402b0b6..40748e20ce 100644 --- a/libminifi/src/core/state/nodes/RepositoryMetrics.cpp +++ b/libminifi/src/core/state/nodes/RepositoryMetrics.cpp @@ -21,6 +21,18 @@ namespace org::apache::nifi::minifi::state::response { +void RepositoryMetrics::addRepository(const std::shared_ptr &repo) { + return repository_metrics_source_store_.addRepository(repo); +} + +std::vector RepositoryMetrics::serialize() { + return repository_metrics_source_store_.serialize(); +} + +std::vector RepositoryMetrics::calculateMetrics() { + return repository_metrics_source_store_.calculateMetrics(); +} + REGISTER_RESOURCE(RepositoryMetrics, DescriptionOnly); } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/SchedulingNodes.cpp b/libminifi/src/core/state/nodes/SchedulingNodes.cpp new file mode 100644 index 0000000000..c3d1d7d5d1 --- /dev/null +++ b/libminifi/src/core/state/nodes/SchedulingNodes.cpp @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/state/nodes/SchedulingNodes.h" + +namespace org::apache::nifi::minifi::state::response { + +std::vector SchedulingDefaults::serialize() { + return { + { + .name = "schedulingDefaults", + .children = { + {.name = "defaultSchedulingStrategy", .value = core::DEFAULT_SCHEDULING_STRATEGY}, + {.name = "defaultSchedulingPeriodMillis", .value = int64_t{core::DEFAULT_SCHEDULING_PERIOD_MILLIS.count()}}, + {.name = "defaultRunDurationNanos", .value = int64_t{core::DEFAULT_RUN_DURATION.count()}}, + {.name = "defaultMaxConcurrentTasks", .value = core::DEFAULT_MAX_CONCURRENT_TASKS}, + {.name = "yieldDurationMillis", .value = int64_t{std::chrono::milliseconds(core::DEFAULT_YIELD_PERIOD_SECONDS).count()}}, + {.name = "penalizationPeriodMillis", .value = int64_t{std::chrono::milliseconds{core::DEFAULT_PENALIZATION_PERIOD}.count()}}, + } + } + }; +} + +} // namespace org::apache::nifi::minifi::state::response + diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp index 230c17c788..d21cba27a0 100644 --- a/libminifi/src/core/state/nodes/SupportedOperations.cpp +++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp @@ -38,29 +38,19 @@ std::string SupportedOperations::getName() const { } void SupportedOperations::addProperty(SerializedResponseNode& properties, const std::string& operand, const Metadata& metadata) { - SerializedResponseNode operand_node; - operand_node.name = operand; - operand_node.keep_empty = true; + SerializedResponseNode operand_node{.name = operand, .keep_empty = true}; for (const auto& [key, value_array] : metadata) { - SerializedResponseNode metadata_item; - metadata_item.name = key; - metadata_item.array = true; - + SerializedResponseNode metadata_item{.name = key, .array = true}; for (const auto& value_object : value_array) { SerializedResponseNode value_child; for (const auto& pair: value_object) { - SerializedResponseNode object_element; - object_element.name = pair.first; - object_element.value = pair.second; - value_child.children.push_back(object_element); + value_child.children.push_back({.name = pair.first, .value = pair.second}); } metadata_item.children.push_back(value_child); } - operand_node.children.push_back(metadata_item); } - properties.children.push_back(operand_node); } @@ -125,31 +115,23 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min } std::vector SupportedOperations::serialize() { - std::vector serialized; - SerializedResponseNode supported_operation; - supported_operation.name = "supportedOperations"; - supported_operation.array = true; - - for (const auto& operation : minifi::c2::Operation::values) { - SerializedResponseNode child; - child.name = "supportedOperations"; + SerializedResponseNode supported_operation{.name = "supportedOperations", .array = true}; - SerializedResponseNode operation_type; - operation_type.name = "type"; - operation_type.value = std::string(operation); - - SerializedResponseNode properties; - properties.name = "properties"; + for (const auto& operation : minifi::c2::Operation::values()) { + SerializedResponseNode child{ + .name = "supportedOperations", + .children = { + {.name = "type", .value = std::string(operation)} + } + }; + SerializedResponseNode properties{.name = "properties"}; fillProperties(properties, minifi::c2::Operation::parse(operation, {}, false)); - - child.children.push_back(operation_type); child.children.push_back(properties); supported_operation.children.push_back(child); } - serialized.push_back(supported_operation); - return serialized; + return {supported_operation}; } REGISTER_RESOURCE(SupportedOperations, DescriptionOnly); From 5ded6d64376772de2f2c2bb7ce66ea53114a294b Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 20 Jul 2023 09:54:55 +0200 Subject: [PATCH 2/3] Fix after rebase --- libminifi/src/core/state/nodes/SupportedOperations.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp index d21cba27a0..03d35373b7 100644 --- a/libminifi/src/core/state/nodes/SupportedOperations.cpp +++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp @@ -117,7 +117,7 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min std::vector SupportedOperations::serialize() { SerializedResponseNode supported_operation{.name = "supportedOperations", .array = true}; - for (const auto& operation : minifi::c2::Operation::values()) { + for (const auto& operation : minifi::c2::Operation::values) { SerializedResponseNode child{ .name = "supportedOperations", .children = { From 35ee94afcd51bd3ec6abaf6ba53e9fa1e1aa45ca Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 20 Jul 2023 14:23:57 +0200 Subject: [PATCH 3/3] Review update --- libminifi/src/core/state/nodes/DeviceInformation.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/libminifi/src/core/state/nodes/DeviceInformation.cpp b/libminifi/src/core/state/nodes/DeviceInformation.cpp index 0bb593aef3..65e02370dc 100644 --- a/libminifi/src/core/state/nodes/DeviceInformation.cpp +++ b/libminifi/src/core/state/nodes/DeviceInformation.cpp @@ -27,13 +27,6 @@ utils::SystemCpuUsageTracker DeviceInfoNode::cpu_load_tracker_; std::mutex DeviceInfoNode::cpu_load_tracker_mutex_; Device::Device() { - addrinfo hints; - memset(&hints, 0, sizeof hints); // make sure the struct is empty - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - hints.ai_protocol = 0; /* any protocol */ - char hostname[1024]; hostname[1023] = '\0'; gethostname(hostname, 1023);