Skip to content

Commit

Permalink
Support resumability of DynamoDB migrations
Browse files Browse the repository at this point in the history
- Track in savepoint files the scan segments that have been fully migrated
- Generalize the savepoints management to support both CQL and DynamoDB
- Update documentation accordingly

Fixes #165
  • Loading branch information
julienrf committed Aug 8, 2024
1 parent cd60263 commit def7249
Show file tree
Hide file tree
Showing 17 changed files with 335 additions and 100 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lazy val migrator = (project in file("migrator")).enablePlugins(BuildInfoPlugin)
// Revision number is automatically generated by the submodule when we do publishLocal. It is based on the output of 'git describe --tags'
"com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0-1-g468079b4",
"com.github.jnr" % "jnr-posix" % "3.1.19", // Needed by the cassandra connector
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.6.0",
"com.scylladb.alternator" % "emr-dynamodb-hadoop" % "5.7.0",
"io.circe" %% "circe-generic" % "0.14.7",
"io.circe" %% "circe-parser" % "0.14.7",
"io.circe" %% "circe-yaml" % "0.15.1",
Expand Down
4 changes: 4 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ savepoints:
# create a savepoint file with this filled.
# skipTokenRanges: []

# Optional - Which scan segments to skip. You shouldn’t need to fill this in normally; the migrator will
# create a savepoint file with this filled.
# skipSegments: []

# Configuration section for running the validator. The validator is run manually (see documentation).
# Mandatory if the application is executed in validation mode.
# validation:
Expand Down
8 changes: 5 additions & 3 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ The configuration file requires the following top-level properties (ie, with no
# Validator configuration. Required only if the app is executed in validation mode.
validation:
# ...
# Optional- Used internally
# Optional - Used internally
skipTokenRanges: []
# Optional - Used internally
skipSegments: []
These top-level properties are documented in the following sections (except ``skipTokenRanges``, which is used internally).
These top-level properties are documented in the following sections (except ``skipTokenRanges`` and ``skipSegments``, which are used internally).

------
Source
Expand Down Expand Up @@ -332,7 +334,7 @@ The optional ``renames`` property lists the item columns to rename along the mig
Savepoints
----------

When migrating data over CQL-compatible storages, the migrator is able to resume an interrupted migration. To achieve this, it stores so-called “savepoints” along the process to remember which token have already been migrated and should be skipped when the migration is restarted. This feature is not supported by DynamoDB-compatible storages.
When migrating data from Apache Cassandra or DynamoDB, the migrator is able to resume an interrupted migration. To achieve this, it stores so-called “savepoints” along the process to remember which data items have already been migrated and should be skipped when the migration is restarted.

.. code-block:: yaml
Expand Down
2 changes: 1 addition & 1 deletion docs/source/getting-started/ansible.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ An `Ansible <https://www.ansible.com/>`_ playbook is provided in the `ansible fo
- Ensure networking is configured to allow you access spark master node via TCP ports 8080 and 4040
- visit ``http://<spark-master-hostname>:8080``

8. `Review and modify config.yaml <../#configure-the-migration>`_ based whether you're performing a migration to CQL or Alternator
8. `Review and modify config.yaml <./#configure-the-migration>`_ based whether you're performing a migration to CQL or Alternator

- If you're migrating to ScyllaDB CQL interface (from Apache Cassandra, ScyllaDB, or other CQL source), make a copy review the comments in ``config.yaml.example``, and edit as directed.
- If you're migrating to Alternator (from DynamoDB or other ScyllaDB Alternator), make a copy, review the comments in ``config.dynamodb.yml``, and edit as directed.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/getting-started/aws-emr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This page describes how to use the Migrator in `Amazon EMR <https://aws.amazon.c
--output-document=config.yaml
2. `Configure the migration <../#configure-the-migration>`_ according to your needs.
2. `Configure the migration <./#configure-the-migration>`_ according to your needs.

3. Download the latest release of the Migrator.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/getting-started/docker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ This page describes how to set up a Spark cluster locally on your machine by usi
127.0.0.1 spark-master
127.0.0.1 spark-worker
5. Rename the file ``config.yaml.example`` to ``config.yaml``, and `configure <../#configure-the-migration>`_ it according to your needs.
5. Rename the file ``config.yaml.example`` to ``config.yaml``, and `configure <./#configure-the-migration>`_ it according to your needs.

6. Finally, run the migration.

Expand Down
4 changes: 2 additions & 2 deletions docs/source/getting-started/spark-standalone.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This page describes how to set up a Spark cluster on your infrastructure and to
wget https://github.com/scylladb/scylla-migrator/raw/master/config.yaml.example \
--output-document=config.yaml
4. `Configure the migration <../#configure-the-migration>`_ according to your needs.
4. `Configure the migration <./#configure-the-migration>`_ according to your needs.

5. Finally, run the migration as follows from the Spark master node.

Expand All @@ -32,6 +32,6 @@ This page describes how to set up a Spark cluster on your infrastructure and to
--conf spark.scylla.config=<path to config.yaml> \
<path to scylla-migrator-assembly.jar>
See also our `general recommendations to tune the Spark job <../#run-the-migration>`_.
See also our `general recommendations to tune the Spark job <./#run-the-migration>`_.

6. You can monitor progress from the `Spark web UI <https://spark.apache.org/docs/latest/spark-standalone.html#monitoring-and-logging>`_.
5 changes: 1 addition & 4 deletions migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ object Migrator {
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) =>
AlternatorMigrator.migrateFromDynamoDB(
dynamoSource,
alternatorTarget,
migratorConfig.renamesMap)
AlternatorMigrator.migrateFromDynamoDB(dynamoSource, alternatorTarget, migratorConfig)
case (
s3Source: SourceSettings.DynamoDBS3Export,
alternatorTarget: TargetSettings.DynamoDB) =>
Expand Down
116 changes: 116 additions & 0 deletions migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.scylladb.migrator

import com.scylladb.migrator.config.MigratorConfig
import org.apache.log4j.LogManager
import sun.misc.{ Signal, SignalHandler }

import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit }

