Skip to content

Commit

Permalink
Merge pull request #1287 from akto-api-security/feature/runtime_optim…
Browse files Browse the repository at this point in the history
…isations

use fastjson, optimised findSubTypes, removed tryCollectionName
  • Loading branch information
avneesh-akto authored Jul 20, 2024
2 parents 631d601 + cb989aa commit a649f84
Show file tree
Hide file tree
Showing 16 changed files with 311 additions and 128 deletions.
33 changes: 17 additions & 16 deletions apps/api-runtime/src/main/java/com/akto/parsers/HttpCallParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.bson.conversions.Bson;
import com.alibaba.fastjson2.*;

import java.io.IOException;
import java.util.*;
Expand Down Expand Up @@ -95,14 +96,14 @@ public HttpCallParser(String userIdentifier, int thresh, int sync_threshold_coun
public static HttpResponseParams parseKafkaMessage(String message) throws Exception {

//convert java object to JSON format
Map<String, Object> json = gson.fromJson(message, Map.class);
JSONObject jsonObject = JSON.parseObject(message);

String method = (String) json.get("method");
String url = (String) json.get("path");
String type = (String) json.get("type");
Map<String,List<String>> requestHeaders = OriginalHttpRequest.buildHeadersMap(json, "requestHeaders");
String method = jsonObject.getString("method");
String url = jsonObject.getString("path");
String type = jsonObject.getString("type");
Map<String,List<String>> requestHeaders = OriginalHttpRequest.buildHeadersMap(jsonObject, "requestHeaders");

String rawRequestPayload = (String) json.get("requestPayload");
String rawRequestPayload = jsonObject.getString("requestPayload");
String requestPayload = HttpRequestResponseUtils.rawToJsonString(rawRequestPayload,requestHeaders);

if (GRPC_DEBUG_COUNTER > 0) {
Expand All @@ -116,7 +117,7 @@ public static HttpResponseParams parseKafkaMessage(String message) throws Except
}
}

String apiCollectionIdStr = json.getOrDefault("akto_vxlan_id", "0").toString();
String apiCollectionIdStr = jsonObject.getOrDefault("akto_vxlan_id", "0").toString();
int apiCollectionId = 0;
if (NumberUtils.isDigits(apiCollectionIdStr)) {
apiCollectionId = NumberUtils.toInt(apiCollectionIdStr, 0);
Expand All @@ -126,19 +127,19 @@ public static HttpResponseParams parseKafkaMessage(String message) throws Except
method,url,type, requestHeaders, requestPayload, apiCollectionId
);

int statusCode = Integer.parseInt(json.get("statusCode").toString());
String status = (String) json.get("status");
Map<String,List<String>> responseHeaders = OriginalHttpRequest.buildHeadersMap(json, "responseHeaders");
String payload = (String) json.get("responsePayload");
int statusCode = jsonObject.getInteger("statusCode");
String status = jsonObject.getString("status");
Map<String,List<String>> responseHeaders = OriginalHttpRequest.buildHeadersMap(jsonObject, "responseHeaders");
String payload = jsonObject.getString("responsePayload");
payload = HttpRequestResponseUtils.rawToJsonString(payload, responseHeaders);
payload = JSONUtils.parseIfJsonP(payload);
int time = Integer.parseInt(json.get("time").toString());
String accountId = (String) json.get("akto_account_id");
String sourceIP = (String) json.get("ip");
int time = jsonObject.getInteger("time");
String accountId = jsonObject.getString("akto_account_id");
String sourceIP = jsonObject.getString("ip");

String isPendingStr = (String) json.getOrDefault("is_pending", "false");
String isPendingStr = (String) jsonObject.getOrDefault("is_pending", "false");
boolean isPending = !isPendingStr.toLowerCase().equals("false");
String sourceStr = (String) json.getOrDefault("source", HttpResponseParams.Source.OTHER.name());
String sourceStr = (String) jsonObject.getOrDefault("source", HttpResponseParams.Source.OTHER.name());
HttpResponseParams.Source source = HttpResponseParams.Source.valueOf(sourceStr);

return new HttpResponseParams(
Expand Down
31 changes: 21 additions & 10 deletions apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import com.akto.types.CappedSet;
import com.akto.usage.UsageMetricCalculator;
import com.akto.usage.UsageMetricHandler;
import com.akto.util.JSONUtils;
import com.akto.utils.RedactSampleData;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.google.api.client.util.Charsets;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
Expand All @@ -44,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -81,6 +85,10 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo
this.aktoPolicyNew = new AktoPolicyNew();
if (buildFromDb) {
buildFromDB(false, fetchAllSTI);
AccountSettings accountSettings = AccountSettingsDao.instance.findOne(AccountSettingsDao.generateFilter());
if (accountSettings != null && accountSettings.getPartnerIpList() != null) {
partnerIpsList = accountSettings.getPartnerIpList();
}
}
}

Expand Down Expand Up @@ -120,10 +128,9 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams
}

