Skip to content

Commit

Permalink
Merge pull request #634 from akto-api-security/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
avneesh-akto authored Feb 22, 2023
2 parents 5e0c37b + 95589e7 commit 6a857cd
Show file tree
Hide file tree
Showing 67 changed files with 1,015 additions and 362 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
**/gen
libawesome.dylib
temp_*

*.templates-config.json
7 changes: 4 additions & 3 deletions apps/api-analyser/src/main/java/com/akto/analyser/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.akto.dao.context.Context;
import com.akto.dto.AccountSettings;
import com.akto.dto.HttpResponseParams;
import com.akto.log.LoggerMaker;
import com.akto.parsers.HttpCallParser;
import com.mongodb.ConnectionString;
import com.mongodb.client.model.Updates;
Expand All @@ -23,7 +24,7 @@

public class Main {
private Consumer<String, String> consumer;
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final LoggerMaker loggerMaker = new LoggerMaker(Main.class);

public static void main(String[] args) {
String mongoURI = System.getenv("AKTO_MONGO_CONN");;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void run() {
ConsumerRecords<String, String> records = main.consumer.poll(Duration.ofMillis(10000));
main.consumer.commitSync();
for (ConsumerRecord<String,String> r: records) {
if ( (i<1000 && i%100 == 0) || (i>10_000 && i%10_000==0)) logger.info(i+"");
if ( (i<1000 && i%100 == 0) || (i>10_000 && i%10_000==0)) loggerMaker.infoAndAddToDb("Count: " + i, LoggerMaker.LogDb.ANALYSER);
i ++;

try {
Expand All @@ -84,7 +85,7 @@ public void run() {
resourceAnalyser.analyse(httpResponseParams);
} catch (Exception e) {
// todo: check cause
logger.error("Error parsing http response params : " + e.getMessage() + " " + e.getCause());
loggerMaker.errorAndAddToDb("Error parsing http response params : " + e.getMessage() + " " + e.getCause(), LoggerMaker.LogDb.ANALYSER);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.akto.dao.context.Context;
import com.akto.dto.*;
import com.akto.dto.type.*;
import com.akto.log.LoggerMaker;
import com.akto.parsers.HttpCallParser;
import com.akto.runtime.APICatalogSync;
import com.akto.runtime.URLAggregator;
Expand All @@ -13,6 +14,7 @@
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.mongodb.BasicDBObject;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.*;
import org.bson.conversions.Bson;

Expand All @@ -25,6 +27,8 @@ public class ResourceAnalyser {

int last_sync = 0;

private static final LoggerMaker loggerMaker = new LoggerMaker(ResourceAnalyser.class);

public ResourceAnalyser(int duplicateCheckerBfSize, double duplicateCheckerBfFpp, int valuesBfSize, double valuesBfFpp) {
duplicateCheckerBF = BloomFilter.create(
Funnels.stringFunnel(Charsets.UTF_8), duplicateCheckerBfSize, duplicateCheckerBfFpp
Expand Down Expand Up @@ -81,6 +85,8 @@ public URLStatic matchWithUrlStatic(int apiCollectionId, String url, String meth
return null;
}

private final Set<String> hostsSeen = new HashSet<>();


public void analyse(HttpResponseParams responseParams) {
if (responseParams.statusCode < 200 || responseParams.statusCode >= 300) return;
Expand All @@ -94,17 +100,29 @@ public void analyse(HttpResponseParams responseParams) {

// user id
Map<String,List<String>> headers = requestParams.getHeaders();
if (headers == null) return;
if (headers == null) {
loggerMaker.infoAndAddToDb("No headers", LoggerMaker.LogDb.ANALYSER);
return;
}

List<String> ipList = headers.get(X_FORWARDED_FOR);
if (ipList == null || ipList.isEmpty()) return;
if (ipList == null || ipList.isEmpty()) {
loggerMaker.infoAndAddToDb("IP not found: " + headers.keySet(), LoggerMaker.LogDb.ANALYSER);
return;
}
String userId = ipList.get(0);

// get actual api collection id
Integer apiCollectionId = requestParams.getApiCollectionId();
String hostName = HttpCallParser.getHeaderValue(requestParams.getHeaders(), "host");
apiCollectionId = findTrueApiCollectionId(apiCollectionId, hostName, responseParams.getSource());

if (apiCollectionId == null) return;
if (hostName != null) hostsSeen.add(hostName);

if (apiCollectionId == null) {
loggerMaker.infoAndAddToDb("API collection not found: " + apiCollectionId + " " + hostName + " " + responseParams.getSource(), LoggerMaker.LogDb.ANALYSER);
return;
}

String method = requestParams.getMethod();

Expand Down Expand Up @@ -217,6 +235,8 @@ public void analysePayload(Object paramObject, String param, String combinedUrl,

public void buildCatalog() {
List<ApiInfo.ApiInfoKey> apis = SingleTypeInfoDao.instance.fetchEndpointsInCollection(-1);
loggerMaker.infoAndAddToDb("APIs fetched from db: " + apis.size(), LoggerMaker.LogDb.ANALYSER);

for (ApiInfo.ApiInfoKey apiInfoKey: apis) {

int apiCollectionId = apiInfoKey.getApiCollectionId();
Expand Down Expand Up @@ -245,20 +265,25 @@ public void buildCatalog() {


public void syncWithDb() {
loggerMaker.infoAndAddToDb("Hosts seen till now: " + hostsSeen, LoggerMaker.LogDb.ANALYSER);

buildCatalog();
populateHostNameToIdMap();

List<WriteModel<SingleTypeInfo>> dbUpdates = getDbUpdatesForSingleTypeInfo();
System.out.println("total count: " + dbUpdates.size());
loggerMaker.infoAndAddToDb("total db updates count: " + dbUpdates.size(), LoggerMaker.LogDb.ANALYSER);
countMap = new HashMap<>();
last_sync = Context.now();
if (dbUpdates.size() > 0) {
SingleTypeInfoDao.instance.getMCollection().bulkWrite(dbUpdates);
BulkWriteResult bulkWriteResult = SingleTypeInfoDao.instance.getMCollection().bulkWrite(dbUpdates);
loggerMaker.infoAndAddToDb("bulkWriteResult: " + bulkWriteResult, LoggerMaker.LogDb.ANALYSER);
}
}

public List<WriteModel<SingleTypeInfo>> getDbUpdatesForSingleTypeInfo() {
List<WriteModel<SingleTypeInfo>> bulkUpdates = new ArrayList<>();
loggerMaker.infoAndAddToDb("countMap keySet size: " + countMap.size(), LoggerMaker.LogDb.ANALYSER);

for (SingleTypeInfo singleTypeInfo: countMap.values()) {
if (singleTypeInfo.getUniqueCount() == 0 && singleTypeInfo.getPublicCount() == 0) continue;
Bson filter = SingleTypeInfoDao.createFiltersWithoutSubType(singleTypeInfo);
Expand All @@ -269,6 +294,11 @@ public List<WriteModel<SingleTypeInfo>> getDbUpdatesForSingleTypeInfo() {
bulkUpdates.add(new UpdateManyModel<>(filter, update, new UpdateOptions().upsert(false)));
}

int i = bulkUpdates.size();
int total = countMap.values().size();

loggerMaker.infoAndAddToDb("bulkUpdates: " + i + " total countMap size: " + total, LoggerMaker.LogDb.ANALYSER);

return bulkUpdates;
}

Expand Down Expand Up @@ -305,6 +335,7 @@ public String convertToParamValue(Object value) {
private Map<String, Integer> hostNameToIdMap = new HashMap<>();

public Integer findTrueApiCollectionId(int originalApiCollectionId, String hostName, HttpResponseParams.Source source) {

if (!HttpCallParser.useHostCondition(hostName, source)) {
return originalApiCollectionId;
}
Expand All @@ -314,6 +345,10 @@ public Integer findTrueApiCollectionId(int originalApiCollectionId, String hostN

if (hostNameToIdMap.containsKey(key)) {
trueApiCollectionId = hostNameToIdMap.get(key);

} else if (hostNameToIdMap.containsKey(hostName + "$0")) {
trueApiCollectionId = hostNameToIdMap.get(hostName + "$0");

}

// todo: what if we don't find because of cycles
Expand All @@ -328,6 +363,7 @@ public void populateHostNameToIdMap() {
String key = apiCollection.getHostName() + "$" + apiCollection.getVxlanId();
hostNameToIdMap.put(key, apiCollection.getId());
}
loggerMaker.infoAndAddToDb("hostNameToIdMap: " + hostNameToIdMap, LoggerMaker.LogDb.ANALYSER);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.akto.dao.ApiCollectionsDao;
import com.akto.dao.context.Context;
import com.akto.dto.*;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.akto.runtime.APICatalogSync;
import com.akto.runtime.URLAggregator;

Expand All @@ -31,8 +33,7 @@ public class HttpCallParser {
private final int sync_threshold_time;
private int sync_count = 0;
private int last_synced;
private static final Logger logger = LoggerFactory.getLogger(HttpCallParser.class);

private static final LoggerMaker loggerMaker = new LoggerMaker(HttpCallParser.class);
public APICatalogSync apiCatalogSync;
private Map<String, Integer> hostNameToIdMap = new HashMap<>();

Expand Down Expand Up @@ -139,7 +140,7 @@ public int createCollectionBasedOnHostName(int id, String host) throws Exceptio
flag = true;
break;
} catch (Exception e) {
logger.error("Error while inserting apiCollection, trying again " + i + " " + e.getMessage());
loggerMaker.errorAndAddToDb("Error while inserting apiCollection, trying again " + i + " " + e.getMessage(), LogDb.RUNTIME);
}
}
if (flag) { // flag tells if we were successfully able to insert collection
Expand Down Expand Up @@ -218,7 +219,7 @@ public List<HttpResponseParams> filterHttpResponseParams(List<HttpResponseParams

hostNameToIdMap.put(key, apiCollectionId);
} catch (Exception e) {
logger.error("Failed to create collection for host : " + hostName);
loggerMaker.errorAndAddToDb("Failed to create collection for host : " + hostName, LogDb.RUNTIME);
createCollectionSimple(vxlanId);
hostNameToIdMap.put("null " + vxlanId, vxlanId);
apiCollectionId = httpResponseParam.requestParams.getApiCollectionId();
Expand Down Expand Up @@ -274,7 +275,7 @@ public boolean aggregate(List<HttpResponseParams> responses) {
}
}

logger.info("added " + count + " urls");
loggerMaker.infoAndAddToDb("added " + count + " urls", LogDb.RUNTIME);
return ret;
}

Expand Down
25 changes: 14 additions & 11 deletions apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.akto.dto.type.SingleTypeInfo.SubType;
import com.akto.dto.type.SingleTypeInfo.SuperType;
import com.akto.dto.type.URLMethods.Method;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.akto.parsers.HttpCallParser;
import com.akto.runtime.merge.MergeOnHostOnly;
import com.akto.task.Cluster;
Expand All @@ -43,6 +45,7 @@ public class APICatalogSync {
public int thresh;
public String userIdentifier;
private static final Logger logger = LoggerFactory.getLogger(APICatalogSync.class);
private static final LoggerMaker loggerMaker = new LoggerMaker(APICatalogSync.class);
public Map<Integer, APICatalog> dbState;
public Map<Integer, APICatalog> delta;
public Map<SensitiveParamInfo, Boolean> sensitiveParamInfoBooleanMap;
Expand Down Expand Up @@ -74,7 +77,7 @@ public void processResponse(RequestTemplate requestTemplate, Collection<HttpResp
processResponse(requestTemplate, iter.next(), deletedInfo);
} catch (Exception e) {
e.printStackTrace();
logger.error("processResponse: " + e.getMessage());
loggerMaker.errorAndAddToDb("processResponse: " + e.getMessage(), LogDb.RUNTIME);
}
}
}
Expand Down Expand Up @@ -141,7 +144,7 @@ public void processResponse(RequestTemplate requestTemplate, HttpResponseParams
}

} catch (JsonParseException e) {
logger.error("Failed to parse json payload " + e.getMessage());
loggerMaker.errorAndAddToDb("Failed to parse json payload " + e.getMessage(), LogDb.RUNTIME);
}
}

Expand Down Expand Up @@ -780,7 +783,7 @@ private Map<URLStatic, RequestTemplate> createRequestTemplates(URLAggregator agg
iterator.remove();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
loggerMaker.errorAndAddToDb(e.toString(), LogDb.RUNTIME);
}

return ret;
Expand Down Expand Up @@ -811,7 +814,7 @@ private void processKnownStaticURLs(URLAggregator aggregator, APICatalog deltaCa

}
} catch (Exception e) {
logger.info(e.getMessage(), e);
loggerMaker.errorAndAddToDb(e.toString(),LogDb.RUNTIME);
}
}

Expand Down Expand Up @@ -1257,7 +1260,7 @@ private static Map<Integer, APICatalog> build(List<SingleTypeInfo> allParams) {
}
keyTypes.getOccurrences().put(param.getSubType(), param);
} catch (Exception e) {
logger.error("ERROR while parsing url param position: " + p);
loggerMaker.errorAndAddToDb("ERROR while parsing url param position: " + p, LogDb.RUNTIME);
}
continue;
}
Expand Down Expand Up @@ -1327,7 +1330,7 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI) {
writesForSampleData.addAll(getDBUpdatesForSampleData(apiCollectionId, deltaCatalog,dbCatalog, redact, forceUpdate));
}

logger.info("adding " + writesForParams.size() + " updates for params");
loggerMaker.infoAndAddToDb("adding " + writesForParams.size() + " updates for params", LogDb.RUNTIME);

long start = System.currentTimeMillis();

Expand All @@ -1338,23 +1341,23 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI) {
writesForParams
);

logger.info((System.currentTimeMillis() - start) + ": " + res.getInserts().size() + " " +res.getUpserts().size());
loggerMaker.infoAndAddToDb((System.currentTimeMillis() - start) + ": " + res.getInserts().size() + " " +res.getUpserts().size(), LogDb.RUNTIME);
}

logger.info("adding " + writesForTraffic.size() + " updates for traffic");
loggerMaker.infoAndAddToDb("adding " + writesForTraffic.size() + " updates for traffic", LogDb.RUNTIME);
if(writesForTraffic.size() > 0) {
BulkWriteResult res = TrafficInfoDao.instance.getMCollection().bulkWrite(writesForTraffic);

logger.info(res.getInserts().size() + " " +res.getUpserts().size());
loggerMaker.infoAndAddToDb(res.getInserts().size() + " " +res.getUpserts().size(), LogDb.RUNTIME);

}


logger.info("adding " + writesForSampleData.size() + " updates for samples");
loggerMaker.infoAndAddToDb("adding " + writesForSampleData.size() + " updates for samples", LogDb.RUNTIME);
if(writesForSampleData.size() > 0) {
BulkWriteResult res = SampleDataDao.instance.getMCollection().bulkWrite(writesForSampleData);

logger.info(res.getInserts().size() + " " +res.getUpserts().size());
loggerMaker.infoAndAddToDb(res.getInserts().size() + " " +res.getUpserts().size(), LogDb.RUNTIME);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.akto.dao.KafkaHealthMetricsDao;
import com.akto.dao.context.Context;
import com.akto.dto.KafkaHealthMetric;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;

Expand All @@ -20,6 +22,7 @@ public class KafkaHealthMetricSyncTask implements Runnable{
Consumer<String, String> consumer;
public Map<String,KafkaHealthMetric> kafkaHealthMetricsMap = new HashMap<>();
private static final Logger logger = LoggerFactory.getLogger(KafkaHealthMetricSyncTask.class);
private static final LoggerMaker loggerMaker = new LoggerMaker(APICatalogSync.class);


public KafkaHealthMetricSyncTask(Consumer<String, String> consumer) {
Expand Down Expand Up @@ -56,7 +59,7 @@ public void run() {
}
logger.info("SYNC DONE");
} catch (Exception e) {
logger.error("ERROR in kafka data sync from api runtime", e);
loggerMaker.errorAndAddToDb("ERROR in kafka data sync from api runtime" + e, LogDb.RUNTIME);
}
}
}
Loading

0 comments on commit 6a857cd

Please sign in to comment.