/**
* A component that manages savepoints. Savepoints provide a way to resume an interrupted migration.
*
* This component periodically stores savepoints according to the schedule defined in the configuration.
* It also automatically stores a savepoint in case of early termination (e.g. due to a SIGTERM signal).
*
* Internally, it works by writing modified copies of the original migration configuration. These copies
* specify which parts of the source dataset have already been migrated and can safely be skipped when
* restarting the migration.
*
* Make sure to call the method `close` when you don’t need the savepoints manager anymore so that it
* releases the resources it was using.
*
* This class is abstract. Subclasses are responsible for implementing how to track the migration progress,
* and for communicating the updated state of the migration via the method `updateConfigWithMigrationState`.
*/
abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoCloseable {

val log = LogManager.getLogger(this.getClass.getName)
private val scheduler = new ScheduledThreadPoolExecutor(1)

createSavepointsDirectory()
addUSR2Handler()
startSavepointSchedule()

private def createSavepointsDirectory(): Unit = {
val savepointsDirectory = Paths.get(migratorConfig.savepoints.path)
if (!Files.exists(savepointsDirectory)) {
log.debug(
s"Directory ${savepointsDirectory.normalize().toString} does not exist. Creating it...")
Files.createDirectories(savepointsDirectory)
}
}

private def savepointFilename(path: String): String =
s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml"

private def addUSR2Handler(): Unit = {
log.info(
"Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.")

val handler = new SignalHandler {
override def handle(signal: Signal): Unit =
dumpMigrationState(signal.toString)
}

Signal.handle(new Signal("USR2"), handler)
Signal.handle(new Signal("TERM"), handler)
Signal.handle(new Signal("INT"), handler)
}

private def startSavepointSchedule(): Unit = {
val runnable = new Runnable {
override def run(): Unit =
try dumpMigrationState("schedule")
catch {
case e: Throwable =>
log.error("Could not create the savepoint. This will be retried.", e)
}
}

log.info(
s"Starting savepoint schedule; will write a savepoint every ${migratorConfig.savepoints.intervalSeconds} seconds")

scheduler.scheduleAtFixedRate(
runnable,
migratorConfig.savepoints.intervalSeconds,
migratorConfig.savepoints.intervalSeconds,
TimeUnit.SECONDS)
}

/**
* Dump the current state of the migration into a configuration file that can be
* used to resume the migration.
* @param reason Human-readable, informal, event that caused the dump.
*/
final def dumpMigrationState(reason: String): Unit = {
val filename =
Paths.get(savepointFilename(migratorConfig.savepoints.path)).normalize

val modifiedConfig = updateConfigWithMigrationState()

Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8))

log.info(
s"Created a savepoint config at ${filename} due to ${reason}. ${describeMigrationState()}")
}

/**
* Stop the periodic creation of savepoints and release the associated resources.
*/
final def close(): Unit =
scheduler.shutdown()

/**
* Provide readable logs by describing which parts of the migration have been completed already.
*/
def describeMigrationState(): String

/**
* A copy of the original migration configuration, updated to describe which parts of the migration
* have been completed already.
*/
def updateConfigWithMigrationState(): MigratorConfig

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.{ readers, writers, DynamoUtils }
import com.scylladb.migrator.config.{ SourceSettings, TargetSettings }
import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings }
import com.scylladb.migrator.writers.DynamoStreamReplication
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.io.Text
Expand All @@ -13,16 +13,22 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription

import scala.util.control.NonFatal
import scala.jdk.CollectionConverters._
import scala.util.Using

