diff --git a/build.sbt b/build.sbt index 7c90d201..fc918c8c 100644 --- a/build.sbt +++ b/build.sbt @@ -51,7 +51,7 @@ lazy val migrator = (project in file("migrator")).enablePlugins(BuildInfoPlugin) // Revision number is automatically generated by the submodule when we do publishLocal. It is based on the output of 'git describe --tags' "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0-1-g468079b4", "com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the cassandra connector - "com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.6.1", + "com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.7.0", "com.scylladb.alternator" % "load-balancing" % "1.0.0", "io.circe" %% "circe-generic" % "0.14.7", "io.circe" %% "circe-parser" % "0.14.7", diff --git a/config.yaml.example b/config.yaml.example index d30a136f..9fa91e3d 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -263,6 +263,10 @@ savepoints: # create a savepoint file with this filled. # skipTokenRanges: [] +# Optional - Which scan segments to skip. You shouldn’t need to fill this in normally; the migrator will +# create a savepoint file with this filled. +# skipSegments: [] + # Configuration section for running the validator. The validator is run manually (see documentation). # Mandatory if the application is executed in validation mode. # validation: diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 486b52e4..fb8227c1 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -35,10 +35,12 @@ The configuration file requires the following top-level properties (ie, with no # Validator configuration. Required only if the app is executed in validation mode. validation: # ... - # Optional- Used internally + # Optional - Used internally skipTokenRanges: [] + # Optional - Used internally + skipSegments: [] -These top-level properties are documented in the following sections (except ``skipTokenRanges``, which is used internally). +These top-level properties are documented in the following sections (except ``skipTokenRanges`` and ``skipSegments``, which are used internally). ------ Source @@ -329,15 +331,15 @@ The optional ``renames`` property lists the item columns to rename along the mig Savepoints ---------- -When migrating data over CQL-compatible storages, the migrator is able to resume an interrupted migration. To achieve this, it stores so-called “savepoints” along the process to remember which token have already been migrated and should be skipped when the migration is restarted. This feature is not supported by DynamoDB-compatible storages. +When migrating data from Apache Cassandra or DynamoDB, the migrator is able to :doc:`resume an interrupted migration `. To achieve this, it stores so-called “savepoints” along the process to remember which data items have already been migrated and should be skipped when the migration is restarted. .. code-block:: yaml savepoints: - # Whe should savepoint configurations be stored? This is a path on the host running + # Where should savepoint configurations be stored? This is a path on the host running # the Spark driver - usually the Spark master. path: /app/savepoints - # Interval in which savepoints will be created + # Interval at which savepoints will be created intervalSeconds: 300 ---------- diff --git a/docs/source/getting-started/index.rst b/docs/source/getting-started/index.rst index de1e2d7f..1def8476 100644 --- a/docs/source/getting-started/index.rst +++ b/docs/source/getting-started/index.rst @@ -46,6 +46,7 @@ You might also be interested in the following extra features: * :doc:`rename columns along the migration `, * :doc:`replicate changes applied to the source table after the initial snapshot transfer has completed `, +* :doc:`resume an interrupted migration where it left off `, * :doc:`validate that the migration was complete and correct `. .. toctree:: diff --git a/docs/source/index.rst b/docs/source/index.rst index bfea7952..3679c112 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -33,6 +33,7 @@ Migrator Spark Scala run-the-migration stream-changes rename-columns + resume-interrupted-migration validate configuration tutorials/index diff --git a/docs/source/resume-interrupted-migration.rst b/docs/source/resume-interrupted-migration.rst new file mode 100644 index 00000000..60de10b4 --- /dev/null +++ b/docs/source/resume-interrupted-migration.rst @@ -0,0 +1,13 @@ +================================================= +Resume an Interrupted Migration Where it Left Off +================================================= + +.. note:: This feature is currently supported only when migrating from Apache Cassandra or DynamoDB. + +If, for some reason, the migration is interrupted (e.g., because of a networking issue, or if you need to manually stop it for some reason), the migrator is able to resume it from a “savepoints”. + +Savepoints are configuration files that contain information about the already migrated items, which can be skipped when the migration is resumed. The savepoint files are automatically generated during the migration. To use a savepoint, start a migration using it as configuration file. + +You can control the savepoints location and the interval at which they are generated in the configuration file under the top-level property ``savepoints``. See `the corresponding section of the configuration reference `_. + +During the migration, the savepoints are generated with file names like ``savepoint_xxx.yaml``, where ``xxx`` is a timestamp looking like ``1234567890``. To resume a migration, start a new migration with the latest savepoint as configuration file. diff --git a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala index 3a84173a..bb01fbd5 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala @@ -42,10 +42,7 @@ object Migrator { val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource) ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF) case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) => - AlternatorMigrator.migrateFromDynamoDB( - dynamoSource, - alternatorTarget, - migratorConfig.renamesMap) + AlternatorMigrator.migrateFromDynamoDB(dynamoSource, alternatorTarget, migratorConfig) case ( s3Source: SourceSettings.DynamoDBS3Export, alternatorTarget: TargetSettings.DynamoDB) => diff --git a/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala b/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala new file mode 100644 index 00000000..0a324c1b --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala @@ -0,0 +1,116 @@ +package com.scylladb.migrator + +import com.scylladb.migrator.config.MigratorConfig +import org.apache.log4j.LogManager +import sun.misc.{ Signal, SignalHandler } + +import java.nio.charset.StandardCharsets +import java.nio.file.{ Files, Paths } +import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit } + +/** + * A component that manages savepoints. Savepoints provide a way to resume an interrupted migration. + * + * This component periodically stores savepoints according to the schedule defined in the configuration. + * It also automatically stores a savepoint in case of early termination (e.g. due to a SIGTERM signal). + * + * Internally, it works by writing modified copies of the original migration configuration. These copies + * specify which parts of the source dataset have already been migrated and can safely be skipped when + * restarting the migration. + * + * Make sure to call the method `close` when you don’t need the savepoints manager anymore so that it + * releases the resources it was using. + * + * This class is abstract. Subclasses are responsible for implementing how to track the migration progress, + * and for communicating the updated state of the migration via the method `updateConfigWithMigrationState`. + */ +abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoCloseable { + + val log = LogManager.getLogger(this.getClass.getName) + private val scheduler = new ScheduledThreadPoolExecutor(1) + + createSavepointsDirectory() + addUSR2Handler() + startSavepointSchedule() + + private def createSavepointsDirectory(): Unit = { + val savepointsDirectory = Paths.get(migratorConfig.savepoints.path) + if (!Files.exists(savepointsDirectory)) { + log.debug( + s"Directory ${savepointsDirectory.normalize().toString} does not exist. Creating it...") + Files.createDirectories(savepointsDirectory) + } + } + + private def savepointFilename(path: String): String = + s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml" + + private def addUSR2Handler(): Unit = { + log.info( + "Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.") + + val handler = new SignalHandler { + override def handle(signal: Signal): Unit = + dumpMigrationState(signal.toString) + } + + Signal.handle(new Signal("USR2"), handler) + Signal.handle(new Signal("TERM"), handler) + Signal.handle(new Signal("INT"), handler) + } + + private def startSavepointSchedule(): Unit = { + val runnable = new Runnable { + override def run(): Unit = + try dumpMigrationState("schedule") + catch { + case e: Throwable => + log.error("Could not create the savepoint. This will be retried.", e) + } + } + + log.info( + s"Starting savepoint schedule; will write a savepoint every ${migratorConfig.savepoints.intervalSeconds} seconds") + + scheduler.scheduleAtFixedRate( + runnable, + migratorConfig.savepoints.intervalSeconds, + migratorConfig.savepoints.intervalSeconds, + TimeUnit.SECONDS) + } + + /** + * Dump the current state of the migration into a configuration file that can be + * used to resume the migration. + * @param reason Human-readable, informal, event that caused the dump. + */ + final def dumpMigrationState(reason: String): Unit = { + val filename = + Paths.get(savepointFilename(migratorConfig.savepoints.path)).normalize + + val modifiedConfig = updateConfigWithMigrationState() + + Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8)) + + log.info( + s"Created a savepoint config at ${filename} due to ${reason}. ${describeMigrationState()}") + } + + /** + * Stop the periodic creation of savepoints and release the associated resources. + */ + final def close(): Unit = + scheduler.shutdown() + + /** + * Provide readable logs by describing which parts of the migration have been completed already. + */ + def describeMigrationState(): String + + /** + * A copy of the original migration configuration, updated to describe which parts of the migration + * have been completed already. + */ + def updateConfigWithMigrationState(): MigratorConfig + +} diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala index 23dafa00..2526966b 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala @@ -1,7 +1,7 @@ package com.scylladb.migrator.alternator import com.scylladb.migrator.{ readers, writers, DynamoUtils } -import com.scylladb.migrator.config.{ SourceSettings, TargetSettings } +import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings } import com.scylladb.migrator.writers.DynamoStreamReplication import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.io.Text @@ -13,16 +13,22 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription import scala.util.control.NonFatal import scala.jdk.CollectionConverters._ +import scala.util.Using object AlternatorMigrator { private val log = LogManager.getLogger("com.scylladb.migrator.alternator") def migrateFromDynamoDB(source: SourceSettings.DynamoDB, target: TargetSettings.DynamoDB, - renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = { - val (sourceRDD, sourceTableDesc) = readers.DynamoDB.readRDD(spark, source) - val maybeStreamedSource = if (target.streamChanges) Some(source) else None - migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, renamesMap) + migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = { + val (sourceRDD, sourceTableDesc) = + readers.DynamoDB.readRDD(spark, source, migratorConfig.skipSegments) + val savepointsManager = + DynamoDbSavepointsManager.setup(migratorConfig, sourceRDD, spark.sparkContext) + Using.resource(savepointsManager) { _ => + val maybeStreamedSource = if (target.streamChanges) Some(source) else None + migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig.renamesMap) + } } def migrateFromS3Export(source: SourceSettings.DynamoDBS3Export, @@ -96,7 +102,7 @@ object AlternatorMigrator { } } catch { case NonFatal(e) => - log.error("Caught error while writing the RDD.", e) + throw new Exception("Caught error while writing the RDD.", e) } } diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorValidator.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorValidator.scala index 5b849588..fddf3d4f 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorValidator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorValidator.scala @@ -22,7 +22,7 @@ object AlternatorValidator { targetSettings: TargetSettings.DynamoDB, config: MigratorConfig)(implicit spark: SparkSession): List[RowComparisonFailure] = { - val (source, sourceTableDesc) = readers.DynamoDB.readRDD(spark, sourceSettings) + val (source, sourceTableDesc) = readers.DynamoDB.readRDD(spark, sourceSettings, None) val sourceTableKeys = sourceTableDesc.keySchema.asScala.toList val sourceByKey: RDD[(List[DdbValue], collection.Map[String, DdbValue])] = @@ -44,7 +44,8 @@ object AlternatorValidator { sourceSettings.scanSegments, // Reuse same settings as source table sourceSettings.maxMapTasks, sourceSettings.readThroughput, - sourceSettings.throughputReadPercent + sourceSettings.throughputReadPercent, + skipSegments = None ) // Define some aliases to prevent the Spark engine to try to serialize the whole object graph diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala new file mode 100644 index 00000000..78bd89a9 --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala @@ -0,0 +1,98 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.SavepointsManager +import com.scylladb.migrator.config.MigratorConfig +import org.apache.hadoop.dynamodb.DynamoDBItemWritable +import org.apache.hadoop.dynamodb.split.DynamoDBSplit +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.InputSplit +import org.apache.log4j.LogManager +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{ SparkListener, SparkListenerTaskEnd } +import org.apache.spark.{ Partition, SerializableWritable, SparkContext, Success => TaskEndSuccess } + +import scala.util.{ Failure, Success, Try } + +/** + * Manage DynamoDB-based migrations by tracking the migrated scan segments. + */ +class DynamoDbSavepointsManager(migratorConfig: MigratorConfig, + segmentsAccumulator: IntSetAccumulator) + extends SavepointsManager(migratorConfig) { + + def describeMigrationState(): String = + s"Segments to skip: ${segmentsAccumulator.value}" + + def updateConfigWithMigrationState(): MigratorConfig = + migratorConfig.copy(skipSegments = Some(segmentsAccumulator.value)) + +} + +object DynamoDbSavepointsManager { + + private val log = LogManager.getLogger(classOf[DynamoDbSavepointsManager]) + + def apply(migratorConfig: MigratorConfig, + segmentsAccumulator: IntSetAccumulator): DynamoDbSavepointsManager = + new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator) + + /** + * Set up a savepoints manager that tracks the scan segments migrated from the source RDD. + */ + def setup(migratorConfig: MigratorConfig, + sourceRDD: RDD[(Text, DynamoDBItemWritable)], + spark: SparkContext): DynamoDbSavepointsManager = { + val segmentsAccumulator = + IntSetAccumulator(migratorConfig.skipSegments.getOrElse(Set.empty)) + spark.addSparkListener(new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + val partitionId = taskEnd.taskInfo.partitionId + log.debug(s"Migration of partition ${partitionId} ended: ${taskEnd.reason}.") + if (taskEnd.reason == TaskEndSuccess) { + scanSegments(sourceRDD, partitionId) match { + case Success(segments) => + segments.forEach(segment => segmentsAccumulator.add(segment)) + log.info(s"Marked segments ${segments} as migrated.") + case Failure(error) => + log.error( + s"Unable to collect the segments scanned in partition ${partitionId}. The next savepoint will not include them.", + error) + } + } + } + }) + DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator) + } + + /** + * @return The scan segments processed in partition `partitionId` of `rdd`. + */ + private def scanSegments(rdd: RDD[(Text, DynamoDBItemWritable)], + partitionId: Int): Try[java.util.List[Integer]] = + if (partitionId >= 0 && partitionId < rdd.getNumPartitions) { + val partition = rdd.partitions(partitionId) + inputSplit(partition).map(_.getSegments) + } else { + Failure(new Exception(s"Partition ${partitionId} not found in the RDD.")) + } + + /** + * @return The `DynamoDBSplit` wrapped by the `partition`. + * Fails if the `partition` is not a `HadoopPartition` containing a `DynamoDBSplit`. + */ + private def inputSplit(partition: Partition): Try[DynamoDBSplit] = Try { + // Unfortunately, class `HadoopPartition` is private, so we can’t simply + // pattern match on it. We use reflection to access its `inputSplit` member. + if (partition.getClass.getName != "org.apache.spark.rdd.HadoopPartition") { + throw new Exception(s"Unexpected partition type: ${partition.getClass.getName}.") + } + val inputSplitMember = partition.getClass.getMethod("inputSplit") + val inputSplitResult = + inputSplitMember.invoke(partition).asInstanceOf[SerializableWritable[InputSplit]] + inputSplitResult.value match { + case dynamoDbSplit: DynamoDBSplit => dynamoDbSplit + case other => throw new Exception(s"Unexpected InputSplit type: ${other.getClass.getName}.") + } + } + +} diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/IntSetAccumulator.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/IntSetAccumulator.scala new file mode 100644 index 00000000..06153906 --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/IntSetAccumulator.scala @@ -0,0 +1,37 @@ +package com.scylladb.migrator.alternator + +import org.apache.spark.util.AccumulatorV2 + +import java.util.concurrent.atomic.AtomicReference + +/** + * A Spark Accumulator that accumulates `Int` values into a `Set[Int]`. + * + * We use it to track the indexes of the DynamoDB scan segments that have been + * migrated to the target database. + */ +class IntSetAccumulator(init: Set[Int]) extends AccumulatorV2[Int, Set[Int]] { + + private val ref = new AtomicReference(init) + + def isZero: Boolean = ref.get.isEmpty + + def copy(): AccumulatorV2[Int, Set[Int]] = + new IntSetAccumulator(ref.get) + + def reset(): Unit = + ref.set(Set.empty) + + def add(v: Int): Unit = + ref.getAndUpdate(_ + v) + + def merge(other: AccumulatorV2[Int, Set[Int]]): Unit = + ref.getAndUpdate(_ ++ other.value) + + def value: Set[Int] = ref.get + +} + +object IntSetAccumulator { + def apply(init: Set[Int] = Set.empty): IntSetAccumulator = new IntSetAccumulator(init) +} diff --git a/migrator/src/main/scala/com/scylladb/migrator/config/MigratorConfig.scala b/migrator/src/main/scala/com/scylladb/migrator/config/MigratorConfig.scala index 4932adca..7768abd5 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/config/MigratorConfig.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/config/MigratorConfig.scala @@ -13,6 +13,7 @@ case class MigratorConfig(source: SourceSettings, renames: Option[List[Rename]], savepoints: Savepoints, skipTokenRanges: Option[Set[(Token[_], Token[_])]], + skipSegments: Option[Set[Int]], validation: Option[Validation]) { def render: String = this.asJson.asYaml.spaces2 diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala index b6b6d6ce..4cdb300c 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala @@ -18,7 +18,8 @@ object DynamoDB { def readRDD( spark: SparkSession, - source: SourceSettings.DynamoDB): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = + source: SourceSettings.DynamoDB, + skipSegments: Option[Set[Int]]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = readRDD( spark, source.endpoint, @@ -28,7 +29,8 @@ object DynamoDB { source.scanSegments, source.maxMapTasks, source.readThroughput, - source.throughputReadPercent + source.throughputReadPercent, + skipSegments ) /** @@ -43,7 +45,8 @@ object DynamoDB { scanSegments: Option[Int], maxMapTasks: Option[Int], readThroughput: Option[Int], - throughputReadPercent: Option[Float]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = { + throughputReadPercent: Option[Float], + skipSegments: Option[Set[Int]]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = { val tableDescription = DynamoUtils .buildDynamoClient(endpoint, credentials.map(_.toProvider), region) @@ -61,7 +64,8 @@ object DynamoDB { maxMapTasks, readThroughput, throughputReadPercent, - tableDescription) + tableDescription, + skipSegments) val rdd = spark.sparkContext.hadoopRDD( @@ -82,7 +86,8 @@ object DynamoDB { maxMapTasks: Option[Int], readThroughput: Option[Int], throughputReadPercent: Option[Float], - description: TableDescription + description: TableDescription, + skipSegments: Option[Set[Int]] ): JobConf = { val maybeItemCount = Option(description.itemCount).map(_.toLong) val maybeAvgItemSize = @@ -125,6 +130,11 @@ object DynamoDB { jobConf, DynamoDBConstants.THROUGHPUT_READ_PERCENT, throughputReadPercent.map(_.toString)) + setOptionalConf( + jobConf, + DynamoDBConstants.EXCLUDED_SCAN_SEGMENTS, + skipSegments.map(_.mkString(",")) + ) jobConf } diff --git a/migrator/src/main/scala/com/scylladb/migrator/scylla/CqlSavepointsManager.scala b/migrator/src/main/scala/com/scylladb/migrator/scylla/CqlSavepointsManager.scala new file mode 100644 index 00000000..e8d54e42 --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/scylla/CqlSavepointsManager.scala @@ -0,0 +1,30 @@ +package com.scylladb.migrator.scylla + +import com.datastax.spark.connector.rdd.partitioner.dht.Token +import com.datastax.spark.connector.writer.TokenRangeAccumulator +import com.scylladb.migrator.SavepointsManager +import com.scylladb.migrator.config.MigratorConfig + +/** + * Manage CQL-based migrations by tracking the migrated token ranges. + */ +class CqlSavepointsManager(migratorConfig: MigratorConfig, val accumulator: TokenRangeAccumulator) + extends SavepointsManager(migratorConfig) { + def describeMigrationState(): String = + s"Ranges added: ${tokenRanges(accumulator)}" + + def updateConfigWithMigrationState(): MigratorConfig = + migratorConfig.copy( + skipTokenRanges = + Some(migratorConfig.getSkipTokenRangesOrEmptySet ++ tokenRanges(accumulator))) + + private def tokenRanges(accumulator: TokenRangeAccumulator): Set[(Token[_], Token[_])] = + accumulator.value.get.map(range => + (range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]])) +} + +object CqlSavepointsManager { + def apply(migratorConfig: MigratorConfig, + accumulator: TokenRangeAccumulator): CqlSavepointsManager = + new CqlSavepointsManager(migratorConfig, accumulator) +} diff --git a/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala b/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala index 2e1bb3a3..849cbb1e 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala @@ -8,11 +8,7 @@ import com.scylladb.migrator.readers.TimestampColumns import com.scylladb.migrator.writers import org.apache.log4j.LogManager import org.apache.spark.sql.{ DataFrame, SparkSession } -import sun.misc.{ Signal, SignalHandler } -import java.nio.charset.StandardCharsets -import java.nio.file.{ Files, Paths } -import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit } import scala.util.control.NonFatal case class SourceDataFrame(dataFrame: DataFrame, @@ -26,28 +22,16 @@ object ScyllaMigrator { target: TargetSettings.Scylla, sourceDF: SourceDataFrame)(implicit spark: SparkSession): Unit = { - val scheduler = new ScheduledThreadPoolExecutor(1) - log.info("Created source dataframe; resulting schema:") sourceDF.dataFrame.printSchema() - val tokenRangeAccumulator = + val maybeSavepointsManager = if (!sourceDF.savepointsSupported) None else { - val savepointsDirectory = Paths.get(migratorConfig.savepoints.path) - if (!Files.exists(savepointsDirectory)) { - log.debug( - s"Directory ${savepointsDirectory.normalize().toString} does not exist. Creating it...") - Files.createDirectories(savepointsDirectory) - } - val tokenRangeAccumulator = TokenRangeAccumulator.empty spark.sparkContext.register(tokenRangeAccumulator, "Token ranges copied") - addUSR2Handler(migratorConfig, tokenRangeAccumulator) - startSavepointSchedule(scheduler, migratorConfig, tokenRangeAccumulator) - - Some(tokenRangeAccumulator) + Some(CqlSavepointsManager(migratorConfig, tokenRangeAccumulator)) } log.info( @@ -73,7 +57,7 @@ object ScyllaMigrator { log.info("All token ranges extracted from partitions size:" + allTokenRanges.size) - if (migratorConfig.skipTokenRanges != None) { + if (migratorConfig.skipTokenRanges.isDefined) { log.info( "Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size) @@ -92,69 +76,18 @@ object ScyllaMigrator { migratorConfig.getRenamesOrNil, sourceDF.dataFrame, sourceDF.timestampColumns, - tokenRangeAccumulator) + maybeSavepointsManager.map(_.accumulator)) } catch { case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state log.error( "Caught error while writing the DataFrame. Will create a savepoint before exiting", e) } finally { - tokenRangeAccumulator.foreach(dumpAccumulatorState(migratorConfig, _, "final")) - scheduler.shutdown() - } - } - - def startSavepointSchedule(svc: ScheduledThreadPoolExecutor, - config: MigratorConfig, - acc: TokenRangeAccumulator): Unit = { - val runnable = new Runnable { - override def run(): Unit = - try dumpAccumulatorState(config, acc, "schedule") - catch { - case e: Throwable => - log.error("Could not create the savepoint. This will be retried.", e) - } - } - - log.info( - s"Starting savepoint schedule; will write a savepoint every ${config.savepoints.intervalSeconds} seconds") - - svc.scheduleAtFixedRate(runnable, 0, config.savepoints.intervalSeconds, TimeUnit.SECONDS) - } - - def addUSR2Handler(config: MigratorConfig, acc: TokenRangeAccumulator) = { - log.info( - "Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.") - - val handler = new SignalHandler { - override def handle(signal: Signal): Unit = - dumpAccumulatorState(config, acc, signal.toString) + for (savePointsManger <- maybeSavepointsManager) { + savePointsManger.dumpMigrationState("final") + savePointsManger.close() + } } - - Signal.handle(new Signal("USR2"), handler) - Signal.handle(new Signal("TERM"), handler) - Signal.handle(new Signal("INT"), handler) - } - - def savepointFilename(path: String): String = - s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml" - - def dumpAccumulatorState(config: MigratorConfig, - accumulator: TokenRangeAccumulator, - reason: String): Unit = { - val filename = - Paths.get(savepointFilename(config.savepoints.path)).normalize - val rangesToSkip = accumulator.value.get.map(range => - (range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]])) - - val modifiedConfig = config.copy( - skipTokenRanges = Some(config.getSkipTokenRangesOrEmptySet ++ rangesToSkip) - ) - - Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8)) - - log.info( - s"Created a savepoint config at ${filename} due to ${reason}. Ranges added: ${rangesToSkip}") } } diff --git a/tests/src/test/configurations/dynamodb-to-alternator-part1.yaml b/tests/src/test/configurations/dynamodb-to-alternator-part1.yaml new file mode 100644 index 00000000..555025d6 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-part1.yaml @@ -0,0 +1,37 @@ +source: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://dynamodb + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + scanSegments: 3 + +target: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: false + +savepoints: + path: /app/savepoints + intervalSeconds: 300 + +skipSegments: [1, 2] + +validation: + compareTimestamps: true + ttlToleranceMillis: 60000 + writetimeToleranceMillis: 1000 + failuresToFetch: 100 + floatingPointTolerance: 0.001 + timestampMsTolerance: 0 diff --git a/tests/src/test/configurations/dynamodb-to-alternator-part2.yaml b/tests/src/test/configurations/dynamodb-to-alternator-part2.yaml new file mode 100644 index 00000000..9a3a5323 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-part2.yaml @@ -0,0 +1,37 @@ +source: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://dynamodb + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + scanSegments: 3 + +target: + type: dynamodb + table: SkippedSegments + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: false + +savepoints: + path: /app/savepoints + intervalSeconds: 300 + +skipSegments: [0] + +validation: + compareTimestamps: true + ttlToleranceMillis: 60000 + writetimeToleranceMillis: 1000 + failuresToFetch: 100 + floatingPointTolerance: 0.001 + timestampMsTolerance: 0 diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala index 23f05ace..38f4a4aa 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala @@ -74,7 +74,8 @@ class DynamoDBInputFormatTest extends munit.FunSuite { maxMapTasks = configuredMaxMapTasks, readThroughput = configuredReadThroughput, throughputReadPercent = configuredThroughputReadPercent, - description = tableDescriptionBuilder.build() + description = tableDescriptionBuilder.build(), + skipSegments = None ) val splits = new DynamoDBInputFormat().getSplits(jobConf, 1) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala new file mode 100644 index 00000000..d2c0b505 --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala @@ -0,0 +1,68 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.SparkUtils.{submitSparkJob, successfullyPerformMigration} +import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest, ResourceNotFoundException, ScanRequest} + +import java.util.UUID +import scala.jdk.CollectionConverters._ +import scala.util.chaining._ + +class SkippedSegmentsTest extends MigratorSuite { + + withTable("SkippedSegments").test("Run partial migrations") { tableName => + // We rely on the fact that both config files have `scanSegments: 3` and + // complementary `skipSegments` properties + val configPart1 = "dynamodb-to-alternator-part1.yaml" + val configPart2 = "dynamodb-to-alternator-part2.yaml" + + createRandomData(tableName) + + // Initially, the target table does not exist + try { + targetAlternator.describeTable(describeTableRequest(tableName)) + fail(s"The table ${tableName} should not exist yet") + } catch { + case _: ResourceNotFoundException => + () // OK + } + + // Perform the first part of the migration + successfullyPerformMigration(configPart1) + + // Verify that some items have been copied to the target database … + val itemCount = targetAlternatorItemCount(tableName) + assert(itemCount > 90L && itemCount < 110L) + // … but not all of them, hence the validator fails + submitSparkJob(configPart2, "com.scylladb.migrator.Validator").exitValue().tap { statusCode => + assertEquals(statusCode, 1) + } + + // Perform the other (complementary) part of the migration + successfullyPerformMigration(configPart2) + + // Validate that all the data from the source have been migrated to the target database + submitSparkJob(configPart2, "com.scylladb.migrator.Validator").exitValue().tap { statusCode => + assertEquals(statusCode, 0, "Validation failed") + } + } + + def createRandomData(tableName: String): Unit = { + for (_ <- 1 to 300) { + val itemData = Map( + "id" -> AttributeValue.fromS(UUID.randomUUID().toString), + "foo" -> AttributeValue.fromS(UUID.randomUUID().toString), + "bar" -> AttributeValue.fromS(UUID.randomUUID().toString), + "baz" -> AttributeValue.fromS(UUID.randomUUID().toString) + ) + sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) + } + } + + def targetAlternatorItemCount(tableName: String): Long = + targetAlternator + .scanPaginator(ScanRequest.builder().tableName(tableName).build()) + .items() + .stream() + .count() + +}