@@ -9,6 +9,10 @@ use crate::rpc::auth::{
9
9
UserAddResponse , UserChangePasswordResponse , UserDeleteResponse , UserGetResponse ,
10
10
UserGrantRoleResponse , UserListResponse , UserRevokeRoleResponse ,
11
11
} ;
12
+ use crate :: rpc:: cluster:: {
13
+ ClusterClient , MemberAddOptions , MemberAddResponse , MemberListResponse , MemberPromoteResponse ,
14
+ MemberRemoveResponse , MemberUpdateResponse ,
15
+ } ;
12
16
use crate :: rpc:: kv:: {
13
17
CompactionOptions , CompactionResponse , DeleteOptions , DeleteResponse , GetOptions , GetResponse ,
14
18
KvClient , PutOptions , PutResponse , Txn , TxnResponse ,
@@ -27,6 +31,8 @@ use tonic::metadata::{Ascii, MetadataValue};
27
31
use tonic:: transport:: Channel ;
28
32
use tonic:: Interceptor ;
29
33
34
+ const HTTP_PREFIX : & str = "http://" ;
35
+
30
36
/// Asynchronous `etcd` client using v3 API.
31
37
pub struct Client {
32
38
kv : KvClient ,
@@ -35,6 +41,7 @@ pub struct Client {
35
41
lock : LockClient ,
36
42
auth : AuthClient ,
37
43
maintenance : MaintenanceClient ,
44
+ cluster : ClusterClient ,
38
45
}
39
46
40
47
impl Client {
@@ -43,8 +50,6 @@ impl Client {
43
50
endpoints : S ,
44
51
options : Option < ConnectOptions > ,
45
52
) -> Result < Self > {
46
- const HTTP_PREFIX : & str = "http://" ;
47
-
48
53
let endpoints = {
49
54
let mut eps = Vec :: new ( ) ;
50
55
for e in endpoints. as_ref ( ) {
@@ -90,6 +95,7 @@ impl Client {
90
95
let lease = LeaseClient :: new ( channel. clone ( ) , interceptor. clone ( ) ) ;
91
96
let lock = LockClient :: new ( channel. clone ( ) , interceptor. clone ( ) ) ;
92
97
let auth = AuthClient :: new ( channel. clone ( ) , interceptor. clone ( ) ) ;
98
+ let cluster = ClusterClient :: new ( channel. clone ( ) , interceptor. clone ( ) ) ;
93
99
let maintenance = MaintenanceClient :: new ( channel, interceptor) ;
94
100
95
101
Ok ( Self {
@@ -99,6 +105,7 @@ impl Client {
99
105
lock,
100
106
auth,
101
107
maintenance,
108
+ cluster,
102
109
} )
103
110
}
104
111
@@ -397,6 +404,55 @@ impl Client {
397
404
pub async fn snapshot ( & mut self ) -> Result < SnapshotStreaming > {
398
405
self . maintenance . snapshot ( ) . await
399
406
}
407
+
408
+ /// Adds current connected server as a member.
409
+ #[ inline]
410
+ pub async fn member_add < E : AsRef < str > , S : AsRef < [ E ] > > (
411
+ & mut self ,
412
+ urls : S ,
413
+ options : Option < MemberAddOptions > ,
414
+ ) -> Result < MemberAddResponse > {
415
+ let mut eps = Vec :: new ( ) ;
416
+ for e in urls. as_ref ( ) {
417
+ let e = e. as_ref ( ) ;
418
+ let url = if e. starts_with ( HTTP_PREFIX ) {
419
+ e. to_string ( )
420
+ } else {
421
+ HTTP_PREFIX . to_owned ( ) + e
422
+ } ;
423
+ eps. push ( url) ;
424
+ }
425
+
426
+ self . cluster . member_add ( eps, options) . await
427
+ }
428
+
429
+ /// Remove a member.
430
+ #[ inline]
431
+ pub async fn member_remove ( & mut self , id : u64 ) -> Result < MemberRemoveResponse > {
432
+ self . cluster . member_remove ( id) . await
433
+ }
434
+
435
+ /// Updates the member.
436
+ #[ inline]
437
+ pub async fn member_update (
438
+ & mut self ,
439
+ id : u64 ,
440
+ url : impl Into < Vec < String > > ,
441
+ ) -> Result < MemberUpdateResponse > {
442
+ self . cluster . member_update ( id, url) . await
443
+ }
444
+
445
+ /// Promotes the member.
446
+ #[ inline]
447
+ pub async fn member_promote ( & mut self , id : u64 ) -> Result < MemberPromoteResponse > {
448
+ self . cluster . member_promote ( id) . await
449
+ }
450
+
451
+ /// Lists members.
452
+ #[ inline]
453
+ pub async fn member_list ( & mut self ) -> Result < MemberListResponse > {
454
+ self . cluster . member_list ( ) . await
455
+ }
400
456
}
401
457
402
458
/// Options for `Connect` operation.
@@ -773,8 +829,8 @@ mod tests {
773
829
Ok ( ( ) )
774
830
}
775
831
776
- #[ tokio:: test]
777
832
#[ ignore]
833
+ #[ tokio:: test]
778
834
async fn test_auth ( ) -> Result < ( ) > {
779
835
let mut client = get_client ( ) . await ?;
780
836
client. auth_enable ( ) . await ?;
@@ -1084,4 +1140,31 @@ mod tests {
1084
1140
}
1085
1141
Ok ( ( ) )
1086
1142
}
1143
+
1144
+ #[ ignore]
1145
+ #[ tokio:: test]
1146
+ async fn test_cluster ( ) -> Result < ( ) > {
1147
+ let node1 = "localhost:2520" ;
1148
+ let node2 = "localhost:2530" ;
1149
+ let node3 = "localhost:2540" ;
1150
+ let mut client = get_client ( ) . await ?;
1151
+ let resp = client
1152
+ . member_add ( [ node1] , Some ( MemberAddOptions :: new ( ) . with_is_learner ( ) ) )
1153
+ . await ?;
1154
+ let id1 = resp. member ( ) . unwrap ( ) . id ( ) ;
1155
+
1156
+ let resp = client. member_add ( [ node2] , None ) . await ?;
1157
+ let id2 = resp. member ( ) . unwrap ( ) . id ( ) ;
1158
+ let resp = client. member_add ( [ node3] , None ) . await ?;
1159
+ let id3 = resp. member ( ) . unwrap ( ) . id ( ) ;
1160
+
1161
+ let resp = client. member_list ( ) . await ?;
1162
+ let members: Vec < _ > = resp. members ( ) . iter ( ) . map ( |member| member. id ( ) ) . collect ( ) ;
1163
+ assert ! ( members. contains( & id1) ) ;
1164
+ assert ! ( members. contains( & id2) ) ;
1165
+ assert ! ( members. contains( & id3) ) ;
1166
+
1167
+ client. member_remove ( id1) . await ?;
1168
+ Ok ( ( ) )
1169
+ }
1087
1170
}
0 commit comments