Skip to content

Commit

Permalink
fix: review fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Dec 18, 2024
1 parent 12768c9 commit 9803649
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public BulkImportProxyConnection(Connection con) {

@Override
public void close() throws SQLException {
//this.con.close(); // why are we against the close?
//this.con.close();
//we don't want to close here because we are trying to reuse existing code but also using the same connection
//for bulk importing
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,39 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.supertokens.ActiveUsers;
import io.supertokens.Main;
import io.supertokens.ProcessState;
import io.supertokens.authRecipe.AuthRecipe;
import io.supertokens.authRecipe.UserPaginationContainer;
import io.supertokens.cronjobs.CronTaskTest;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.emailpassword.EmailPassword;
import io.supertokens.emailpassword.ParsedFirebaseSCryptResponse;
import io.supertokens.featureflag.EE_FEATURES;
import io.supertokens.featureflag.FeatureFlag;
import io.supertokens.featureflag.FeatureFlagTestContent;
import io.supertokens.passwordless.Passwordless;
import io.supertokens.pluginInterface.RECIPE_ID;
import io.supertokens.pluginInterface.STORAGE_TYPE;
import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage;
import io.supertokens.pluginInterface.emailpassword.sqlStorage.EmailPasswordSQLStorage;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.pluginInterface.passwordless.sqlStorage.PasswordlessSQLStorage;
import io.supertokens.pluginInterface.thirdparty.sqlStorage.ThirdPartySQLStorage;
import io.supertokens.session.Session;
import io.supertokens.session.info.SessionInformationHolder;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.test.httpRequest.HttpRequestForTesting;
import io.supertokens.storage.postgresql.test.httpRequest.HttpResponseException;
import io.supertokens.storageLayer.StorageLayer;
import io.supertokens.thirdparty.ThirdParty;
import io.supertokens.useridmapping.UserIdMapping;
Expand All @@ -56,8 +64,12 @@
import org.junit.Test;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand Down Expand Up @@ -969,4 +981,239 @@ private static long measureTime(Supplier<Void> function) {
// Calculate elapsed time in milliseconds
return (endTime - startTime) / 1000000; // Convert to milliseconds
}

@Test
public void testWithOneMillionUsers() throws Exception {
Main main = startCronProcess(String.valueOf(NUM_THREADS));

int NUMBER_OF_USERS_TO_UPLOAD = 1000000; // million

if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) {
return;
}

// Create user roles before inserting bulk users
{
UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null);
UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null);
}

// upload a bunch of users through the API
{
for (int i = 0; i < (NUMBER_OF_USERS_TO_UPLOAD / 10000); i++) {
JsonObject request = generateUsersJson(10000, i * 10000); // API allows 10k users upload at once
JsonObject response = uploadBulkImportUsersJson(main, request);
assertEquals("OK", response.get("status").getAsString());
}

}

long processingStarted = System.currentTimeMillis();

// wait for the cron job to process them
// periodically check the remaining unprocessed users
// Note1: the cronjob starts the processing automatically
// Note2: the successfully processed users get deleted from the bulk_import_users table
{
long count = NUMBER_OF_USERS_TO_UPLOAD;
while(true) {
try {
JsonObject response = loadBulkImportUsersCountWithStatus(main, null);
assertEquals("OK", response.get("status").getAsString());
count = response.get("count").getAsLong();
int newUsersNumber = loadBulkImportUsersCountWithStatus(main,
BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt();
int processingUsersNumber = loadBulkImportUsersCountWithStatus(main,
BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt();
int failedUsersNumber = loadBulkImportUsersCountWithStatus(main,
BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt();
count = newUsersNumber + processingUsersNumber;

if (count == 0) {
break;
}
} catch (Exception e) {
if(e instanceof SocketTimeoutException) {
//ignore
} else {
throw e;
}
}
Thread.sleep(5000);
}
}

long processingFinished = System.currentTimeMillis();
System.out.println("Processed " + NUMBER_OF_USERS_TO_UPLOAD + " users in " + (processingFinished - processingStarted) / 1000
+ " seconds ( or " + (processingFinished - processingStarted) / 60000 + " minutes)");

// after processing finished, make sure every user got processed correctly
{
int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt();
int usersInCore = loadUsersCount(main).get("count").getAsInt();
assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore + failedImportedUsersNumber);
assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore);
}
}

private static JsonObject loadBulkImportUsersCountWithStatus(Main main, BulkImportStorage.BULK_IMPORT_USER_STATUS status)
throws HttpResponseException, IOException {
Map<String, String> params = new HashMap<>();
if(status!= null) {
params.put("status", status.name());
}
return HttpRequestForTesting.sendGETRequest(main, "",
"http://localhost:3567/bulk-import/users/count",
params, 10000, 10000, null, SemVer.v5_2.get(), null);
}

private static JsonObject loadBulkImportUsersWithStatus(Main main, BulkImportStorage.BULK_IMPORT_USER_STATUS status)
throws HttpResponseException, IOException {
Map<String, String> params = new HashMap<>();
if(status!= null) {
params.put("status", status.name());
}
return HttpRequestForTesting.sendGETRequest(main, "",
"http://localhost:3567/bulk-import/users",
params, 10000, 10000, null, SemVer.v5_2.get(), null);
}

private static JsonObject loadUsersCount(Main main) throws HttpResponseException, IOException {
Map<String, String> params = new HashMap<>();

return HttpRequestForTesting.sendGETRequest(main, "",
"http://localhost:3567/users/count",
params, 10000, 10000, null, SemVer.v5_2.get(), null);
}

