Skip to content

Commit

Permalink
Merge pull request #174 from julienrf/optional-config-properties
Browse files Browse the repository at this point in the history
Make some configuration properties optional
  • Loading branch information
tarzanek authored Jul 18, 2024
2 parents f7ac7f6 + 170f4cb commit 8389e93
Show file tree
Hide file tree
Showing 16 changed files with 56 additions and 102 deletions.
39 changes: 20 additions & 19 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -260,26 +260,27 @@ savepoints:
# Interval in which savepoints will be created
intervalSeconds: 300

# Column renaming configuration. If you'd like to rename any columns, specify them like so:
# Optional - Column renaming configuration. If you'd like to rename any columns, specify them like so:
# - from: source_column_name
# to: dest_column_name
renames: []
# Which token ranges to skip. You shouldn't need to fill this in normally; the migrator will
# create a savepoint file with this filled.
skipTokenRanges: []
# renames: []

# Configuration section for running the validator. The validator is run manually (see README).
validation:
# Should WRITETIMEs and TTLs be compared?
compareTimestamps: true
# What difference should we allow between TTLs?
ttlToleranceMillis: 60000
# What difference should we allow between WRITETIMEs?
writetimeToleranceMillis: 1000
# How many differences to fetch and print
failuresToFetch: 100
# What difference should we allow between floating point numbers?
floatingPointTolerance: 0.001
# What difference in ms should we allow between timestamps?
timestampMsTolerance: 0
# Optional - Which token ranges to skip. You shouldn't need to fill this in normally; the migrator will
# create a savepoint file with this filled.
# skipTokenRanges: []

