Skip to content

Commit c3780f0

Browse files
authored
Merge pull request #49 from sauliusvl/uuid7
Use UUIDv7 based Iceberg file names
2 parents 952ecae + fda0cf8 commit c3780f0

File tree

6 files changed

+73
-20
lines changed

6 files changed

+73
-20
lines changed

build.sbt

+15-15
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ ThisBuild / git.remoteRepo := {
3434
}
3535
}
3636

37-
val scalaTestVersion = "3.2.18"
37+
val scalaTestVersion = "3.2.19"
3838
val scalaCheckVersion = "1.18.0"
39-
val scalaCheckTestVersion = "3.2.18.0"
39+
val scalaCheckTestVersion = "3.2.19.0"
4040

4141
val hadoopVersion = "3.4.0"
4242
val parquetVersion = "1.14.1"
43-
val icebergVersion = "1.5.2"
43+
val icebergVersion = "1.6.0"
4444

4545
lazy val `stream-loader-core` = project
4646
.in(file("stream-loader-core"))
@@ -51,17 +51,17 @@ lazy val `stream-loader-core` = project
5151
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, git.gitHeadCommit),
5252
libraryDependencies ++= Seq(
5353
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
54-
"org.apache.kafka" % "kafka-clients" % "3.7.0",
54+
"org.apache.kafka" % "kafka-clients" % "3.8.0",
5555
"org.log4s" %% "log4s" % "1.10.0",
5656
"org.apache.commons" % "commons-compress" % "1.26.2",
5757
"org.xerial.snappy" % "snappy-java" % "1.1.10.5",
5858
"org.lz4" % "lz4-java" % "1.8.0",
59-
"com.github.luben" % "zstd-jni" % "1.5.6-3",
59+
"com.github.luben" % "zstd-jni" % "1.5.6-4",
6060
"com.univocity" % "univocity-parsers" % "2.9.1",
6161
"org.json4s" %% "json4s-native" % "4.0.7",
62-
"io.micrometer" % "micrometer-core" % "1.13.1",
62+
"io.micrometer" % "micrometer-core" % "1.13.2",
6363
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
64-
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
64+
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
6565
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
6666
"ch.qos.logback" % "logback-classic" % "1.5.6" % "test"
6767
)
@@ -75,9 +75,9 @@ lazy val `stream-loader-clickhouse` = project
7575
resolvers += "jitpack" at "https://jitpack.io",
7676
libraryDependencies ++= Seq(
7777
"org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1",
78-
"com.clickhouse" % "clickhouse-jdbc" % "0.6.1",
78+
"com.clickhouse" % "clickhouse-jdbc" % "0.6.3",
7979
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
80-
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
80+
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
8181
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
8282
)
8383
)
@@ -116,14 +116,14 @@ lazy val `stream-loader-s3` = project
116116
.settings(commonSettings)
117117
.settings(
118118
libraryDependencies ++= Seq(
119-
"software.amazon.awssdk" % "s3" % "2.26.3",
119+
"software.amazon.awssdk" % "s3" % "2.26.25",
120120
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
121-
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.744" % "test",
121+
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.765" % "test",
122122
"org.gaul" % "s3proxy" % "2.2.0" % "test"
123123
)
124124
)
125125

126-
val verticaVersion = "24.2.0-0"
126+
val verticaVersion = "24.3.0-0"
127127

128128
lazy val `stream-loader-vertica` = project
129129
.in(file("stream-loader-vertica"))
@@ -133,7 +133,7 @@ lazy val `stream-loader-vertica` = project
133133
libraryDependencies ++= Seq(
134134
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion % "provided",
135135
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
136-
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
136+
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
137137
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
138138
)
139139
)
@@ -167,11 +167,11 @@ lazy val `stream-loader-tests` = project
167167
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
168168
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
169169
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
170-
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
170+
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
171171
"org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test",
172172
"org.mandas" % "docker-client" % "7.0.8" % "test",
173173
"org.jboss.resteasy" % "resteasy-client" % "6.2.9.Final" % "test",
174-
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.1" % "test",
174+
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.2" % "test",
175175
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
176176
),
177177
inConfig(IntegrationTest)(Defaults.testTasks),

