Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make some configuration properties optional #174

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -260,26 +260,27 @@ savepoints:
# Interval in which savepoints will be created
intervalSeconds: 300

# Column renaming configuration. If you'd like to rename any columns, specify them like so:
# Optional - Column renaming configuration. If you'd like to rename any columns, specify them like so:
# - from: source_column_name
# to: dest_column_name
renames: []
# Which token ranges to skip. You shouldn't need to fill this in normally; the migrator will
# create a savepoint file with this filled.
skipTokenRanges: []
# renames: []

# Configuration section for running the validator. The validator is run manually (see README).
validation:
# Should WRITETIMEs and TTLs be compared?
compareTimestamps: true
# What difference should we allow between TTLs?
ttlToleranceMillis: 60000
# What difference should we allow between WRITETIMEs?
writetimeToleranceMillis: 1000
# How many differences to fetch and print
failuresToFetch: 100
# What difference should we allow between floating point numbers?
floatingPointTolerance: 0.001
# What difference in ms should we allow between timestamps?
timestampMsTolerance: 0
# Optional - Which token ranges to skip. You shouldn't need to fill this in normally; the migrator will
# create a savepoint file with this filled.
# skipTokenRanges: []

# Configuration section for running the validator. The validator is run manually (see documentation).
# Mandatory if the application is executed in validation mode.
# validation:
# # Should WRITETIMEs and TTLs be compared?
# compareTimestamps: true
# # What difference should we allow between TTLs?
# ttlToleranceMillis: 60000
# # What difference should we allow between WRITETIMEs?
# writetimeToleranceMillis: 1000
# # How many differences to fetch and print
# failuresToFetch: 100
# # What difference should we allow between floating point numbers?
# floatingPointTolerance: 0.001
# # What difference in ms should we allow between timestamps?
# timestampMsTolerance: 0
10 changes: 5 additions & 5 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ The configuration file requires the following top-level properties (ie, with no
# Target configuration
target:
# ...
# Columns to rename
# Optional - Columns to rename
renames:
# ...
# Savepoints configuration
savepoints:
# ...
# Validator configuration
# Validator configuration. Required only if the app is executed in validation mode.
validation:
# ...
# Used internally
# Optional- Used internally
skipTokenRanges: []

These top-level properties are documented in the following sections (except ``skipTokenRanges``, which is used internally).
Expand Down Expand Up @@ -319,7 +319,7 @@ DynamoDB Target
Renames
-------

The ``renames`` property lists the item columns to rename along the migration. To not rename any columns, use the empty array ``renames: []``.
The optional ``renames`` property lists the item columns to rename along the migration.

.. code-block:: yaml

Expand Down Expand Up @@ -347,7 +347,7 @@ When migrating data over CQL-compatible storages, the migrator is able to resume
Validation
----------

The properties of the ``validation`` field are used only when the application is executed in :doc:`validation mode </validate>`.
The ``validation`` field and its properties are mandatory only when the application is executed in :doc:`validation mode </validate>`.

.. code-block:: yaml

Expand Down
2 changes: 2 additions & 0 deletions docs/source/rename-columns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ Indicate in the migration configuration which columns to rename with the ``renam
to: bar
- from: xxx
to: yyy

To not perform any renames, leave out the ``renames`` property from the configuration file.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object Migrator {
spark,
cassandraSource,
cassandraSource.preserveTimestamps,
migratorConfig.skipTokenRanges)
migratorConfig.getSkipTokenRangesOrEmptySet)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (parquetSource: SourceSettings.Parquet, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object AlternatorValidator {

// Define some aliases to prevent the Spark engine to try to serialize the whole object graph
val renamedColumn = config.renamesMap
val configValidation = config.validation
val configValidation = config.validation.getOrElse(
sys.error("Missing required property 'validation' in the configuration file."))

val targetByKey: RDD[(List[DdbValue], collection.Map[String, DdbValue])] =
target
Expand All @@ -74,7 +75,7 @@ object AlternatorValidator {
configValidation.floatingPointTolerance
)
}
.take(config.validation.failuresToFetch)
.take(configValidation.failuresToFetch)
.toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ import io.circe.{ Decoder, DecodingFailure, Encoder, Error, Json }

case class MigratorConfig(source: SourceSettings,
target: TargetSettings,
renames: List[Rename],
renames: Option[List[Rename]],
savepoints: Savepoints,
skipTokenRanges: Set[(Token[_], Token[_])],
validation: Validation) {
skipTokenRanges: Option[Set[(Token[_], Token[_])]],
validation: Option[Validation]) {
def render: String = this.asJson.asYaml.spaces2

def getRenamesOrNil: List[Rename] = renames.getOrElse(Nil)

/** The list of renames modelled as a Map from the old column name to the new column name */
lazy val renamesMap: Map[String, String] =
renames.map(rename => rename.from -> rename.to).toMap.withDefault(identity)
getRenamesOrNil.map(rename => rename.from -> rename.to).toMap.withDefault(identity)

def getSkipTokenRangesOrEmptySet: Set[(Token[_], Token[_])] = skipTokenRanges.getOrElse(Set.empty)

}
object MigratorConfig {
implicit val tokenEncoder: Encoder[Token[_]] = Encoder.instance {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object ScyllaMigrator {
log.info(
"Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size)

val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges)
val diff = allTokenRanges.diff(migratorConfig.getSkipTokenRangesOrEmptySet)
log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size)
log.debug("Dump of the missing tokens: ")
log.debug(diff)
Expand All @@ -88,7 +88,7 @@ object ScyllaMigrator {
try {
writers.Scylla.writeDataframe(
target,
migratorConfig.renames,
migratorConfig.getRenamesOrNil,
sourceDF.dataFrame,
sourceDF.timestampColumns,
tokenRangeAccumulator)
Expand Down Expand Up @@ -147,7 +147,7 @@ object ScyllaMigrator {
(range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]]))

val modifiedConfig = config.copy(
skipTokenRanges = config.skipTokenRanges ++ rangesToSkip
skipTokenRanges = Some(config.getSkipTokenRangesOrEmptySet ++ rangesToSkip)
)

Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ object ScyllaValidator {
sourceSettings: SourceSettings.Cassandra,
targetSettings: TargetSettings.Scylla,
config: MigratorConfig)(implicit spark: SparkSession): List[RowComparisonFailure] = {

val validationConfig =
config.validation.getOrElse(
sys.error("Missing required property 'validation' in the configuration file."))

val sourceConnector: CassandraConnector =
Connectors.sourceConnector(spark.sparkContext.getConf, sourceSettings)
val targetConnector: CassandraConnector =
Expand Down Expand Up @@ -118,14 +123,14 @@ object ScyllaValidator {
RowComparisonFailure.compareCassandraRows(
l,
r,
config.validation.floatingPointTolerance,
config.validation.timestampMsTolerance,
config.validation.ttlToleranceMillis,
config.validation.writetimeToleranceMillis,
config.validation.compareTimestamps
validationConfig.floatingPointTolerance,
validationConfig.timestampMsTolerance,
validationConfig.ttlToleranceMillis,
validationConfig.writetimeToleranceMillis,
validationConfig.compareTimestamps
)
}
.take(config.validation.failuresToFetch)
.take(validationConfig.failuresToFetch)
.toList

}
Expand Down
4 changes: 1 addition & 3 deletions tests/src/test/configurations/cassandra-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ target:
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []

validation:
compareTimestamps: true
ttlToleranceMillis: 60000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,3 @@ renames:
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@ target:
secretKey: dummy
streamChanges: false

renames: []

# Below are unused but mandatory settings
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ target:
secretKey: dummy
streamChanges: false

renames: []

# Below are unused but mandatory settings
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []

validation:
compareTimestamps: true
ttlToleranceMillis: 60000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,7 @@ target:
maxMapTasks: 1
streamChanges: false

renames: []

# Below are unused but mandatory settings
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,3 @@ renames:
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
10 changes: 0 additions & 10 deletions tests/src/test/configurations/parquet-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@ target:
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
10 changes: 0 additions & 10 deletions tests/src/test/configurations/scylla-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ target:
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Loading