Skip to content

Commit

Permalink
Merge commit 'afeeb33077647181282ce3344f34f7d6c565f288' into feature/…
Browse files Browse the repository at this point in the history
…traffic_processor_miniruntime_changes
  • Loading branch information
TangoBeeAkto committed Sep 19, 2024
2 parents a5dd0d4 + afeeb33 commit 21cbe54
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.akto.action.traffic_metrics;

import com.akto.action.UserAction;
import com.akto.dao.traffic_metrics.RuntimeMetricsDao;
import com.akto.dao.traffic_metrics.TrafficMetricsDao;
import com.akto.dto.traffic_metrics.RuntimeMetrics;
import com.akto.dto.traffic_metrics.TrafficMetrics;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;

import org.bson.Document;
import org.bson.conversions.Bson;

Expand All @@ -16,11 +21,16 @@
public class TrafficMetricsAction extends UserAction {
private int startTimestamp;
private int endTimestamp;
private String instanceId;


private List<TrafficMetrics.Name> names;
private String groupBy;
private String host;
private int vxlanID;
List<RuntimeMetrics> runtimeMetrics;
List<String> instanceIds;


public static final String ID = "_id.";

Expand Down Expand Up @@ -113,6 +123,39 @@ public String execute() {
return SUCCESS.toUpperCase();
}

public String fetchRuntimeInstances() {
instanceIds = new ArrayList<>();
Bson filters = RuntimeMetricsDao.buildFilters(startTimestamp, endTimestamp);
runtimeMetrics = RuntimeMetricsDao.instance.findAll(filters, 0, 0, Sorts.descending("timestamp"), Projections.include("instanceId"));
for (RuntimeMetrics metric: runtimeMetrics) {
instanceIds.add(metric.getInstanceId());
}

return SUCCESS.toUpperCase();
}

public String fetchRuntimeMetrics() {
Bson filters = RuntimeMetricsDao.buildFilters(startTimestamp, endTimestamp, instanceId);
// runtimeMetrics = RuntimeMetricsDao.instance.findAll(filters, 0, 0, Sorts.descending("timestamp"));
runtimeMetrics = new ArrayList<>();

try (MongoCursor<BasicDBObject> cursor = RuntimeMetricsDao.instance.getMCollection().aggregate(
Arrays.asList(
Aggregates.match(filters),
Aggregates.sort(Sorts.descending("timestamp")),
Aggregates.group(new BasicDBObject("name", "$name"), Accumulators.first("latestDoc", "$$ROOT"))
), BasicDBObject.class
).cursor()) {
while (cursor.hasNext()) {
BasicDBObject basicDBObject = cursor.next();
BasicDBObject latestDoc = (BasicDBObject) basicDBObject.get("latestDoc");
runtimeMetrics.add(new RuntimeMetrics(latestDoc.getString("name"), 0, instanceId, latestDoc.getDouble("val")));
}
}

return SUCCESS.toUpperCase();
}

public String fetchTrafficMetricsDesciptions(){
names = Arrays.asList(TrafficMetrics.Name.values());
return SUCCESS.toUpperCase();
Expand Down Expand Up @@ -150,4 +193,29 @@ public void setHost(String host) {
public void setVxlanID(int vxlanID) {
this.vxlanID = vxlanID;
}

public List<RuntimeMetrics> getRuntimeMetrics() {
return runtimeMetrics;
}

public void setRuntimeMetrics(List<RuntimeMetrics> runtimeMetrics) {
this.runtimeMetrics = runtimeMetrics;
}

public List<String> getInstanceIds() {
return instanceIds;
}

public void setInstanceIds(List<String> instanceIds) {
this.instanceIds = instanceIds;
}

public String getInstanceId() {
return instanceId;
}

public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}

}
22 changes: 22 additions & 0 deletions apps/dashboard/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2849,6 +2849,28 @@
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

<action name="api/fetchRuntimeInstances" class="com.akto.action.traffic_metrics.TrafficMetricsAction" method="fetchRuntimeInstances">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

<action name="api/fetchRuntimeMetrics" class="com.akto.action.traffic_metrics.TrafficMetricsAction" method="fetchRuntimeMetrics">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

<action name="api/fetchTrafficMetricsDesciptions" class="com.akto.action.traffic_metrics.TrafficMetricsAction" method="fetchTrafficMetricsDesciptions">
<interceptor-ref name="json"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ public void setDeltaUsage(int deltaUsage) {
}

DataControlSettings dataControlSettings;
BasicDBList metricsData;

public String fetchDataControlSettings() {
try {
String prevCommand = "";
Expand Down Expand Up @@ -1654,6 +1656,15 @@ public String fetchActiveAdvancedFilters(){
return Action.SUCCESS.toUpperCase();
}

public String insertRuntimeMetricsData() {
try {
DbLayer.insertRuntimeMetricsData(metricsData);
} catch (Exception e) {
return Action.ERROR.toUpperCase();
}
return Action.SUCCESS.toUpperCase();
}

public List<CustomDataTypeMapper> getCustomDataTypes() {
return customDataTypes;
}
Expand Down Expand Up @@ -2540,4 +2551,12 @@ public void setActiveAdvancedFilters(List<YamlTemplate> activeAdvancedFilters) {
this.activeAdvancedFilters = activeAdvancedFilters;
}

public BasicDBList getMetricsData() {
return metricsData;
}

public void setMetricsData(BasicDBList metricsData) {
this.metricsData = metricsData;
}

}
11 changes: 11 additions & 0 deletions apps/database-abstractor/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,17 @@
</result>
</action>

<action name="api/insertRuntimeMetricsData" class="com.akto.action.DbAction" method="insertRuntimeMetricsData">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

</package>

</struts>
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.akto.hybrid_runtime;

import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;
Expand All @@ -25,6 +29,7 @@
import com.akto.util.DashboardMode;
import com.google.gson.Gson;
import com.mongodb.BasicDBObject;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -416,6 +421,17 @@ public void run() {
continue;
}

if (UsageMetricUtils.checkActiveEndpointOverage(accountIdInt)) {
int now = Context.now();
int lastSent = logSentMap.getOrDefault(accountIdInt, 0);
if (now - lastSent > LoggerMaker.LOG_SAVE_INTERVAL) {
logSentMap.put(accountIdInt, now);
loggerMaker.infoAndAddToDb("Active endpoint overage detected for account " + accountIdInt
+ ". Ingestion stopped " + now, LogDb.RUNTIME);
}
continue;
}

if (!httpCallParserMap.containsKey(accountId)) {
HttpCallParser parser = new HttpCallParser(
apiConfig.getUserIdentifier(), apiConfig.getThreshold(), apiConfig.getSync_threshold_count(),
Expand Down
6 changes: 5 additions & 1 deletion libs/dao/src/main/java/com/akto/DaoInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.akto.dao.testing.TestingRunResultDao;
import com.akto.dao.testing.TestingRunResultSummariesDao;
import com.akto.dao.testing_run_findings.TestingRunIssuesDao;
import com.akto.dao.traffic_metrics.RuntimeMetricsDao;
import com.akto.dao.traffic_metrics.TrafficMetricsDao;
import com.akto.dao.usage.UsageMetricsDao;
import com.akto.dto.*;
Expand Down Expand Up @@ -36,6 +37,7 @@
import com.akto.dto.third_party_access.Credential;
import com.akto.dto.third_party_access.ThirdPartyAccess;
import com.akto.dto.traffic.SampleData;
import com.akto.dto.traffic_metrics.RuntimeMetrics;
import com.akto.dto.traffic_metrics.TrafficMetrics;
import com.akto.dto.traffic_metrics.TrafficMetricsAlert;
import com.akto.dto.type.SingleTypeInfo;
Expand Down Expand Up @@ -246,6 +248,7 @@ public static CodecRegistry createCodecRegistry(){
ClassModel<CodeAnalysisApiInfo.CodeAnalysisApiInfoKey> codeAnalysisApiInfoKeyClassModel = ClassModel.builder(CodeAnalysisApiInfo.CodeAnalysisApiInfoKey.class).enableDiscriminator(true).build();
ClassModel<RiskScoreTestingEndpoints> riskScoreTestingEndpointsClassModel = ClassModel.builder(RiskScoreTestingEndpoints.class).enableDiscriminator(true).build();
ClassModel<OrganizationFlags> OrganizationFlagsClassModel = ClassModel.builder(OrganizationFlags.class).enableDiscriminator(true).build();
ClassModel<RuntimeMetrics> RuntimeMetricsClassModel = ClassModel.builder(RuntimeMetrics.class).enableDiscriminator(true).build();

CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().register(
configClassModel, signupInfoClassModel, apiAuthClassModel, attempResultModel, urlTemplateModel,
Expand Down Expand Up @@ -275,7 +278,7 @@ public static CodecRegistry createCodecRegistry(){
yamlNodeDetails, multiExecTestResultClassModel, workflowTestClassModel, dependencyNodeClassModel, paramInfoClassModel,
nodeClassModel, connectionClassModel, edgeClassModel, replaceDetailClassModel, modifyHostDetailClassModel, fileUploadClassModel
,fileUploadLogClassModel, codeAnalysisCollectionClassModel, codeAnalysisApiLocationClassModel, codeAnalysisApiInfoClassModel, codeAnalysisApiInfoKeyClassModel,
riskScoreTestingEndpointsClassModel, OrganizationFlagsClassModel).automatic(true).build());
riskScoreTestingEndpointsClassModel, OrganizationFlagsClassModel, RuntimeMetricsClassModel).automatic(true).build());

final CodecRegistry customEnumCodecs = CodecRegistries.fromCodecs(
new EnumCodec<>(Conditions.Operator.class),
Expand Down Expand Up @@ -374,6 +377,7 @@ public static void createIndices() {
DependencyFlowNodesDao.instance.createIndicesIfAbsent();
CodeAnalysisCollectionDao.instance.createIndicesIfAbsent();
CodeAnalysisApiInfoDao.instance.createIndicesIfAbsent();
RuntimeMetricsDao.instance.createIndicesIfAbsent();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.akto.dao.traffic_metrics;

import java.util.ArrayList;

import org.bson.conversions.Bson;

import com.akto.dao.AccountsContextDao;
import com.akto.dao.MCollection;
import com.akto.dao.context.Context;
import com.akto.dto.traffic_metrics.RuntimeMetrics;
import com.akto.dto.type.URLMethods;
import com.akto.util.DbMode;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.WriteModel;

public class RuntimeMetricsDao extends AccountsContextDao<RuntimeMetrics> {

public static final RuntimeMetricsDao instance = new RuntimeMetricsDao();
public static final int maxDocuments = 100_000;
public static final int sizeInBytes = 100_000_000;

@Override
public String getCollName() {
return "runtime_metrics";
}

@Override
public Class<RuntimeMetrics> getClassT() {
return RuntimeMetrics.class;
}

public void createIndicesIfAbsent() {
boolean exists = false;
String dbName = Context.accountId.get()+"";
MongoDatabase db = clients[0].getDatabase(dbName);
for (String col: db.listCollectionNames()){
if (getCollName().equalsIgnoreCase(col)){
exists = true;
break;
}
};

if (!exists) {
db.createCollection(getCollName());
}

if (!exists) {
if (DbMode.allowCappedCollections()) {
db.createCollection(getCollName(), new CreateCollectionOptions().capped(true).maxDocuments(maxDocuments).sizeInBytes(sizeInBytes));
} else {
db.createCollection(getCollName());
}
}

MCollection.createIndexIfAbsent(getDBName(), getCollName(),
new String[] { "timestamp" }, true);
MCollection.createIndexIfAbsent(getDBName(), getCollName(),
new String[] { "timestamp", "instanceId" }, true);
}

public static void bulkInsertMetrics(ArrayList<WriteModel<RuntimeMetrics>> bulkUpdates) {
RuntimeMetricsDao.instance. getMCollection().bulkWrite(bulkUpdates);
}

public static Bson buildFilters(int startTs, int endTs) {
return Filters.and(
Filters.gte("timestamp", startTs),
Filters.lte("timestamp", endTs)
);
}

public static Bson buildFilters(int startTs, int endTs, String instanceId) {
return Filters.and(
Filters.gte("timestamp", startTs),
Filters.lte("timestamp", endTs),
Filters.eq("instanceId", instanceId)
);
}

}
Loading

0 comments on commit 21cbe54

Please sign in to comment.