diff --git a/src/main/java/io/supertokens/storage/postgresql/Start.java b/src/main/java/io/supertokens/storage/postgresql/Start.java index c5a72782..75f60f89 100644 --- a/src/main/java/io/supertokens/storage/postgresql/Start.java +++ b/src/main/java/io/supertokens/storage/postgresql/Start.java @@ -25,6 +25,8 @@ import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.authRecipe.LoginMethod; import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; import io.supertokens.pluginInterface.dashboard.DashboardSearchTags; import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo; import io.supertokens.pluginInterface.dashboard.DashboardUser; @@ -109,7 +111,7 @@ public class Start implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage, JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage, UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage, - ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage { + ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, BulkImportSQLStorage { // these configs are protected from being modified / viewed by the dev using the SuperTokens // SaaS. If the core is not running in SuperTokens SaaS, this array has no effect. @@ -3039,4 +3041,46 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx return -1; }); } + + @Override + public void addBulkImportUsers(AppIdentifier appIdentifier, List users) + throws StorageQueryException, + TenantOrAppNotFoundException, + io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException { + try { + BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users); + } catch (SQLException e) { + if (e instanceof PSQLException) { + ServerErrorMessage serverErrorMessage = ((PSQLException) e).getServerErrorMessage(); + if (isPrimaryKeyError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable())) { + throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException(); + } + if (isForeignKeyConstraintError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable(), "app_id")) { + throw new TenantOrAppNotFoundException(appIdentifier); + } + } + throw new StorageQueryException(e); + } + } + + @Override + public List getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + @Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException { + try { + return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status) + throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserIds, status); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } } diff --git a/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java b/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java index e0a0c682..bd206366 100644 --- a/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java +++ b/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java @@ -326,6 +326,10 @@ public String getTotpUsedCodesTable() { return addSchemaAndPrefixToTableName("totp_used_codes"); } + public String getBulkImportUsersTable() { + return addSchemaAndPrefixToTableName("bulk_import_users"); + } + private String addSchemaAndPrefixToTableName(String tableName) { return addSchemaToTableName(postgresql_table_names_prefix + tableName); } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java new file mode 100644 index 00000000..44d05765 --- /dev/null +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens.storage.postgresql.queries; + +import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update; +import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.supertokens.pluginInterface.RowMapper; +import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.multitenancy.AppIdentifier; +import io.supertokens.storage.postgresql.Start; +import io.supertokens.storage.postgresql.config.Config; +import io.supertokens.storage.postgresql.utils.Utils; + +public class BulkImportQueries { + static String getQueryToCreateBulkImportUsersTable(Start start) { + String schema = Config.getConfig(start).getTableSchema(); + String tableName = Config.getConfig(start).getBulkImportUsersTable(); + return "CREATE TABLE IF NOT EXISTS " + tableName + " (" + + "id CHAR(36)," + + "app_id VARCHAR(64) NOT NULL DEFAULT 'public'," + + "raw_data TEXT NOT NULL," + + "status VARCHAR(128) DEFAULT 'NEW'," + + "error_msg TEXT," + + "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)," + + "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)," + + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey") + + " PRIMARY KEY(app_id, id)," + + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " " + + "FOREIGN KEY(app_id) " + + "REFERENCES " + Config.getConfig(start).getAppsTable() + " (app_id) ON DELETE CASCADE" + + " );"; + } + + public static String getQueryToCreateStatusUpdatedAtIndex(Start start) { + return "CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)"; + } + + public static String getQueryToCreateCreatedAtIndex(Start start) { + return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)"; + } + + public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) + throws SQLException, StorageQueryException { + StringBuilder queryBuilder = new StringBuilder( + "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES "); + + int userCount = users.size(); + + for (int i = 0; i < userCount; i++) { + queryBuilder.append(" (?, ?, ?)"); + + if (i < userCount - 1) { + queryBuilder.append(","); + } + } + + update(start, queryBuilder.toString(), pst -> { + int parameterIndex = 1; + for (BulkImportUser user : users) { + pst.setString(parameterIndex++, user.id); + pst.setString(parameterIndex++, appIdentifier.getAppId()); + pst.setString(parameterIndex++, user.toRawDataForDbStorage()); + } + }); + } + + public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status) + throws SQLException, StorageQueryException { + if (bulkImportUserIds.length == 0) { + return; + } + + String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE app_id = ?"; + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + parameters.add(status.toString()); + parameters.add(appIdentifier.getAppId()); + + queryBuilder.append(" AND id IN ("); + for (int i = 0; i < bulkImportUserIds.length; i++) { + if (i != 0) { + queryBuilder.append(", "); + } + queryBuilder.append("?"); + parameters.add(bulkImportUserIds[i]); + } + queryBuilder.append(")"); + + String query = queryBuilder.toString(); + + update(con, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }); + } + + public static List getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + @Nullable String bulkImportUserId, @Nullable Long createdAt) + throws SQLException, StorageQueryException { + + String baseQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable(); + + StringBuilder queryBuilder = new StringBuilder(baseQuery); + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + if (status != null) { + queryBuilder.append(" AND status = ?"); + parameters.add(status.toString()); + } + + if (bulkImportUserId != null && createdAt != null) { + queryBuilder + .append(" AND created_at < ? OR (created_at = ? AND id <= ?)"); + parameters.add(createdAt); + parameters.add(createdAt); + parameters.add(bulkImportUserId); + } + + queryBuilder.append(" ORDER BY created_at DESC, id DESC LIMIT ?"); + parameters.add(limit); + + String query = queryBuilder.toString(); + + return execute(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + List bulkImportUsers = new ArrayList<>(); + while (result.next()) { + bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result)); + } + return bulkImportUsers; + }); + } + + private static class BulkImportUserRowMapper implements RowMapper { + private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper(); + + private BulkImportUserRowMapper() { + } + + private static BulkImportUserRowMapper getInstance() { + return INSTANCE; + } + + @Override + public BulkImportUser map(ResultSet result) throws Exception { + return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"), + BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")), + result.getLong("created_at"), result.getLong("updated_at")); + } + } +} diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java index 8bc2d561..5e8574a2 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java @@ -541,6 +541,14 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto update(start, TOTPQueries.getQueryToCreateTenantIdIndexForUsedCodesTable(start), NO_OP_SETTER); } + if (!doesTableExists(start, Config.getConfig(start).getBulkImportUsersTable())) { + getInstance(start).addState(CREATING_NEW_TABLE, null); + update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER); + // index: + update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER); + update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER); + } + } catch (Exception e) { if (e.getMessage().contains("schema") && e.getMessage().contains("does not exist") && numberOfRetries < 1) { @@ -576,7 +584,14 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer String DROP_QUERY = "DROP INDEX IF EXISTS all_auth_recipe_users_pagination_index"; update(start, DROP_QUERY, NO_OP_SETTER); } - + { + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_status_updated_at_index"; + update(start, DROP_QUERY, NO_OP_SETTER); + } + { + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index"; + update(start, DROP_QUERY, NO_OP_SETTER); + } { String DROP_QUERY = "DROP TABLE IF EXISTS " + getConfig(start).getAppsTable() + "," @@ -613,7 +628,8 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer + getConfig(start).getDashboardSessionsTable() + "," + getConfig(start).getTotpUsedCodesTable() + "," + getConfig(start).getTotpUserDevicesTable() + "," - + getConfig(start).getTotpUsersTable(); + + getConfig(start).getTotpUsersTable() + "," + + getConfig(start).getBulkImportUsersTable(); update(start, DROP_QUERY, NO_OP_SETTER); } }