Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eaacfa6
code to write daily irs
kambstreat May 30, 2025
40b6cb2
store incremental agg and compute final IRs
kambstreat Jun 3, 2025
a014b6e
Store hops to inc tables
kambstreat Jun 7, 2025
32d559e
add code changes to generate final output from IR for AVG
kambstreat Jun 14, 2025
37293df
change function structure and variable names
kambstreat Jun 19, 2025
6263706
remove unused functions
kambstreat Jun 19, 2025
cb4325b
change function defs
kambstreat Jun 19, 2025
796ef96
make changes
kambstreat Jun 19, 2025
f218b23
change function order
kambstreat Jun 19, 2025
b1d4ee9
add new field is_incremental to python api
kambstreat Jun 20, 2025
2ab7659
get argument for isIncremental in scala spark backend
kambstreat Jun 20, 2025
238c781
add unit test for incremental groupby
kambstreat Jun 20, 2025
8edfd27
reuse table ccreation
kambstreat Jul 18, 2025
e903683
Update GroupByTest
kambstreat Jul 18, 2025
0bdc4fc
Add GroupByTest for events
kambstreat Jul 18, 2025
7987931
changes for incrementalg
kambstreat Sep 3, 2025
2b26d45
resolve merge conflicts
kambstreat Sep 3, 2025
7b62a43
add last hole logic for incrementnal bacckfill
kambstreat Sep 5, 2025
aeeb5ec
fix syntax
kambstreat Sep 5, 2025
9180d23
fix bug : backfill only for missing holes
kambstreat Sep 6, 2025
ee81672
fix none error for inc Table
kambstreat Sep 7, 2025
29a3f28
add incremental table queryable range
kambstreat Sep 19, 2025
aa16010
add logging for tableUtils
kambstreat Sep 19, 2025
ff41cc9
add log
kambstreat Sep 19, 2025
aa25f9f
fill incremental holes
kambstreat Sep 22, 2025
3efe8cd
modify incremental aggregation parts
kambstreat Oct 2, 2025
a3bece6
remove logs for debugging
kambstreat Oct 7, 2025
897d18c
fix output schema from incremenntal aggregations. Added unit tests
kambstreat Oct 12, 2025
a52d7d0
resolve merge conflicts
kambstreat Oct 12, 2025
9c446ec
resolve merge conflict
kambstreat Oct 12, 2025
ca14309
add test case to test struct of Average
kambstreat Nov 2, 2025
dfb9226
add option for isIncremental for backward compatibility
kambstreat Nov 2, 2025
7ca4dfc
resolve merge conflicts
kambstreat Nov 2, 2025
ab994bc
fix count operation from incremental IRS
kambstreat Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,57 @@ class UniqueCount[T](inputType: DataType) extends SimpleAggregator[T, util.HashS
}
}

class AverageIR extends SimpleAggregator[Array[Any], Array[Any], Double] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like this mostly exists to serialize aggregators. Is there something more generic we can do? @nikhilsimha and I talked about https://fory.apache.org/ for example, but it could be even simpler to moving this logic into a different class abstraction

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that if we could somehow make it more general, then we would not have to implement for each operation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @nikhil-zlai 😁

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of sticking with Avro because it is what we used for the online serving path so that we can keep the ser//de logic same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue with Avro is that it needs to schematize the object, whereas hopIR is just an Array[Any]. If you just reflect the hopIR, there is no need to schematize it.

override def outputType: DataType = DoubleType

override def irType: DataType =
StructType(
"AvgIr",
Array(StructField("sum", DoubleType), StructField("count", IntType))
)

override def prepare(input: Array[Any]): Array[Any] = {
Array(input(0).asInstanceOf[Double], input(1).asInstanceOf[Int])
}

// mutating
override def update(ir: Array[Any], input: Array[Any]): Array[Any] = {
val inputSum = input(0).asInstanceOf[Double]
val inputCount = input(1).asInstanceOf[Int]
ir.update(0, ir(0).asInstanceOf[Double] + inputSum)
ir.update(1, ir(1).asInstanceOf[Int] + inputCount)
ir
}

