diff --git a/apps/api-runtime/src/main/java/com/akto/parsers/HttpCallParser.java b/apps/api-runtime/src/main/java/com/akto/parsers/HttpCallParser.java index 9dc67f9812..af9aab23ca 100644 --- a/apps/api-runtime/src/main/java/com/akto/parsers/HttpCallParser.java +++ b/apps/api-runtime/src/main/java/com/akto/parsers/HttpCallParser.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.bson.conversions.Bson; +import com.alibaba.fastjson2.*; import java.io.IOException; import java.util.*; @@ -95,14 +96,14 @@ public HttpCallParser(String userIdentifier, int thresh, int sync_threshold_coun public static HttpResponseParams parseKafkaMessage(String message) throws Exception { //convert java object to JSON format - Map json = gson.fromJson(message, Map.class); + JSONObject jsonObject = JSON.parseObject(message); - String method = (String) json.get("method"); - String url = (String) json.get("path"); - String type = (String) json.get("type"); - Map> requestHeaders = OriginalHttpRequest.buildHeadersMap(json, "requestHeaders"); + String method = jsonObject.getString("method"); + String url = jsonObject.getString("path"); + String type = jsonObject.getString("type"); + Map> requestHeaders = OriginalHttpRequest.buildHeadersMap(jsonObject, "requestHeaders"); - String rawRequestPayload = (String) json.get("requestPayload"); + String rawRequestPayload = jsonObject.getString("requestPayload"); String requestPayload = HttpRequestResponseUtils.rawToJsonString(rawRequestPayload,requestHeaders); if (GRPC_DEBUG_COUNTER > 0) { @@ -116,7 +117,7 @@ public static HttpResponseParams parseKafkaMessage(String message) throws Except } } - String apiCollectionIdStr = json.getOrDefault("akto_vxlan_id", "0").toString(); + String apiCollectionIdStr = jsonObject.getOrDefault("akto_vxlan_id", "0").toString(); int apiCollectionId = 0; if (NumberUtils.isDigits(apiCollectionIdStr)) { apiCollectionId = NumberUtils.toInt(apiCollectionIdStr, 0); @@ -126,19 +127,19 @@ public static HttpResponseParams parseKafkaMessage(String message) throws Except method,url,type, requestHeaders, requestPayload, apiCollectionId ); - int statusCode = Integer.parseInt(json.get("statusCode").toString()); - String status = (String) json.get("status"); - Map> responseHeaders = OriginalHttpRequest.buildHeadersMap(json, "responseHeaders"); - String payload = (String) json.get("responsePayload"); + int statusCode = jsonObject.getInteger("statusCode"); + String status = jsonObject.getString("status"); + Map> responseHeaders = OriginalHttpRequest.buildHeadersMap(jsonObject, "responseHeaders"); + String payload = jsonObject.getString("responsePayload"); payload = HttpRequestResponseUtils.rawToJsonString(payload, responseHeaders); payload = JSONUtils.parseIfJsonP(payload); - int time = Integer.parseInt(json.get("time").toString()); - String accountId = (String) json.get("akto_account_id"); - String sourceIP = (String) json.get("ip"); + int time = jsonObject.getInteger("time"); + String accountId = jsonObject.getString("akto_account_id"); + String sourceIP = jsonObject.getString("ip"); - String isPendingStr = (String) json.getOrDefault("is_pending", "false"); + String isPendingStr = (String) jsonObject.getOrDefault("is_pending", "false"); boolean isPending = !isPendingStr.toLowerCase().equals("false"); - String sourceStr = (String) json.getOrDefault("source", HttpResponseParams.Source.OTHER.name()); + String sourceStr = (String) jsonObject.getOrDefault("source", HttpResponseParams.Source.OTHER.name()); HttpResponseParams.Source source = HttpResponseParams.Source.valueOf(sourceStr); return new HttpResponseParams( diff --git a/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java b/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java index 544ba667e3..6c93a80b27 100644 --- a/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java +++ b/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java @@ -27,7 +27,10 @@ import com.akto.types.CappedSet; import com.akto.usage.UsageMetricCalculator; import com.akto.usage.UsageMetricHandler; +import com.akto.util.JSONUtils; import com.akto.utils.RedactSampleData; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.google.api.client.util.Charsets; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; @@ -44,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -81,6 +85,10 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo this.aktoPolicyNew = new AktoPolicyNew(); if (buildFromDb) { buildFromDB(false, fetchAllSTI); + AccountSettings accountSettings = AccountSettingsDao.instance.findOne(AccountSettingsDao.generateFilter()); + if (accountSettings != null && accountSettings.getPartnerIpList() != null) { + partnerIpsList = accountSettings.getPartnerIpList(); + } } } @@ -120,10 +128,9 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams } requestTemplate.processHeaders(requestParams.getHeaders(), baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp); - BasicDBObject requestPayload = RequestTemplate.parseRequestPayload(requestParams, urlWithParams); - if (requestPayload != null) { - deletedInfo.addAll(requestTemplate.process2(requestPayload, baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); - } + JSONObject jsonObject = RequestTemplate.parseRequestPayloadToJsonObject(requestParams.getPayload(), urlWithParams); + Map> flattened = JSONUtils.flattenJSONObject(jsonObject); + deletedInfo.addAll(requestTemplate.process2(flattened, baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); requestTemplate.recordMessage(responseParams.getOrig()); Map responseTemplates = requestTemplate.getResponseTemplates(); @@ -145,15 +152,15 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams respPayload = "{\"json\": "+respPayload+"}"; } - - BasicDBObject payload; + JSONObject payload; try { - payload = BasicDBObject.parse(respPayload); + payload = JSON.parseObject(respPayload); } catch (Exception e) { - payload = BasicDBObject.parse("{}"); + payload = JSON.parseObject("{}"); } - deletedInfo.addAll(responseTemplate.process2(payload, baseURL.getUrl(), methodStr, statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); + flattened = JSONUtils.flattenJSONObject(payload); + deletedInfo.addAll(responseTemplate.process2(flattened, baseURL.getUrl(), methodStr, statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); responseTemplate.processHeaders(responseParams.getHeaders(), baseURL.getUrl(), method.name(), statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp); if (!responseParams.getIsPending()) { responseTemplate.processTraffic(responseParams.getTime()); @@ -206,7 +213,7 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe Set value = entry.getValue(); for (HttpResponseParams responseParams: value) { try { - aktoPolicyNew.process(responseParams); + aktoPolicyNew.process(responseParams, partnerIpsList); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1706,6 +1713,7 @@ private static Map build(List allParams, Bl } int counter = 0; + List partnerIpsList = new ArrayList<>(); public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit syncLimit) { loggerMaker.infoAndAddToDb("Started sync with db! syncImmediately="+syncImmediately + " fetchAllSTI="+fetchAllSTI, LogDb.RUNTIME); @@ -1725,6 +1733,9 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit s boolean redact = false; if (accountSettings != null) { redact = accountSettings.isRedactPayload(); + if (accountSettings.getPartnerIpList() != null) { + partnerIpsList = accountSettings.getPartnerIpList(); + } } counter++; diff --git a/apps/api-runtime/src/main/java/com/akto/runtime/Main.java b/apps/api-runtime/src/main/java/com/akto/runtime/Main.java index 867c9b8e45..5a03b38ad1 100644 --- a/apps/api-runtime/src/main/java/com/akto/runtime/Main.java +++ b/apps/api-runtime/src/main/java/com/akto/runtime/Main.java @@ -232,10 +232,6 @@ public void run() { logger.info("Committing offset at position: " + lastSyncOffset); } - if (tryForCollectionName(r.value())) { - continue; - } - httpResponseParams = HttpCallParser.parseKafkaMessage(r.value()); } catch (Exception e) { loggerMaker.errorAndAddToDb(e, "Error while parsing kafka message " + e, LogDb.RUNTIME); diff --git a/apps/api-runtime/src/main/java/com/akto/runtime/policies/AktoPolicyNew.java b/apps/api-runtime/src/main/java/com/akto/runtime/policies/AktoPolicyNew.java index ca5861f749..365a946c69 100644 --- a/apps/api-runtime/src/main/java/com/akto/runtime/policies/AktoPolicyNew.java +++ b/apps/api-runtime/src/main/java/com/akto/runtime/policies/AktoPolicyNew.java @@ -127,12 +127,12 @@ public void fillApiInfoInCatalog(ApiInfo apiInfo, Map httpResponseParamsList) throws Exception { + public void main(List httpResponseParamsList, List partnerIpsList) throws Exception { if (httpResponseParamsList == null) httpResponseParamsList = new ArrayList<>(); loggerMaker.infoAndAddToDb("AktoPolicy main: httpResponseParamsList size: " + httpResponseParamsList.size(), LogDb.RUNTIME); for (HttpResponseParams httpResponseParams: httpResponseParamsList) { try { - process(httpResponseParams); + process(httpResponseParams, partnerIpsList); } catch (Exception e) { loggerMaker.errorAndAddToDb(e.toString(), LogDb.RUNTIME); ; @@ -154,20 +154,13 @@ public static ApiInfoKey generateFromHttpResponseParams(HttpResponseParams httpR return new ApiInfo.ApiInfoKey(apiCollectionId, url, method); } - public void process(HttpResponseParams httpResponseParams) throws Exception { + public void process(HttpResponseParams httpResponseParams, List partnerIpsList) throws Exception { List customAuthTypes = SingleTypeInfo.getCustomAuthType(Integer.parseInt(httpResponseParams.getAccountId())); ApiInfo.ApiInfoKey apiInfoKey = generateFromHttpResponseParams(httpResponseParams); PolicyCatalog policyCatalog = getApiInfoFromMap(apiInfoKey); policyCatalog.setSeenEarlier(true); ApiInfo apiInfo = policyCatalog.getApiInfo(); - List partnerIpsList = new ArrayList<>(); - - AccountSettings accountSettings = AccountSettingsDao.instance.findOne(new BasicDBObject()); - if(accountSettings != null){ - partnerIpsList = accountSettings.getPartnerIpList(); - } - Map filterSampleDataMap = policyCatalog.getFilterSampleDataMap(); if (filterSampleDataMap == null) { filterSampleDataMap = new HashMap<>(); diff --git a/apps/api-runtime/src/test/java/com/akto/parsers/TestDump2.java b/apps/api-runtime/src/test/java/com/akto/parsers/TestDump2.java index 38008f1336..d209ca4605 100644 --- a/apps/api-runtime/src/test/java/com/akto/parsers/TestDump2.java +++ b/apps/api-runtime/src/test/java/com/akto/parsers/TestDump2.java @@ -1,31 +1,26 @@ package com.akto.parsers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import com.akto.MongoBasedTest; +import com.akto.dao.*; import com.akto.dao.context.Context; +import com.akto.dto.*; +import com.akto.dto.traffic.SampleData; import com.akto.dto.type.*; import com.akto.dto.type.SingleTypeInfo.SubType; import com.akto.dto.type.SingleTypeInfo.SuperType; import com.akto.dto.type.URLMethods.Method; -import com.akto.dto.HttpResponseParams; -import com.akto.dto.IgnoreData; -import com.akto.dto.AktoDataType; -import com.akto.dto.HttpRequestParams; import com.akto.runtime.APICatalogSync; import com.akto.runtime.URLAggregator; +import com.akto.types.CappedSet; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; +import com.mongodb.client.model.Filters; import org.junit.Test; public class TestDump2 extends MongoBasedTest { @@ -177,6 +172,112 @@ public void simpleTest() { assertEquals(24, sync.getDBUpdatesForParams(sync.getDelta(2), sync.getDbState(2),false, false).bulkUpdatesForSingleTypeInfo.size()); } + @Test + public void testEndToEnd() throws Exception { + ApiCollectionsDao.instance.getMCollection().drop(); + ApiInfoDao.instance.getMCollection().drop(); + SampleDataDao.instance.getMCollection().drop(); + SensitiveSampleDataDao.instance.getMCollection().drop(); + TrafficInfoDao.instance.getMCollection().drop(); + SingleTypeInfoDao.instance.getMCollection().drop(); + + // requestHeaders: 2 + // responseHeaders: 1 + // requestPayload: 6 + // responsePayload: 6 + // total: 15 + String host = "company.io"; + int apiCollectionId = host.hashCode(); + String url = "/api/books"; + String method = "POST"; + + String message = "{\n" + + " \"akto_account_id\": \"" + Context.accountId.get() + "\",\n" + + " \"path\": \"" + url + "\",\n" + + " \"requestHeaders\": \"{'host': '" + host + "', 'user-agent': 'chrome' }\",\n" + + " \"responseHeaders\": \"{'token': 'token'}\",\n" + + " \"method\": \"" + method + "\",\n" + + " \"requestPayload\": \"{'user_req': 'user1_req', 'friends_req': [{'name_req':'friend1_req'},{'name_req':'friend2_req'}], 'role_req': {'name_req': 'admin_req', 'accesses_req': [{'billing_req': true, 'user_req' : 'true'}] }, 'gifts_req': [1,2,3] }\",\n" + + " \"responsePayload\": \"{'user_resp': 'user1_resp', 'friends_resp': [{'name_resp':'friend1_resp'},{'name_resp':'friend2_resp'}], 'role_resp': {'name_resp': 'admin_resp', 'accesses_resp': [{'billing_resp': true, 'user_resp' : 'true'}] }, 'gifts_resp': [1,2,3] }\",\n" + + " \"ip\": \"0.0.0.0\",\n" + + " \"time\": \"1721382413\",\n" + + " \"statusCode\": \"200\",\n" + + " \"type\": \"type\",\n" + + " \"status\": \"status\",\n" + + " \"contentType\": \"contentType\",\n" + + " \"source\": \"MIRRORING\",\n" + + " \"akto_vxlan_id\": \"0\"\n" + + "}\n"; + + HttpResponseParams httpResponseParams = HttpCallParser.parseKafkaMessage(message); + HttpCallParser httpCallParser = new HttpCallParser("",0,0, 0, true); + httpCallParser.syncFunction(Collections.singletonList(httpResponseParams), true, true, new AccountSettings()); + + List singleTypeInfos = SingleTypeInfoDao.instance.findAll(new BasicDBObject()); + assertEquals(15, singleTypeInfos.size()); + + SingleTypeInfo singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "user_req", -1, false); + SingleTypeInfo info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "friends_req#$#name_req", -1, false); + info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "role_req#name_req", -1, false); + info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.TRUE, "role_req#accesses_req#$#billing_req", -1, false); + info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.GENERIC, "role_req#accesses_req#$#user_req", -1, false); + info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.INTEGER_32, "gifts_req#$", -1, false); + info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + singleTypeInfo = generateSTI(apiCollectionId, url, method, SingleTypeInfo.TRUE, "role_resp#accesses_resp#$#billing_resp", 200, false); + info = SingleTypeInfoDao.instance.findOne(SingleTypeInfoDao.createFilters(singleTypeInfo)); + assertNotNull(info); + + // sample data + long sampleDataCount = SampleDataDao.instance.getMCollection().estimatedDocumentCount(); + assertEquals(1, sampleDataCount); + SampleData sampleData = SampleDataDao.instance.findOne(Filters.and( + Filters.eq("_id.apiCollectionId", apiCollectionId), + Filters.eq("_id.url", url), + Filters.eq("_id.method", method) + )); + assertEquals(1, sampleData.getSamples().size()); + + // api info + long apiInfoCount = ApiInfoDao.instance.getMCollection().estimatedDocumentCount(); + assertEquals(1, apiInfoCount); + ApiInfo apiInfo = ApiInfoDao.instance.findOne(Filters.and( + Filters.eq("_id.apiCollectionId", apiCollectionId), + Filters.eq("_id.url", url), + Filters.eq("_id.method", method) + )); + assertTrue(apiInfo.getLastSeen() > 0); + + // api collection + long apiCollectionsCount = ApiCollectionsDao.instance.getMCollection().estimatedDocumentCount(); + assertEquals(1, apiCollectionsCount); + ApiCollection apiCollection = ApiCollectionsDao.instance.findOne(Filters.eq("_id", apiCollectionId)); + assertEquals(host, apiCollection.getHostName()); + + } + + static SingleTypeInfo generateSTI(int apiCollectionId, String url, String method, SubType subType, String param, int responseCode, boolean isHeader) { + SingleTypeInfo.ParamId paramId = new SingleTypeInfo.ParamId(url, method, responseCode, isHeader, param, subType, apiCollectionId, false); + return new SingleTypeInfo(paramId, new HashSet<>(), new HashSet<>(), 0, 0,0, new CappedSet<>(), null, 0,0); + } + + @Test public void getParamsTest() { String baseurl = "https://someapi.com/example"; diff --git a/apps/api-runtime/src/test/java/com/akto/parsers/TestMergingNew.java b/apps/api-runtime/src/test/java/com/akto/parsers/TestMergingNew.java index a4fbd773db..0fbef78c04 100644 --- a/apps/api-runtime/src/test/java/com/akto/parsers/TestMergingNew.java +++ b/apps/api-runtime/src/test/java/com/akto/parsers/TestMergingNew.java @@ -275,17 +275,17 @@ public void testNonJsonResponsePayloadPayload() throws Exception { parser.syncFunction(responseParams.subList(10,25), false, true, null); parser.apiCatalogSync.syncWithDB(false, true, SyncLimit.noLimit); - APICatalogSync.mergeUrlsAndSave(0,true, false, parser.apiCatalogSync.existingAPIsInDb); + APICatalogSync.mergeUrlsAndSave(123,true, false, parser.apiCatalogSync.existingAPIsInDb); parser.apiCatalogSync.buildFromDB(false, true); parser.syncFunction(responseParams.subList(25,30), false, true, null); parser.apiCatalogSync.syncWithDB(false, true, SyncLimit.noLimit); - APICatalogSync.mergeUrlsAndSave(0,true, false, parser.apiCatalogSync.existingAPIsInDb); + APICatalogSync.mergeUrlsAndSave(123,true, false, parser.apiCatalogSync.existingAPIsInDb); parser.apiCatalogSync.buildFromDB(false, true); - Map urlTemplateMap = parser.apiCatalogSync.getDbState(0).getTemplateURLToMethods(); - Map urlStaticMap = parser.apiCatalogSync.getDbState(0).getStrictURLToMethods(); + Map urlTemplateMap = parser.apiCatalogSync.getDbState(123).getTemplateURLToMethods(); + Map urlStaticMap = parser.apiCatalogSync.getDbState(123).getStrictURLToMethods(); assertEquals(urlTemplateMap.size(), 1); assertEquals(urlStaticMap.size(), 0); @@ -318,12 +318,12 @@ public void testEmptyResponsePayload() throws Exception { parser.apiCatalogSync.syncWithDB(false, true, SyncLimit.noLimit); parser.syncFunction(responseParams.subList(25,30), false, true, null); parser.apiCatalogSync.syncWithDB(false, true, SyncLimit.noLimit); - APICatalogSync.mergeUrlsAndSave(0,true, false, parser.apiCatalogSync.existingAPIsInDb); + APICatalogSync.mergeUrlsAndSave(123,true, false, parser.apiCatalogSync.existingAPIsInDb); parser.apiCatalogSync.buildFromDB(false, true); - Map urlTemplateMap = parser.apiCatalogSync.getDbState(0).getTemplateURLToMethods(); - Map urlStaticMap = parser.apiCatalogSync.getDbState(0).getStrictURLToMethods(); + Map urlTemplateMap = parser.apiCatalogSync.getDbState(123).getTemplateURLToMethods(); + Map urlStaticMap = parser.apiCatalogSync.getDbState(123).getStrictURLToMethods(); assertEquals(urlTemplateMap.size(), 30); assertEquals(urlStaticMap.size(), 0); diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_parsers/HttpCallParser.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_parsers/HttpCallParser.java index 58218fb199..290decc9e0 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_parsers/HttpCallParser.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_parsers/HttpCallParser.java @@ -29,6 +29,7 @@ import okhttp3.*; import org.apache.commons.lang3.math.NumberUtils; import org.bson.conversions.Bson; +import com.alibaba.fastjson2.*; import java.io.IOException; import java.util.*; @@ -87,19 +88,19 @@ public HttpCallParser(String userIdentifier, int thresh, int sync_threshold_coun public static HttpResponseParams parseKafkaMessage(String message) throws Exception { //convert java object to JSON format - Map json = gson.fromJson(message, Map.class); + JSONObject jsonObject = JSON.parseObject(message); - String method = (String) json.get("method"); - String url = (String) json.get("path"); - String type = (String) json.get("type"); - Map> requestHeaders = OriginalHttpRequest.buildHeadersMap(json, "requestHeaders"); + String method = jsonObject.getString("method"); + String url = jsonObject.getString("path"); + String type = jsonObject.getString("type"); + Map> requestHeaders = OriginalHttpRequest.buildHeadersMap(jsonObject, "requestHeaders"); - String rawRequestPayload = (String) json.get("requestPayload"); + String rawRequestPayload = jsonObject.getString("requestPayload"); String requestPayload = HttpRequestResponseUtils.rawToJsonString(rawRequestPayload,requestHeaders); - String apiCollectionIdStr = json.getOrDefault("akto_vxlan_id", "0").toString(); + String apiCollectionIdStr = jsonObject.getOrDefault("akto_vxlan_id", "0").toString(); int apiCollectionId = 0; if (NumberUtils.isDigits(apiCollectionIdStr)) { apiCollectionId = NumberUtils.toInt(apiCollectionIdStr, 0); @@ -109,19 +110,19 @@ public static HttpResponseParams parseKafkaMessage(String message) throws Except method,url,type, requestHeaders, requestPayload, apiCollectionId ); - int statusCode = Integer.parseInt(json.get("statusCode").toString()); - String status = (String) json.get("status"); - Map> responseHeaders = OriginalHttpRequest.buildHeadersMap(json, "responseHeaders"); - String payload = (String) json.get("responsePayload"); + int statusCode = jsonObject.getInteger("statusCode"); + String status = jsonObject.getString("status"); + Map> responseHeaders = OriginalHttpRequest.buildHeadersMap(jsonObject, "responseHeaders"); + String payload = jsonObject.getString("responsePayload"); payload = HttpRequestResponseUtils.rawToJsonString(payload, responseHeaders); payload = JSONUtils.parseIfJsonP(payload); - int time = Integer.parseInt(json.get("time").toString()); - String accountId = (String) json.get("akto_account_id"); - String sourceIP = (String) json.get("ip"); + int time = jsonObject.getInteger("time"); + String accountId = jsonObject.getString("akto_account_id"); + String sourceIP = jsonObject.getString("ip"); - String isPendingStr = (String) json.getOrDefault("is_pending", "false"); + String isPendingStr = (String) jsonObject.getOrDefault("is_pending", "false"); boolean isPending = !isPendingStr.toLowerCase().equals("false"); - String sourceStr = (String) json.getOrDefault("source", HttpResponseParams.Source.OTHER.name()); + String sourceStr = (String) jsonObject.getOrDefault("source", HttpResponseParams.Source.OTHER.name()); HttpResponseParams.Source source = HttpResponseParams.Source.valueOf(sourceStr); return new HttpResponseParams( diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java index 05fc2bedfd..9a32eabc8c 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java @@ -28,7 +28,10 @@ import com.akto.hybrid_runtime.policies.AktoPolicyNew; import com.akto.task.Cluster; import com.akto.types.CappedSet; +import com.akto.util.JSONUtils; import com.akto.utils.RedactSampleData; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.mongodb.BasicDBObject; import com.mongodb.ConnectionString; import com.mongodb.bulk.BulkWriteResult; @@ -80,6 +83,10 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo this.aktoPolicyNew = new AktoPolicyNew(); if (buildFromDb) { buildFromDB(false, fetchAllSTI); + AccountSettings accountSettings = dataActor.fetchAccountSettings(); + if (accountSettings != null && accountSettings.getPartnerIpList() != null) { + partnerIpsList = accountSettings.getPartnerIpList(); + } } } @@ -119,10 +126,9 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams } requestTemplate.processHeaders(requestParams.getHeaders(), baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp); - BasicDBObject requestPayload = RequestTemplate.parseRequestPayload(requestParams, urlWithParams); - if (requestPayload != null) { - deletedInfo.addAll(requestTemplate.process2(requestPayload, baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); - } + JSONObject jsonObject = RequestTemplate.parseRequestPayloadToJsonObject(requestParams.getPayload(), urlWithParams); + Map> flattened = JSONUtils.flattenJSONObject(jsonObject); + deletedInfo.addAll(requestTemplate.process2(flattened, baseURL.getUrl(), methodStr, -1, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); requestTemplate.recordMessage(responseParams.getOrig()); Map responseTemplates = requestTemplate.getResponseTemplates(); @@ -144,15 +150,15 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams respPayload = "{\"json\": "+respPayload+"}"; } - - BasicDBObject payload; + JSONObject payload; try { - payload = BasicDBObject.parse(respPayload); + payload = JSON.parseObject(respPayload); } catch (Exception e) { - payload = BasicDBObject.parse("{}"); + payload = JSON.parseObject("{}"); } - deletedInfo.addAll(responseTemplate.process2(payload, baseURL.getUrl(), methodStr, statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); + flattened = JSONUtils.flattenJSONObject(payload); + deletedInfo.addAll(responseTemplate.process2(flattened, baseURL.getUrl(), methodStr, statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp)); responseTemplate.processHeaders(responseParams.getHeaders(), baseURL.getUrl(), method.name(), statusCode, userId, requestParams.getApiCollectionId(), responseParams.getOrig(), sensitiveParamInfoBooleanMap, timestamp); if (!responseParams.getIsPending()) { responseTemplate.processTraffic(responseParams.getTime()); @@ -205,7 +211,7 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe Set value = entry.getValue(); for (HttpResponseParams responseParams: value) { try { - aktoPolicyNew.process(responseParams); + aktoPolicyNew.process(responseParams, partnerIpsList); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1228,7 +1234,8 @@ private static Map build(List allParams) { } int counter = 0; - + List partnerIpsList = new ArrayList<>(); + public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI) { loggerMaker.infoAndAddToDb("Started sync with db! syncImmediately="+syncImmediately + " fetchAllSTI="+fetchAllSTI, LogDb.RUNTIME); List writesForParams = new ArrayList<>(); @@ -1247,6 +1254,9 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI) { boolean redact = false; if (accountSettings != null) { redact = accountSettings.isRedactPayload(); + if (accountSettings.getPartnerIpList() != null) { + partnerIpsList = accountSettings.getPartnerIpList(); + } } counter++; diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java index 71c91761a2..d35dfa7c79 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java @@ -238,10 +238,6 @@ public void run() { logger.info("Committing offset at position: " + lastSyncOffset); } - if (tryForCollectionName(r.value())) { - continue; - } - httpResponseParams = HttpCallParser.parseKafkaMessage(r.value()); } catch (Exception e) { loggerMaker.errorAndAddToDb(e, "Error while parsing kafka message " + e, LogDb.RUNTIME); diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/policies/AktoPolicyNew.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/policies/AktoPolicyNew.java index 472dc06462..9d862059fc 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/policies/AktoPolicyNew.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/policies/AktoPolicyNew.java @@ -128,12 +128,12 @@ public void fillApiInfoInCatalog(ApiInfo apiInfo, Map httpResponseParamsList) throws Exception { + public void main(List httpResponseParamsList, List partnerIpsList) throws Exception { if (httpResponseParamsList == null) httpResponseParamsList = new ArrayList<>(); loggerMaker.infoAndAddToDb("AktoPolicy main: httpResponseParamsList size: " + httpResponseParamsList.size(), LogDb.RUNTIME); for (HttpResponseParams httpResponseParams: httpResponseParamsList) { try { - process(httpResponseParams); + process(httpResponseParams, partnerIpsList); } catch (Exception e) { loggerMaker.errorAndAddToDb(e.toString(), LogDb.RUNTIME); ; @@ -155,20 +155,13 @@ public static ApiInfoKey generateFromHttpResponseParams(HttpResponseParams httpR return new ApiInfo.ApiInfoKey(apiCollectionId, url, method); } - public void process(HttpResponseParams httpResponseParams) throws Exception { + public void process(HttpResponseParams httpResponseParams, List partnerIpsList) throws Exception { List customAuthTypes = SingleTypeInfo.getCustomAuthType(Integer.parseInt(httpResponseParams.getAccountId())); ApiInfo.ApiInfoKey apiInfoKey = generateFromHttpResponseParams(httpResponseParams); PolicyCatalog policyCatalog = getApiInfoFromMap(apiInfoKey); policyCatalog.setSeenEarlier(true); ApiInfo apiInfo = policyCatalog.getApiInfo(); - List partnerIpsList = new ArrayList<>(); - - AccountSettings accountSettings = dataActor.fetchAccountSettings(); - if(accountSettings != null){ - partnerIpsList = accountSettings.getPartnerIpList(); - } - Map filterSampleDataMap = policyCatalog.getFilterSampleDataMap(); if (filterSampleDataMap == null) { filterSampleDataMap = new HashMap<>(); diff --git a/libs/dao/pom.xml b/libs/dao/pom.xml index bafa4e8f3f..3c6ce0b2f9 100644 --- a/libs/dao/pom.xml +++ b/libs/dao/pom.xml @@ -166,6 +166,11 @@ 3.12.4 test + + com.alibaba.fastjson2 + fastjson2 + 2.0.51 + diff --git a/libs/dao/src/main/java/com/akto/dto/OriginalHttpRequest.java b/libs/dao/src/main/java/com/akto/dto/OriginalHttpRequest.java index d3ae6fb3da..b9de1c4901 100644 --- a/libs/dao/src/main/java/com/akto/dto/OriginalHttpRequest.java +++ b/libs/dao/src/main/java/com/akto/dto/OriginalHttpRequest.java @@ -2,6 +2,8 @@ import com.akto.dto.type.RequestTemplate; import com.akto.util.HttpRequestResponseUtils; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import com.mongodb.BasicDBObject; @@ -236,7 +238,7 @@ public static Map> buildHeadersMap(Map json, String key) { } public static Map> buildHeadersMap(String headersString) { - Map headersFromRequest = gson.fromJson(headersString, Map.class); + JSONObject headersFromRequest = JSON.parseObject(headersString); Map> headers = new HashMap<>(); if (headersFromRequest == null) return headers; for (Object k: headersFromRequest.keySet()) { diff --git a/libs/dao/src/main/java/com/akto/dto/type/KeyTypes.java b/libs/dao/src/main/java/com/akto/dto/type/KeyTypes.java index 364b9fd73a..318152f1c9 100644 --- a/libs/dao/src/main/java/com/akto/dto/type/KeyTypes.java +++ b/libs/dao/src/main/java/com/akto/dto/type/KeyTypes.java @@ -128,13 +128,14 @@ private static SubType getSubtype(Object o,String key, boolean checkForSubtypes) } } - if (checkForSubtypes && isCreditCard(o.toString())) { + String oString = o.toString(); + if (checkForSubtypes && isCreditCard(oString)) { return SingleTypeInfo.CREDIT_CARD; } - if (NumberUtils.isDigits(o.toString())) { - if (o.toString().length() < 19) { - o = Long.parseLong(o.toString()); + if (NumberUtils.isDigits(oString)) { + if (oString.length() < 19) { + o = Long.parseLong(oString); } } @@ -152,8 +153,8 @@ private static SubType getSubtype(Object o,String key, boolean checkForSubtypes) return SingleTypeInfo.INTEGER_32; } - if (NumberUtils.isParsable(o.toString())) { - o = Float.parseFloat(o.toString()); + if (NumberUtils.isParsable(oString)) { + o = Float.parseFloat(oString); } if (o instanceof Float || o instanceof Double) { @@ -161,23 +162,22 @@ private static SubType getSubtype(Object o,String key, boolean checkForSubtypes) } if (o instanceof String) { - String str = o.toString(); for(Map.Entry entry: patternToSubType.entrySet()) { Pattern pattern = entry.getValue(); SubType subType = entry.getKey(); - if( ( checkForSubtypes || subType.getName().equals("URL") ) && pattern.matcher(str).matches()) { + if( ( checkForSubtypes || subType.getName().equals("URL") ) && pattern.matcher(oString).matches()) { return subType; } } - if (checkForSubtypes && isJWT(str)) { + if (checkForSubtypes && isJWT(oString)) { return SingleTypeInfo.JWT; } - if (checkForSubtypes && isPhoneNumber(str)) { + if (checkForSubtypes && isPhoneNumber(oString)) { return SingleTypeInfo.PHONE_NUMBER; } - if (checkForSubtypes && isIP(str)) { + if (checkForSubtypes && isIP(oString)) { return SingleTypeInfo.IP_ADDRESS; } @@ -250,11 +250,10 @@ public String toString() { public static boolean isPhoneNumber(String mobileNumber) { boolean lengthCondition = mobileNumber.length() < 8 || mobileNumber.length() > 16; + if (lengthCondition) return false; boolean alphabetsCondition = mobileNumber.toLowerCase() != mobileNumber.toUpperCase(); // contains alphabets - if (lengthCondition || alphabetsCondition) { - return false; - } + if (alphabetsCondition) return false; PhoneNumberUtil phoneNumberUtil = PhoneNumberUtil.getInstance(); @@ -294,17 +293,17 @@ public static boolean isJWT(String jwt) { } public static boolean isCreditCard(String s) { - if (s.length() < 12) return false; + if (s.length() < 12 || s.length() > 23) return false; + char firstChar = s.charAt(0); + if (!Character.isDigit(firstChar)) return false; + if (!s.toLowerCase().equals(s.toUpperCase())) return false; // only numbers String cc = s.replaceAll(" ", "").replaceAll("-", ""); - if (cc.length() > 23) return false; - if (!cc.toLowerCase().equals(cc.toUpperCase())) return false; // only numbers - return creditCardValidator.isValid(cc); - } + return creditCardValidator.isValid(cc); } public static boolean isIP(String s) { // for edge cases look at test cases of this function - boolean canBeIpv4 = (s.length() > 6) || (s.length() <= 15) && (s.split(".").length == 4); - boolean canBeIpv6 = (s.length() < 45 && s.split(":").length > 6); + boolean canBeIpv4 = s.length() > 6 && s.length() <= 15 && s.contains("."); + boolean canBeIpv6 = (s.length() <= 45 && s.contains(":")); if (!(canBeIpv4 || canBeIpv6)) return false; return ipAddressValidator.isValid(s); } diff --git a/libs/dao/src/main/java/com/akto/dto/type/RequestTemplate.java b/libs/dao/src/main/java/com/akto/dto/type/RequestTemplate.java index dff6d12cda..3852647e68 100644 --- a/libs/dao/src/main/java/com/akto/dto/type/RequestTemplate.java +++ b/libs/dao/src/main/java/com/akto/dto/type/RequestTemplate.java @@ -26,6 +26,8 @@ import com.akto.util.Pair; import com.akto.util.Trie; import com.akto.util.Trie.Node; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; @@ -125,20 +127,17 @@ public void recordMessage(String message) { public static long insertTime = 0, processTime = 0, deleteTime = 0; - public List process2(BasicDBObject payload, String url, String method, int responseCode, String userId, + public List process2(Map> flattened, String url, String method, int responseCode, String userId, int apiCollectionId, String rawMessage, Map sensitiveParamInfoBooleanMap, int timestamp) { List deleted = new ArrayList<>(); if(userIds.size() < 10) userIds.add(userId); - Trie.Node>> root = this.keyTrie.getRoot(); - long s = System.currentTimeMillis(); // insert(payload, userId, root, url, method, responseCode, "", apiCollectionId); insertTime += (System.currentTimeMillis() - s); int now = Context.now(); - Map> flattened = JSONUtils.flatten(payload); s = System.currentTimeMillis(); for(String param: flattened.keySet()) { if (parameters.size() > 1000) { @@ -646,6 +645,29 @@ public static BasicDBObject parseRequestPayload(HttpRequestParams requestParams, return parseRequestPayload(reqPayload, urlWithParams); } + public static JSONObject parseRequestPayloadToJsonObject(String reqPayload, String urlWithParams) { + BasicDBObject queryParams = getQueryJSON(urlWithParams); + + if (reqPayload == null || reqPayload.isEmpty()) { + reqPayload = "{}"; + } + + if(reqPayload.startsWith("[")) { + reqPayload = "{\"json\": "+reqPayload+"}"; + } + + + JSONObject payload = null; + try { + payload = JSON.parseObject(reqPayload); + } catch (Exception e) { + payload = JSON.parseObject("{}"); + } + + payload.putAll(queryParams.toMap()); + + return payload; + } public static BasicDBObject parseRequestPayload(String reqPayload, String urlWithParams) { diff --git a/libs/dao/src/main/java/com/akto/util/JSONUtils.java b/libs/dao/src/main/java/com/akto/util/JSONUtils.java index 27523bbabf..26854b7dc9 100644 --- a/libs/dao/src/main/java/com/akto/util/JSONUtils.java +++ b/libs/dao/src/main/java/com/akto/util/JSONUtils.java @@ -6,10 +6,65 @@ import com.google.gson.JsonParser; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import java.util.*; public class JSONUtils { + public static Map> flattenJSONObject(JSONObject jsonObject) { + Map> ret = new HashMap<>(); + if (jsonObject == null) return ret; + String prefix = ""; + flattenJSONObject(jsonObject, prefix, ret); + return ret; + } + + private static void flattenJSONObject(Object obj, String prefix, Map> ret) { + if (obj instanceof JSONObject) { + JSONObject jsonObject = (JSONObject) obj; + + Set keySet = jsonObject.keySet(); + + if (prefix != null && !prefix.isEmpty() && (keySet == null || keySet.isEmpty())) { + Set values = ret.getOrDefault(prefix, new HashSet<>()); + values.add(obj); + ret.put(prefix, values); + } + + for(String key: keySet) { + + if (key == null) { + continue; + } + boolean anyAlphabetExists = false; + + final int sz = key.length(); + for (int i = 0; i < sz; i++) { + final char nowChar = key.charAt(i); + if (Character.isLetter(nowChar)) { + anyAlphabetExists = true; + break; + } + } + + key = anyAlphabetExists ? key: "NUMBER"; + Object value = jsonObject.get(key); + flattenJSONObject(value, prefix + (prefix.isEmpty() ? "" : "#") + key, ret); + } + } else if (obj instanceof JSONArray) { + for(Object elem: (JSONArray) obj) { + flattenJSONObject(elem, prefix+(prefix.isEmpty() ? "$" : "#$"), ret); + } + } else { + Set values = ret.getOrDefault(prefix, new HashSet<>()); + values.add(obj); + ret.put(prefix, values); + } + } + + + private static void flatten(Object obj, String prefix, Map> ret) { if (obj instanceof BasicDBObject) { BasicDBObject basicDBObject = (BasicDBObject) obj; diff --git a/libs/dao/src/test/java/com/akto/dto/type/TestSubType.java b/libs/dao/src/test/java/com/akto/dto/type/TestSubType.java index 17022a5b7f..26588d810f 100644 --- a/libs/dao/src/test/java/com/akto/dto/type/TestSubType.java +++ b/libs/dao/src/test/java/com/akto/dto/type/TestSubType.java @@ -73,8 +73,6 @@ public void testCreditCard() { assertEquals(subType, SingleTypeInfo.CREDIT_CARD); subType = KeyTypes.findSubType("5267 318 1879 75449","",null); assertEquals(subType, SingleTypeInfo.CREDIT_CARD); - subType = KeyTypes.findSubType("5267 318 1879 75 4 49","",null); - assertEquals(subType, SingleTypeInfo.CREDIT_CARD); subType = KeyTypes.findSubType("5267-3181-8797-5449","",null); assertEquals(subType, SingleTypeInfo.CREDIT_CARD); subType = KeyTypes.findSubType("4111 1111 1111 1111","",null);