Skip to content

Commit

Permalink
Merge pull request #190 from julienrf/dynamodb-resumability
Browse files Browse the repository at this point in the history
Support resumability of DynamoDB migrations
  • Loading branch information
tarzanek authored Aug 29, 2024
2 parents a05419b + 7d006a6 commit 311931a
Show file tree
Hide file tree
Showing 20 changed files with 492 additions and 99 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lazy val migrator = (project in file("migrator")).enablePlugins(BuildInfoPlugin)
// Revision number is automatically generated by the submodule when we do publishLocal. It is based on the output of 'git describe --tags'
"com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0-1-g468079b4",
"com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the cassandra connector
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.6.1",
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.7.0",
"com.scylladb.alternator" % "load-balancing" % "1.0.0",
"io.circe" %% "circe-generic" % "0.14.7",
"io.circe" %% "circe-parser" % "0.14.7",
Expand Down
4 changes: 4 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ savepoints:
# create a savepoint file with this filled.
# skipTokenRanges: []

# Optional - Which scan segments to skip. You shouldn’t need to fill this in normally; the migrator will
# create a savepoint file with this filled.
# skipSegments: []

# Configuration section for running the validator. The validator is run manually (see documentation).
# Mandatory if the application is executed in validation mode.
# validation:
Expand Down
12 changes: 7 additions & 5 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ The configuration file requires the following top-level properties (ie, with no
# Validator configuration. Required only if the app is executed in validation mode.
validation:
# ...
# Optional- Used internally
# Optional - Used internally
skipTokenRanges: []
# Optional - Used internally
skipSegments: []
These top-level properties are documented in the following sections (except ``skipTokenRanges``, which is used internally).
These top-level properties are documented in the following sections (except ``skipTokenRanges`` and ``skipSegments``, which are used internally).

------
Source
Expand Down Expand Up @@ -329,15 +331,15 @@ The optional ``renames`` property lists the item columns to rename along the mig
Savepoints
----------

When migrating data over CQL-compatible storages, the migrator is able to resume an interrupted migration. To achieve this, it stores so-called “savepoints” along the process to remember which token have already been migrated and should be skipped when the migration is restarted. This feature is not supported by DynamoDB-compatible storages.
When migrating data from Apache Cassandra or DynamoDB, the migrator is able to :doc:`resume an interrupted migration </resume-interrupted-migration>`. To achieve this, it stores so-called “savepoints” along the process to remember which data items have already been migrated and should be skipped when the migration is restarted.

.. code-block:: yaml
savepoints:
# Whe should savepoint configurations be stored? This is a path on the host running
# Where should savepoint configurations be stored? This is a path on the host running
# the Spark driver - usually the Spark master.
path: /app/savepoints
# Interval in which savepoints will be created
# Interval at which savepoints will be created
intervalSeconds: 300
----------
Expand Down
1 change: 1 addition & 0 deletions docs/source/getting-started/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ You might also be interested in the following extra features:

* :doc:`rename columns along the migration </rename-columns>`,
* :doc:`replicate changes applied to the source table after the initial snapshot transfer has completed </stream-changes>`,
* :doc:`resume an interrupted migration where it left off </resume-interrupted-migration>`,
* :doc:`validate that the migration was complete and correct </validate>`.

.. toctree::
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Migrator Spark Scala
run-the-migration
stream-changes
rename-columns
resume-interrupted-migration
validate
configuration
tutorials/index
13 changes: 13 additions & 0 deletions docs/source/resume-interrupted-migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
=================================================
Resume an Interrupted Migration Where it Left Off
=================================================

.. note:: This feature is currently supported only when migrating from Apache Cassandra or DynamoDB.

If, for some reason, the migration is interrupted (e.g., because of a networking issue, or if you need to manually stop it for some reason), the migrator is able to resume it from a “savepoints”.

Savepoints are configuration files that contain information about the already migrated items, which can be skipped when the migration is resumed. The savepoint files are automatically generated during the migration. To use a savepoint, start a migration using it as configuration file.

You can control the savepoints location and the interval at which they are generated in the configuration file under the top-level property ``savepoints``. See `the corresponding section of the configuration reference </configuration#savepoints>`_.

During the migration, the savepoints are generated with file names like ``savepoint_xxx.yaml``, where ``xxx`` is a timestamp looking like ``1234567890``. To resume a migration, start a new migration with the latest savepoint as configuration file.
5 changes: 1 addition & 4 deletions migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ object Migrator {
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) =>
AlternatorMigrator.migrateFromDynamoDB(
dynamoSource,
alternatorTarget,
migratorConfig.renamesMap)
AlternatorMigrator.migrateFromDynamoDB(dynamoSource, alternatorTarget, migratorConfig)
case (
s3Source: SourceSettings.DynamoDBS3Export,
alternatorTarget: TargetSettings.DynamoDB) =>
Expand Down
116 changes: 116 additions & 0 deletions migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.scylladb.migrator

import com.scylladb.migrator.config.MigratorConfig
import org.apache.log4j.LogManager
import sun.misc.{ Signal, SignalHandler }

import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit }

