Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkImport APIs and cron #211

Closed
wants to merge 12 commits into from
Closed
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [7.1.0] - 2024-04-10

- Adds queries for Bulk Import

sattvikc marked this conversation as resolved.
Show resolved Hide resolved
## [7.0.0] - 2024-03-13

- Replace `TotpNotEnabledError` with `UnknownUserIdTotpError`.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plugins {
id 'java-library'
}

version = "7.0.0"
version = "7.1.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update branch with latest and update versions accordingly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.storage.postgresql;

import java.sql.Array;
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -3142,4 +3142,13 @@ public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, Transa
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException {
try {
BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static String getQueryToCreateBulkImportUsersTable(Start start) {
return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "primary_user_id VARCHAR(64),"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this 64 chars? SuperTokens user id is 36 chats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it to 36 chars.

+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
Expand All @@ -63,9 +64,14 @@ public static String getQueryToCreateStatusUpdatedAtIndex(Start start) {
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)";
}

public static String getQueryToCreateCreatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)";
public static String getQueryToCreatePaginationIndex1(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, created_at DESC, id DESC)";
}

public static String getQueryToCreatePaginationIndex2(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at DESC, id DESC)";
}

public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
Expand Down Expand Up @@ -96,7 +102,8 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie
public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull String bulkImportUserId, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage)
throws SQLException {
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";

List<Object> parameters = new ArrayList<>();

Expand All @@ -113,16 +120,18 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio
});
}

public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(Start start, AppIdentifier appIdentifier,
public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(Start start,
AppIdentifier appIdentifier,
@Nonnull Integer limit)
throws StorageQueryException, StorageTransactionLogicException {

return start.startTransaction(con -> {
Connection sqlCon = (Connection) con.getConnection();
try {
// NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE status = 'NEW' AND app_id = ? "
+ " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) "
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is 10 mins a good value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On average we take around 66 seconds to process 1000 users but we chose 10 mins wait time to give us ample amount of time before retrying the users in case the processing was delayed.

The processing can fail because of reasons like (DB failure), 10 mins gives some time for those services to recover before next retry.

+ " LIMIT ? FOR UPDATE SKIP LOCKED";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we had decided not to use SKIP LOCKED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had decided to use SKIP LOCKED and we tested that it was safe to do so. I have added comment explaining why we need this.


List<BulkImportUser> bulkImportUsers = new ArrayList<>();
Expand All @@ -137,30 +146,21 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
return null;
});

String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = ? WHERE app_id = ?";
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

parameters.add(BULK_IMPORT_USER_STATUS.PROCESSING.toString());
parameters.add(System.currentTimeMillis());
parameters.add(appIdentifier.getAppId());

queryBuilder.append(" AND id IN (");
for (int i = 0; i < bulkImportUsers.size(); i++) {
if (i != 0) {
queryBuilder.append(", ");
}
queryBuilder.append("?");
parameters.add(bulkImportUsers.get(i).id);
if (bulkImportUsers.isEmpty()) {
return new ArrayList<>();
}
queryBuilder.append(")");

String updateQuery = queryBuilder.toString();


String updateQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, updated_at = ? WHERE app_id = ? AND id IN (" + Utils
.generateCommaSeperatedQuestionMarks(bulkImportUsers.size()) + ")";

update(sqlCon, updateQuery, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
int index = 1;
pst.setString(index++, BULK_IMPORT_USER_STATUS.PROCESSING.toString());
pst.setLong(index++, System.currentTimeMillis());
pst.setString(index++, appIdentifier.getAppId());
for (BulkImportUser user : bulkImportUsers) {
pst.setObject(index++, user.id);
}
});
return bulkImportUsers;
Expand Down Expand Up @@ -190,7 +190,7 @@ public static List<BulkImportUser> getBulkImportUsers(Start start, AppIdentifier

if (bulkImportUserId != null && createdAt != null) {
queryBuilder
.append(" AND created_at < ? OR (created_at = ? AND id <= ?)");
.append(" AND (created_at < ? OR (created_at = ? AND id <= ?))");
parameters.add(createdAt);
parameters.add(createdAt);
parameters.add(bulkImportUserId);
Expand Down Expand Up @@ -264,6 +264,20 @@ public static void deleteBulkImportUser_Transaction(Start start, Connection con,
});
}

public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier,
@Nonnull String bulkImportUserId,
@Nonnull String primaryUserId) throws SQLException {
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET primary_user_id = ?, updated_at = ? WHERE app_id = ? and id = ?";

update(start, query, pst -> {
pst.setString(1, primaryUserId);
pst.setLong(2, System.currentTimeMillis());
pst.setString(3, appIdentifier.getAppId());
pst.setString(4, bulkImportUserId);
});
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

Expand All @@ -278,7 +292,8 @@ private static BulkImportUserRowMapper getInstance() {
public BulkImportUser map(ResultSet result) throws Exception {
return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"),
BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")),
result.getString("error_msg"), result.getLong("created_at"), result.getLong("updated_at"));
result.getString("primary_user_id"), result.getString("error_msg"), result.getLong("created_at"),
result.getLong("updated_at"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,8 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto
update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER);
// index:
update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER);
update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER);
update(start, BulkImportQueries.getQueryToCreatePaginationIndex1(start), NO_OP_SETTER);
update(start, BulkImportQueries.getQueryToCreatePaginationIndex2(start), NO_OP_SETTER);
}

} catch (Exception e) {
Expand Down
Loading