Skip to content

Commit

Permalink
Databricks 3.4 (#214)
Browse files Browse the repository at this point in the history
* casting strings and maps to UTF8Strings and MapData

* updating version numbers
  • Loading branch information
sabeegrewal authored Nov 28, 2017
1 parent 12d5838 commit c9eb2bc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-databricks-parent_2.11</artifactId>
<version>3.3.0</version>
<version>3.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>azure-eventhubs-databricks_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import org.apache.spark.eventhubs.common.rdd.{ EventHubsRDD, OffsetRange, Progre
import org.apache.spark.eventhubs.common.{ EventHubsConnector, NameAndPartition, RateControlUtils }
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.execution.streaming.{ Offset, SerializedOffset, Source }
import org.apache.spark.sql.streaming.eventhubs.checkpoint.StructuredStreamingProgressTracker
import org.apache.spark.sql.types._
import org.apache.spark.sql.{ DataFrame, SQLContext }
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

Expand Down Expand Up @@ -251,31 +254,37 @@ private[spark] class EventHubsSource private[eventhubs] (
import scala.collection.JavaConverters._
val (containsProperties, userDefinedKeys) =
EventHubsSourceProvider.ifContainsPropertiesAndUserDefinedKeys(eventHubsParams)

val rowRDD = eventHubsRDD.map(
eventData =>
InternalRow.fromSeq(Seq(
eventData.getBytes,
eventData.getSystemProperties.getOffset.toLong,
eventData.getSystemProperties.getSequenceNumber,
eventData.getSystemProperties.getEnqueuedTime.getEpochSecond,
eventData.getSystemProperties.getPublisher,
eventData.getSystemProperties.getPartitionKey
UTF8String.fromString(eventData.getSystemProperties.getPublisher),
UTF8String.fromString(eventData.getSystemProperties.getPartitionKey)
) ++ {
if (containsProperties) {
if (userDefinedKeys.nonEmpty) {
userDefinedKeys.map(k => {
eventData.getProperties.asScala.getOrElse(k, "").toString
UTF8String.fromString(eventData.getProperties.asScala.getOrElse(k, "").toString)
})
} else {
Seq(eventData.getProperties.asScala.map {
case (k, v) =>
k -> (if (v == null) null else v.toString)
})
val keys = ArrayBuffer[UTF8String]()
val values = ArrayBuffer[UTF8String]()
for ((k, v) <- eventData.getProperties.asScala) {
keys.append(UTF8String.fromString(k))
if (v == null) values.append(null)
else values.append(UTF8String.fromString(v.toString))
}
Seq(ArrayBasedMapData(keys.toArray, values.toArray))
}
} else {
Seq()
}
}))

sqlContext.sparkSession.internalCreateDataFrame(rowRDD, schema, isStreaming = true)
}

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-databricks-parent_2.11</artifactId>
<version>3.3.0</version>
<version>3.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>azure-eventhubs-databricks-examples_2.11</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-databricks-parent_2.11</artifactId>
<version>3.3.0</version>
<version>3.4.0</version>
<packaging>pom</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down

0 comments on commit c9eb2bc

Please sign in to comment.