private static JsonObject generateUsersJson(int numberOfUsers, int startIndex) {
JsonObject userJsonObject = new JsonObject();
JsonParser parser = new JsonParser();

JsonArray usersArray = new JsonArray();
for (int i = 0; i < numberOfUsers; i++) {
JsonObject user = new JsonObject();

user.addProperty("externalUserId", UUID.randomUUID().toString());
user.add("userMetadata", parser.parse("{\"key1\":"+ UUID.randomUUID().toString() + ",\"key2\":{\"key3\":\"value3\"}}"));
user.add("userRoles", parser.parse(
"[{\"role\":\"role1\", \"tenantIds\": [\"public\"]},{\"role\":\"role2\", \"tenantIds\": [\"public\"]}]"));
user.add("totpDevices", parser.parse("[{\"secretKey\":\"secretKey\",\"deviceName\":\"deviceName\"}]"));

//JsonArray tenanatIds = parser.parse("[\"public\", \"t1\"]").getAsJsonArray();
JsonArray tenanatIds = parser.parse("[\"public\"]").getAsJsonArray();
String email = " johndoe+" + (i + startIndex) + "@gmail.com ";

Random random = new Random();

JsonArray loginMethodsArray = new JsonArray();
//if(random.nextInt(2) == 0){
loginMethodsArray.add(createEmailLoginMethod(email, tenanatIds));
//}
if(random.nextInt(2) == 0){
loginMethodsArray.add(createThirdPartyLoginMethod(email, tenanatIds));
}
if(random.nextInt(2) == 0){
loginMethodsArray.add(createPasswordlessLoginMethod(email, tenanatIds, "+910000" + (startIndex + i)));
}
if(loginMethodsArray.size() == 0) {
int methodNumber = random.nextInt(3);
switch (methodNumber) {
case 0:
loginMethodsArray.add(createEmailLoginMethod(email, tenanatIds));
break;
case 1:
loginMethodsArray.add(createThirdPartyLoginMethod(email, tenanatIds));
break;
case 2:
loginMethodsArray.add(createPasswordlessLoginMethod(email, tenanatIds, "+911000" + (startIndex + i)));
break;
}
}
user.add("loginMethods", loginMethodsArray);

usersArray.add(user);
}

userJsonObject.add("users", usersArray);
return userJsonObject;
}

private static JsonObject createEmailLoginMethod(String email, JsonArray tenantIds) {
JsonObject loginMethod = new JsonObject();
loginMethod.add("tenantIds", tenantIds);
loginMethod.addProperty("email", email);
loginMethod.addProperty("recipeId", "emailpassword");
loginMethod.addProperty("passwordHash",
"$argon2d$v=19$m=12,t=3,p=1$aGI4enNvMmd0Zm0wMDAwMA$r6p7qbr6HD+8CD7sBi4HVw");
loginMethod.addProperty("hashingAlgorithm", "argon2");
loginMethod.addProperty("isVerified", true);
loginMethod.addProperty("isPrimary", true);
loginMethod.addProperty("timeJoinedInMSSinceEpoch", 0);
return loginMethod;
}

private static JsonObject createThirdPartyLoginMethod(String email, JsonArray tenantIds) {
JsonObject loginMethod = new JsonObject();
loginMethod.add("tenantIds", tenantIds);
loginMethod.addProperty("recipeId", "thirdparty");
loginMethod.addProperty("email", email);
loginMethod.addProperty("thirdPartyId", "google");
loginMethod.addProperty("thirdPartyUserId", String.valueOf(email.hashCode()));
loginMethod.addProperty("isVerified", true);
loginMethod.addProperty("isPrimary", false);
loginMethod.addProperty("timeJoinedInMSSinceEpoch", 0);
return loginMethod;
}

private static JsonObject createPasswordlessLoginMethod(String email, JsonArray tenantIds, String phoneNumber) {
JsonObject loginMethod = new JsonObject();
loginMethod.add("tenantIds", tenantIds);
loginMethod.addProperty("email", email);
loginMethod.addProperty("recipeId", "passwordless");
loginMethod.addProperty("phoneNumber", phoneNumber);
loginMethod.addProperty("isVerified", true);
loginMethod.addProperty("isPrimary", false);
loginMethod.addProperty("timeJoinedInMSSinceEpoch", 0);
return loginMethod;
}

private void setFeatureFlags(Main main, EE_FEATURES[] features) {
FeatureFlagTestContent.getInstance(main).setKeyValue(FeatureFlagTestContent.ENABLED_FEATURES, features);
}


private static JsonObject uploadBulkImportUsersJson(Main main, JsonObject request) throws IOException,
HttpResponseException {
return HttpRequestForTesting.sendJsonPOSTRequest(main, "",
"http://localhost:3567/bulk-import/users",
request, 1000, 10000, null, SemVer.v5_2.get(), null);
}

private Main startCronProcess(String parallelism) throws IOException, InterruptedException,
TenantOrAppNotFoundException {
return startCronProcess(parallelism, 5*60);
}

private Main startCronProcess(String parallelism, int intervalInSeconds) throws IOException, InterruptedException, TenantOrAppNotFoundException {
String[] args = { "../" };

// set processing thread number
Utils.setValueInConfig("bulk_migration_parallelism", parallelism);

TestingProcessManager.TestingProcess process = TestingProcessManager.start(args, false);
Main main = process.getProcess();
setFeatureFlags(main, new EE_FEATURES[] {
EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA });
// We are setting a non-zero initial wait for tests to avoid race condition with the beforeTest process that deletes data in the storage layer
CronTaskTest.getInstance(main).setInitialWaitTimeInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 5);
CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, intervalInSeconds);

process.startProcess();
assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED));

Cronjobs.addCronjob(main, (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY));
return main;
}

}

0 comments on commit 9803649

Please sign in to comment.