Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding version and instanceId for traffic processor #1515

Open
wants to merge 15 commits into
base: feature/mini-runtime-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions .github/workflows/prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ jobs:
node-version: '17'

- name: mvn package command
run: mvn package
env:
IMAGE_TAG: collector-testing
run: mvn package -Dakto-image-tag=$IMAGE_TAG -Dakto-build-time=$(eval "date +%s")

- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
Expand All @@ -58,19 +60,13 @@ jobs:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: akto-api-security
REGISTRY_ALIAS: p7q3h0z2
IMAGE_TAG: latest
IMAGE_TAG1: testruntime
IMAGE_TAG2: local
IMAGE_TAG3: 1.41.20_local
IMAGE_TAG: collector-testing
run: |
docker buildx create --use
# Build a docker container and push it to DockerHub
cd apps/mini-runtime
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-runtime:$IMAGE_TAG -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-runtime:$IMAGE_TAG1 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-runtime:$IMAGE_TAG2 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-runtime:$IMAGE_TAG3 . --push
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-runtime:$IMAGE_TAG . --push
echo "::set-output name=image::$ECR_REGISTRY/akto-api-security-mini-runtime:$IMAGE_TAG"
cd ../mini-testing
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-testing:$IMAGE_TAG -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-testing:$IMAGE_TAG1 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-testing:$IMAGE_TAG2 -t $ECR_REGISTRY/$REGISTRY_ALIAS/akto-api-security-mini-testing:$IMAGE_TAG3 . --push
echo "::set-output name=image::$ECR_REGISTRY/akto-api-security-mini-testing:$IMAGE_TAG3"

- name: DockerHub login
env:
Expand All @@ -83,15 +79,10 @@ jobs:
env:
ECR_REGISTRY: aktosecurity
ECR_REPOSITORY: akto-api-security
IMAGE_TAG: latest
IMAGE_TAG1: testruntime
IMAGE_TAG2: local
IMAGE_TAG3: 1.41.20_local
IMAGE_TAG: collector-testing
run: |
echo $IMAGE_TAG >> $GITHUB_STEP_SUMMARY
docker buildx create --use
# Build a docker container and push it to DockerHub
cd apps/mini-runtime
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mini-runtime:$IMAGE_TAG -t $ECR_REGISTRY/mini-runtime:$IMAGE_TAG1 -t $ECR_REGISTRY/mini-runtime:$IMAGE_TAG2 -t $ECR_REGISTRY/mini-runtime:$IMAGE_TAG3 . --push
cd ../mini-testing
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mini-testing:$IMAGE_TAG -t $ECR_REGISTRY/mini-testing:$IMAGE_TAG1 -t $ECR_REGISTRY/mini-testing:$IMAGE_TAG2 -t $ECR_REGISTRY/mini-testing:$IMAGE_TAG3 . --push
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mini-runtime:$IMAGE_TAG . --push
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.akto.action.traffic_metrics;

import com.akto.action.UserAction;
import com.akto.dao.traffic_metrics.RuntimeMetricsDao;
import com.akto.dao.traffic_metrics.TrafficMetricsDao;
import com.akto.dto.traffic_metrics.RuntimeMetrics;
import com.akto.dto.traffic_metrics.TrafficMetrics;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;

import org.bson.Document;
import org.bson.conversions.Bson;