/**
* A component that manages savepoints. Savepoints provide a way to resume an interrupted migration.
*
* This component periodically stores savepoints according to the schedule defined in the configuration.
* It also automatically stores a savepoint in case of early termination (e.g. due to a SIGTERM signal).
*
* Internally, it works by writing modified copies of the original migration configuration. These copies
* specify which parts of the source dataset have already been migrated and can safely be skipped when
* restarting the migration.
*
* Make sure to call the method `close` when you don’t need the savepoints manager anymore so that it
* releases the resources it was using.
*
* This class is abstract. Subclasses are responsible for implementing how to track the migration progress,
* and for communicating the updated state of the migration via the method `updateConfigWithMigrationState`.
*/
abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoCloseable {

val log = LogManager.getLogger(this.getClass.getName)
private val scheduler = new ScheduledThreadPoolExecutor(1)

createSavepointsDirectory()
addUSR2Handler()
startSavepointSchedule()

private def createSavepointsDirectory(): Unit = {
val savepointsDirectory = Paths.get(migratorConfig.savepoints.path)
if (!Files.exists(savepointsDirectory)) {
log.debug(
s"Directory ${savepointsDirectory.normalize().toString} does not exist. Creating it...")
Files.createDirectories(savepointsDirectory)
}
}

private def savepointFilename(path: String): String =
s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml"

private def addUSR2Handler(): Unit = {
log.info(
"Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.")

val handler = new SignalHandler {
override def handle(signal: Signal): Unit =
dumpMigrationState(signal.toString)
}

Signal.handle(new Signal("USR2"), handler)
Signal.handle(new Signal("TERM"), handler)
Signal.handle(new Signal("INT"), handler)
}

private def startSavepointSchedule(): Unit = {
val runnable = new Runnable {
override def run(): Unit =
try dumpMigrationState("schedule")
catch {
case e: Throwable =>
log.error("Could not create the savepoint. This will be retried.", e)
}
}

log.info(
s"Starting savepoint schedule; will write a savepoint every ${migratorConfig.savepoints.intervalSeconds} seconds")

scheduler.scheduleAtFixedRate(
runnable,
migratorConfig.savepoints.intervalSeconds,
migratorConfig.savepoints.intervalSeconds,
TimeUnit.SECONDS)
}

/**
* Dump the current state of the migration into a configuration file that can be
* used to resume the migration.
* @param reason Human-readable, informal, event that caused the dump.
*/
final def dumpMigrationState(reason: String): Unit = {
val filename =
Paths.get(savepointFilename(migratorConfig.savepoints.path)).normalize

val modifiedConfig = updateConfigWithMigrationState()

Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8))

log.info(
s"Created a savepoint config at ${filename} due to ${reason}. ${describeMigrationState()}")
}

/**
* Stop the periodic creation of savepoints and release the associated resources.
*/
final def close(): Unit =
scheduler.shutdown()

/**
* Provide readable logs by describing which parts of the migration have been completed already.
*/
def describeMigrationState(): String

/**
* A copy of the original migration configuration, updated to describe which parts of the migration
* have been completed already.
*/
def updateConfigWithMigrationState(): MigratorConfig

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.{ readers, writers, DynamoUtils }
import com.scylladb.migrator.config.{ SourceSettings, TargetSettings }
import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings }
import com.scylladb.migrator.writers.DynamoStreamReplication
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.io.Text
Expand All @@ -13,16 +13,22 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription

import scala.util.control.NonFatal
import scala.jdk.CollectionConverters._
import scala.util.Using

