Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eaacfa6
code to write daily irs
kambstreat May 30, 2025
40b6cb2
store incremental agg and compute final IRs
kambstreat Jun 3, 2025
a014b6e
Store hops to inc tables
kambstreat Jun 7, 2025
32d559e
add code changes to generate final output from IR for AVG
kambstreat Jun 14, 2025
37293df
change function structure and variable names
kambstreat Jun 19, 2025
6263706
remove unused functions
kambstreat Jun 19, 2025
cb4325b
change function defs
kambstreat Jun 19, 2025
796ef96
make changes
kambstreat Jun 19, 2025
f218b23
change function order
kambstreat Jun 19, 2025
b1d4ee9
add new field is_incremental to python api
kambstreat Jun 20, 2025
2ab7659
get argument for isIncremental in scala spark backend
kambstreat Jun 20, 2025
238c781
add unit test for incremental groupby
kambstreat Jun 20, 2025
8edfd27
reuse table ccreation
kambstreat Jul 18, 2025
e903683
Update GroupByTest
kambstreat Jul 18, 2025
0bdc4fc
Add GroupByTest for events
kambstreat Jul 18, 2025
7987931
changes for incrementalg
kambstreat Sep 3, 2025
2b26d45
resolve merge conflicts
kambstreat Sep 3, 2025
7b62a43
add last hole logic for incrementnal bacckfill
kambstreat Sep 5, 2025
aeeb5ec
fix syntax
kambstreat Sep 5, 2025
9180d23
fix bug : backfill only for missing holes
kambstreat Sep 6, 2025
ee81672
fix none error for inc Table
kambstreat Sep 7, 2025
29a3f28
add incremental table queryable range
kambstreat Sep 19, 2025
aa16010
add logging for tableUtils
kambstreat Sep 19, 2025
ff41cc9
add log
kambstreat Sep 19, 2025
aa25f9f
fill incremental holes
kambstreat Sep 22, 2025
3efe8cd
modify incremental aggregation parts
kambstreat Oct 2, 2025
a3bece6
remove logs for debugging
kambstreat Oct 7, 2025
897d18c
fix output schema from incremenntal aggregations. Added unit tests
kambstreat Oct 12, 2025
a52d7d0
resolve merge conflicts
kambstreat Oct 12, 2025
9c446ec
resolve merge conflict
kambstreat Oct 12, 2025
ca14309
add test case to test struct of Average
kambstreat Nov 2, 2025
dfb9226
add option for isIncremental for backward compatibility
kambstreat Nov 2, 2025
7ca4dfc
resolve merge conflicts
kambstreat Nov 2, 2025
ab994bc
fix count operation from incremental IRS
kambstreat Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class RowAggregator(val inputSchema: Seq[(String, DataType)], val aggregationPar
.toArray
.zip(columnAggregators.map(_.irType))

val incSchema = aggregationParts
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we should use a full incremental in the code and we can keep inc as the suffix for the table so the table names are not getting too long. What do you think?

Suggested change
val incSchema = aggregationParts
val incrementalSchema = aggregationParts

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. good to use full name.

.map(_.incOutputColumnName)
.toArray
.zip(columnAggregators.map(_.irType))

val outputSchema: Array[(String, DataType)] = aggregationParts
.map(_.outputColumnName)
.toArray
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object Extensions {
def cleanName: String = metaData.name.sanitize

def outputTable = s"${metaData.outputNamespace}.${metaData.cleanName}"
def incOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_inc"
def outputLabelTable = s"${metaData.outputNamespace}.${metaData.cleanName}_labels"
def outputFinalView = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
def outputLatestLabelView = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled_latest"
Expand Down Expand Up @@ -176,8 +177,13 @@ object Extensions {

def outputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${aggregationPart.window.suffix}${bucketSuffix}"

def incOutputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${bucketSuffix}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we still keep the aggregationPart.window.suffix? Otherwise, how do we reconstruct the final output column?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pengyu-hou not sure I get it. I can not use the window.suffix right as the intermediate incremental is daily aggregation.


}


implicit class AggregationOps(aggregation: Aggregation) {

// one agg part per bucket per window
Expand Down
136 changes: 114 additions & 22 deletions spark/src/main/scala/ai/chronon/spark/GroupBy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ai.chronon.spark

import ai.chronon.aggregator.base.TimeTuple
import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.windowing.HopsAggregator.HopIr
import ai.chronon.aggregator.windowing._
import ai.chronon.api
import ai.chronon.api.DataModel.{Entities, Events}
Expand All @@ -41,7 +42,9 @@ class GroupBy(val aggregations: Seq[api.Aggregation],
val inputDf: DataFrame,
val mutationDfFn: () => DataFrame = null,
skewFilter: Option[String] = None,
finalize: Boolean = true)
finalize: Boolean = true,
incAgg: Boolean = false
)
extends Serializable {
@transient lazy val logger = LoggerFactory.getLogger(getClass)

Expand Down Expand Up @@ -88,7 +91,11 @@ class GroupBy(val aggregations: Seq[api.Aggregation],
lazy val aggPartWithSchema = aggregationParts.zip(columnAggregators.map(_.outputType))

lazy val postAggSchema: StructType = {
val valueChrononSchema = if (finalize) windowAggregator.outputSchema else windowAggregator.irSchema
val valueChrononSchema = if (finalize) {
windowAggregator.outputSchema
} else {
windowAggregator.irSchema
}
SparkConversions.fromChrononSchema(valueChrononSchema)
}

Expand Down Expand Up @@ -141,12 +148,13 @@ class GroupBy(val aggregations: Seq[api.Aggregation],
}

def snapshotEventsBase(partitionRange: PartitionRange,
resolution: Resolution = DailyResolution): RDD[(Array[Any], Array[Any])] = {
resolution: Resolution = DailyResolution,
incAgg: Boolean = true): RDD[(Array[Any], Array[Any])] = {
val endTimes: Array[Long] = partitionRange.toTimePoints
// add 1 day to the end times to include data [ds 00:00:00.000, ds + 1 00:00:00.000)
val shiftedEndTimes = endTimes.map(_ + tableUtils.partitionSpec.spanMillis)
val sawtoothAggregator = new SawtoothAggregator(aggregations, selectedSchema, resolution)
val hops = hopsAggregate(endTimes.min, resolution)
val hops = hopsAggregate(endTimes.min, resolution, incAgg)

hops
.flatMap {
Expand Down Expand Up @@ -356,12 +364,43 @@ class GroupBy(val aggregations: Seq[api.Aggregation],
toDf(outputRdd, Seq(Constants.TimeColumn -> LongType, tableUtils.partitionColumn -> StringType))
}

//def dfToOutputArrayType(df: DataFrame): RDD[(KeyWithHash, HopsAggregator.OutputArrayType)] = {
// val keyBuilder: Row => KeyWithHash =
// FastHashing.generateKeyBuilder(keyColumns.toArray, df.schema)

// df.rdd
// .keyBy(keyBuilder)
// .mapValues(SparkConversions.toChrononRow(_, tsIndex))
// .mapValues(windowAggregator.toTimeSortedArray)
//}

def flattenOutputArrayType(hopsArrays: RDD[(KeyWithHash, HopsAggregator.OutputArrayType)]): RDD[(Array[Any], Array[Any])] = {
hopsArrays.flatMap { case (keyWithHash: KeyWithHash, hopsArray: HopsAggregator.OutputArrayType) =>
val hopsArrayHead: Array[HopIr] = hopsArray.headOption.get
hopsArrayHead.map { array: HopIr =>
// the last element is a timestamp, we need to drop it
// and add it to the key
val timestamp = array.last.asInstanceOf[Long]
val withoutTimestamp = array.dropRight(1)
((keyWithHash.data :+ tableUtils.partitionSpec.at(timestamp)), withoutTimestamp)
}
}
}

def convertHopsToDf(range: PartitionRange,
schema: Array[(String, ai.chronon.api.DataType)]
): DataFrame = {
val hops = hopsAggregate(range.toTimePoints.min, DailyResolution)
val hopsDf = flattenOutputArrayType(hops)
toDf(hopsDf, Seq((tableUtils.partitionColumn, StringType)), Some(SparkConversions.fromChrononSchema(schema)))
}

// convert raw data into IRs, collected by hopSizes
// TODO cache this into a table: interface below
// Class HopsCacher(keySchema, irSchema, resolution) extends RddCacher[(KeyWithHash, HopsOutput)]
// buildTableRow((keyWithHash, hopsOutput)) -> GenericRowWithSchema
// buildRddRow(GenericRowWithSchema) -> (keyWithHash, hopsOutput)
def hopsAggregate(minQueryTs: Long, resolution: Resolution): RDD[(KeyWithHash, HopsAggregator.OutputArrayType)] = {
def hopsAggregate(minQueryTs: Long, resolution: Resolution, incAgg: Boolean = false): RDD[(KeyWithHash, HopsAggregator.OutputArrayType)] = {
val hopsAggregator =
new HopsAggregator(minQueryTs, aggregations, selectedSchema, resolution)
val keyBuilder: Row => KeyWithHash =
Expand All @@ -378,9 +417,9 @@ class GroupBy(val aggregations: Seq[api.Aggregation],
}

protected[spark] def toDf(aggregateRdd: RDD[(Array[Any], Array[Any])],
additionalFields: Seq[(String, DataType)]): DataFrame = {
additionalFields: Seq[(String, DataType)], schema: Option[StructType] = None): DataFrame = {
val finalKeySchema = StructType(keySchema ++ additionalFields.map { case (name, typ) => StructField(name, typ) })
KvRdd(aggregateRdd, finalKeySchema, postAggSchema).toFlatDf
KvRdd(aggregateRdd, finalKeySchema, schema.getOrElse(postAggSchema)).toFlatDf
}

private def normalizeOrFinalize(ir: Array[Any]): Array[Any] =
Expand Down Expand Up @@ -461,18 +500,19 @@ object GroupBy {
bloomMapOpt: Option[util.Map[String, BloomFilter]] = None,
skewFilter: Option[String] = None,
finalize: Boolean = true,
showDf: Boolean = false): GroupBy = {
showDf: Boolean = false,
incrementalAgg: Boolean = false): GroupBy = {
logger.info(s"\n----[Processing GroupBy: ${groupByConfOld.metaData.name}]----")
val groupByConf = replaceJoinSource(groupByConfOld, queryRange, tableUtils, computeDependency, showDf)
val inputDf = groupByConf.sources.toScala
.map { source =>
renderDataSourceQuery(groupByConf,
source,
groupByConf.getKeyColumns.toScala,
queryRange,
tableUtils,
groupByConf.maxWindow,
groupByConf.inferredAccuracy)
source,
groupByConf.getKeyColumns.toScala,
queryRange,
tableUtils,
groupByConf.maxWindow,
groupByConf.inferredAccuracy)

}
.map {
Expand Down Expand Up @@ -543,15 +583,20 @@ object GroupBy {
logger.info(s"printing mutation data for groupBy: ${groupByConf.metaData.name}")
df.prettyPrint()
}

df
}

val finalizeValue = if (incrementalAgg) {
!incrementalAgg
} else {
finalize
}
new GroupBy(Option(groupByConf.getAggregations).map(_.toScala).orNull,
keyColumns,
nullFiltered,
mutationDfFn,
finalize = finalize)
keyColumns,
nullFiltered,
mutationDfFn,
finalize = finalizeValue,
incAgg = incrementalAgg,
)
}

def getIntersectedRange(source: api.Source,
Expand Down Expand Up @@ -670,12 +715,54 @@ object GroupBy {
query
}

def saveAndGetIncDf(
groupByConf: api.GroupBy,
range: PartitionRange,
tableUtils: TableUtils,
): GroupBy = {
val incOutputTable = groupByConf.metaData.incOutputTable
val tableProps = Option(groupByConf.metaData.tableProperties)
.map(_.toScala)
.orNull
//range should be modified to incremental range
val incGroupByBackfill = from(groupByConf, range, tableUtils, computeDependency = true, incrementalAgg = true)
val selectedSchema = incGroupByBackfill.selectedSchema
//TODO is there any other way to get incSchema?
val incSchema = new RowAggregator(selectedSchema, incGroupByBackfill.aggregations.flatMap(_.unWindowed)).incSchema
val hopsDf = incGroupByBackfill.convertHopsToDf(range, incSchema)
hopsDf.save(incOutputTable, tableProps)

val maxWindow = groupByConf.maxWindow.get
val sourceQueryableRange = PartitionRange(
tableUtils.partitionSpec.minus(range.start, maxWindow),
range.end
)(tableUtils)

val incTableFirstPartition: Option[String] = tableUtils.firstAvailablePartition(incOutputTable)
val incTableLastPartition: Option[String] = tableUtils.lastAvailablePartition(incOutputTable)

val incTableRange = PartitionRange(
incTableFirstPartition.get,
incTableLastPartition.get
)(tableUtils)

val incDfQuery = incTableRange.intersect(sourceQueryableRange).genScanQuery(null, incOutputTable)
val incDf: DataFrame = tableUtils.sql(incDfQuery)

new GroupBy(
incGroupByBackfill.aggregations,
incGroupByBackfill.keyColumns,
incDf
)
}

def computeBackfill(groupByConf: api.GroupBy,
endPartition: String,
tableUtils: TableUtils,
stepDays: Option[Int] = None,
overrideStartPartition: Option[String] = None,
skipFirstHole: Boolean = true): Unit = {
skipFirstHole: Boolean = true,
incrementalAgg: Boolean = true): Unit = {
assert(
groupByConf.backfillStartDate != null,
s"GroupBy:${groupByConf.metaData.name} has null backfillStartDate. This needs to be set for offline backfilling.")
Expand Down Expand Up @@ -714,7 +801,12 @@ object GroupBy {
stepRanges.zipWithIndex.foreach {
case (range, index) =>
logger.info(s"Computing group by for range: $range [${index + 1}/${stepRanges.size}]")
val groupByBackfill = from(groupByConf, range, tableUtils, computeDependency = true)
val groupByBackfill = if (incrementalAgg) {
saveAndGetIncDf(groupByConf, range, tableUtils)
//from(groupByConf, range, tableUtils, computeDependency = true)
} else {
from(groupByConf, range, tableUtils, computeDependency = true)
}
val outputDf = groupByConf.dataModel match {
// group by backfills have to be snapshot only
case Entities => groupByBackfill.snapshotEntities
Expand Down