Skip to content

Commit

Permalink
LC-68 - Avoid passing round java maps (#1029)
Browse files Browse the repository at this point in the history
* LC-68 - Avoid passing round java maps, only on ingress and egress from the scala code

* Correct weird class name

* Test fix
  • Loading branch information
davidsloan authored Feb 21, 2024
1 parent a190645 commit 7d8f2dc
Show file tree
Hide file tree
Showing 107 changed files with 390 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.kafka.connect.sink.SinkTask

import java.util
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.MapHasAsScala

/**
Expand Down Expand Up @@ -62,9 +61,9 @@ class S3ConsumerGroupsSinkTask extends SinkTask with ErrorHandler {
logger.debug(s"[{}] S3ConsumerGroupSinkTask.start", fallbackProps.get("name"))

val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty)
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap).asJava
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap)
(for {
taskId <- new ConnectorTaskIdCreator(CONNECTOR_PREFIX).fromProps(fallbackProps)
taskId <- new ConnectorTaskIdCreator(CONNECTOR_PREFIX).fromProps(fallbackProps.asScala.toMap)
config <- S3ConsumerGroupsSinkConfig.fromProps(props)
s3Client <- AwsS3ClientCreator.make(config.config)
uploader = new AwsS3Uploader(s3Client, taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask
import io.lenses.streamreactor.connect.cloud.common.sink.WriterManagerCreator
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager

import scala.jdk.CollectionConverters.MapHasAsJava
import scala.util.Try

object S3SinkTask {}
Expand All @@ -46,7 +45,7 @@ class S3SinkTask

def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[S3FileMetadata]] =
for {
config <- S3SinkConfig.fromProps(props.asJava)
config <- S3SinkConfig.fromProps(props)
s3Client <- AwsS3ClientCreator.make(config.s3Config)
storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
_ <- Try(setErrorRetryInterval(config.s3Config)).toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.cloud.common.config.PropertiesHelper
import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey

import java.util

case class S3ConsumerGroupsSinkConfig(
location: CloudObjectKey,
config: S3Config,
)

object S3ConsumerGroupsSinkConfig extends PropertiesHelper {
def fromProps(
props: util.Map[String, String],
props: Map[String, String],
): Either[Throwable, S3ConsumerGroupsSinkConfig] =
S3ConsumerGroupsSinkConfig(S3ConsumerGroupsSinkConfigDef(props))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance
import org.apache.kafka.common.config.ConfigDef.Type

import java.util
import scala.jdk.CollectionConverters._

object S3ConsumerGroupsSinkConfigDef {
Expand Down Expand Up @@ -116,7 +115,7 @@ object S3ConsumerGroupsSinkConfigDef {
)
}

case class S3ConsumerGroupsSinkConfigDef(props: util.Map[String, String])
case class S3ConsumerGroupsSinkConfigDef(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3ConsumerGroupsSinkConfigDef.config, props) {
def getParsedValues: Map[String, _] = values().asScala.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketO
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.sink.config.OffsetSeekerOptions

import java.util

object S3SinkConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
)(
implicit
connectorTaskId: ConnectorTaskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import io.lenses.streamreactor.connect.aws.s3.config.DeleteModeSettings
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

case class S3SinkConfigDefBuilder(props: util.Map[String, String])
case class S3SinkConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SinkConfigDef.config, props)
with CloudSinkConfigDefBuilder
with ErrorPolicySettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ class S3SourceTask extends SourceTask with LazyLogging {

logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties")

val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava
val contextProperties: Map[String, String] =
Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties: Map[String, String] = MapUtils.mergeProps(contextProperties, props.asScala.toMap)
(for {
result <- S3SourceState.make(mergedProperties, contextOffsetFn)
fiber <- result.partitionDiscoveryLoop.start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

import java.util
import scala.util.Try

object S3SourceConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
): Either[Throwable, S3SourceConfig] =
S3SourceConfig(S3SourceConfigDefBuilder(props))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import io.lenses.streamreactor.connect.aws.s3.config.DeleteModeSettings
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

case class S3SourceConfigDefBuilder(props: util.Map[String, String])
case class S3SourceConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SourceConfigDef.config, props)
with KcqlSettings
with ErrorPolicySettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState
import io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState

import java.util
import scala.jdk.CollectionConverters.IteratorHasAsScala

object S3SourceState extends StrictLogging {
def make(
props: util.Map[String, String],
props: Map[String, String],
contextOffsetFn: CloudLocation => Option[CloudLocation],
)(
implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters._
class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
test("creates an instance of S3ConsumerGroupsSinkConfig") {
S3ConsumerGroupsSinkConfig.fromProps(
Expand All @@ -34,7 +33,7 @@ class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
AWS_SECRET_KEY -> "secret",
AUTH_MODE -> "credentials",
CUSTOM_ENDPOINT -> "endpoint",
).asJava,
),
) match {
case Left(value) => fail("Expecting to build a config but got an error instead.", value)
case Right(value) =>
Expand Down Expand Up @@ -68,7 +67,7 @@ class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
AWS_SECRET_KEY -> "secret",
AUTH_MODE -> "credentials",
CUSTOM_ENDPOINT -> "endpoint",
).asJava,
),
) match {
case Left(value) => fail("Expecting to build a config but got an error instead.", value)
case Right(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._

import scala.jdk.CollectionConverters.MapHasAsJava

class DeleteModeSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
private val deleteModeMap = Table[String, String, Boolean](
("testName", "value", "expected"),
Expand All @@ -36,7 +34,7 @@ class DeleteModeSettingsTest extends AnyFlatSpec with Matchers with LazyLogging
S3SinkConfigDefBuilder(Map(
"connect.s3.kcql" -> "abc",
"connect.s3.delete.mode" -> value,
).asJava).batchDelete() should be(expected)
)).batchDelete() should be(expected)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters._
class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
test("creates an instance of S3ConsumerGroupsSinkConfig") {
S3ConsumerGroupsSinkConfig.fromProps(
Expand All @@ -37,7 +36,7 @@ class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
AWS_SECRET_KEY -> "secret",
AUTH_MODE -> "credentials",
CUSTOM_ENDPOINT -> "endpoint",
).asJava,
),
) match {
case Left(value) => fail("Expecting to build a config but got an error instead.", value)
case Right(value) =>
Expand Down Expand Up @@ -71,7 +70,7 @@ class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
AWS_SECRET_KEY -> "secret",
AUTH_MODE -> "credentials",
CUSTOM_ENDPOINT -> "endpoint",
).asJava,
),
) match {
case Left(value) => fail("Expecting to build a config but got an error instead.", value)
case Right(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.MapHasAsJava

class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matchers with EitherValues {
class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matchers with EitherValues {

val PrefixName = "streamReactorBackups"
val TopicName = "myTopic"
Expand All @@ -48,7 +47,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName PARTITIONBY _key STOREAS `CSV` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1",
)

val kcql = S3SinkConfigDefBuilder(props.asJava).getKCQL
val kcql = S3SinkConfigDefBuilder(props).getKCQL
kcql should have size 1

val element = kcql.head
Expand All @@ -65,7 +64,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS CSV WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1",
)

CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.Default))
}
Expand All @@ -76,7 +75,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `JSON` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.enabled))
}
Expand All @@ -87,7 +86,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `PARQUET` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false)))
Expand Down Expand Up @@ -116,7 +115,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
|""".stripMargin,
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false),
Expand All @@ -131,7 +130,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
)

val commitPolicy =
S3SinkConfigDefBuilder(props.asJava).commitPolicy(S3SinkConfigDefBuilder(props.asJava).getKCQL.head)
S3SinkConfigDefBuilder(props).commitPolicy(S3SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -149,7 +148,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
)

val commitPolicy =
S3SinkConfigDefBuilder(props.asJava).commitPolicy(S3SinkConfigDefBuilder(props.asJava).getKCQL.head)
S3SinkConfigDefBuilder(props).commitPolicy(S3SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -165,7 +164,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
)

val commitPolicy =
S3SinkConfigDefBuilder(props.asJava).commitPolicy(S3SinkConfigDefBuilder(props.asJava).getKCQL.head)
S3SinkConfigDefBuilder(props).commitPolicy(S3SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -181,7 +180,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName BATCH = 150 STOREAS `CSV` LIMIT 550",
)

val kcql = S3SinkConfigDefBuilder(props.asJava).getKCQL
val kcql = S3SinkConfigDefBuilder(props).getKCQL

kcql.head.getBatchSize should be(150)
kcql.head.getLimit should be(550)
Expand All @@ -192,7 +191,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
Expand All @@ -209,7 +208,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
Expand All @@ -226,7 +225,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES_VALUEONLY` WITH_FLUSH_COUNT = 1",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith(
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)).left.value.getMessage should startWith(
"Unsupported format - BYTES_VALUEONLY. Please note",
)
}
Expand All @@ -236,7 +235,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES` WITH_FLUSH_COUNT = 3",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith(
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)).left.value.getMessage should startWith(
"FLUSH_COUNT > 1 is not allowed for BYTES",
)
}
Expand Down
Loading

0 comments on commit 7d8f2dc

Please sign in to comment.