object AlternatorMigrator {
private val log = LogManager.getLogger("com.scylladb.migrator.alternator")

def migrateFromDynamoDB(source: SourceSettings.DynamoDB,
target: TargetSettings.DynamoDB,
renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = {
val (sourceRDD, sourceTableDesc) = readers.DynamoDB.readRDD(spark, source)
val maybeStreamedSource = if (target.streamChanges) Some(source) else None
migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, renamesMap)
migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = {
val (sourceRDD, sourceTableDesc) =
readers.DynamoDB.readRDD(spark, source, migratorConfig.skipSegments)
val savepointsManager =
DynamoDbSavepointsManager.setup(migratorConfig, sourceRDD, spark.sparkContext)
Using.resource(savepointsManager) { _ =>
val maybeStreamedSource = if (target.streamChanges) Some(source) else None
migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig.renamesMap)
}
}

def migrateFromS3Export(source: SourceSettings.DynamoDBS3Export,
Expand Down Expand Up @@ -96,7 +102,7 @@ object AlternatorMigrator {
}
} catch {
case NonFatal(e) =>
log.error("Caught error while writing the RDD.", e)
throw new Exception("Caught error while writing the RDD.", e)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object AlternatorValidator {
targetSettings: TargetSettings.DynamoDB,
config: MigratorConfig)(implicit spark: SparkSession): List[RowComparisonFailure] = {

val (source, sourceTableDesc) = readers.DynamoDB.readRDD(spark, sourceSettings)
val (source, sourceTableDesc) = readers.DynamoDB.readRDD(spark, sourceSettings, None)
val sourceTableKeys = sourceTableDesc.keySchema.asScala.toList

val sourceByKey: RDD[(List[DdbValue], collection.Map[String, DdbValue])] =
Expand All @@ -44,7 +44,8 @@ object AlternatorValidator {
sourceSettings.scanSegments, // Reuse same settings as source table
sourceSettings.maxMapTasks,
sourceSettings.readThroughput,
sourceSettings.throughputReadPercent
sourceSettings.throughputReadPercent,
skipSegments = None
)

// Define some aliases to prevent the Spark engine to try to serialize the whole object graph
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.SavepointsManager
import com.scylladb.migrator.config.MigratorConfig
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.split.DynamoDBSplit
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.InputSplit
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{ SparkListener, SparkListenerTaskEnd }
import org.apache.spark.{ Partition, SerializableWritable, SparkContext, Success => TaskEndSuccess }

import scala.util.{ Failure, Success, Try }

/**
* Manage DynamoDB-based migrations by tracking the migrated scan segments.
*/
class DynamoDbSavepointsManager(migratorConfig: MigratorConfig,
segmentsAccumulator: IntSetAccumulator)
extends SavepointsManager(migratorConfig) {

def describeMigrationState(): String =
s"Segments to skip: ${segmentsAccumulator.value}"

def updateConfigWithMigrationState(): MigratorConfig =
migratorConfig.copy(skipSegments = Some(segmentsAccumulator.value))

}

object DynamoDbSavepointsManager {

private val log = LogManager.getLogger(classOf[DynamoDbSavepointsManager])

def apply(migratorConfig: MigratorConfig,
segmentsAccumulator: IntSetAccumulator): DynamoDbSavepointsManager =
new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator)

/**
* Set up a savepoints manager that tracks the scan segments migrated from the source RDD.
*/
def setup(migratorConfig: MigratorConfig,
sourceRDD: RDD[(Text, DynamoDBItemWritable)],
spark: SparkContext): DynamoDbSavepointsManager = {
val segmentsAccumulator =
IntSetAccumulator(migratorConfig.skipSegments.getOrElse(Set.empty))
spark.addSparkListener(new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val partitionId = taskEnd.taskInfo.partitionId
log.debug(s"Migration of partition ${partitionId} ended: ${taskEnd.reason}.")
if (taskEnd.reason == TaskEndSuccess) {
scanSegments(sourceRDD, partitionId) match {
case Success(segments) =>
segments.forEach(segment => segmentsAccumulator.add(segment))
log.info(s"Marked segments ${segments} as migrated.")
case Failure(error) =>
log.error(
s"Unable to collect the segments scanned in partition ${partitionId}. The next savepoint will not include them.",
error)
}
}
}
})
DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator)
}

/**
* @return The scan segments processed in partition `partitionId` of `rdd`.
*/
private def scanSegments(rdd: RDD[(Text, DynamoDBItemWritable)],
partitionId: Int): Try[java.util.List[Integer]] =
if (partitionId >= 0 && partitionId < rdd.getNumPartitions) {
val partition = rdd.partitions(partitionId)
inputSplit(partition).map(_.getSegments)
} else {
Failure(new Exception(s"Partition ${partitionId} not found in the RDD."))
}

/**
* @return The `DynamoDBSplit` wrapped by the `partition`.
* Fails if the `partition` is not a `HadoopPartition` containing a `DynamoDBSplit`.
*/
private def inputSplit(partition: Partition): Try[DynamoDBSplit] = Try {
// Unfortunately, class `HadoopPartition` is private, so we can’t simply
// pattern match on it. We use reflection to access its `inputSplit` member.
if (partition.getClass.getName != "org.apache.spark.rdd.HadoopPartition") {
throw new Exception(s"Unexpected partition type: ${partition.getClass.getName}.")
}
val inputSplitMember = partition.getClass.getMethod("inputSplit")
val inputSplitResult =
inputSplitMember.invoke(partition).asInstanceOf[SerializableWritable[InputSplit]]
inputSplitResult.value match {
case dynamoDbSplit: DynamoDBSplit => dynamoDbSplit
case other => throw new Exception(s"Unexpected InputSplit type: ${other.getClass.getName}.")
}
}

}
Loading

0 comments on commit 311931a

Please sign in to comment.