diff --git a/src/main/scala/com/metabolic/data/core/services/spark/transformations/DemultiplexTransform.scala b/src/main/scala/com/metabolic/data/core/services/spark/transformations/DemultiplexTransform.scala index d54293cc..4618ae02 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/transformations/DemultiplexTransform.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/transformations/DemultiplexTransform.scala @@ -24,8 +24,7 @@ class DemultiplexTransform(idColumns: Seq[String], val orderColumns: Seq[String] (SELECT sequence(date_trunc( '$format', to_date('$fromDate')), date_trunc('$format', $endStatement), interval 1 ${format.toLowerCase}) AS dates) - ) AS date) - WHERE date <= now()""" + ) AS date)""" ) } diff --git a/src/test/scala/com/metabolic/data/core/services/spark/transform/DemultiplexTest.scala b/src/test/scala/com/metabolic/data/core/services/spark/transform/DemultiplexTest.scala index ca867dd9..89744cf9 100644 --- a/src/test/scala/com/metabolic/data/core/services/spark/transform/DemultiplexTest.scala +++ b/src/test/scala/com/metabolic/data/core/services/spark/transform/DemultiplexTest.scala @@ -3,6 +3,7 @@ package com.metabolic.data.core.services.spark.transform import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext} import com.metabolic.data.core.services.spark.transformations.DemultiplexTransform import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.not import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite @@ -14,15 +15,15 @@ class DemultiplexTest extends AnyFunSuite with SharedSparkContext with BeforeAndAfterAll { - ignore("Demultiplex by month") { + test("Demultiplex by month") { val sqlCtx = sqlContext val inputData = Seq( - Row("Marc", new Timestamp(1641026108000L)), // 1 enero - Row("Marc", new Timestamp(1641803708000L)), // 10 de enero - Row("Ausias", new Timestamp(1644482108000L)), // 10 de febrero - Row("Ausias", new Timestamp(1645346108000L)), // 20 de febrero - Row("Marc", new Timestamp(1647765308000L)) // 20 de marzo + Row("Marc", Timestamp.valueOf("2022-01-01 09:35:08")), // 1 enero + Row("Marc", Timestamp.valueOf("2022-01-10 09:35:08")), // 10 de enero + Row("Ausias", Timestamp.valueOf("2022-02-10 09:35:08")), // 10 de febrero + Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08")), // 20 de febrero + Row("Marc", Timestamp.valueOf("2022-03-20 09:35:08")) // 20 de marzo ) val inputSchema = List( @@ -41,13 +42,13 @@ class DemultiplexTest extends AnyFunSuite .where("period <= '2022-04-30'") val expectedData = Seq( - Row("Marc", new Timestamp(1641803708000L), new Timestamp(1640991600000L)), // 1 de enero - Row("Marc", new Timestamp(1641803708000L), new Timestamp(1643670000000L)), //1 de febrero que no existia - Row("Marc", new Timestamp(1647765308000L), new Timestamp(1646089200000L)), //1 marzo - Row("Marc", new Timestamp(1647765308000L), new Timestamp(1648764000000L)), //1 de abril que no existia - Row("Ausias", new Timestamp(1645346108000L), new Timestamp(1643670000000L)), // 1 de febrero - Row("Ausias", new Timestamp(1645346108000L), new Timestamp(1646089200000L)), //1 marzo que no existia - Row("Ausias", new Timestamp(1645346108000L), new Timestamp(1648764000000L)) //1 de abril que no existia + Row("Marc", Timestamp.valueOf("2022-01-10 09:35:08"), Timestamp.valueOf("2022-01-01 00:00:00")), // 1 de enero + Row("Marc", Timestamp.valueOf("2022-01-10 09:35:08"), Timestamp.valueOf("2022-02-01 00:00:00")), //1 de febrero que no existia + Row("Marc", Timestamp.valueOf("2022-03-20 09:35:08"), Timestamp.valueOf("2022-03-01 00:00:00")), //1 marzo + Row("Marc", Timestamp.valueOf("2022-03-20 09:35:08"), Timestamp.valueOf("2022-04-01 00:00:00")), //1 de abril que no existia + Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08"), Timestamp.valueOf("2022-02-01 00:00:00")), // 1 de febrero + Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08"), Timestamp.valueOf("2022-03-01 00:00:00")), //1 marzo que no existia + Row("Ausias", Timestamp.valueOf("2022-02-20 09:35:08"), Timestamp.valueOf("2022-04-01 00:00:00")) //1 de abril que no existia ) val expectedSchema = List( @@ -60,7 +61,8 @@ class DemultiplexTest extends AnyFunSuite StructType(expectedSchema) ) - assertDataFrameNoOrderEquals(outputDF, expectedDf) + assertDataFrameNoOrderEquals(outputDF,expectedDf) + }