Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongodb transaction giving error when documents have more then 1k to migrate #450

Open
anishbishnoi127 opened this issue Sep 15, 2024 · 0 comments

Comments

@anishbishnoi127
Copy link

Describe the bug
when i have more then 1k documents that time transaction crashing.
To Reproduce
Steps to reproduce the behavior: makes documents 10k and then try to migrate using transaction.

Expected behavior
A clear and concise description of what you expected to happen. it's should migrate

Additional context
giving this error
ERROR: Could not migrate up 20240915073415-trail-schema.js: Transaction with { txnNumber: 2 } has been aborted. MongoBulkWriteError: Transaction with { txnNumber: 2 } has been aborted.
at resultHandler (C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules.pnpm\[email protected]\node_modules\mongodb\lib\bulk\common.js:294:29)
at C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules.pnpm\[email protected]\node_modules\mongodb\lib\bulk\common.js:344:159
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)

my code is

async up(db, client) {
		const startTime = performance.now(); // Start time of the migration
		let retries = 0;
		while (retries < MAX_RETRIES) {
			const session = client.startSession({
				causalConsistency: true,
				defaultTransactionOptions: {
					readConcern: { level: 'majority' },
					writeConcern: { w: 'majority', wtimeout: 2147483646 },
					readPreference: 'primary',
					maxCommitTimeMS: 2147483646
				},
				snapshot: false,
				transactionLifetimeLimitSeconds: 2147483646
			});
			try {
				await session.withTransaction(async () => {
					const BATCH_SIZE = batch_size;
					let skip = 0;
					let batch;

					const missingFieldsFilter = {
						$or: Object.keys(fields_which_need_to_migrate).map((field) => ({
							[field]: { $exists: false }
						}))
					};

					// Check if the MigrationHistory and MigrationChanges collections exist
					const collections = await db.listCollections().toArray();
					const existingCollections = collections.map((col) => col.name);

					// Create collections only if they don't exist
					if (!existingCollections.includes(migration_history_collection_name)) {
						await db.createCollection(migration_history_collection_name, { session });
						await db
							.collection(migration_history_collection_name)
							.createIndex({ version: 1 }, { unique: true, session });
					}

					if (!existingCollections.includes(migration_changes_collection_name)) {
						await db.createCollection(migration_changes_collection_name, { session });
						await db
							.collection(migration_changes_collection_name)
							.createIndex({ migrationVersion: 1 }, { session });
					}

					// Ensure migration version is unique
					const existingMigration = await db
						.collection(migration_history_collection_name)
						.findOne({ version: migrationVersion }, { session });
					if (existingMigration) {
						throw new Error(`Migration version ${migrationVersion} already exists.`);
					}

					const migrationDoc = {
						version: migrationVersion,
						appliedAt: new Date(),
						status: 'in_progress'
					};

					const migrationId = (
						await db.collection(migration_history_collection_name).insertOne(migrationDoc, { session })
					).insertedId;

					do {
						batch = await db
							.collection(collection_name_which_need_to_migrate)
							.find(missingFieldsFilter)
							.skip(skip)
							.limit(BATCH_SIZE)
							.toArray();

						if (batch.length > 0) {
							const bulkOperations = batch.map((doc) => {
								const updateFields = {};
								const originalValues = {};
								const updatedFields = {};

								for (const [field, defaultValue] of Object.entries(fields_which_need_to_migrate)) {
									if (!(field in doc)) {
										updateFields[field] = defaultValue;
										updatedFields[field] = defaultValue;
									} else {
										originalValues[field] = doc[field];
									}
								}

								if (Object.keys(updateFields).length > 0) {
									// Save each change as a separate document in MigrationChanges
									const changeDoc = {
										migrationVersion: migrationVersion,
										migrationId: migrationId,
										collection: collection_name_which_need_to_migrate,
										documentId: doc._id,
										updatedAt: new Date(),
										fieldsUpdated: updatedFields,
										originalValues
									};

									return [
										{
											insertOne: {
												document: changeDoc
											}
										},
										{
											updateOne: {
												filter: { _id: doc._id },
												update: { $set: updateFields }
											}
										}
									];
								}
								return [];
							});

							const operations = bulkOperations.flat();

							if (operations.length > 0) {
								await Promise.all([
									db.collection(migration_changes_collection_name).bulkWrite(
										operations.filter((op) => op.insertOne),
										{ session }
									),
									db.collection(collection_name_which_need_to_migrate).bulkWrite(
										operations.filter((op) => op.updateOne),
										{ session }
									)
								]);
							}

							skip += BATCH_SIZE;
						}
					} while (batch.length === BATCH_SIZE);

					console.log("migration done !")
					// Update the main migration document status to success
					await db
						.collection(migration_history_collection_name)
						.updateOne(
							{ _id: migrationId },
							{ $set: { status: 'success', completedAt: new Date() } },
							{ session }
						);

					console.log(`Migration ${migrationVersion} applied successfully.`);
				});
				break; // Exit the loop if successful
			} catch (error) {
				console.error(`Migration ${migrationVersion} failed:`, error);
				retries += 1;
				if (retries >= MAX_RETRIES) {
					await db
						.collection(migration_history_collection_name)
						.updateOne(
							{ version: migrationVersion },
							{ $set: { status: 'failure', reason: error.message, completedAt: new Date() } }
						);
					throw error;
				}
			} finally {
				const endTime = performance.now(); // End time of the migration
				const duration = (endTime - startTime) / 1000; // Duration in seconds
				await session.endSession();
			}
		}
	},
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant