diff --git a/tests/src/test/configurations/dynamodb-to-alternator-part1.yaml b/tests/src/test/configurations/dynamodb-to-alternator-part1.yaml new file mode 100644 index 00000000..555025d6 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-part1.yaml @@ -0,0 +1,37 @@ +source: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://dynamodb + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + scanSegments: 3 + +target: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: false + +savepoints: + path: /app/savepoints + intervalSeconds: 300 + +skipSegments: [1, 2] + +validation: + compareTimestamps: true + ttlToleranceMillis: 60000 + writetimeToleranceMillis: 1000 + failuresToFetch: 100 + floatingPointTolerance: 0.001 + timestampMsTolerance: 0 diff --git a/tests/src/test/configurations/dynamodb-to-alternator-part2.yaml b/tests/src/test/configurations/dynamodb-to-alternator-part2.yaml new file mode 100644 index 00000000..9a3a5323 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-part2.yaml @@ -0,0 +1,37 @@ +source: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://dynamodb + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + scanSegments: 3 + +target: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: false + +savepoints: + path: /app/savepoints + intervalSeconds: 300 + +skipSegments: [0] + +validation: + compareTimestamps: true + ttlToleranceMillis: 60000 + writetimeToleranceMillis: 1000 + failuresToFetch: 100 + floatingPointTolerance: 0.001 + timestampMsTolerance: 0 diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala index 277c144e..9e60bb55 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala @@ -69,7 +69,8 @@ class DynamoDBInputFormatTest extends munit.FunSuite { maxMapTasks = configuredMaxMapTasks, readThroughput = configuredReadThroughput, throughputReadPercent = configuredThroughputReadPercent, - description = tableDescriptionBuilder.build() + description = tableDescriptionBuilder.build(), + skipSegments = None ) val splits = new DynamoDBInputFormat().getSplits(jobConf, 1) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala new file mode 100644 index 00000000..d2c0b505 --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala @@ -0,0 +1,68 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.SparkUtils.{submitSparkJob, successfullyPerformMigration} +import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest, ResourceNotFoundException, ScanRequest} + +import java.util.UUID +import scala.jdk.CollectionConverters._ +import scala.util.chaining._ + +class SkippedSegmentsTest extends MigratorSuite { + + withTable("SkippedSegments").test("Run partial migrations") { tableName => + // We rely on the fact that both config files have `scanSegments: 3` and + // complementary `skipSegments` properties + val configPart1 = "dynamodb-to-alternator-part1.yaml" + val configPart2 = "dynamodb-to-alternator-part2.yaml" + + createRandomData(tableName) + + // Initially, the target table does not exist + try { + targetAlternator.describeTable(describeTableRequest(tableName)) + fail(s"The table ${tableName} should not exist yet") + } catch { + case _: ResourceNotFoundException => + () // OK + } + + // Perform the first part of the migration + successfullyPerformMigration(configPart1) + + // Verify that some items have been copied to the target database … + val itemCount = targetAlternatorItemCount(tableName) + assert(itemCount > 90L && itemCount < 110L) + // … but not all of them, hence the validator fails + submitSparkJob(configPart2, "com.scylladb.migrator.Validator").exitValue().tap { statusCode => + assertEquals(statusCode, 1) + } + + // Perform the other (complementary) part of the migration + successfullyPerformMigration(configPart2) + + // Validate that all the data from the source have been migrated to the target database + submitSparkJob(configPart2, "com.scylladb.migrator.Validator").exitValue().tap { statusCode => + assertEquals(statusCode, 0, "Validation failed") + } + } + + def createRandomData(tableName: String): Unit = { + for (_ <- 1 to 300) { + val itemData = Map( + "id" -> AttributeValue.fromS(UUID.randomUUID().toString), + "foo" -> AttributeValue.fromS(UUID.randomUUID().toString), + "bar" -> AttributeValue.fromS(UUID.randomUUID().toString), + "baz" -> AttributeValue.fromS(UUID.randomUUID().toString) + ) + sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) + } + } + + def targetAlternatorItemCount(tableName: String): Long = + targetAlternator + .scanPaginator(ScanRequest.builder().tableName(tableName).build()) + .items() + .stream() + .count() + +}