project/build.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.10.0
1+
sbt.version=1.10.1

project/plugins.sbt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")
1616

1717
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
1818

19-
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.5"
19+
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.6"
2020

2121
addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")
2222

2323
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.2.1")
2424

25-
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.10.0")
25+
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.11.1")

stream-loader-tests/src/main/scala/com/adform/streamloader/util/UuidExtensions.scala stream-loader-core/src/main/scala/com/adform/streamloader/util/UuidExtensions.scala

+21
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package com.adform.streamloader.util
1010

1111
import java.nio.ByteBuffer
12+
import java.security.SecureRandom
1213
import java.util.UUID
1314

1415
object UuidExtensions {
@@ -20,4 +21,24 @@ object UuidExtensions {
2021
bb.array
2122
}
2223
}
24+
25+
private val random = new SecureRandom
26+
27+
def randomUUIDv7(): UUID = {
28+
val value = new Array[Byte](16)
29+
random.nextBytes(value)
30+
31+
val timestamp = ByteBuffer.allocate(8)
32+
timestamp.putLong(System.currentTimeMillis)
33+
System.arraycopy(timestamp.array, 2, value, 0, 6)
34+
35+
value(6) = ((value(6) & 0x0f) | 0x70).toByte
36+
value(8) = ((value(8) & 0x3f) | 0x80).toByte
37+
38+
val buf = ByteBuffer.wrap(value)
39+
val high = buf.getLong
40+
val low = buf.getLong
41+
42+
new UUID(high, low)
43+
}
2344
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2020 Adform
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package com.adform.streamloader.util
10+
11+
import com.adform.streamloader.util.UuidExtensions.randomUUIDv7
12+
import org.scalatest.funspec.AnyFunSpec
13+
import org.scalatest.matchers.should.Matchers
14+
15+
class UuidExtensionsTest extends AnyFunSpec with Matchers {
16+
17+
it("should generate unique UUIDv7 values") {
18+
val uuids = (0 until 100).map(_ => randomUUIDv7())
19+
uuids should contain theSameElementsInOrderAs uuids.distinct
20+
}
21+
22+
it("should generate alphabetically sorted UUIDv7 values") {
23+
val uuids = (0 until 10)
24+
.map(_ => {
25+
randomUUIDv7()
26+
Thread.sleep(5) // the v7 spec guarantees ordering in a 1ms scope
27+
})
28+
.map(_.toString)
29+
30+
uuids should contain theSameElementsInOrderAs uuids.sorted
31+
}
32+
}

stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatcher.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ import com.adform.streamloader.model.{StreamRange, StreamRecord}
1212
import com.adform.streamloader.sink.batch.{RecordBatch, RecordBatchBuilder, RecordBatcher, RecordFormatter}
1313
import com.adform.streamloader.sink.file.{FileStats, MultiFileCommitStrategy}
1414
import com.adform.streamloader.util.TimeProvider
15+
import com.adform.streamloader.util.UuidExtensions.randomUUIDv7
1516
import org.apache.iceberg.data.{GenericAppenderFactory, Record => IcebergRecord}
1617
import org.apache.iceberg.io.DataWriteResult
1718
import org.apache.iceberg.{FileFormat, PartitionKey, Table}
1819

1920
import java.time.Duration
20-
import java.util.UUID
2121
import scala.collection.mutable
2222
import scala.jdk.CollectionConverters._
2323

@@ -41,7 +41,7 @@ class IcebergRecordBatchBuilder(
4141
private val startTimeMillis = timeProvider.currentMillis
4242
private var recordCount = 0L
4343
private val dataWriter = {
44-
val filename = fileFormat.addExtension(UUID.randomUUID().toString)
44+
val filename = fileFormat.addExtension(randomUUIDv7().toString)
4545
val path = table.locationProvider().newDataLocation(table.spec(), pk, filename)
4646
val output = table.io().newOutputFile(path)
4747

0 commit comments

Comments
 (0)