Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet file loading #16

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/com/twosigma/flint/rdd/Conversion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ object Conversion {
)
val iter = PeekableIterator(
OrderedIterator(
PartitionsIterator(rdd, thisDep.parents, context)
PartitionsIterator(rdd, thisDep.parents, context, preservesPartitionsOrdering = true)
).filterByRange(thisDep.range)
)
new InterruptibleIterator[(K, V)](context, iter)
Expand Down
45 changes: 31 additions & 14 deletions src/main/scala/com/twosigma/flint/rdd/RangeDependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.reflect.ClassTag
* :: DeveloperApi ::
*/
private[rdd] object RangeDependency {

/**
* Normalize the ranges of partitions from a sorted [[org.apache.spark.rdd.RDD]].
*
Expand All @@ -39,19 +38,9 @@ private[rdd] object RangeDependency {
normalizationStrategy: PartitionNormalizationStrategy = HeavyKeysNormalizationStrategy
)(implicit ord: Ordering[K]): Seq[RangeDependency[K, P]] = {
require(headers.nonEmpty, "Need at least one partition")
import OrderedPartitionHeaderUtils.HeaderOrdering

val sortedHeaders = headers.sortBy(_.partition.index).toArray
// Assume partitions are sorted, i.e. the keys of ith partition are less or equal than those of (i + 1)th partition.
sortedHeaders.reduceOption {
(h1, h2) =>
if (ord.lteq(h1.firstKey, h2.firstKey)) {
h2
} else {
sys.error(s"Partitions are not sorted. " +
s"The partition ${h1.partition.index} has the first key ${h1.firstKey} and " +
s"the partition ${h2.partition.index} has the first key ${h2.firstKey}.")
}
}
val sortedHeaders = headers.sorted.toArray

val (nonNormalizedPartitions, nonNormalizedRanges) = sortedHeaders.zipWithIndex.map {
case (hdr, idx) =>
Expand Down Expand Up @@ -102,6 +91,32 @@ private[rdd] case class OrderedPartitionHeader[K, P <: Partition](
secondKey: Option[K]
)

private[rdd] object OrderedPartitionHeaderUtils {
implicit class HeaderOrdering[K, P <: Partition](header: OrderedPartitionHeader[K, P])(implicit ord: Ordering[K])
extends Ordered[OrderedPartitionHeader[K, P]] {
import scala.math.Ordered.orderingToOrdered
def compare(that: OrderedPartitionHeader[K, P]): Int = (header, that) match {
// First keys not equal we return the larger first key
case (OrderedPartitionHeader(_, thisFirst, _), OrderedPartitionHeader(_, thatFirst, _))
if thisFirst != thatFirst
=> thisFirst.compare(thatFirst)
// First keys are equal, the header with "Some" second key one is larger than the one with "None"
case (OrderedPartitionHeader(_, _, Some(_)), OrderedPartitionHeader(_, _, None)) => 1
case (OrderedPartitionHeader(_, _, None), OrderedPartitionHeader(_, _, Some(_))) => -1
// If the partitions both only contain the same single key, compare by partition index to preserve
// previous behaviour
case (OrderedPartitionHeader(thisPartition, _, None), OrderedPartitionHeader(thatPartition, _, None))
=> thisPartition.index.compare(thatPartition.index)
case (OrderedPartitionHeader(thisPartition, thisFirst, Some(thisSecond)),
OrderedPartitionHeader(thatPartition, _, Some(thatSecond))) =>
sys.error(s"Error has occurred, partitions weren't sorted properly, two partitions both had first key " +
s"$thisFirst and they both had non empty seconds keys: $thisSecond and $thatSecond" +
s" partitions were: $thisPartition and $thatPartition")
}
}
}


/**
* :: DeveloperApi ::
* Base class for range dependency.
Expand Down Expand Up @@ -214,7 +229,9 @@ private[rdd] object HeavyKeysNormalizationStrategy extends PartitionNormalizatio
implicit
ord: Ordering[K]
): Seq[CloseOpen[K]] = {
val sortedHeaders = headers.sortBy(_.partition.index)
import OrderedPartitionHeaderUtils.HeaderOrdering

val sortedHeaders = headers.sorted

val partitionBoundaries = sortedHeaders.head.firstKey +: sortedHeaders.tail.map {
header => header.secondKey.getOrElse(header.firstKey)
Expand Down
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
16 changes: 16 additions & 0 deletions src/test/scala/com/twosigma/flint/rdd/ConversionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ class ConversionSpec extends FlatSpec with SharedSparkContext with Timeouts {
assert(false, "Should not completed as the job has been killed.")
case Failure(_) =>
}
}

"fromSortedRDD" should "sort partitions, and have partition indexes increasing" in {

// Create an RDD with data data sorted within partitions, but partitions not sorted
// Data is:
// Partition 0: 100, 130, 160, 199
// Partition 1: 0, 30, 60, 99
val data = Seq(100, 130, 160, 199, 0, 30, 60, 99)
val kvData = data.zip(data)
val rdd = sc.makeRDD(kvData, 2)

val orderedRdd = Conversion.fromSortedRDD(rdd)
assert(orderedRdd.partitions(0).index == 0)
assert(orderedRdd.partitions(1).index == 1)

assert(orderedRdd.collect().toSeq == kvData.sorted)
}
}
64 changes: 63 additions & 1 deletion src/test/scala/com/twosigma/flint/rdd/RangeDependencySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package com.twosigma.flint.rdd

import org.scalatest.FlatSpec
import org.scalatest.prop.TableDrivenPropertyChecks

class RangeDependencySpec extends FlatSpec {
class RangeDependencySpec extends FlatSpec with TableDrivenPropertyChecks {
// partition 0: [1, 1, 2, ..., 4]
// partition 1: [4, ..., 4]
// partition 2: [4, 4, 5, ..., 7]
Expand Down Expand Up @@ -141,4 +142,65 @@ class RangeDependencySpec extends FlatSpec {
assert(RangeDependency(2, CloseOpen(8, Some(14)), List(Split(3), Split(4))) == dep(2))
assert(RangeDependency(3, CloseOpen(14, None), List(Split(4))) == dep(3))
}

private def makeHeader(
firstKey: Int,
secondKey: Option[Int],
partitionNumber: Int = 0):
OrderedPartitionHeader[Int, OrderedRDDPartition] =
OrderedPartitionHeader(OrderedRDDPartition(partitionNumber), firstKey, secondKey)

import OrderedPartitionHeaderUtils.HeaderOrdering
import org.scalatest.Matchers.{an, thrownBy}

"HeaderOrdering" should "throw error if first keys are both equal, and second keys are both Some" in {
val a = makeHeader(1, Some(2))
val b = makeHeader(1, Some(2))
an[Exception] shouldBe thrownBy {
a.compare(b)
}
}

val secondKeyExamples = Table(
("left", "right"),
(Some(1), Some(2)),
(Some(2), Some(1)),
(None, Some(1)),
(Some(1), None),
(None, None))


forAll(secondKeyExamples) { (leftSecondKey, rightSecondKey) => {
it should s"say a < b if a.firstKey < b.firstKey with second keys ($leftSecondKey, $rightSecondKey)" in {
val left = makeHeader(1, leftSecondKey)
val right = makeHeader(2, rightSecondKey)
assert(left.compare(right) < 0)
}
}
}

forAll(secondKeyExamples) { (leftSecondKey, rightSecondKey) => {
it should s"say a > b if a.firstKey > b.firstKey with second keys ($leftSecondKey, $rightSecondKey)" in {
import OrderedPartitionHeaderUtils.HeaderOrdering
val left = makeHeader(2, leftSecondKey)
val right = makeHeader(1, rightSecondKey)
assert(left.compare(right) > 0)
}
}
}

it should "say the header with Some second key is larger if first keys are equal" in {
val smaller = makeHeader(1, None)
val larger = makeHeader(1, Some(2))

assert(smaller.compare(larger) < 0)
assert(larger.compare(smaller) > 0)
}

it should "say headers are ordered by partition number if first keys are equal and second keys are both None" in {
val a = makeHeader(1, None, 1)
val b = makeHeader(1, None, 2)
assert(a.compare(b) < 0)
assert(b.compare(a) > 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,25 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite {
}
}

// Parquet files no longer load with their partitions in the order they were written, see:
// https://issues.apache.org/jira/browse/SPARK-20144
// This tests that data loads correctly despite this, the parquet file in question was generated in spark shell
// with the following code:
// import org.apache.spark.sql.types._
// import org.apache.spark.sql._
//
// val rdd = sc.makeRDD(Seq.range(0L, 10L).map(Row(_)), 5)
// val schema = StructType(Array(StructField("time", LongType)))
// val df = spark.sqlContext.createDataFrame(rdd, schema)
// df.write.parquet("/small.parquet")
it should "load from parquet" taggedAs(Slow) in {
withResource("/timeseries/parquet/small.parquet") { source =>
val tsRdd = TimeSeriesRDD.fromParquet(sc, source)(true, TimeUnit.NANOSECONDS);
val loadedData = tsRdd.collect().map(_.getLong(0))
assert(loadedData.toSeq == Seq.range(0, 10))
}
}

// This test is temporarily tagged as "Slow" so that scalatest runner could exclude this test optionally.
it should "not modify original rows during conversions/modifications" taggedAs (Slow) ignore {
withResource("/timeseries/parquet/PriceWithHeader.parquet") { source =>
Expand All @@ -652,5 +671,4 @@ class TimeSeriesRDDSpec extends TimeSeriesSuite {
assert(rows.deep == finalRows.deep)
}
}

}