diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala index 17ab8b07..5e762468 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/admin/AdminClient.scala @@ -11,7 +11,7 @@ import com.wixpress.dst.greyhound.core.zioutils.KafkaFutures._ import com.wixpress.dst.greyhound.core._ import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource -import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, ListConsumerGroupOffsetsOptions, ListConsumerGroupOffsetsSpec, NewPartitions, NewTopic, OffsetSpec, AdminClient => KafkaAdminClient, AdminClientConfig => KafkaAdminClientConfig} +import org.apache.kafka.clients.admin.{AdminClient => KafkaAdminClient, AdminClientConfig => KafkaAdminClientConfig, AlterConfigOp, Config, ConfigEntry, ListConsumerGroupOffsetsOptions, ListConsumerGroupOffsetsSpec, NewPartitions, NewTopic, OffsetSpec} import org.apache.kafka.common import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.TOPIC @@ -28,7 +28,11 @@ trait AdminClient { def listTopics()(implicit trace: Trace): RIO[Any, Set[String]] def listEndOffsets( - tps: Set[TopicPartition] + tps: Set[TopicPartition] + )(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] + + def listBeginningOffsets( + tps: Set[TopicPartition] )(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] def topicExists(topic: String)(implicit trace: Trace): RIO[Any, Boolean] @@ -36,8 +40,8 @@ trait AdminClient { def topicsExist(topics: Set[Topic])(implicit trace: Trace): ZIO[Any, Throwable, Map[Topic, Boolean]] def createTopics( - configs: Set[TopicConfig], - ignoreErrors: Throwable => Boolean = isTopicExistsError + configs: Set[TopicConfig], + ignoreErrors: Throwable => Boolean = isTopicExistsError )(implicit trace: Trace): RIO[GreyhoundMetrics, Map[String, Option[Throwable]]] def numberOfBrokers(implicit trace: Trace): RIO[Any, Int] @@ -45,7 +49,7 @@ trait AdminClient { def propertiesFor(topics: Set[Topic])(implicit trace: Trace): RIO[Any, Map[Topic, TopicPropertiesResult]] def commit(group: Group, commits: Map[TopicPartition, OffsetAndMetadata])( - implicit trace: Trace + implicit trace: Trace ): ZIO[Any, Throwable, Unit] def listGroups()(implicit trace: Trace): RIO[Any, Set[String]] @@ -53,7 +57,7 @@ trait AdminClient { def groupOffsets(groups: Set[Group])(implicit trace: Trace): RIO[Any, Map[GroupTopicPartition, PartitionOffset]] def groupOffsetsSpecific(requestedTopicPartitions: Map[Group, Set[TopicPartition]])( - implicit trace: Trace + implicit trace: Trace ): RIO[Any, Map[GroupTopicPartition, PartitionOffset]] def groupState(groups: Set[Group])(implicit trace: Trace): RIO[Any, Map[String, GroupState]] @@ -63,19 +67,19 @@ trait AdminClient { def describeConsumerGroups(groupIds: Set[Group])(implicit trace: Trace): RIO[Any, Map[Group, ConsumerGroupDescription]] def consumerGroupOffsets(groupId: Group, onlyPartitions: Option[Set[TopicPartition]] = None)( - implicit trace: Trace + implicit trace: Trace ): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] def consumerGroupsOffsets( - groups: Map[Group, Option[Set[TopicPartition]]] + groups: Map[Group, Option[Set[TopicPartition]]] )(implicit trace: Trace): RIO[Any, Map[Group, Map[TopicPartition, OffsetAndMetadata]]] def increasePartitions(topic: Topic, newCount: Int)(implicit trace: Trace): RIO[Any with GreyhoundMetrics, Unit] def updateTopicConfigProperties( - topic: Topic, - configProperties: Map[String, ConfigPropOp], - useNonIncrementalAlter: Boolean = false + topic: Topic, + configProperties: Map[String, ConfigPropOp], + useNonIncrementalAlter: Boolean = false )(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] def attributes: Map[String, String] @@ -145,13 +149,14 @@ object AdminClient { .values() .asScala .headOption - .map { case (_, topicResult) => - topicResult.asZio.either.flatMap { - case Right(_) => ZIO.succeed(true) - case Left(_: UnknownTopicOrPartitionException) => ZIO.succeed(false) - case Left(_: InvalidTopicException) => ZIO.succeed(false) - case Left(ex) => ZIO.fail(ex) - } + .map { + case (_, topicResult) => + topicResult.asZio.either.flatMap { + case Right(_) => ZIO.succeed(true) + case Left(_: UnknownTopicOrPartitionException) => ZIO.succeed(false) + case Left(_: InvalidTopicException) => ZIO.succeed(false) + case Left(ex) => ZIO.fail(ex) + } } .getOrElse(ZIO.succeed(false)) } @@ -159,35 +164,37 @@ object AdminClient { override def topicsExist(topics: Set[Topic])(implicit trace: Trace): ZIO[Any, Throwable, Map[Topic, Boolean]] = attemptBlocking(client.describeTopics(topics.asJava)).flatMap { result => ZIO - .foreach(result.values().asScala.toSeq) { case (topic, topicResult) => - topicResult.asZio.either.flatMap { - case Right(_) => ZIO.succeed(topic -> true) - case Left(_: UnknownTopicOrPartitionException) => ZIO.succeed(topic -> false) - case Left(ex) => ZIO.fail(ex) - } + .foreach(result.values().asScala.toSeq) { + case (topic, topicResult) => + topicResult.asZio.either.flatMap { + case Right(_) => ZIO.succeed(topic -> true) + case Left(_: UnknownTopicOrPartitionException) => ZIO.succeed(topic -> false) + case Left(ex) => ZIO.fail(ex) + } } .map(_.toMap) } override def createTopics( - configs: Set[TopicConfig], - ignoreErrors: Throwable => Boolean = isTopicExistsError + configs: Set[TopicConfig], + ignoreErrors: Throwable => Boolean = isTopicExistsError )(implicit trace: Trace): RIO[GreyhoundMetrics, Map[String, Option[Throwable]]] = { val configsByTopic = configs.map(c => c.name -> c).toMap attemptBlocking(client.createTopics(configs.map(toNewTopic).asJava)).flatMap { result => ZIO - .foreach(result.values.asScala.toSeq) { case (topic, topicResult) => - topicResult.asZio.unit - .reporting(res => - TopicCreated( - topic, - configsByTopic(topic).partitions, - attributes, - res.mapExit(fromExit(isTopicExistsError)) + .foreach(result.values.asScala.toSeq) { + case (topic, topicResult) => + topicResult.asZio.unit + .reporting(res => + TopicCreated( + topic, + configsByTopic(topic).partitions, + attributes, + res.mapExit(fromExit(isTopicExistsError)) + ) ) - ) - .either - .map(topic -> _.left.toOption.filterNot(ignoreErrors)) + .either + .map(topic -> _.left.toOption.filterNot(ignoreErrors)) } .map(_.toMap) } @@ -198,7 +205,7 @@ object AdminClient { .flatMap(_.nodes().asZio.map(_.size)) override def propertiesFor( - topics: Set[Topic] + topics: Set[Topic] )(implicit trace: Trace): RIO[Any, Map[Topic, TopicPropertiesResult]] = (describeConfigs(client, topics) zipPar describePartitions(client, topics)).map { case (configsPerTopic, partitionsAndReplicationPerTopic) => @@ -219,17 +226,33 @@ object AdminClient { topics <- result.names().asZio } yield topics.asScala.toSet + override def listBeginningOffsets( + tps: Set[TopicPartition] + )(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = { + val j: java.util.Map[org.apache.kafka.common.TopicPartition, OffsetSpec] = + tps.map { tp => (tp.asKafka, OffsetSpec.earliest()) }.toMap.asJava + + for { + result <- attemptBlocking(client.listOffsets(j)) + results <- result.all.asZio.map(_.asScala.toMap.map { + case (tp, offset) => + (TopicPartition.fromKafka(tp), offset.offset()) + }) + } yield results + } + override def listEndOffsets( - tps: Set[TopicPartition] + tps: Set[TopicPartition] )(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = { val j: java.util.Map[org.apache.kafka.common.TopicPartition, OffsetSpec] = tps.map { tp => (tp.asKafka, OffsetSpec.latest()) }.toMap.asJava for { - result <- attemptBlocking(client.listOffsets(j)) - results <- result.all.asZio.map(_.asScala.toMap.map { case (tp, offset) => - (TopicPartition.fromKafka(tp), offset.offset()) - }) + result <- attemptBlocking(client.listOffsets(j)) + results <- result.all.asZio.map(_.asScala.toMap.map { + case (tp, offset) => + (TopicPartition.fromKafka(tp), offset.offset()) + }) } yield results } @@ -243,7 +266,7 @@ object AdminClient { } yield groups.asScala.map(_.groupId()).toSet override def commit(group: Group, commits: Map[TopicPartition, OffsetAndMetadata])( - implicit trace: Trace + implicit trace: Trace ): ZIO[Any, Throwable, Unit] = attemptBlocking( client @@ -251,68 +274,66 @@ object AdminClient { ).unit override def groupOffsetsSpecific( - requestedTopicPartitions: Map[Group, Set[TopicPartition]] + requestedTopicPartitions: Map[Group, Set[TopicPartition]] )(implicit trace: Trace): RIO[Any, Map[GroupTopicPartition, PartitionOffset]] = for { - result <- ZIO.flatten( - ZIO - .attemptBlocking( - client.listConsumerGroupOffsets( - requestedTopicPartitions - .mapValues(tps => - new ListConsumerGroupOffsetsSpec().topicPartitions(tps.map(_.asKafka).asJavaCollection) - ) - .asJava - ) - ) - .map(_.all.asZio) - ) - rawOffsets = result.asScala.toMap.mapValues(_.asScala.toMap) - offset = - rawOffsets.map { case (group, offsets) => - offsets - .map { case (tp, offset) => - ( - GroupTopicPartition(group, TopicPartition.fromKafka(tp)), - PartitionOffset(Option(offset).map(_.offset()).getOrElse(-1L)) - ) - } - .filter { case (_, o) => o.offset >= 0 } + result <- ZIO.flatten( + ZIO + .attemptBlocking( + client.listConsumerGroupOffsets( + requestedTopicPartitions + .mapValues(tps => new ListConsumerGroupOffsetsSpec().topicPartitions(tps.map(_.asKafka).asJavaCollection)) + .asJava + ) + ) + .map(_.all.asZio) + ) + rawOffsets = result.asScala.toMap.mapValues(_.asScala.toMap) + offset = + rawOffsets.map { + case (group, offsets) => + offsets + .map { + case (tp, offset) => + ( + GroupTopicPartition(group, TopicPartition.fromKafka(tp)), + PartitionOffset(Option(offset).map(_.offset()).getOrElse(-1L)) + ) + } + .filter { case (_, o) => o.offset >= 0 } } groupOffsets = offset.foldLeft(Map.empty[GroupTopicPartition, PartitionOffset])((x, y) => x ++ y) } yield groupOffsets override def groupOffsets( - groups: Set[String] + groups: Set[String] )(implicit trace: Trace): RIO[Any, Map[GroupTopicPartition, PartitionOffset]] = for { - result <- ZIO.foreach(groups)(group => attemptBlocking(group -> client.listConsumerGroupOffsets(group))) + result <- ZIO.foreach(groups)(group => attemptBlocking(group -> client.listConsumerGroupOffsets(group))) // TODO: remove ._1 , ._2 rawOffsetsEffects = result.toMap.mapValues(_.partitionsToOffsetAndMetadata().asZio) - offsetsEffects = + offsetsEffects = rawOffsetsEffects.map(offset => offset._2.map(f => - f.asScala.map(p => - p.copy(GroupTopicPartition(offset._1, core.TopicPartition(p._1)), PartitionOffset(p._2.offset())) - ) + f.asScala.map(p => p.copy(GroupTopicPartition(offset._1, core.TopicPartition(p._1)), PartitionOffset(p._2.offset()))) ) ) - offsetsMapSets <- ZIO.collectAll(offsetsEffects) - groupOffsets = offsetsMapSets.foldLeft(Map.empty[GroupTopicPartition, PartitionOffset])((x, y) => x ++ y) + offsetsMapSets <- ZIO.collectAll(offsetsEffects) + groupOffsets = offsetsMapSets.foldLeft(Map.empty[GroupTopicPartition, PartitionOffset])((x, y) => x ++ y) } yield groupOffsets override def groupState(groups: Set[String])(implicit trace: Trace): RIO[Any, Map[String, GroupState]] = for { - result <- attemptBlocking(client.describeConsumerGroups(groups.asJava)) + result <- attemptBlocking(client.describeConsumerGroups(groups.asJava)) groupEffects = result.describedGroups().asScala.mapValues(_.asZio).toMap - groupsList <- ZIO.collectAll(groupEffects.values) - membersMap = groupsList.groupBy(_.groupId()).mapValues(_.flatMap(_.members().asScala)).toMap - groupState = membersMap - .mapValues(members => { - val topicPartitionsMap = members.flatMap(_.assignment().topicPartitions().asScala) - GroupState(topicPartitionsMap.map(TopicPartition(_)).toSet) - }) - .toMap + groupsList <- ZIO.collectAll(groupEffects.values) + membersMap = groupsList.groupBy(_.groupId()).mapValues(_.flatMap(_.members().asScala)).toMap + groupState = membersMap + .mapValues(members => { + val topicPartitionsMap = members.flatMap(_.assignment().topicPartitions().asScala) + GroupState(topicPartitionsMap.map(TopicPartition(_)).toSet) + }) + .toMap } yield groupState override def deleteTopic(topic: Topic)(implicit trace: Trace): RIO[Any, Unit] = { @@ -322,7 +343,7 @@ object AdminClient { } override def describeConsumerGroups( - groupIds: Set[Group] + groupIds: Set[Group] )(implicit trace: Trace): RIO[Any, Map[Group, ConsumerGroupDescription]] = { for { desc <- attemptBlocking(client.describeConsumerGroups(groupIds.asJava).all()) @@ -331,45 +352,45 @@ object AdminClient { } override def consumerGroupOffsets( - groupId: Group, - onlyPartitions: Option[Set[TopicPartition]] = None + groupId: Group, + onlyPartitions: Option[Set[TopicPartition]] = None )(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] = { val maybePartitions: util.List[common.TopicPartition] = onlyPartitions.map(_.map(_.asKafka).toList.asJava).orNull for { desc <- attemptBlocking( - client - .listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions().topicPartitions(maybePartitions)) - ) - res <- attemptBlocking(desc.partitionsToOffsetAndMetadata().get()) + client + .listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions().topicPartitions(maybePartitions)) + ) + res <- attemptBlocking(desc.partitionsToOffsetAndMetadata().get()) } yield res.asScala.toMap.map { case (tp, om) => (TopicPartition(tp), OffsetAndMetadata(om)) } } override def consumerGroupsOffsets( - groups: Map[Group, Option[Set[TopicPartition]]] + groups: Map[Group, Option[Set[TopicPartition]]] )(implicit trace: Trace): RIO[Any, Map[Group, Map[TopicPartition, OffsetAndMetadata]]] = for { - desc <- attemptBlocking( - client - .listConsumerGroupOffsets( - groups - .mapValues(tps => - new ListConsumerGroupOffsetsSpec().topicPartitions(tps.map(_.map(_.asKafka).toList.asJava).orNull) - ) - .asJava - ) - ) - res <- attemptBlocking(groups.map(g => (g._1, desc.partitionsToOffsetAndMetadata(g._1).get()))) - } yield res.map { case (group, o) => - ( - group, - o.asScala.toSeq - .map(om => (TopicPartition.fromKafka(om._1), OffsetAndMetadata(om._2.offset(), om._2.metadata()))) - .toMap - ) + desc <- + attemptBlocking( + client + .listConsumerGroupOffsets( + groups + .mapValues(tps => new ListConsumerGroupOffsetsSpec().topicPartitions(tps.map(_.map(_.asKafka).toList.asJava).orNull)) + .asJava + ) + ) + res <- attemptBlocking(groups.map(g => (g._1, desc.partitionsToOffsetAndMetadata(g._1).get()))) + } yield res.map { + case (group, o) => + ( + group, + o.asScala.toSeq + .map(om => (TopicPartition.fromKafka(om._1), OffsetAndMetadata(om._2.offset(), om._2.metadata()))) + .toMap + ) } override def increasePartitions(topic: Topic, newCount: Int)( - implicit trace: Trace + implicit trace: Trace ): RIO[GreyhoundMetrics, Unit] = { attemptBlocking(client.createPartitions(Map(topic -> NewPartitions.increaseTo(newCount)).asJava)) .flatMap(_.all().asZio) @@ -378,9 +399,9 @@ object AdminClient { } override def updateTopicConfigProperties( - topic: Topic, - configProperties: Map[String, ConfigPropOp], - useNonIncrementalAlter: Boolean = false + topic: Topic, + configProperties: Map[String, ConfigPropOp], + useNonIncrementalAlter: Boolean = false )(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = { if (useNonIncrementalAlter) updateTopicConfigUsingAlter(topic, configProperties) else updateTopicConfigIncremental(topic, configProperties) @@ -395,24 +416,25 @@ object AdminClient { described <- describeConfigs(client, Set(topic)) beforeProps <- described.values.head.getOrFail beforeConfig = beforeProps.propertiesThat(_.isTopicSpecific) - configToSet = configProperties.foldLeft(beforeConfig) { - case (acc, (key, ConfigPropOp.Delete)) => acc - key - case (acc, (key, ConfigPropOp.Set(value))) => acc + (key -> value) - } - configJava = new Config(configToSet.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava) - _ <- attemptBlocking(client.alterConfigs(Map(resource -> configJava).asJava)) - .flatMap(_.all().asZio) + configToSet = configProperties.foldLeft(beforeConfig) { + case (acc, (key, ConfigPropOp.Delete)) => acc - key + case (acc, (key, ConfigPropOp.Set(value))) => acc + (key -> value) + } + configJava = new Config(configToSet.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava) + _ <- attemptBlocking(client.alterConfigs(Map(resource -> configJava).asJava)) + .flatMap(_.all().asZio) } yield () ).reporting(TopicConfigUpdated(topic, configProperties, incremental = false, attributes, _)) } private def updateTopicConfigIncremental(topic: Topic, configProperties: Map[String, ConfigPropOp]) = { val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic) - val ops = configProperties.map { case (key, value) => - value match { - case ConfigPropOp.Delete => new AlterConfigOp(new ConfigEntry(key, null), OpType.DELETE) - case ConfigPropOp.Set(value) => new AlterConfigOp(new ConfigEntry(key, value), OpType.SET) - } + val ops = configProperties.map { + case (key, value) => + value match { + case ConfigPropOp.Delete => new AlterConfigOp(new ConfigEntry(key, null), OpType.DELETE) + case ConfigPropOp.Set(value) => new AlterConfigOp(new ConfigEntry(key, value), OpType.SET) + } }.asJavaCollection attemptBlocking(client.incrementalAlterConfigs(Map(resource -> ops).asJava)) .flatMap(_.all().asZio) @@ -424,11 +446,11 @@ object AdminClient { } private def describeConfigs(client: KafkaAdminClient, topics: Set[Topic]): RIO[Any, Map[Topic, TopicPropertiesResult]] = - attemptBlocking(client.describeConfigs(topics.map(t => new ConfigResource(TOPIC, t)).asJavaCollection)) flatMap { - result => - ZIO - .collectAll( - result.values.asScala.toMap.map { case (resource, kf) => + attemptBlocking(client.describeConfigs(topics.map(t => new ConfigResource(TOPIC, t)).asJavaCollection)) flatMap { result => + ZIO + .collectAll( + result.values.asScala.toMap.map { + case (resource, kf) => kf.asZio .map { config => resource.name -> @@ -439,36 +461,39 @@ object AdminClient { 0 ) } - .catchSome { case _: UnknownTopicOrPartitionException => - ZIO.succeed(resource.name -> TopicPropertiesResult.TopicDoesnExist(resource.name)) + .catchSome { + case _: UnknownTopicOrPartitionException => + ZIO.succeed(resource.name -> TopicPropertiesResult.TopicDoesnExist(resource.name)) } - } - ) - .map(_.toMap) + } + ) + .map(_.toMap) } private def describePartitions( - client: KafkaAdminClient, - topics: Set[Topic] + client: KafkaAdminClient, + topics: Set[Topic] ): RIO[Any, Map[Topic, TopicPropertiesResult]] = attemptBlocking(client.describeTopics(topics.asJavaCollection)) .flatMap { result => ZIO - .collectAll(result.values.asScala.toMap.map { case (topic, kf) => - kf.asZio - .map { desc => - val replication = desc.partitions.asScala.map(_.replicas.size).sorted.headOption.getOrElse(0) - topic -> - TopicPropertiesResult.TopicProperties( - topic, - desc.partitions.size, - Seq.empty, - replication - ) - } - .catchSome { case _: UnknownTopicOrPartitionException => - ZIO.succeed(topic -> TopicPropertiesResult.TopicDoesnExist(topic)) - } + .collectAll(result.values.asScala.toMap.map { + case (topic, kf) => + kf.asZio + .map { desc => + val replication = desc.partitions.asScala.map(_.replicas.size).sorted.headOption.getOrElse(0) + topic -> + TopicPropertiesResult.TopicProperties( + topic, + desc.partitions.size, + Seq.empty, + replication + ) + } + .catchSome { + case _: UnknownTopicOrPartitionException => + ZIO.succeed(topic -> TopicPropertiesResult.TopicDoesnExist(topic)) + } }) .map(_.toMap) } @@ -477,8 +502,7 @@ object AdminClient { Option(e.getCause).exists(_.isInstanceOf[TopicExistsException]) } -case class AdminClientConfig(bootstrapServers: String, extraProperties: Map[String, String] = Map.empty) - extends CommonGreyhoundConfig { +case class AdminClientConfig(bootstrapServers: String, extraProperties: Map[String, String] = Map.empty) extends CommonGreyhoundConfig { override def kafkaProps: Map[String, String] = Map(KafkaAdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers) ++ extraProperties