# Configuration section for running the validator. The validator is run manually (see documentation).
# Mandatory if the application is executed in validation mode.
# validation:
# # Should WRITETIMEs and TTLs be compared?
# compareTimestamps: true
# # What difference should we allow between TTLs?
# ttlToleranceMillis: 60000
# # What difference should we allow between WRITETIMEs?
# writetimeToleranceMillis: 1000
# # How many differences to fetch and print
# failuresToFetch: 100
# # What difference should we allow between floating point numbers?
# floatingPointTolerance: 0.001
# # What difference in ms should we allow between timestamps?
# timestampMsTolerance: 0
10 changes: 5 additions & 5 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ The configuration file requires the following top-level properties (ie, with no
# Target configuration
target:
# ...
# Columns to rename
# Optional - Columns to rename
renames:
# ...
# Savepoints configuration
savepoints:
# ...
# Validator configuration
# Validator configuration. Required only if the app is executed in validation mode.
validation:
# ...
# Used internally
# Optional- Used internally
skipTokenRanges: []
These top-level properties are documented in the following sections (except ``skipTokenRanges``, which is used internally).
Expand Down Expand Up @@ -319,7 +319,7 @@ DynamoDB Target
Renames
-------

The ``renames`` property lists the item columns to rename along the migration. To not rename any columns, use the empty array ``renames: []``.
The optional ``renames`` property lists the item columns to rename along the migration.

.. code-block:: yaml
Expand Down Expand Up @@ -347,7 +347,7 @@ When migrating data over CQL-compatible storages, the migrator is able to resume
Validation
----------

The properties of the ``validation`` field are used only when the application is executed in :doc:`validation mode </validate>`.
The ``validation`` field and its properties are mandatory only when the application is executed in :doc:`validation mode </validate>`.

.. code-block:: yaml
Expand Down
2 changes: 2 additions & 0 deletions docs/source/rename-columns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ Indicate in the migration configuration which columns to rename with the ``renam
to: bar
- from: xxx
to: yyy
To not perform any renames, leave out the ``renames`` property from the configuration file.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object Migrator {
spark,
cassandraSource,
cassandraSource.preserveTimestamps,
migratorConfig.skipTokenRanges)
migratorConfig.getSkipTokenRangesOrEmptySet)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (parquetSource: SourceSettings.Parquet, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object AlternatorValidator {

// Define some aliases to prevent the Spark engine to try to serialize the whole object graph
val renamedColumn = config.renamesMap
val configValidation = config.validation
val configValidation = config.validation.getOrElse(
sys.error("Missing required property 'validation' in the configuration file."))

val targetByKey: RDD[(List[DdbValue], collection.Map[String, DdbValue])] =
target
Expand All @@ -74,7 +75,7 @@ object AlternatorValidator {
configValidation.floatingPointTolerance
)
}
.take(config.validation.failuresToFetch)
.take(configValidation.failuresToFetch)
.toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ import io.circe.{ Decoder, DecodingFailure, Encoder, Error, Json }

case class MigratorConfig(source: SourceSettings,
target: TargetSettings,
renames: List[Rename],
renames: Option[List[Rename]],
savepoints: Savepoints,
skipTokenRanges: Set[(Token[_], Token[_])],
validation: Validation) {
skipTokenRanges: Option[Set[(Token[_], Token[_])]],
validation: Option[Validation]) {
def render: String = this.asJson.asYaml.spaces2

def getRenamesOrNil: List[Rename] = renames.getOrElse(Nil)

/** The list of renames modelled as a Map from the old column name to the new column name */
lazy val renamesMap: Map[String, String] =
renames.map(rename => rename.from -> rename.to).toMap.withDefault(identity)
getRenamesOrNil.map(rename => rename.from -> rename.to).toMap.withDefault(identity)

def getSkipTokenRangesOrEmptySet: Set[(Token[_], Token[_])] = skipTokenRanges.getOrElse(Set.empty)

}
object MigratorConfig {
implicit val tokenEncoder: Encoder[Token[_]] = Encoder.instance {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object ScyllaMigrator {
log.info(
"Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size)

val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges)
val diff = allTokenRanges.diff(migratorConfig.getSkipTokenRangesOrEmptySet)
log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size)
log.debug("Dump of the missing tokens: ")
log.debug(diff)
Expand All @@ -88,7 +88,7 @@ object ScyllaMigrator {
try {
writers.Scylla.writeDataframe(
target,
migratorConfig.renames,
migratorConfig.getRenamesOrNil,
sourceDF.dataFrame,
sourceDF.timestampColumns,
tokenRangeAccumulator)
Expand Down Expand Up @@ -147,7 +147,7 @@ object ScyllaMigrator {
(range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]]))

val modifiedConfig = config.copy(
skipTokenRanges = config.skipTokenRanges ++ rangesToSkip
skipTokenRanges = Some(config.getSkipTokenRangesOrEmptySet ++ rangesToSkip)
)

Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ object ScyllaValidator {
sourceSettings: SourceSettings.Cassandra,
targetSettings: TargetSettings.Scylla,
config: MigratorConfig)(implicit spark: SparkSession): List[RowComparisonFailure] = {

val validationConfig =
config.validation.getOrElse(
sys.error("Missing required property 'validation' in the configuration file."))

val sourceConnector: CassandraConnector =
Connectors.sourceConnector(spark.sparkContext.getConf, sourceSettings)
val targetConnector: CassandraConnector =
Expand Down Expand Up @@ -118,14 +123,14 @@ object ScyllaValidator {
RowComparisonFailure.compareCassandraRows(
l,
r,
config.validation.floatingPointTolerance,
config.validation.timestampMsTolerance,
config.validation.ttlToleranceMillis,
config.validation.writetimeToleranceMillis,
config.validation.compareTimestamps
validationConfig.floatingPointTolerance,
validationConfig.timestampMsTolerance,
validationConfig.ttlToleranceMillis,
validationConfig.writetimeToleranceMillis,
validationConfig.compareTimestamps
)
}
.take(config.validation.failuresToFetch)
.take(validationConfig.failuresToFetch)
.toList

}
Expand Down
4 changes: 1 addition & 3 deletions tests/src/test/configurations/cassandra-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ target:
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []

validation:
compareTimestamps: true
ttlToleranceMillis: 60000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,3 @@ renames:
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@ target:
secretKey: dummy
streamChanges: false

renames: []

# Below are unused but mandatory settings
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ target:
secretKey: dummy
streamChanges: false

renames: []

# Below are unused but mandatory settings
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []

validation:
compareTimestamps: true
ttlToleranceMillis: 60000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,7 @@ target:
maxMapTasks: 1
streamChanges: false

renames: []

# Below are unused but mandatory settings
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,3 @@ renames:
savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
10 changes: 0 additions & 10 deletions tests/src/test/configurations/parquet-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@ target:
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
10 changes: 0 additions & 10 deletions tests/src/test/configurations/scylla-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ target:
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0

0 comments on commit 8389e93

Please sign in to comment.