Skip to content

Commit

Permalink
feat: multithreaded bulk import (#162)
Browse files Browse the repository at this point in the history
* feat: Add BulkImport APIs and cron

* fix: PR changes

* fix: PR changes

* fix: Update version and changelog

* fix: PR changes

* fix: PR changes

* fix: fixing transaction rolled back issues with multithreaded bulk import

* fix: changelog

* fix: reusing gson object

* feat: bulk inserting the bulk migration data

* fix: fixes and error handling changes

* fix: fixing tests

* chore: update build version and changelog

* fix: adding exeption thrown declaration for bulk import and mysql

---------

Co-authored-by: Ankit Tiwari <[email protected]>
  • Loading branch information
tamassoltesz and anku255 authored Dec 19, 2024
1 parent aa11e40 commit 435504f
Show file tree
Hide file tree
Showing 27 changed files with 615 additions and 2 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [6.4.0]

- Adds support for Bulk Import
- Adds `BulkImportUser` class to represent a bulk import user
- Adds `BulkImportStorage` interface
- Adds `DuplicateUserIdException` class
- Adds `createBulkImportProxyStorageInstance` method in `Storage` class
- Adds `closeConnectionForBulkImportProxyStorage`, `commitTransactionForBulkImportProxyStorage`, and `rollbackTransactionForBulkImportProxyStorage` method in `SQLStorage` class
- Adds `BulkImportTransactionRolledBackException` for signaling if the transaction was rolled back by the DBMS

## [6.3.0] - 2024-10-02

- Adds `OAuthStorage` interface for OAuth Provider support
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plugins {
id 'java-library'
}

version = "6.3.0"
version = "6.4.0"

repositories {
mavenCentral()
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/supertokens/pluginInterface/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface Storage {

// if silent is true, do not log anything out on the console
void constructor(String processId, boolean silent, boolean isTesting);

Storage createBulkImportProxyStorageInstance();

void loadConfig(JsonObject jsonConfig, Set<LOG_LEVEL> logLevels, TenantIdentifier tenantIdentifier)
throws InvalidConfigException;

Expand Down Expand Up @@ -78,6 +81,9 @@ void setKeyValue(TenantIdentifier tenantIdentifier, String key, KeyValueInfo inf
boolean isUserIdBeingUsedInNonAuthRecipe(AppIdentifier appIdentifier, String className, String userId)
throws StorageQueryException;

Map<String, List<String>> findNonAuthRecipesWhereForUserIdsUsed(AppIdentifier appIdentifier, List<String> userIds)
throws StorageQueryException;

// to be used for testing purposes only. This function will add dummy data to non-auth tables.
void addInfoToNonAuthRecipesBasedOnUserId(TenantIdentifier tenantIdentifier, String className, String userId)
throws StorageQueryException;
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/supertokens/pluginInterface/StorageUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.supertokens.pluginInterface;

import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.dashboard.sqlStorage.DashboardSQLStorage;
import io.supertokens.pluginInterface.emailpassword.sqlStorage.EmailPasswordSQLStorage;
import io.supertokens.pluginInterface.emailverification.sqlStorage.EmailVerificationSQLStorage;
Expand Down Expand Up @@ -134,6 +135,15 @@ public static MultitenancyStorage getMultitenancyStorage(Storage storage) {
return (MultitenancyStorage) storage;
}

public static BulkImportSQLStorage getBulkImportStorage(Storage storage) {
if (storage.getType() != STORAGE_TYPE.SQL) {
// we only support SQL for now
throw new UnsupportedOperationException("");
}

return (BulkImportSQLStorage) storage;
}

public static OAuthStorage getOAuthStorage(Storage storage) {
if (storage.getType() != STORAGE_TYPE.SQL) {
// we only support SQL for now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;

public interface AuthRecipeStorage extends Storage {

Expand All @@ -45,6 +46,8 @@ AuthRecipeUserInfo[] getUsers(TenantIdentifier tenantIdentifier, @Nonnull Intege

boolean doesUserIdExist(TenantIdentifier tenantIdentifierIdentifier, String userId) throws StorageQueryException;

List<String> findExistingUserIds(AppIdentifier appIdentifier, List<String> userIds) throws StorageQueryException;

AuthRecipeUserInfo getPrimaryUserById(AppIdentifier appIdentifier, String userId) throws StorageQueryException;

String getPrimaryUserIdStrForUserId(AppIdentifier appIdentifier, String userId) throws StorageQueryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
import io.supertokens.pluginInterface.sqlStorage.SQLStorage;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;

import java.util.List;
import java.util.Map;

public interface AuthRecipeSQLStorage extends AuthRecipeStorage, SQLStorage {

AuthRecipeUserInfo getPrimaryUserById_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
String userId)
throws StorageQueryException;

List<AuthRecipeUserInfo> getPrimaryUsersByIds_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
List<String> userIds)
throws StorageQueryException;

// lock order:
// - emailpassword table
// - thirdparty table
Expand All @@ -38,6 +45,13 @@ AuthRecipeUserInfo[] listPrimaryUsersByEmail_Transaction(AppIdentifier appIdenti
String email)
throws StorageQueryException;

//helper method for bulk import
AuthRecipeUserInfo[] listPrimaryUsersByMultipleEmailsOrPhoneNumbersOrThirdparty_Transaction(AppIdentifier appIdentifier,
TransactionConnection con,
List<String> emails, List<String> phones,
Map<String, String> thirdpartyIdToThirdpartyUserId)
throws StorageQueryException;

// locks only passwordless table
AuthRecipeUserInfo[] listPrimaryUsersByPhoneNumber_Transaction(AppIdentifier appIdentifier,
TransactionConnection con, String phoneNumber)
Expand All @@ -52,9 +66,15 @@ AuthRecipeUserInfo[] listPrimaryUsersByThirdPartyInfo_Transaction(AppIdentifier
void makePrimaryUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String userId)
throws StorageQueryException;

void makePrimaryUsers_Transaction(AppIdentifier appIdentifier, TransactionConnection con, List<String> userIds)
throws StorageQueryException;

void linkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String recipeUserId,
String primaryUserId) throws StorageQueryException;

void linkMultipleAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
Map<String, String> recipeUserIdByPrimaryUserId) throws StorageQueryException;

void unlinkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String primaryUserId,
String recipeUserId)
throws StorageQueryException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.pluginInterface.bulkimport;

import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.pluginInterface.nonAuthRecipe.NonAuthRecipeStorage;

public interface BulkImportStorage extends NonAuthRecipeStorage {
/**
* Add users to the bulk_import_users table
*/
void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException;

/**
* Get users from the bulk_import_users table
*/
List<BulkImportUser> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException;

/**
* Delete users by id from the bulk_import_users table
*/
List<String> deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException;

/**
* Returns the users from the bulk_import_users table for processing
*/
List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException;


/**
* Update the bulk_import_user's primary_user_id by bulk_import_user_id
*/
void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException;

/**
* Returns the count of users from the bulk_import_users table
*/
long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException;

public enum BULK_IMPORT_USER_STATUS {
NEW, PROCESSING, FAILED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.pluginInterface.bulkimport;

import java.util.List;

import com.google.gson.Gson;
import com.google.gson.JsonObject;

import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;

public class BulkImportUser {
public String id;
public String externalUserId;
public JsonObject userMetadata;
public List<UserRole> userRoles;
public List<TotpDevice> totpDevices;
public List<LoginMethod> loginMethods;

// Following fields come from the DB Record.
public BULK_IMPORT_USER_STATUS status;
public String primaryUserId;
public String errorMessage;
public Long createdAt;
public Long updatedAt;

private static final Gson gson = new Gson();

public BulkImportUser(String id, String externalUserId, JsonObject userMetadata, List<UserRole> userRoles,
List<TotpDevice> totpDevices, List<LoginMethod> loginMethods) {
this.id = id;
this.externalUserId = externalUserId;
this.userMetadata = userMetadata;
this.userRoles = userRoles;
this.totpDevices = totpDevices;
this.loginMethods = loginMethods;
}

public static BulkImportUser forTesting_fromJson(JsonObject jsonObject) {
return gson.fromJson(jsonObject, BulkImportUser.class);
}

// The bulk_import_users table stores users to be imported via a Cron Job.
// It has a `raw_data` column containing user data in JSON format.

// The BulkImportUser class represents this `raw_data`, including additional fields like `status`, `createdAt`, and `updatedAt`.
// First, we validate all fields of `raw_data` using the BulkImportUser class, then store this data in the bulk_import_users table.
// This function retrieves the `raw_data` after removing the additional fields.
public String toRawDataForDbStorage() {
JsonObject jsonObject = gson.fromJson(new Gson().toJson(this), JsonObject.class);
jsonObject.remove("status");
jsonObject.remove("createdAt");
jsonObject.remove("updatedAt");
return jsonObject.toString();
}

// The bulk_import_users table contains a `raw_data` column with user data in JSON format, along with other columns such as `id`, `status`, `primary_user_id`, and `error_msg` etc.

// When creating an instance of the BulkImportUser class, the extra fields must be passed separately as they are not part of the `raw_data`.
// This function creates a BulkImportUser instance from a stored bulk_import_user entry.
public static BulkImportUser fromRawDataFromDbStorage(String id, String rawData, BULK_IMPORT_USER_STATUS status, String primaryUserId, String errorMessage, long createdAt, long updatedAt) {
BulkImportUser user = gson.fromJson(rawData, BulkImportUser.class);
user.id = id;
user.status = status;
user.primaryUserId = primaryUserId;
user.errorMessage = errorMessage;
user.createdAt = createdAt;
user.updatedAt = updatedAt;
return user;
}

public JsonObject toJsonObject() {
return gson.fromJson(gson.toJson(this), JsonObject.class);
}

public static class UserRole {
public String role;
public List<String> tenantIds;

public UserRole(String role, List<String> tenantIds) {
this.role = role;
this.tenantIds = tenantIds;
}
}

public static class TotpDevice {
public String secretKey;
public int period;
public int skew;
public String deviceName;

public TotpDevice(String secretKey, int period, int skew, String deviceName) {
this.secretKey = secretKey;
this.period = period;
this.skew = skew;
this.deviceName = deviceName;
}
}

public static class LoginMethod {
public List<String> tenantIds;
public boolean isVerified;
public boolean isPrimary;
public long timeJoinedInMSSinceEpoch;
public String recipeId;
public String email;
public String passwordHash;
public String hashingAlgorithm;
public String plainTextPassword;
public String thirdPartyId;
public String thirdPartyUserId;
public String phoneNumber;
public String superTokensUserId;
public String externalUserId;

public String getSuperTokenOrExternalUserId() {
return this.externalUserId != null ? this.externalUserId : this.superTokensUserId;
}

public LoginMethod(List<String> tenantIds, String recipeId, boolean isVerified, boolean isPrimary,
long timeJoinedInMSSinceEpoch, String email, String passwordHash, String hashingAlgorithm, String plainTextPassword,
String thirdPartyId, String thirdPartyUserId, String phoneNumber) {
this.tenantIds = tenantIds;
this.recipeId = recipeId;
this.isVerified = isVerified;
this.isPrimary = isPrimary;
this.timeJoinedInMSSinceEpoch = timeJoinedInMSSinceEpoch;
this.email = email;
this.passwordHash = passwordHash;
this.hashingAlgorithm = hashingAlgorithm;
this.plainTextPassword = plainTextPassword;
this.thirdPartyId = thirdPartyId;
this.thirdPartyUserId = thirdPartyUserId;
this.phoneNumber = phoneNumber;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.pluginInterface.bulkimport;

import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;

public class ImportUserBase {

public String userId;
public String email;
public TenantIdentifier tenantIdentifier;
public long timeJoinedMSSinceEpoch;

public ImportUserBase(String userId, String email, TenantIdentifier tenantIdentifier, long timeJoinedMSSinceEpoch) {
this.userId = userId; //this will be the supertokens userId.
this.email = email;
this.tenantIdentifier = tenantIdentifier;
this.timeJoinedMSSinceEpoch = timeJoinedMSSinceEpoch;
}
}
Loading

0 comments on commit 435504f

Please sign in to comment.