12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use crate :: { Error , Result } ;
16
+
17
+ use gax:: paginator:: ItemPaginator as _;
15
18
use pubsub:: { client:: TopicAdmin , model:: Topic } ;
16
19
use rand:: { Rng , distr:: Alphanumeric } ;
17
20
18
- use crate :: Result ;
19
-
20
21
pub async fn basic_topic ( ) -> Result < ( ) > {
21
22
// Enable a basic subscriber. Useful to troubleshoot problems and visually
22
23
// verify tracing is doing something.
@@ -43,7 +44,7 @@ pub async fn basic_topic() -> Result<()> {
43
44
tracing:: info!( "success with get_topic={get_topic:?}" ) ;
44
45
assert_eq ! ( get_topic. name, topic. name) ;
45
46
46
- cleanup_test_topic ( client, topic. name ) . await ?;
47
+ cleanup_test_topic ( & client, topic. name ) . await ?;
47
48
48
49
Ok ( ( ) )
49
50
}
@@ -61,28 +62,76 @@ fn random_topic_name(project: String) -> String {
61
62
}
62
63
63
64
pub async fn create_test_topic ( ) -> Result < ( TopicAdmin , Topic ) > {
64
- let project = crate :: project_id ( ) ?;
65
+ let project_id = crate :: project_id ( ) ?;
65
66
let client = pubsub:: client:: TopicAdmin :: builder ( )
66
67
. with_tracing ( )
67
68
. build ( )
68
69
. await ?;
69
- let topic_name = random_topic_name ( project) ;
70
+
71
+ cleanup_stale_topics ( & client, & project_id) . await ?;
72
+
73
+ let topic_name = random_topic_name ( project_id) ;
74
+ let now = chrono:: Utc :: now ( ) . timestamp ( ) . to_string ( ) ;
70
75
71
76
tracing:: info!( "testing create_topic()" ) ;
72
77
let topic = client
73
78
. create_topic ( )
74
79
. set_name ( topic_name)
75
- . set_labels ( [ ( "integration-test" , "true" ) ] )
80
+ . set_labels ( [ ( "integration-test" , "true" ) , ( "create-time" , & now ) ] )
76
81
. send ( )
77
82
. await ?;
78
83
tracing:: info!( "success on create_topic: {topic:?}" ) ;
79
84
80
85
Ok ( ( client, topic) )
81
86
}
82
87
83
- pub async fn cleanup_test_topic ( client : TopicAdmin , topic_name : String ) -> Result < ( ) > {
88
+ pub async fn cleanup_test_topic ( client : & TopicAdmin , topic_name : String ) -> Result < ( ) > {
84
89
tracing:: info!( "testing delete_topic()" ) ;
85
90
client. delete_topic ( ) . set_topic ( topic_name) . send ( ) . await ?;
86
91
tracing:: info!( "success on delete_topic" ) ;
87
92
Ok ( ( ) )
88
93
}
94
+
95
+ pub async fn cleanup_stale_topics ( client : & TopicAdmin , project_id : & str ) -> Result < ( ) > {
96
+ let stale_deadline = chrono:: Utc :: now ( ) - chrono:: Duration :: hours ( 48 ) ;
97
+
98
+ let mut topics = client
99
+ . list_topics ( )
100
+ . set_project ( format ! ( "projects/{project_id}" ) )
101
+ . by_item ( ) ;
102
+
103
+ let mut pending = Vec :: new ( ) ;
104
+ let mut names = Vec :: new ( ) ;
105
+ while let Some ( topic) = topics. next ( ) . await {
106
+ let topic = topic?;
107
+ if topic
108
+ . labels
109
+ . get ( "integration-test" )
110
+ . is_some_and ( |v| v == "true" )
111
+ && topic
112
+ . labels
113
+ . get ( "create-time" )
114
+ . and_then ( |v| v. parse :: < i64 > ( ) . ok ( ) )
115
+ . and_then ( |s| chrono:: DateTime :: from_timestamp ( s, 0 ) )
116
+ . is_some_and ( |create_time| create_time < stale_deadline)
117
+ {
118
+ let client = client. clone ( ) ;
119
+ let name = topic. name . clone ( ) ;
120
+ pending. push ( tokio:: spawn ( async move {
121
+ cleanup_test_topic ( & client, name) . await
122
+ } ) ) ;
123
+ names. push ( topic. name ) ;
124
+ }
125
+ }
126
+
127
+ let r: std:: result:: Result < Vec < _ > , _ > = futures:: future:: join_all ( pending)
128
+ . await
129
+ . into_iter ( )
130
+ . collect ( ) ;
131
+ r. map_err ( Error :: from) ?
132
+ . into_iter ( )
133
+ . zip ( names)
134
+ . for_each ( |( r, name) | tracing:: info!( "deleting topic {name} resulted in {r:?}" ) ) ;
135
+
136
+ Ok ( ( ) )
137
+ }
0 commit comments