@@ -121,7 +121,6 @@ impl FlowService for FlowServiceBase {
121
121
}
122
122
123
123
mod test {
124
-
125
124
use std:: sync:: { Arc } ;
126
125
use redis:: AsyncCommands ;
127
126
use tokio:: sync:: Mutex ;
@@ -130,7 +129,49 @@ mod test {
130
129
use crate :: service:: flow_service:: { FlowService , FlowServiceBase } ;
131
130
132
131
#[ tokio:: test]
133
- async fn test_insert_flow_positive ( ) {
132
+ async fn test_get_all_flow_ids_redis_error ( ) {
133
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
134
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
135
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
136
+
137
+ drop ( _container) ;
138
+
139
+ let flow_ids = service. get_all_flow_ids ( ) . await ;
140
+ assert ! ( flow_ids. is_err( ) , "Expected an error due to Redis disconnection" ) ;
141
+ }
142
+
143
+ #[ tokio:: test]
144
+ async fn test_insert_flow_once ( ) {
145
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
146
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
147
+
148
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
149
+
150
+ let flow = Flow {
151
+ flow_id : 1 ,
152
+ start_node : None ,
153
+ definition : None ,
154
+ } ;
155
+
156
+ service. insert_flow ( flow. clone ( ) ) . await ;
157
+
158
+ let result: Option < String > = {
159
+ let mut conn = redis_client. lock ( ) . await ;
160
+ conn. get ( "1" ) . await . unwrap ( )
161
+ } ;
162
+
163
+ assert ! ( result. is_some( ) ) ;
164
+
165
+ let decoded_flow: Flow = serde_json:: from_str ( & result. unwrap ( ) ) . unwrap ( ) ;
166
+ assert_eq ! ( decoded_flow. flow_id, flow. flow_id) ;
167
+
168
+ let flow_ids = service. get_all_flow_ids ( ) . await ;
169
+ assert ! ( flow_ids. is_ok( ) ) ;
170
+ assert_eq ! ( flow_ids. unwrap( ) . len( ) , 1 ) ;
171
+ }
172
+
173
+ #[ tokio:: test]
174
+ async fn test_insert_flow_with_same_id_will_overwrite ( ) {
134
175
let ( connection, _container) = setup_redis_test_container ( ) . await ;
135
176
let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
136
177
@@ -144,11 +185,241 @@ mod test {
144
185
145
186
service. insert_flow ( flow. clone ( ) ) . await ;
146
187
147
- let mut conn = redis_client. lock ( ) . await ;
148
- let result: Option < String > = conn. get ( "1" ) . await . unwrap ( ) ;
188
+ let result: Option < String > = {
189
+ let mut conn = redis_client. lock ( ) . await ;
190
+ conn. get ( "1" ) . await . unwrap ( )
191
+ } ;
192
+
149
193
assert ! ( result. is_some( ) ) ;
150
194
151
195
let decoded_flow: Flow = serde_json:: from_str ( & result. unwrap ( ) ) . unwrap ( ) ;
152
196
assert_eq ! ( decoded_flow. flow_id, flow. flow_id) ;
197
+
198
+ service. insert_flow ( flow. clone ( ) ) . await ;
199
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 1 ) ;
200
+ }
201
+
202
+ #[ tokio:: test]
203
+ async fn test_insert_flows_once ( ) {
204
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
205
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
206
+
207
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
208
+
209
+ let flow_1 = Flow {
210
+ flow_id : 1 ,
211
+ start_node : None ,
212
+ definition : None ,
213
+ } ;
214
+
215
+ let flow_2 = Flow {
216
+ flow_id : 2 ,
217
+ start_node : None ,
218
+ definition : None ,
219
+ } ;
220
+
221
+ let flows = vec ! [ flow_1. clone( ) , flow_2. clone( ) ] ;
222
+
223
+ service. insert_flows ( flows) . await ;
224
+
225
+ let results: ( Option < String > , Option < String > ) = {
226
+ let mut conn = redis_client. lock ( ) . await ;
227
+ ( conn. get ( "1" ) . await . unwrap ( ) , conn. get ( "2" ) . await . unwrap ( ) )
228
+ } ;
229
+
230
+ assert ! ( results. 0 . is_some( ) ) ;
231
+ assert ! ( results. 1 . is_some( ) ) ;
232
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 2 ) ;
233
+ }
234
+
235
+ #[ tokio:: test]
236
+ async fn test_insert_flows_empty ( ) {
237
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
238
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
239
+
240
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
241
+ let flows: Vec < Flow > = vec ! [ ] ;
242
+
243
+ service. insert_flows ( flows) . await ;
244
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 0 ) ;
245
+ }
246
+
247
+ #[ tokio:: test]
248
+ async fn test_insert_flows_with_duplicate_id ( ) {
249
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
250
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
251
+
252
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
253
+
254
+ let flow_1 = Flow {
255
+ flow_id : 1 ,
256
+ start_node : None ,
257
+ definition : None ,
258
+ } ;
259
+
260
+ let flow_2 = Flow {
261
+ flow_id : 1 ,
262
+ start_node : None ,
263
+ definition : None ,
264
+ } ;
265
+
266
+ let flows = vec ! [ flow_1. clone( ) , flow_2. clone( ) ] ;
267
+
268
+ service. insert_flows ( flows) . await ;
269
+
270
+ let result: Option < String > = {
271
+ let mut conn = redis_client. lock ( ) . await ;
272
+ conn. get ( "1" ) . await . unwrap ( )
273
+ } ;
274
+
275
+ assert ! ( result. is_some( ) ) ;
276
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 1 ) ;
277
+ }
278
+
279
+ #[ tokio:: test]
280
+ async fn test_get_all_flow_ids_empty ( ) {
281
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
282
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
283
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
284
+
285
+ let flow_ids = service. get_all_flow_ids ( ) . await ;
286
+
287
+ assert ! ( flow_ids. is_ok( ) ) ;
288
+ assert ! ( flow_ids. unwrap( ) . is_empty( ) ) ;
289
+ }
290
+
291
+ #[ tokio:: test]
292
+ async fn test_delete_exising_flow ( ) {
293
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
294
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
295
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
296
+
297
+ let flow = Flow {
298
+ flow_id : 1 ,
299
+ start_node : None ,
300
+ definition : None ,
301
+ } ;
302
+
303
+ service. insert_flow ( flow. clone ( ) ) . await ;
304
+ let result: Option < String > = {
305
+ let mut conn = redis_client. lock ( ) . await ;
306
+ conn. get ( "1" ) . await . unwrap ( )
307
+ } ;
308
+
309
+ assert ! ( result. is_some( ) ) ;
310
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 1 ) ;
311
+
312
+ service. delete_flow ( 1 ) . await ;
313
+
314
+ let result_after: Option < String > = {
315
+ let mut conn = redis_client. lock ( ) . await ;
316
+ conn. get ( "1" ) . await . unwrap ( )
317
+ } ;
318
+
319
+ assert ! ( result_after. is_none( ) ) ;
320
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 0 ) ;
321
+ }
322
+
323
+ #[ tokio:: test]
324
+ async fn test_delete_non_existing_flow_does_not_crash ( ) {
325
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
326
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
327
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
328
+
329
+ let result_after: Option < String > = {
330
+ let mut conn = redis_client. lock ( ) . await ;
331
+ conn. get ( "1" ) . await . unwrap ( )
332
+ } ;
333
+ assert ! ( result_after. is_none( ) ) ;
334
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 0 ) ;
335
+
336
+ service. delete_flow ( 1 ) . await ;
337
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 0 ) ;
338
+ }
339
+
340
+ #[ tokio:: test]
341
+ async fn test_delete_existing_flow ( ) {
342
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
343
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
344
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
345
+
346
+ let flow_1 = Flow {
347
+ flow_id : 1 ,
348
+ start_node : None ,
349
+ definition : None ,
350
+ } ;
351
+
352
+ let flow_2 = Flow {
353
+ flow_id : 2 ,
354
+ start_node : None ,
355
+ definition : None ,
356
+ } ;
357
+
358
+ let flows = vec ! [ flow_1. clone( ) , flow_2. clone( ) ] ;
359
+
360
+ service. insert_flows ( flows) . await ;
361
+
362
+ let results: ( Option < String > , Option < String > ) = {
363
+ let mut conn = redis_client. lock ( ) . await ;
364
+ ( conn. get ( "1" ) . await . unwrap ( ) , conn. get ( "2" ) . await . unwrap ( ) )
365
+ } ;
366
+
367
+ assert ! ( results. 0 . is_some( ) ) ;
368
+ assert ! ( results. 1 . is_some( ) ) ;
369
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 2 ) ;
370
+
371
+ service. delete_flows ( vec ! [ 1 , 2 ] ) . await ;
372
+ let results_after: ( Option < String > , Option < String > ) = {
373
+ let mut conn = redis_client. lock ( ) . await ;
374
+ ( conn. get ( "1" ) . await . unwrap ( ) , conn. get ( "2" ) . await . unwrap ( ) )
375
+ } ;
376
+
377
+ assert ! ( results_after. 0 . is_none( ) ) ;
378
+ assert ! ( results_after. 1 . is_none( ) ) ;
379
+ assert_eq ! ( service. get_all_flow_ids( ) . await . unwrap( ) . len( ) , 0 ) ;
380
+ }
381
+
382
+ #[ tokio:: test]
383
+ async fn test_delete_flows_mixed_ids ( ) {
384
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
385
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
386
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
387
+
388
+ let flow_1 = Flow { flow_id : 1 , start_node : None , definition : None } ;
389
+ let flow_2 = Flow { flow_id : 2 , start_node : None , definition : None } ;
390
+ service. insert_flows ( vec ! [ flow_1. clone( ) , flow_2. clone( ) ] ) . await ;
391
+
392
+ service. delete_flows ( vec ! [ 1 , 3 ] ) . await ;
393
+
394
+ let result_1: Option < String > = {
395
+ let mut conn = redis_client. lock ( ) . await ;
396
+ conn. get ( "1" ) . await . unwrap ( )
397
+ } ;
398
+ let result_2: Option < String > = {
399
+ let mut conn = redis_client. lock ( ) . await ;
400
+ conn. get ( "2" ) . await . unwrap ( )
401
+ } ;
402
+
403
+ assert ! ( result_1. is_none( ) , "Flow with ID 1 should be deleted" ) ;
404
+ assert ! ( result_2. is_some( ) , "Flow with ID 2 should still exist" ) ;
405
+ }
406
+
407
+ #[ tokio:: test]
408
+ async fn test_delete_flows_empty_list ( ) {
409
+ let ( connection, _container) = setup_redis_test_container ( ) . await ;
410
+ let redis_client = Arc :: new ( Mutex :: new ( Box :: new ( connection) ) ) ;
411
+ let mut service = FlowServiceBase :: new ( redis_client. clone ( ) ) . await ;
412
+
413
+ let flow = Flow { flow_id : 1 , start_node : None , definition : None } ;
414
+ service. insert_flow ( flow. clone ( ) ) . await ;
415
+
416
+ service. delete_flows ( vec ! [ ] ) . await ;
417
+
418
+ let result: Option < String > = {
419
+ let mut conn = redis_client. lock ( ) . await ;
420
+ conn. get ( "1" ) . await . unwrap ( )
421
+ } ;
422
+
423
+ assert ! ( result. is_some( ) ) ;
153
424
}
154
425
}
0 commit comments