diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index 1d0c6e82c9..23674ed3b6 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -907,20 +907,28 @@ object Extensions { join } - /* - * Compute variants of semantic_hash with different flags. A flag is stored on Hive metadata and used to - * indicate which version of semantic_hash logic to use. - */ - def semanticHash(excludeTopic: Boolean): Map[String, String] = { + /** + * Compute variants of semantic_hash with different flags. A flag is stored on Hive metadata and used to + * indicate which version of semantic_hash logic to use. + * @param excludeTopic exclude streaming topic for any potential streaming topic migrations + * @param excludeBackfillStartDateInJoinPart exclude the backfill start date in the join part, it is intended to + * trigger separate group by backfill jobs, and semantic hash is only + * used in join for versioning check, therefore, exclude it for more + * flexibility of group by backfill jobs + */ + def semanticHash(excludeTopic: Boolean, excludeBackfillStartDateInJoinPart: Boolean = true): Map[String, String] = { + // WARN: deepCopy doesn't guarantee same semantic_hash will be produced due to reordering of map keys + // but the behavior is deterministic + val joinCopy = if (excludeTopic || excludeBackfillStartDateInJoinPart) join.deepCopy() else join + if (excludeTopic) { - // WARN: deepCopy doesn't guarantee same semantic_hash will be produced due to reordering of map keys - // but the behavior is deterministic - val joinCopy = join.deepCopy() cleanTopic(joinCopy) - joinCopy.baseSemanticHash - } else { - baseSemanticHash } + + if (excludeBackfillStartDateInJoinPart) { + joinCopy.getJoinParts.toScala.foreach(_.groupBy.unsetBackfillStartDate()) + } + joinCopy.baseSemanticHash } /*