Skip to content

Commit

Permalink
Merge pull request #26 from metabolicdata/feature/demultiplex_to_future
Browse files Browse the repository at this point in the history
Allow demultiplex to future
  • Loading branch information
margon8 authored Jan 25, 2024
2 parents 6ddc6a4 + ecee749 commit 549df77
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class DemultiplexTransform(idColumns: Seq[String], val orderColumns: Seq[String]
(SELECT sequence(date_trunc( '$format', to_date('$fromDate')),
date_trunc('$format', $endStatement),
interval 1 ${format.toLowerCase}) AS dates)
) AS date)
WHERE date <= now()"""
) AS date)"""
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.metabolic.data.core.services.spark.transform
import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import com.metabolic.data.core.services.spark.transformations.DemultiplexTransform
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.not
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -14,15 +15,15 @@ class DemultiplexTest extends AnyFunSuite
with SharedSparkContext
with BeforeAndAfterAll {

ignore("Demultiplex by month") {
test("Demultiplex by month") {
val sqlCtx = sqlContext

val inputData = Seq(
Row("Marc", new Timestamp(1641026108000L)), // 1 enero
Row("Marc", new Timestamp(1641803708000L)), // 10 de enero
Row("Ausias", new Timestamp(1644482108000L)), // 10 de febrero
Row("Ausias", new Timestamp(1645346108000L)), // 20 de febrero
Row("Marc", new Timestamp(1647765308000L)) // 20 de marzo
Row("Marc", Timestamp.valueOf("2022-01-01 09:35:08")), // 1 enero
Row("Marc", Timestamp.valueOf("2022-01-10 09:35:08")), // 10 de enero
Row("Ausias", Timestamp.valueOf("2022-02-10 09:35:08")), // 10 de febrero
Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08")), // 20 de febrero
Row("Marc", Timestamp.valueOf("2022-03-20 09:35:08")) // 20 de marzo
)

val inputSchema = List(
Expand All @@ -41,13 +42,13 @@ class DemultiplexTest extends AnyFunSuite
.where("period <= '2022-04-30'")

val expectedData = Seq(
Row("Marc", new Timestamp(1641803708000L), new Timestamp(1640991600000L)), // 1 de enero
Row("Marc", new Timestamp(1641803708000L), new Timestamp(1643670000000L)), //1 de febrero que no existia
Row("Marc", new Timestamp(1647765308000L), new Timestamp(1646089200000L)), //1 marzo
Row("Marc", new Timestamp(1647765308000L), new Timestamp(1648764000000L)), //1 de abril que no existia
Row("Ausias", new Timestamp(1645346108000L), new Timestamp(1643670000000L)), // 1 de febrero
Row("Ausias", new Timestamp(1645346108000L), new Timestamp(1646089200000L)), //1 marzo que no existia
Row("Ausias", new Timestamp(1645346108000L), new Timestamp(1648764000000L)) //1 de abril que no existia
Row("Marc", Timestamp.valueOf("2022-01-10 09:35:08"), Timestamp.valueOf("2022-01-01 00:00:00")), // 1 de enero
Row("Marc", Timestamp.valueOf("2022-01-10 09:35:08"), Timestamp.valueOf("2022-02-01 00:00:00")), //1 de febrero que no existia
Row("Marc", Timestamp.valueOf("2022-03-20 09:35:08"), Timestamp.valueOf("2022-03-01 00:00:00")), //1 marzo
Row("Marc", Timestamp.valueOf("2022-03-20 09:35:08"), Timestamp.valueOf("2022-04-01 00:00:00")), //1 de abril que no existia
Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08"), Timestamp.valueOf("2022-02-01 00:00:00")), // 1 de febrero
Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08"), Timestamp.valueOf("2022-03-01 00:00:00")), //1 marzo que no existia
Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08"), Timestamp.valueOf("2022-04-01 00:00:00")) //1 de abril que no existia
)

val expectedSchema = List(
Expand All @@ -60,7 +61,8 @@ class DemultiplexTest extends AnyFunSuite
StructType(expectedSchema)
)

assertDataFrameNoOrderEquals(outputDF, expectedDf)
assertDataFrameNoOrderEquals(outputDF,expectedDf)


}

Expand Down

0 comments on commit 549df77

Please sign in to comment.