Expand All @@ -16,11 +21,16 @@
public class TrafficMetricsAction extends UserAction {
private int startTimestamp;
private int endTimestamp;
private String instanceId;


private List<TrafficMetrics.Name> names;
private String groupBy;
private String host;
private int vxlanID;
List<RuntimeMetrics> runtimeMetrics;
List<String> instanceIds;


public static final String ID = "_id.";

Expand Down Expand Up @@ -113,6 +123,39 @@ public String execute() {
return SUCCESS.toUpperCase();
}

public String fetchRuntimeInstances() {
instanceIds = new ArrayList<>();
Bson filters = RuntimeMetricsDao.buildFilters(startTimestamp, endTimestamp);
runtimeMetrics = RuntimeMetricsDao.instance.findAll(filters, 0, 0, Sorts.descending("timestamp"), Projections.include("instanceId"));
for (RuntimeMetrics metric: runtimeMetrics) {
instanceIds.add(metric.getInstanceId());
}

return SUCCESS.toUpperCase();
}

public String fetchRuntimeMetrics() {
Bson filters = RuntimeMetricsDao.buildFilters(startTimestamp, endTimestamp, instanceId);
// runtimeMetrics = RuntimeMetricsDao.instance.findAll(filters, 0, 0, Sorts.descending("timestamp"));
runtimeMetrics = new ArrayList<>();

try (MongoCursor<BasicDBObject> cursor = RuntimeMetricsDao.instance.getMCollection().aggregate(
Arrays.asList(
Aggregates.match(filters),
Aggregates.sort(Sorts.descending("timestamp")),
Aggregates.group(new BasicDBObject("name", "$name"), Accumulators.first("latestDoc", "$$ROOT"))
), BasicDBObject.class
).cursor()) {
while (cursor.hasNext()) {
BasicDBObject basicDBObject = cursor.next();
BasicDBObject latestDoc = (BasicDBObject) basicDBObject.get("latestDoc");
runtimeMetrics.add(new RuntimeMetrics(latestDoc.getString("name"), 0, instanceId, latestDoc.getDouble("val")));
}
}

return SUCCESS.toUpperCase();
}

public String fetchTrafficMetricsDesciptions(){
names = Arrays.asList(TrafficMetrics.Name.values());
return SUCCESS.toUpperCase();
Expand Down Expand Up @@ -150,4 +193,29 @@ public void setHost(String host) {
public void setVxlanID(int vxlanID) {
this.vxlanID = vxlanID;
}

public List<RuntimeMetrics> getRuntimeMetrics() {
return runtimeMetrics;
}

public void setRuntimeMetrics(List<RuntimeMetrics> runtimeMetrics) {
this.runtimeMetrics = runtimeMetrics;
}

public List<String> getInstanceIds() {
return instanceIds;
}

public void setInstanceIds(List<String> instanceIds) {
this.instanceIds = instanceIds;
}

public String getInstanceId() {
return instanceId;
}

public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}

}
22 changes: 22 additions & 0 deletions apps/dashboard/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2849,6 +2849,28 @@
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

<action name="api/fetchRuntimeInstances" class="com.akto.action.traffic_metrics.TrafficMetricsAction" method="fetchRuntimeInstances">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

<action name="api/fetchRuntimeMetrics" class="com.akto.action.traffic_metrics.TrafficMetricsAction" method="fetchRuntimeMetrics">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

<action name="api/fetchTrafficMetricsDesciptions" class="com.akto.action.traffic_metrics.TrafficMetricsAction" method="fetchTrafficMetricsDesciptions">
<interceptor-ref name="json"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ public void setDeltaUsage(int deltaUsage) {
}

DataControlSettings dataControlSettings;
BasicDBList metricsData;

public String fetchDataControlSettings() {
try {
String prevCommand = "";
Expand Down Expand Up @@ -1654,6 +1656,15 @@ public String fetchActiveAdvancedFilters(){
return Action.SUCCESS.toUpperCase();
}

public String insertRuntimeMetricsData() {
try {
DbLayer.insertRuntimeMetricsData(metricsData);
} catch (Exception e) {
return Action.ERROR.toUpperCase();
}
return Action.SUCCESS.toUpperCase();
}

public List<CustomDataTypeMapper> getCustomDataTypes() {
return customDataTypes;
}
Expand Down Expand Up @@ -2540,4 +2551,12 @@ public void setActiveAdvancedFilters(List<YamlTemplate> activeAdvancedFilters) {
this.activeAdvancedFilters = activeAdvancedFilters;
}

public BasicDBList getMetricsData() {
return metricsData;
}

public void setMetricsData(BasicDBList metricsData) {
this.metricsData = metricsData;
}

}
11 changes: 11 additions & 0 deletions apps/database-abstractor/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,17 @@
</result>
</action>

<action name="api/insertRuntimeMetricsData" class="com.akto.action.DbAction" method="insertRuntimeMetricsData">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

</package>

</struts>
75 changes: 65 additions & 10 deletions apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.akto.hybrid_runtime;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;
Expand All @@ -23,6 +28,8 @@
import com.akto.data_actor.DataActorFactory;
import com.akto.util.DashboardMode;
import com.google.gson.Gson;
import com.mongodb.BasicDBObject;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -109,6 +116,41 @@ private static void buildKafka() {
}
}

private static String insertCredsRecordInKafka(String brokerUrl) {
File f = new File("creds.txt");
String instanceId = UUID.randomUUID().toString();
if (f.exists()) {
try (FileReader reader = new FileReader(f);
BufferedReader bufferedReader = new BufferedReader(reader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
instanceId = line;
}
} catch (IOException e) {
loggerMaker.errorAndAddToDb("Error reading instanceId from file: " + e.getMessage());
}
} else {
try (FileWriter writer = new FileWriter(f)) {
writer.write(instanceId);
} catch (IOException e) {
loggerMaker.errorAndAddToDb("Error writing instanceId to file: " + e.getMessage());
}
}
int batchSize = Integer.parseInt(System.getenv("AKTO_TRAFFIC_BATCH_SIZE"));
int kafkaLingerMS = Integer.parseInt(System.getenv("AKTO_TRAFFIC_BATCH_TIME_SECS"));
kafkaProducer = new Kafka(brokerUrl, kafkaLingerMS, batchSize);
BasicDBObject creds = new BasicDBObject();
creds.put("id", instanceId);
creds.put("token", System.getenv("DATABASE_ABSTRACTOR_SERVICE_TOKEN"));
creds.put("url", System.getenv("DATABASE_ABSTRACTOR_SERVICE_URL"));
try {
kafkaProducer.send(creds.toJson(), "credentials");
} catch (Exception e) {
loggerMaker.errorAndAddToDb("Error inserting creds record in kafka: " + e.getMessage());
}
return instanceId;
}

public static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

public static class AccountInfo {
Expand Down Expand Up @@ -171,6 +213,7 @@ public static void main(String[] args) {
fetchAllSTI = false;
}
int maxPollRecordsConfig = Integer.parseInt(System.getenv("AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG"));
String instanceId = insertCredsRecordInKafka(kafkaBrokerUrl);

AccountSettings aSettings = dataActor.fetchAccountSettings();
if (aSettings == null) {
Expand All @@ -188,7 +231,7 @@ public static void main(String[] args) {

dataActor.modifyHybridSaasSetting(RuntimeMode.isHybridDeployment());

initializeRuntime();
initializeRuntime(instanceId);

String centralKafkaTopicName = AccountSettings.DEFAULT_CENTRAL_KAFKA_TOPIC_NAME;

Expand Down Expand Up @@ -378,6 +421,17 @@ public void run() {
continue;
}

if (UsageMetricUtils.checkActiveEndpointOverage(accountIdInt)) {
int now = Context.now();
int lastSent = logSentMap.getOrDefault(accountIdInt, 0);
if (now - lastSent > LoggerMaker.LOG_SAVE_INTERVAL) {
logSentMap.put(accountIdInt, now);
loggerMaker.infoAndAddToDb("Active endpoint overage detected for account " + accountIdInt
+ ". Ingestion stopped " + now, LogDb.RUNTIME);
}
continue;
}

if (!httpCallParserMap.containsKey(accountId)) {
HttpCallParser parser = new HttpCallParser(
apiConfig.getUserIdentifier(), apiConfig.getThreshold(), apiConfig.getSync_threshold_count(),
Expand Down Expand Up @@ -510,12 +564,20 @@ public static void changeTargetCollection(Map<String, Map<Pattern, String>> apiC
}
}

public static void initializeRuntime(){
public static void initializeRuntime(String instanceId){

Account account = dataActor.fetchActiveAccount();
Context.accountId.set(account.getId());

AllMetrics.instance.init();
String version = "";
RuntimeVersion runtimeVersion = new RuntimeVersion();
try {
version = runtimeVersion.updateVersion(AccountSettings.API_RUNTIME_VERSION, dataActor);
} catch (Exception e) {
loggerMaker.errorAndAddToDb("error while updating dashboard version: " + e.getMessage(), LogDb.RUNTIME);
}

AllMetrics.instance.init(instanceId, version);
loggerMaker.infoAndAddToDb("All metrics initialized", LogDb.RUNTIME);

Setup setup = dataActor.fetchSetup();
Expand All @@ -526,13 +588,6 @@ public static void initializeRuntime(){
}

isOnprem = dashboardMode.equalsIgnoreCase(DashboardMode.ON_PREM.name());

RuntimeVersion runtimeVersion = new RuntimeVersion();
try {
runtimeVersion.updateVersion(AccountSettings.API_RUNTIME_VERSION, dataActor);
} catch (Exception e) {
loggerMaker.errorAndAddToDb("error while updating dashboard version: " + e.getMessage(), LogDb.RUNTIME);
}

initFromRuntime(account.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public class RuntimeVersion {

public void updateVersion(String fieldName, DataActor dataActor) throws Exception {
public String updateVersion(String fieldName, DataActor dataActor) throws Exception {
try (InputStream in = getClass().getResourceAsStream("/version.txt")) {
if (in != null) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
Expand All @@ -18,6 +18,7 @@ public void updateVersion(String fieldName, DataActor dataActor) throws Exceptio

String version = imageTag + " - " + buildTime;
dataActor.updateRuntimeVersion(fieldName, version);
return version;
} else {
throw new Exception("Input stream null");
}
Expand Down
Loading
Loading