24
24
*/
25
25
typedef struct ucp_device_request {
26
26
uct_device_completion_t comp;
27
+ ucs_status_t status;
27
28
uct_device_ep_h device_ep;
28
29
} ucp_device_request_t ;
29
30
@@ -51,9 +52,6 @@ UCS_F_DEVICE void ucp_device_request_init(uct_device_ep_t *device_ep,
51
52
if (req != nullptr ) {
52
53
comp = &req->comp ;
53
54
req->device_ep = device_ep;
54
- uct_device_completion_init (comp);
55
- /* TODO: Handle multiple device posts with same req? */
56
- ++comp->count ;
57
55
} else {
58
56
comp = nullptr ;
59
57
}
@@ -63,16 +61,20 @@ UCS_F_DEVICE void ucp_device_request_init(uct_device_ep_t *device_ep,
63
61
/* *
64
62
* Macro for device put operations with retry logic
65
63
*/
66
- #define UCP_DEVICE_SEND_BLOCKING (_level, _uct_device_ep_send, _device_ep, ...) \
64
+ #define UCP_DEVICE_SEND_BLOCKING (_level, _uct_device_ep_send, _device_ep, \
65
+ _req, ...) \
67
66
({ \
68
67
ucs_status_t _status; \
69
68
do { \
70
69
_status = _uct_device_ep_send<_level>(_device_ep, __VA_ARGS__); \
71
70
if (_status != UCS_ERR_NO_RESOURCE) { \
72
71
break ; \
73
72
} \
74
- _status = uct_device_ep_progress<_level>(_device_ep); \
75
- } while (!UCS_STATUS_IS_ERR (_status)); \
73
+ uct_device_ep_progress<_level>(_device_ep); \
74
+ } while (1 ); \
75
+ if (_req != nullptr ) { \
76
+ _req->status = _status; \
77
+ } \
76
78
_status; \
77
79
})
78
80
@@ -148,8 +150,8 @@ UCS_F_DEVICE ucs_status_t ucp_device_put_single(
148
150
}
149
151
150
152
return UCP_DEVICE_SEND_BLOCKING (level, uct_device_ep_put_single, device_ep,
151
- uct_elem, address, remote_address, length ,
152
- flags, comp);
153
+ req, uct_elem, address, remote_address,
154
+ length, flags, comp);
153
155
}
154
156
155
157
@@ -199,8 +201,8 @@ UCS_F_DEVICE ucs_status_t ucp_device_counter_inc(
199
201
}
200
202
201
203
return UCP_DEVICE_SEND_BLOCKING (level, uct_device_ep_atomic_add, device_ep,
202
- uct_elem, inc_value, remote_address, flags ,
203
- comp);
204
+ req, uct_elem, inc_value, remote_address,
205
+ flags, comp);
204
206
}
205
207
206
208
@@ -263,8 +265,9 @@ UCS_F_DEVICE ucs_status_t ucp_device_put_multi(
263
265
}
264
266
265
267
return UCP_DEVICE_SEND_BLOCKING (level, uct_device_ep_put_multi, device_ep,
266
- uct_mem_list, mem_list_h->mem_list_length ,
267
- addresses, remote_addresses, lengths,
268
+ req, uct_mem_list,
269
+ mem_list_h->mem_list_length , addresses,
270
+ remote_addresses, lengths,
268
271
counter_inc_value, counter_remote_address,
269
272
flags, comp);
270
273
}
@@ -338,10 +341,11 @@ UCS_F_DEVICE ucs_status_t ucp_device_put_multi_partial(
338
341
}
339
342
340
343
return UCP_DEVICE_SEND_BLOCKING (level, uct_device_ep_put_multi_partial,
341
- device_ep, uct_mem_list, mem_list_indices,
342
- mem_list_count, addresses, remote_addresses,
343
- lengths, counter_index, counter_inc_value,
344
- counter_remote_address, flags, comp);
344
+ device_ep, req, uct_mem_list,
345
+ mem_list_indices, mem_list_count, addresses,
346
+ remote_addresses, lengths, counter_index,
347
+ counter_inc_value, counter_remote_address,
348
+ flags, comp);
345
349
}
346
350
347
351
@@ -409,19 +413,14 @@ UCS_F_DEVICE void ucp_device_counter_write(void *counter_ptr, uint64_t value)
409
413
template <ucs_device_level_t level = UCS_DEVICE_LEVEL_THREAD>
410
414
UCS_F_DEVICE ucs_status_t ucp_device_progress_req (ucp_device_request_t *req)
411
415
{
412
- ucs_status_t status;
413
-
414
- if (ucs_likely (req->comp .count == 0 )) {
415
- return req->comp .status ;
416
- }
417
-
418
- status = uct_device_ep_progress<level>(req->device_ep );
419
- if (status != UCS_OK) {
420
- return status;
416
+ if (ucs_likely (req->status != UCS_INPROGRESS)) {
417
+ return req->status ;
421
418
}
422
419
423
- return (ucs_likely (req->comp .count == 0 )) ? req->comp .status :
424
- UCS_INPROGRESS;
420
+ uct_device_ep_progress<level>(req->device_ep );
421
+ req->status = uct_device_ep_check_completion<level>(req->device_ep ,
422
+ &req->comp );
423
+ return req->status ;
425
424
}
426
425
427
426
#endif /* UCP_DEVICE_IMPL_H */
0 commit comments