object AlternatorMigrator {
private val log = LogManager.getLogger("com.scylladb.migrator.alternator")

def migrateFromDynamoDB(source: SourceSettings.DynamoDB,
target: TargetSettings.DynamoDB,
renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = {
val (sourceRDD, sourceTableDesc) = readers.DynamoDB.readRDD(spark, source)
val maybeStreamedSource = if (target.streamChanges) Some(source) else None
migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, renamesMap)
migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = {
val (sourceRDD, sourceTableDesc) =
readers.DynamoDB.readRDD(spark, source, migratorConfig.skipSegments)
val savepointsManager =
DynamoDbSavepointsManager.setup(migratorConfig, sourceRDD, spark.sparkContext)
Using.resource(savepointsManager) { _ =>
val maybeStreamedSource = if (target.streamChanges) Some(source) else None
migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig.renamesMap)
}
}

def migrateFromS3Export(source: SourceSettings.DynamoDBS3Export,
Expand Down Expand Up @@ -96,7 +102,7 @@ object AlternatorMigrator {
}
} catch {
case NonFatal(e) =>
log.error("Caught error while writing the RDD.", e)
throw new Exception("Caught error while writing the RDD.", e)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object AlternatorValidator {
targetSettings: TargetSettings.DynamoDB,
config: MigratorConfig)(implicit spark: SparkSession): List[RowComparisonFailure] = {

val (source, sourceTableDesc) = readers.DynamoDB.readRDD(spark, sourceSettings)
val (source, sourceTableDesc) = readers.DynamoDB.readRDD(spark, sourceSettings, None)
val sourceTableKeys = sourceTableDesc.keySchema.asScala.toList

val sourceByKey: RDD[(List[DdbValue], collection.Map[String, DdbValue])] =
Expand All @@ -44,7 +44,8 @@ object AlternatorValidator {
targetSettings.scanSegments,
targetSettings.maxMapTasks,
readThroughput = None,
throughputReadPercent = None
throughputReadPercent = None,
skipSegments = None
)

// Define some aliases to prevent the Spark engine to try to serialize the whole object graph
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.SavepointsManager
import com.scylladb.migrator.config.MigratorConfig
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.split.DynamoDBSplit
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.InputSplit
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{ SparkListener, SparkListenerTaskEnd }
import org.apache.spark.{ Partition, SerializableWritable, SparkContext, Success => TaskEndSuccess }

import scala.util.{ Failure, Success, Try }

/**
* Manage DynamoDB-based migrations by tracking the migrated scan segments.
*/
class DynamoDbSavepointsManager(migratorConfig: MigratorConfig,
segmentsAccumulator: IntSetAccumulator)
extends SavepointsManager(migratorConfig) {

def describeMigrationState(): String =
s"Segments to skip: ${segmentsAccumulator.value}"

def updateConfigWithMigrationState(): MigratorConfig =
migratorConfig.copy(skipSegments = Some(segmentsAccumulator.value))

}

object DynamoDbSavepointsManager {

private val log = LogManager.getLogger(classOf[DynamoDbSavepointsManager])

def apply(migratorConfig: MigratorConfig,
segmentsAccumulator: IntSetAccumulator): DynamoDbSavepointsManager =
new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator)

/**
* Set up a savepoints manager that tracks the scan segments migrated from the source RDD.
*/
def setup(migratorConfig: MigratorConfig,
sourceRDD: RDD[(Text, DynamoDBItemWritable)],
spark: SparkContext): DynamoDbSavepointsManager = {
val segmentsAccumulator = IntSetAccumulator.empty
spark.addSparkListener(new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val partitionId = taskEnd.taskInfo.partitionId
log.debug(s"Migration of partition ${partitionId} ended: ${taskEnd.reason}.")
if (taskEnd.reason == TaskEndSuccess) {
scanSegments(sourceRDD, partitionId) match {
case Success(segments) =>
segments.forEach(segment => segmentsAccumulator.add(segment))
log.info(s"Marked segments ${segments} as migrated.")
case Failure(error) =>
log.error(
s"Unable to collect the segments scanned in partition ${partitionId}. The next savepoint will not include them.",
error)
}
}
}
})
DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator)
}

/**
* @return The scan segments processed in partition `partitionId` of `rdd`.
*/
private def scanSegments(rdd: RDD[(Text, DynamoDBItemWritable)],
partitionId: Int): Try[java.util.List[Integer]] =
if (partitionId >= 0 && partitionId < rdd.getNumPartitions) {
val partition = rdd.partitions(partitionId)
inputSplit(partition).map(_.getSegments)
} else {
Failure(new Exception(s"Partition ${partitionId} not found in the RDD."))
}

/**
* @return The `DynamoDBSplit` wrapped by the `partition`.
* Fails if the `partition` is not a `HadoopPartition` containing a `DynamoDBSplit`.
*/
private def inputSplit(partition: Partition): Try[DynamoDBSplit] = Try {
// Unfortunately, class `HadoopPartition` is private, so we can’t simply
// pattern match on it. We use reflection to access its `inputSplit` member.
if (partition.getClass.getName != "org.apache.spark.rdd.HadoopPartition") {
throw new Exception(s"Unexpected partition type: ${partition.getClass.getName}.")
}
val inputSplitMember = partition.getClass.getMethod("inputSplit")
val inputSplitResult =
inputSplitMember.invoke(partition).asInstanceOf[SerializableWritable[InputSplit]]
inputSplitResult.value match {
case dynamoDbSplit: DynamoDBSplit => dynamoDbSplit
case other => throw new Exception(s"Unexpected InputSplit type: ${other.getClass.getName}.")
}
}

}
Loading

0 comments on commit def7249

Please sign in to comment.