// mutating
override def merge(ir1: Array[Any], ir2: Array[Any]): Array[Any] = {
ir1.update(0, ir1(0).asInstanceOf[Double] + ir2(0).asInstanceOf[Double])
ir1.update(1, ir1(1).asInstanceOf[Int] + ir2(1).asInstanceOf[Int])
ir1
}

override def finalize(ir: Array[Any]): Double =
ir(0).asInstanceOf[Double] / ir(1).asInstanceOf[Int].toDouble

override def delete(ir: Array[Any], input: Array[Any]): Array[Any] = {
val inputSum = input(0).asInstanceOf[Double]
val inputCount = input(1).asInstanceOf[Int]
ir.update(0, ir(0).asInstanceOf[Double] - inputSum)
ir.update(1, ir(1).asInstanceOf[Int] - inputCount)
ir
}

override def clone(ir: Array[Any]): Array[Any] = {
val arr = new Array[Any](ir.length)
ir.copyToArray(arr)
arr
}

override def isDeletable: Boolean = true
}



class Average extends SimpleAggregator[Double, Array[Any], Double] {
override def outputType: DataType = DoubleType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ object ColumnAggregator {
private def toJavaDouble[A: Numeric](inp: Any) =
implicitly[Numeric[A]].toDouble(inp.asInstanceOf[A]).asInstanceOf[java.lang.Double]


private def toStructArray(inp: Any): Array[Any] = inp match {
case r: org.apache.spark.sql.Row => r.toSeq.toArray
case null => null
case other => throw new IllegalArgumentException(s"Expected Row, got: $other")
}

def construct(baseInputType: DataType,
aggregationPart: AggregationPart,
columnIndices: ColumnIndices,
Expand Down Expand Up @@ -341,6 +348,7 @@ object ColumnAggregator {
case ShortType => simple(new Average, toDouble[Short])
case DoubleType => simple(new Average)
case FloatType => simple(new Average, toDouble[Float])
case StructType(name, fields) => simple(new AverageIR, toStructArray)
case _ => mismatchException
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import scala.collection.Seq

// The primary API of the aggregator package.
// the semantics are to mutate values in place for performance reasons
class RowAggregator(val inputSchema: Seq[(String, DataType)], val aggregationParts: Seq[AggregationPart])
// userAggregationParts is used when incrementalMode = True.
class RowAggregator(val inputSchema: Seq[(String, DataType)],
val aggregationParts: Seq[AggregationPart],
val userInputAggregationParts: Option[Seq[AggregationPart]] = None )
extends Serializable
with SimpleAggregator[Row, Array[Any], Array[Any]] {

Expand Down Expand Up @@ -70,11 +73,25 @@ class RowAggregator(val inputSchema: Seq[(String, DataType)], val aggregationPar
.toArray
.zip(columnAggregators.map(_.irType))

val outputSchema: Array[(String, DataType)] = aggregationParts
val incrementalOutputSchema: Array[(String, DataType)] = aggregationParts
.map(_.incrementalOutputColumnName)
.toArray
.zip(columnAggregators.map(_.irType))

val aggregationPartsOutputSchema: Array[(String, DataType)] = aggregationParts
.map(_.outputColumnName)
.toArray
.zip(columnAggregators.map(_.outputType))

val outputSchema: Array[(String, DataType)] = userInputAggregationParts
.map{ parts =>
parts
.map(_.outputColumnName)
.toArray
.zip(columnAggregators.map(_.outputType))
}.getOrElse(aggregationPartsOutputSchema)


val isNotDeletable: Boolean = columnAggregators.forall(!_.isDeletable)

// this will mutate in place
Expand Down
2 changes: 2 additions & 0 deletions api/py/ai/chronon/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ def GroupBy(
derivations: Optional[List[ttypes.Derivation]] = None,
deprecation_date: Optional[str] = None,
description: Optional[str] = None,
is_incremental: Optional[bool] = False,
**kwargs,
) -> ttypes.GroupBy:
"""
Expand Down Expand Up @@ -570,6 +571,7 @@ def _normalize_source(source):
backfillStartDate=backfill_start_date,
accuracy=accuracy,
derivations=derivations,
isIncremental=is_incremental,
)
validate_group_by(group_by)
return group_by
9 changes: 8 additions & 1 deletion api/py/test/sample/scripts/spark_submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

set -euxo pipefail
CHRONON_WORKING_DIR=${CHRONON_TMPDIR:-/tmp}/${USER}
echo $CHRONON_WORKING_DIR
mkdir -p ${CHRONON_WORKING_DIR}
export TEST_NAME="${APP_NAME}_${USER}_test"
unset PYSPARK_DRIVER_PYTHON
unset PYSPARK_PYTHON
unset SPARK_HOME
unset SPARK_CONF_DIR
export LOG4J_FILE="${CHRONON_WORKING_DIR}/log4j_file"
export LOG4J_FILE="${CHRONON_WORKING_DIR}/log4j.properties"
cat > ${LOG4J_FILE} << EOF
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Expand All @@ -47,6 +48,9 @@ EOF
$SPARK_SUBMIT_PATH \
--driver-java-options " -Dlog4j.configuration=file:${LOG4J_FILE}" \
--conf "spark.executor.extraJavaOptions= -XX:ParallelGCThreads=4 -XX:+UseParallelGC -XX:+UseCompressedOops" \
--conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Dlog4j.configuration=file:${LOG4J_FILE}" \
--conf "spark.sql.warehouse.dir=/home/chaitu/projects/chronon/spark-warehouse" \
--conf "javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=/home/chaitu/projects/chronon/hive-metastore/metastore_db;create=true" \
--conf spark.sql.shuffle.partitions=${PARALLELISM:-4000} \
--conf spark.dynamicAllocation.maxExecutors=${MAX_EXECUTORS:-1000} \
--conf spark.default.parallelism=${PARALLELISM:-4000} \
Expand Down Expand Up @@ -77,3 +81,6 @@ tee ${CHRONON_WORKING_DIR}/${APP_NAME}_spark.log




#--conf "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Dlog4j.rootLogger=INFO,console" \

2 changes: 1 addition & 1 deletion api/py/test/sample/teams.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
},
"common_env": {
"VERSION": "latest",
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit",
"SPARK_SUBMIT_PATH": "spark-submit",
"JOB_MODE": "local[*]",
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing",
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class",
Expand Down
7 changes: 6 additions & 1 deletion api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object Extensions {
def cleanName: String = metaData.name.sanitize

def outputTable = s"${metaData.outputNamespace}.${metaData.cleanName}"

def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_inc"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_inc"
def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_daily_inc"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abbywh I think we already have a daily here.

def preModelTransformsTable = s"${metaData.outputNamespace}.${metaData.cleanName}_pre_mt"
def outputLabelTable = s"${metaData.outputNamespace}.${metaData.cleanName}_labels"
def outputFinalView = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
Expand Down Expand Up @@ -178,8 +178,13 @@ object Extensions {

def outputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${aggregationPart.window.suffix}${bucketSuffix}"

def incrementalOutputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${bucketSuffix}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we still keep the aggregationPart.window.suffix? Otherwise, how do we reconstruct the final output column?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pengyu-hou not sure I get it. I can not use the window.suffix right as the intermediate incremental is daily aggregation.


}


implicit class AggregationOps(aggregation: Aggregation) {

// one agg part per bucket per window
Expand Down
1 change: 1 addition & 0 deletions api/thrift/api.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ struct GroupBy {
6: optional string backfillStartDate
// Optional derivation list
7: optional list<Derivation> derivations
8: optional bool isIncremental
}

struct JoinPart {
Expand Down
5 changes: 5 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/DataRange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ case class PartitionRange(start: String, end: String)(implicit tableUtils: Table
}
}

def daysBetween: Int = {
if (start == null || end == null) 0
else Stream.iterate(start)(tableUtils.partitionSpec.after).takeWhile(_ <= end).size
}

def isSingleDay: Boolean = {
start == end
}
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ object Driver {
tableUtils,
args.stepDays.toOption,
args.startPartitionOverride.toOption,
!args.runFirstHole()
!args.runFirstHole(),
Option(args.groupByConf.isIncremental).getOrElse(false)
)

if (args.shouldExport()) {
Expand Down
Loading