1
1
package com .quantifind .utils
2
2
3
- import java .util
4
-
5
- import kafka .api .{ApiVersion , LeaderAndIsr }
6
- import kafka .cluster .{EndPoint , BrokerEndPoint , Cluster , Broker }
7
- import kafka .common .TopicAndPartition
3
+ import kafka .cluster .Broker
8
4
import kafka .consumer .ConsumerThreadId
9
- import kafka .controller .{ReassignedPartitionsContext , LeaderIsrAndControllerEpoch }
10
5
import kafka .utils .ZkUtils
11
- import org .I0Itec .zkclient .ZkClient
12
- import org .apache .kafka .common .protocol .SecurityProtocol
13
- import org .apache .zookeeper .data .{ACL , Stat }
6
+ import org .apache .zookeeper .data .Stat
14
7
15
8
import scala .collection .mutable
16
9
10
+
17
11
/*
18
12
This class is mainly to help us mock the ZkUtils class. It is really painful to get powermock to work with scalatest,
19
13
so we created this class with a little help from IntelliJ to auto-generate the delegation code
@@ -22,114 +16,21 @@ class ZkUtilsWrapper(zkUtils: ZkUtils) {
22
16
23
17
val ConsumersPath = ZkUtils .ConsumersPath
24
18
25
- val delegator = zkUtils
26
-
27
- // def updatePersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updatePersistentPath(path, data, acls)
28
-
29
- // def updatePartitionReassignmentData(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): Unit = delegator.updatePartitionReassignmentData(partitionsToBeReassigned)
30
-
31
- // def updateEphemeralPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updateEphemeralPath(path, data, acls)
32
-
33
- // def setupCommonPaths(): Unit = delegator.setupCommonPaths()
34
-
35
- // def replicaAssignmentZkData(map: collection.Map[String, Seq[Int]]): String = delegator.replicaAssignmentZkData(map)
36
-
37
- // def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int, rack: Option[String], apiVersion: ApiVersion): Unit = delegator.registerBrokerInZk(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion)
19
+ val delegator : ZkUtils = zkUtils
38
20
39
21
def readDataMaybeNull (path : String ): (Option [String ], Stat ) = delegator.readDataMaybeNull(path)
40
22
41
23
def readData (path : String ): (String , Stat ) = delegator.readData(path)
42
24
43
- // def pathExists(path: String): Boolean = delegator.pathExists(path)
44
-
45
- // def parseTopicsData(jsonData: String): Seq[String] = delegator.parseTopicsData(jsonData)
46
-
47
- // def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = delegator.parsePartitionReassignmentDataWithoutDedup(jsonData)
48
-
49
- // def parsePartitionReassignmentData(jsonData: String): collection.Map[TopicAndPartition, Seq[Int]] = delegator.parsePartitionReassignmentData(jsonData)
50
-
51
- // def makeSurePersistentPathExists(path: String, acls: util.List[ACL]): Unit = delegator.makeSurePersistentPathExists(path, acls)
52
-
53
- // def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = delegator.leaderAndIsrZkData(leaderAndIsr, controllerEpoch)
54
-
55
- // def getTopicsByConsumerGroup(consumerGroup: String): Seq[String] = delegator.getTopicsByConsumerGroup(consumerGroup)
56
-
57
- // def getSortedBrokerList(): Seq[Int] = delegator.getSortedBrokerList()
58
-
59
- // def getSequenceId(path: String, acls: util.List[ACL]): Int = delegator.getSequenceId(path, acls)
60
-
61
- // def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getReplicasForPartition(topic, partition)
62
-
63
- // def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = delegator.getReplicaAssignmentForTopics(topics)
64
-
65
- // def getPartitionsUndergoingPreferredReplicaElection(): collection.Set[TopicAndPartition] = delegator.getPartitionsUndergoingPreferredReplicaElection()
66
-
67
25
def getPartitionsForTopics (topics : Seq [String ]): mutable.Map [String , Seq [Int ]] = delegator.getPartitionsForTopics(topics)
68
26
69
- // def getPartitionsBeingReassigned(): collection.Map[TopicAndPartition, ReassignedPartitionsContext] = delegator.getPartitionsBeingReassigned()
70
-
71
- // def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: collection.Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = delegator.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
72
-
73
- // def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = delegator.getPartitionAssignmentForTopics(topics)
74
-
75
27
def getLeaderForPartition (topic : String , partition : Int ): Option [Int ] = delegator.getLeaderForPartition(topic, partition)
76
28
77
- // def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = delegator.getLeaderAndIsrForPartition(topic, partition)
78
-
79
- // def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getInSyncReplicasForPartition(topic, partition)
80
-
81
- // def getEpochForPartition(topic: String, partition: Int): Int = delegator.getEpochForPartition(topic, partition)
82
-
83
- // def getController(): Int = delegator.getController()
84
-
85
29
def getConsumersPerTopic (group : String , excludeInternalTopics : Boolean ): mutable.Map [String , List [ConsumerThreadId ]] = delegator.getConsumersPerTopic(group, excludeInternalTopics)
86
30
87
- // def getConsumersInGroup(group: String): Seq[String] = delegator.getConsumersInGroup(group)
88
-
89
- // def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = delegator.getConsumerPartitionOwnerPath(group, topic, partition)
90
-
91
- // def getConsumerGroups(): Seq[String] = delegator.getConsumerGroups()
92
-
93
- // def getChildrenParentMayNotExist(path: String): Seq[String] = delegator.getChildrenParentMayNotExist(path)
94
-
95
31
def getChildren (path : String ): Seq [String ] = delegator.getChildren(path)
96
32
97
- // def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = delegator.getBrokerSequenceId(MaxReservedBrokerId)
98
-
99
- // def getBrokerInfo(brokerId: Int): Option[Broker] = delegator.getBrokerInfo(brokerId)
100
-
101
- // def getAllTopics(): Seq[String] = delegator.getAllTopics()
102
-
103
- // def getAllPartitions(): collection.Set[TopicAndPartition] = delegator.getAllPartitions()
104
-
105
- // def getAllEntitiesWithConfig(entityType: String): Seq[String] = delegator.getAllEntitiesWithConfig(entityType)
106
-
107
- // def getAllConsumerGroupsForTopic(topic: String): collection.Set[String] = delegator.getAllConsumerGroupsForTopic(topic)
108
-
109
33
def getAllBrokersInCluster (): Seq [Broker ] = delegator.getAllBrokersInCluster()
110
34
111
- // def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = delegator.getAllBrokerEndPointsForChannel(protocolType)
112
-
113
- // def formatAsReassignmentJson(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): String = delegator.formatAsReassignmentJson(partitionsToBeReassigned)
114
-
115
- // def deletePathRecursive(path: String): Unit = delegator.deletePathRecursive(path)
116
-
117
- // def deletePath(path: String): Boolean = delegator.deletePath(path)
118
-
119
- // def deletePartition(brokerId: Int, topic: String): Unit = delegator.deletePartition(brokerId, topic)
120
-
121
- // def createSequentialPersistentPath(path: String, data: String, acls: util.List[ACL]): String = delegator.createSequentialPersistentPath(path, data, acls)
122
-
123
- // def createPersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createPersistentPath(path, data, acls)
124
-
125
- // def createEphemeralPathExpectConflict(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createEphemeralPathExpectConflict(path, data, acls)
126
-
127
- // def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = delegator.conditionalUpdatePersistentPathIfExists(path, data, expectVersion)
128
-
129
- // def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int, optionalChecker: Option[(ZkUtils, String, String) => (Boolean, Int)]): (Boolean, Int) = delegator.conditionalUpdatePersistentPath(path, data, expectVersion, optionalChecker)
130
-
131
- // def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = delegator.conditionalDeletePath(path, expectedVersion)
132
-
133
35
def close (): Unit = delegator.close()
134
-
135
36
}
0 commit comments