requestTemplate.processHeaders(requestParams.getHeaders(), baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp);
BasicDBObject requestPayload = RequestTemplate.parseRequestPayload(requestParams, urlWithParams);
if (requestPayload != null) {
deletedInfo.addAll(requestTemplate.process2(requestPayload, baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp));
}
JSONObject jsonObject = RequestTemplate.parseRequestPayloadToJsonObject(requestParams.getPayload(), urlWithParams);
Map<String, Set<Object>> flattened = JSONUtils.flattenJSONObject(jsonObject);
deletedInfo.addAll(requestTemplate.process2(flattened, baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp));
requestTemplate.recordMessage(responseParams.getOrig());

Map<Integer, RequestTemplate> responseTemplates = requestTemplate.getResponseTemplates();
Expand All @@ -145,15 +152,15 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams
respPayload = "{\"json\": "+respPayload+"}";
}


BasicDBObject payload;
JSONObject payload;
try {
payload = BasicDBObject.parse(respPayload);
payload = JSON.parseObject(respPayload);
} catch (Exception e) {
payload = BasicDBObject.parse("{}");
payload = JSON.parseObject("{}");
}

deletedInfo.addAll(responseTemplate.process2(payload, baseURL.getUrl(), methodStr, statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp));
flattened = JSONUtils.flattenJSONObject(payload);
deletedInfo.addAll(responseTemplate.process2(flattened, baseURL.getUrl(), methodStr, statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp));
responseTemplate.processHeaders(responseParams.getHeaders(), baseURL.getUrl(), method.name(), statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp);
if (!responseParams.getIsPending()) {
responseTemplate.processTraffic(responseParams.getTime());
Expand Down Expand Up @@ -206,7 +213,7 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe
Set<HttpResponseParams> value = entry.getValue();
for (HttpResponseParams responseParams: value) {
try {
aktoPolicyNew.process(responseParams);
aktoPolicyNew.process(responseParams, partnerIpsList);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
Expand Down Expand Up @@ -1706,6 +1713,7 @@ private static Map<Integer, APICatalog> build(List<SingleTypeInfo> allParams, Bl
}

int counter = 0;
List<String> partnerIpsList = new ArrayList<>();

public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit syncLimit) {
loggerMaker.infoAndAddToDb("Started sync with db! syncImmediately="+syncImmediately + " fetchAllSTI="+fetchAllSTI, LogDb.RUNTIME);
Expand All @@ -1725,6 +1733,9 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit s
boolean redact = false;
if (accountSettings != null) {
redact = accountSettings.isRedactPayload();
if (accountSettings.getPartnerIpList() != null) {
partnerIpsList = accountSettings.getPartnerIpList();
}
}

counter++;
Expand Down
4 changes: 0 additions & 4 deletions apps/api-runtime/src/main/java/com/akto/runtime/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ public void run() {
logger.info("Committing offset at position: " + lastSyncOffset);
}

if (tryForCollectionName(r.value())) {
continue;
}

httpResponseParams = HttpCallParser.parseKafkaMessage(r.value());
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e, "Error while parsing kafka message " + e, LogDb.RUNTIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ public void fillApiInfoInCatalog(ApiInfo apiInfo, Map<Integer, FilterSampleData

}

public void main(List<HttpResponseParams> httpResponseParamsList) throws Exception {
public void main(List<HttpResponseParams> httpResponseParamsList, List<String> partnerIpsList) throws Exception {
if (httpResponseParamsList == null) httpResponseParamsList = new ArrayList<>();
loggerMaker.infoAndAddToDb("AktoPolicy main: httpResponseParamsList size: " + httpResponseParamsList.size(), LogDb.RUNTIME);
for (HttpResponseParams httpResponseParams: httpResponseParamsList) {
try {
process(httpResponseParams);
process(httpResponseParams, partnerIpsList);
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e.toString(), LogDb.RUNTIME);
;
Expand All @@ -154,20 +154,13 @@ public static ApiInfoKey generateFromHttpResponseParams(HttpResponseParams httpR
return new ApiInfo.ApiInfoKey(apiCollectionId, url, method);
}

public void process(HttpResponseParams httpResponseParams) throws Exception {
public void process(HttpResponseParams httpResponseParams, List<String> partnerIpsList) throws Exception {
List<CustomAuthType> customAuthTypes = SingleTypeInfo.getCustomAuthType(Integer.parseInt(httpResponseParams.getAccountId()));
ApiInfo.ApiInfoKey apiInfoKey = generateFromHttpResponseParams(httpResponseParams);
PolicyCatalog policyCatalog = getApiInfoFromMap(apiInfoKey);
policyCatalog.setSeenEarlier(true);
ApiInfo apiInfo = policyCatalog.getApiInfo();

List<String> partnerIpsList = new ArrayList<>();

AccountSettings accountSettings = AccountSettingsDao.instance.findOne(new BasicDBObject());
if(accountSettings != null){
partnerIpsList = accountSettings.getPartnerIpList();
}

Map<Integer, FilterSampleData> filterSampleDataMap = policyCatalog.getFilterSampleDataMap();
if (filterSampleDataMap == null) {
filterSampleDataMap = new HashMap<>();
Expand Down
125 changes: 113 additions & 12 deletions apps/api-runtime/src/test/java/com/akto/parsers/TestDump2.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
package com.akto.parsers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

import com.akto.MongoBasedTest;
import com.akto.dao.*;
import com.akto.dao.context.Context;
import com.akto.dto.*;
import com.akto.dto.traffic.SampleData;
import com.akto.dto.type.*;
import com.akto.dto.type.SingleTypeInfo.SubType;
import com.akto.dto.type.SingleTypeInfo.SuperType;
import com.akto.dto.type.URLMethods.Method;
import com.akto.dto.HttpResponseParams;
import com.akto.dto.IgnoreData;
import com.akto.dto.AktoDataType;
import com.akto.dto.HttpRequestParams;
import com.akto.runtime.APICatalogSync;
import com.akto.runtime.URLAggregator;
import com.akto.types.CappedSet;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;

import com.mongodb.client.model.Filters;
import org.junit.Test;

public class TestDump2 extends MongoBasedTest {
Expand Down Expand Up @@ -177,6 +172,112 @@ public void simpleTest() {
assertEquals(24, sync.getDBUpdatesForParams(sync.getDelta(2), sync.getDbState(2),false, false).bulkUpdatesForSingleTypeInfo.size());
}

@Test
public void testEndToEnd() throws Exception {
ApiCollectionsDao.instance.getMCollection().drop();
ApiInfoDao.instance.getMCollection().drop();
SampleDataDao.instance.getMCollection().drop();
SensitiveSampleDataDao.instance.getMCollection().drop();
TrafficInfoDao.instance.getMCollection().drop();
SingleTypeInfoDao.instance.getMCollection().drop();

// requestHeaders: 2
// responseHeaders: 1
// requestPayload: 6
// responsePayload: 6
// total: 15
String host = "company.io";
int apiCollectionId = host.hashCode();
String url = "/api/books";
String method = "POST";

String message = "{\n" +
" \"akto_account_id\": \"" + Context.accountId.get() + "\",\n" +
" \"path\": \"" + url + "\",\n" +
" \"requestHeaders\": \"{'host': '" + host + "', 'user-agent': 'chrome' }\",\n" +
" \"responseHeaders\": \"{'token': 'token'}\",\n" +
" \"method\": \"" + method + "\",\n" +
" \"requestPayload\": \"{'user_req': 'user1_req', 'friends_req': [{'name_req':'friend1_req'},{'name_req':'friend2_req'}], 'role_req': {'name_req': 'admin_req', 'accesses_req': [{'billing_req': true, 'user_req' : 'true'}] }, 'gifts_req': [1,2,3] }\",\n" +
" \"responsePayload\": \"{'user_resp': 'user1_resp', 'friends_resp': [{'name_resp':'friend1_resp'},{'name_resp':'friend2_resp'}], 'role_resp': {'name_resp': 'admin_resp', 'accesses_resp': [{'billing_resp': true, 'user_resp' : 'true'}] }, 'gifts_resp': [1,2,3] }\",\n" +
" \"ip\": \"0.0.0.0\",\n" +
" \"time\": \"1721382413\",\n" +
" \"statusCode\": \"200\",\n" +
" \"type\": \"type\",\n" +
" \"status\": \"status\",\n" +
" \"contentType\": \"contentType\",\n" +
" \"source\": \"MIRRORING\",\n" +
" \"akto_vxlan_id\": \"0\"\n" +
"}\n";

HttpResponseParams httpResponseParams = HttpCallParser.parseKafkaMessage(message);
HttpCallParser httpCallParser = new HttpCallParser("",0,0, 0, true);
httpCallParser.syncFunction(Collections.singletonList(httpResponseParams), true, true, new AccountSettings());

List<SingleTypeInfo> singleTypeInfos = SingleTypeInfoDao.instance.findAll(new BasicDBObject());
assertEquals(15, singleTypeInfos.size());

SingleTypeInfo singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "user_req", -1, false);
SingleTypeInfo info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "friends_req#$#name_req", -1, false);
info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "role_req#name_req", -1, false);
info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.TRUE, "role_req#accesses_req#$#billing_req", -1, false);
info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "role_req#accesses_req#$#user_req", -1, false);
info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.INTEGER_32, "gifts_req#$", -1, false);
info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.TRUE, "role_resp#accesses_resp#$#billing_resp", 200, false);
info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo));
assertNotNull(info);

// sample data
long sampleDataCount = SampleDataDao.instance.getMCollection().estimatedDocumentCount();
assertEquals(1, sampleDataCount);
SampleData sampleData = SampleDataDao.instance.findOne(Filters.and(
Filters.eq("_id.apiCollectionId", apiCollectionId),
Filters.eq("_id.url", url),
Filters.eq("_id.method", method)
));
assertEquals(1, sampleData.getSamples().size());

// api info
long apiInfoCount = ApiInfoDao.instance.getMCollection().estimatedDocumentCount();
assertEquals(1, apiInfoCount);
ApiInfo apiInfo = ApiInfoDao.instance.findOne(Filters.and(
Filters.eq("_id.apiCollectionId", apiCollectionId),
Filters.eq("_id.url", url),
Filters.eq("_id.method", method)
));
assertTrue(apiInfo.getLastSeen() > 0);

// api collection
long apiCollectionsCount = ApiCollectionsDao.instance.getMCollection().estimatedDocumentCount();
assertEquals(1, apiCollectionsCount);
ApiCollection apiCollection = ApiCollectionsDao.instance.findOne(Filters.eq("_id", apiCollectionId));
assertEquals(host, apiCollection.getHostName());

}

static SingleTypeInfo generateSTI(int apiCollectionId, String url, String method, SubType subType, String param, int responseCode, boolean isHeader) {
SingleTypeInfo.ParamId paramId = new SingleTypeInfo.ParamId(url, method, responseCode, isHeader, param, subType, apiCollectionId, false);
return new SingleTypeInfo(paramId, new HashSet<>(), new HashSet<>(), 0, 0,0, new CappedSet<>(), null, 0,0);
}


@Test
public void getParamsTest() {
String baseurl = "https://someapi.com/example";
Expand Down
Loading

0 comments on commit a649f84

Please sign in to comment.