Skip to content

Commit

Permalink
feat: adding version and instanceId for traffic processor
Browse files Browse the repository at this point in the history
  • Loading branch information
TangoBeeAkto committed Sep 19, 2024
1 parent 9e7dbdf commit a4d700b
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 14 deletions.
59 changes: 49 additions & 10 deletions apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.akto.hybrid_runtime;

import java.io.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Executors;
Expand All @@ -23,6 +24,7 @@
import com.akto.data_actor.DataActorFactory;
import com.akto.util.DashboardMode;
import com.google.gson.Gson;
import com.mongodb.BasicDBObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -109,6 +111,41 @@ private static void buildKafka() {
}
}

private static String insertCredsRecordInKafka(String brokerUrl) {
File f = new File("creds.txt");
String instanceId = UUID.randomUUID().toString();
if (f.exists()) {
try (FileReader reader = new FileReader(f);
BufferedReader bufferedReader = new BufferedReader(reader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
instanceId = line;
}
} catch (IOException e) {
loggerMaker.errorAndAddToDb("Error reading instanceId from file: " + e.getMessage());
}
} else {
try (FileWriter writer = new FileWriter(f)) {
writer.write(instanceId);
} catch (IOException e) {
loggerMaker.errorAndAddToDb("Error writing instanceId to file: " + e.getMessage());
}
}
int batchSize = Integer.parseInt(System.getenv("AKTO_KAFKA_PRODUCER_BATCH_SIZE"));
int kafkaLingerMS = Integer.parseInt(System.getenv("AKTO_KAFKA_PRODUCER_LINGER_MS"));
kafkaProducer = new Kafka(brokerUrl, kafkaLingerMS, batchSize);
BasicDBObject creds = new BasicDBObject();
creds.put("id", instanceId);
creds.put("token", System.getenv("DATABASE_ABSTRACTOR_SERVICE_TOKEN"));
creds.put("url", System.getenv("DATABASE_ABSTRACTOR_SERVICE_URL"));
try {
kafkaProducer.send(creds.toJson(), "credentials");
} catch (Exception e) {
loggerMaker.errorAndAddToDb("Error inserting creds record in kafka: " + e.getMessage());
}
return instanceId;
}

public static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

public static class AccountInfo {
Expand Down Expand Up @@ -171,6 +208,7 @@ public static void main(String[] args) {
fetchAllSTI = false;
}
int maxPollRecordsConfig = Integer.parseInt(System.getenv("AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG"));
String instanceId = insertCredsRecordInKafka(kafkaBrokerUrl);

AccountSettings aSettings = dataActor.fetchAccountSettings();
if (aSettings == null) {
Expand All @@ -188,7 +226,7 @@ public static void main(String[] args) {

dataActor.modifyHybridSaasSetting(RuntimeMode.isHybridDeployment());

initializeRuntime();
initializeRuntime(instanceId);

String centralKafkaTopicName = AccountSettings.DEFAULT_CENTRAL_KAFKA_TOPIC_NAME;

Expand Down Expand Up @@ -510,12 +548,20 @@ public static void changeTargetCollection(Map<String, Map<Pattern, String>> apiC
}
}

public static void initializeRuntime(){
public static void initializeRuntime(String instanceId){

Account account = dataActor.fetchActiveAccount();
Context.accountId.set(account.getId());

AllMetrics.instance.init();
String version = "";
RuntimeVersion runtimeVersion = new RuntimeVersion();
try {
version = runtimeVersion.updateVersion(AccountSettings.API_RUNTIME_VERSION, dataActor);
} catch (Exception e) {
loggerMaker.errorAndAddToDb("error while updating dashboard version: " + e.getMessage(), LogDb.RUNTIME);
}

AllMetrics.instance.init(instanceId, version);
loggerMaker.infoAndAddToDb("All metrics initialized", LogDb.RUNTIME);

Setup setup = dataActor.fetchSetup();
Expand All @@ -526,13 +572,6 @@ public static void initializeRuntime(){
}

isOnprem = dashboardMode.equalsIgnoreCase(DashboardMode.ON_PREM.name());

RuntimeVersion runtimeVersion = new RuntimeVersion();
try {
runtimeVersion.updateVersion(AccountSettings.API_RUNTIME_VERSION, dataActor);
} catch (Exception e) {
loggerMaker.errorAndAddToDb("error while updating dashboard version: " + e.getMessage(), LogDb.RUNTIME);
}

initFromRuntime(account.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public class RuntimeVersion {

public void updateVersion(String fieldName, DataActor dataActor) throws Exception {
public String updateVersion(String fieldName, DataActor dataActor) throws Exception {
try (InputStream in = getClass().getResourceAsStream("/version.txt")) {
if (in != null) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
Expand All @@ -18,6 +18,7 @@ public void updateVersion(String fieldName, DataActor dataActor) throws Exceptio

String version = imageTag + " - " + buildTime;
dataActor.updateRuntimeVersion(fieldName, version);
return version;
} else {
throw new Exception("Input stream null");
}
Expand Down
28 changes: 25 additions & 3 deletions libs/utils/src/main/java/com/akto/metrics/AllMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

public class AllMetrics {

public void init(){
private String instanceId;
private String version;

public void init(String instanceId, String version){
int accountId = Context.accountId.get();
this.setInstanceId(instanceId);
this.setVersion(version);

Organization organization = DataActorFactory.fetchInstance().fetchOrganization(accountId);
String orgId = organization.getId();
Expand Down Expand Up @@ -81,8 +86,10 @@ public void init(){
metricsData.put("metric_id", m.metricId);
metricsData.put("val", metric);
metricsData.put("org_id", m.orgId);
metricsData.put("instance_id", instance_id);
metricsData.put("instance_id", this.getInstanceId());
metricsData.put("version", this.getVersion());
metricsData.put("account_id", m.accountId);
metricsData.put("timestamp", Context.now());
list.add(metricsData);

}
Expand Down Expand Up @@ -110,7 +117,6 @@ private AllMetrics(){}

private final static LoggerMaker loggerMaker = new LoggerMaker(AllMetrics.class);

private static final String instance_id = UUID.randomUUID().toString();
private Metric runtimeKafkaRecordCount;
private Metric runtimeKafkaRecordSize;
private Metric runtimeProcessLatency = null;
Expand Down Expand Up @@ -431,4 +437,20 @@ public static void sendDataToAkto(BasicDBList list){
loggerMaker.infoAndAddToDb("Traffic_metrics not sent", LoggerMaker.LogDb.RUNTIME);
}
}

public String getInstanceId() {
return instanceId;
}

public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}
}

0 comments on commit a4d700b

Please sign in to comment.