Skip to content

Commit

Permalink
[greyhound-consumer-proxy] start from latest offset when group doesn'…
Browse files Browse the repository at this point in the history
…t exist (temporary until full config is done) (#35428)

* [greyhound-consumer-proxy] start from latest offset when group doesn't exist (temporary until full config is done) #automerge #skipreview

* .

GitOrigin-RevId: 34b953997eee3d167bb2beb23d50045daf03460f
  • Loading branch information
berman7 authored and wix-oss committed Oct 5, 2023
1 parent f8c6082 commit 04698bd
Showing 1 changed file with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import com.wixpress.dst.greyhound.core.admin.AdminClient.isTopicExistsError
import com.wixpress.dst.greyhound.core.admin.TopicPropertiesResult.{TopicDoesnExistException, TopicProperties}
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.zioutils.KafkaFutures._
import com.wixpress.dst.greyhound.core.{CommonGreyhoundConfig, GHThrowable, Group, GroupTopicPartition, OffsetAndMetadata, Topic, TopicConfig, TopicPartition}
import com.wixpress.dst.greyhound.core.{CommonGreyhoundConfig, GHThrowable, Group, GroupTopicPartition, Offset, OffsetAndMetadata, Topic, TopicConfig, TopicPartition}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, ListConsumerGroupOffsetsOptions, ListConsumerGroupOffsetsSpec, NewPartitions, NewTopic, TopicDescription, AdminClient => KafkaAdminClient, AdminClientConfig => KafkaAdminClientConfig}
import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, ListConsumerGroupOffsetsOptions, ListConsumerGroupOffsetsSpec, ListOffsetsOptions, ListOffsetsResult, NewPartitions, NewTopic, OffsetSpec, TopicDescription, AdminClient => KafkaAdminClient, AdminClientConfig => KafkaAdminClientConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException}
Expand All @@ -27,6 +27,10 @@ trait AdminClient {

def listTopics()(implicit trace: Trace): RIO[Any, Set[String]]

def listEndOffsets(
tps: Set[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]]

def topicExists(topic: String)(implicit trace: Trace): RIO[Any, Boolean]

def topicsExist(topics: Set[Topic])(implicit trace: Trace): ZIO[Any, Throwable, Map[Topic, Boolean]]
Expand All @@ -40,7 +44,9 @@ trait AdminClient {

def propertiesFor(topics: Set[Topic])(implicit trace: Trace): RIO[Any, Map[Topic, TopicPropertiesResult]]

def commit(group: Group, commits: Map[TopicPartition, OffsetAndMetadata])(implicit trace: Trace): ZIO[Any, Throwable, Unit]
def commit(group: Group, commits: Map[TopicPartition, OffsetAndMetadata])(
implicit trace: Trace
): ZIO[Any, Throwable, Unit]

def listGroups()(implicit trace: Trace): RIO[Any, Set[String]]

Expand Down Expand Up @@ -216,6 +222,20 @@ object AdminClient {
topics <- result.names().asZio
} yield topics.asScala.toSet

override def listEndOffsets(
tps: Set[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = {
val j: java.util.Map[org.apache.kafka.common.TopicPartition, OffsetSpec] =
tps.map { tp => (tp.asKafka, OffsetSpec.latest()) }.toMap.asJava

for {
result <- attemptBlocking(client.listOffsets(j))
results <- result.all.asZio.map(_.asScala.toMap.map { case (tp, offset) =>
(TopicPartition.fromKafka(tp), offset.offset())
})
} yield results
}

private def toNewTopic(config: TopicConfig): NewTopic =
new NewTopic(config.name, config.partitions, config.replicationFactor.toShort)
.configs(config.propertiesMap.asJava)
Expand All @@ -225,9 +245,13 @@ object AdminClient {
groups <- result.valid().asZio
} yield groups.asScala.map(_.groupId()).toSet

override def commit(group: Group, commits: Map[TopicPartition, OffsetAndMetadata])(implicit trace: Trace): ZIO[Any, Throwable, Unit] =
attemptBlocking(client.alterConsumerGroupOffsets(group,
commits.map { case (tp, offset) => (tp.asKafka, offset.asKafka) }.asJava)).unit
override def commit(group: Group, commits: Map[TopicPartition, OffsetAndMetadata])(
implicit trace: Trace
): ZIO[Any, Throwable, Unit] =
attemptBlocking(
client
.alterConsumerGroupOffsets(group, commits.map { case (tp, offset) => (tp.asKafka, offset.asKafka) }.asJava)
).unit

override def groupOffsetsSpecific(
requestedTopicPartitions: Map[Group, Set[TopicPartition]]
Expand All @@ -249,9 +273,13 @@ object AdminClient {
rawOffsets = result.asScala.toMap.mapValues(_.asScala.toMap)
offset =
rawOffsets.map { case (group, offsets) =>
offsets.map{case (tp, offset) =>
(GroupTopicPartition(group, TopicPartition.fromKafka(tp)), PartitionOffset(Option(offset).map(_.offset()).getOrElse(0L)))
offsets.map { case (tp, offset) =>
(
GroupTopicPartition(group, TopicPartition.fromKafka(tp)),
PartitionOffset(Option(offset).map(_.offset()).getOrElse(-1L))
)
}
.filter{case (_, o) => o.offset >= 0}
}
groupOffsets = offset.foldLeft(Map.empty[GroupTopicPartition, PartitionOffset])((x, y) => x ++ y)
} yield groupOffsets
Expand Down

0 comments on commit 04698bd

Please sign in to comment.