Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Storage changes for ProcessBulkImportUsers cron #205

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
396 changes: 261 additions & 135 deletions src/main/java/io/supertokens/storage/postgresql/Start.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
Expand All @@ -47,8 +48,8 @@ static String getQueryToCreateBulkImportUsersTable(Start start) {
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000,"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000,"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey")
+ " PRIMARY KEY(app_id, id),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
Expand Down Expand Up @@ -92,18 +93,19 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie
});
}

public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status)
public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage)
throws SQLException, StorageQueryException {
if (bulkImportUserIds.length == 0) {
return;
}

String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?";
String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?";
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

parameters.add(status.toString());
parameters.add(errorMessage);
parameters.add(appIdentifier.getAppId());

queryBuilder.append(" AND id IN (");
Expand All @@ -125,6 +127,39 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio
});
}

public static List<BulkImportUser> getBulkImportUsersForProcessing(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit)
throws StorageQueryException, StorageTransactionLogicException {

return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE status = 'NEW' AND app_id = ? "
+ " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) "
+ " LIMIT ? FOR UPDATE SKIP LOCKED";

List<BulkImportUser> bulkImportUsers = new ArrayList<>();

execute(sqlCon, selectQuery, pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setInt(2, limit);
}, result -> {
while (result.next()) {
bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result));
}
return null;
});

String[] bulkImportUserIds = bulkImportUsers.stream().map(user -> user.id).toArray(String[]::new);

updateBulkImportUserStatus_Transaction(start, sqlCon, appIdentifier, bulkImportUserIds, BULK_IMPORT_USER_STATUS.PROCESSING, null);
return bulkImportUsers;
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
});
}

public static List<BulkImportUser> getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt)
throws SQLException, StorageQueryException {
Expand Down Expand Up @@ -205,6 +240,16 @@ public static List<String> deleteBulkImportUsers(Start start, AppIdentifier appI
return deletedIds;
});
}

public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId) throws SQLException, StorageQueryException {
String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ? AND id = ?";

update(con, query, pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, bulkImportUserId);
});
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

Expand All @@ -219,7 +264,7 @@ private static BulkImportUserRowMapper getInstance() {
public BulkImportUser map(ResultSet result) throws Exception {
return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"),
BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")),
result.getLong("created_at"), result.getLong("updated_at"));
result.getString("error_msg"), result.getLong("created_at"), result.getLong("updated_at"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.emailpassword.PasswordResetTokenInfo;
import io.supertokens.pluginInterface.emailpassword.exceptions.DuplicateEmailException;
import io.supertokens.pluginInterface.emailpassword.exceptions.UnknownUserIdException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.postgresql.ConnectionPool;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;
Expand Down Expand Up @@ -266,74 +264,85 @@ public static void addPasswordResetToken(Start start, AppIdentifier appIdentifie
}
}

public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined)
throws StorageQueryException, StorageTransactionLogicException {
return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
{ // app_id_to_user_id
String QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});
}
private static AuthRecipeUserInfo signUpQuery(Start start, Connection sqlCon, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined) throws StorageQueryException, StorageTransactionLogicException {
try {
{ // app_id_to_user_id
String QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});
}

{ // all_auth_recipe_users
String QUERY = "INSERT INTO " + getConfig(start).getUsersTable()
+ "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, timeJoined);
pst.setLong(7, timeJoined);
});
}
{ // all_auth_recipe_users
String QUERY = "INSERT INTO " + getConfig(start).getUsersTable()
+ "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, timeJoined);
pst.setLong(7, timeJoined);
});
}

{ // emailpassword_users
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, email);
pst.setString(4, passwordHash);
pst.setLong(5, timeJoined);
});
}
{ // emailpassword_users
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

{ // emailpassword_user_to_tenant
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, email);
pst.setString(4, passwordHash);
pst.setLong(5, timeJoined);
});
}

update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, email);
});
}
{ // emailpassword_user_to_tenant
String QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined);
fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
sqlCon.commit();
return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod());
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
update(sqlCon, QUERY, pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, email);
});
}

UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined);
fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);
fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo);

return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod());
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
}

public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined)
throws StorageQueryException, StorageTransactionLogicException {
return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
return signUpQuery(start, sqlCon, tenantIdentifier, userId, email, passwordHash, timeJoined);
});
}

public static AuthRecipeUserInfo bulkImport_signUp_Transaction(Start start, Connection sqlCon, TenantIdentifier tenantIdentifier, String userId, String email,
String passwordHash, long timeJoined)
throws StorageQueryException, StorageTransactionLogicException {
return signUpQuery(start, sqlCon, tenantIdentifier, userId, email, passwordHash, timeJoined);
}

public static void deleteUser_Transaction(Connection sqlCon, Start start, AppIdentifier appIdentifier,
String userId, boolean deleteUserIdMappingToo)
throws StorageQueryException, SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,30 +476,35 @@ public static boolean isUserIdBeingUsedForEmailVerification(Start start, AppIden
}
}

public static void updateIsEmailVerifiedToExternalUserIdQuery(Start start, Connection sqlCon, AppIdentifier appIdentifier, String supertokensUserId, String externalUserId)
throws StorageQueryException, SQLException {
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
}

public static void updateIsEmailVerifiedToExternalUserId(Start start, AppIdentifier appIdentifier, String supertokensUserId, String externalUserId)
throws StorageQueryException {
try {
start.startTransaction((TransactionConnection con) -> {
Connection sqlCon = (Connection) con.getConnection();
try {
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
{
String QUERY = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
update(sqlCon, QUERY, pst -> {
pst.setString(1, externalUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
updateIsEmailVerifiedToExternalUserIdQuery(start, sqlCon, appIdentifier, supertokensUserId, externalUserId);
} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
}
Expand All @@ -511,6 +516,11 @@ public static void updateIsEmailVerifiedToExternalUserId(Start start, AppIdentif
}
}

public static void bulkImport_updateIsEmailVerifiedToExternalUserId_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, String supertokensUserId, String externalUserId)
throws SQLException, StorageQueryException {
updateIsEmailVerifiedToExternalUserIdQuery(start, sqlCon, appIdentifier, supertokensUserId, externalUserId);
}

private static class EmailVerificationTokenInfoRowMapper
implements RowMapper<EmailVerificationTokenInfo, ResultSet> {
private static final EmailVerificationTokenInfoRowMapper INSTANCE = new EmailVerificationTokenInfoRowMapper();
Expand Down
Loading
Loading