Skip to content

Commit 0de577e

Browse files
committed
UCP/API: Merge branch 'master' into topic/device-api-use-offsets
2 parents 70fdc08 + cd9f7f8 commit 0de577e

36 files changed

+414
-294
lines changed

src/tools/perf/cuda/ucp_cuda_kernel.cu

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ ucp_perf_cuda_send_sync(ucp_perf_cuda_params &params, ucx_perf_counter_t idx,
226226
ucp_device_request_t &req)
227227
{
228228
ucs_status_t status = ucp_perf_cuda_send_nbx<level, cmd>(params, idx, req);
229-
if (status != UCS_OK) {
229+
if (UCS_STATUS_IS_ERR(status)) {
230230
return status;
231231
}
232232

@@ -260,7 +260,7 @@ ucp_perf_cuda_put_multi_bw_kernel(ucx_perf_cuda_context &ctx,
260260

261261
ucp_device_request_t &req = request_mgr.get_request();
262262
status = ucp_perf_cuda_send_nbx<level, cmd>(params, idx, req);
263-
if (status != UCS_OK) {
263+
if (UCS_STATUS_IS_ERR(status)) {
264264
ucs_device_error("send failed: %d", status);
265265
goto out;
266266
}

src/tools/vfs/vfs_main.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,12 @@ static int vfs_unlink_socket(int silent_notexist)
277277
}
278278

279279
/* return 0 or the (negative) value of errno in case of error */
280-
static int vfs_listen(int silent_addinuse_err)
280+
static int vfs_listen(int silent_addrinuse_err)
281281
{
282282
int listen_fd, ret;
283283

284+
vfs_log("listening on socket %s", (const char*)g_sockaddr.sun_path);
285+
284286
ret = umask(~S_IRWXU);
285287
if (ret < 0) {
286288
ret = -errno;
@@ -304,7 +306,7 @@ static int vfs_listen(int silent_addinuse_err)
304306
sizeof(g_sockaddr));
305307
if (ret < 0) {
306308
ret = -errno;
307-
if ((errno != EADDRINUSE) || !silent_addinuse_err) {
309+
if ((errno != EADDRINUSE) || !silent_addrinuse_err) {
308310
vfs_error("bind(%s) failed: %m", g_sockaddr.sun_path);
309311
}
310312
goto out_close;

src/ucp/api/device/ucp_device_impl.h

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
*/
2525
typedef struct ucp_device_request {
2626
uct_device_completion_t comp;
27+
ucs_status_t status;
2728
uct_device_ep_h device_ep;
2829
unsigned channel_id;
2930
} ucp_device_request_t;
@@ -52,9 +53,6 @@ UCS_F_DEVICE void ucp_device_request_init(uct_device_ep_t *device_ep,
5253
if (req != nullptr) {
5354
comp = &req->comp;
5455
req->device_ep = device_ep;
55-
uct_device_completion_init(comp);
56-
/* TODO: Handle multiple device posts with same req? */
57-
++comp->count;
5856
} else {
5957
comp = nullptr;
6058
}
@@ -64,16 +62,20 @@ UCS_F_DEVICE void ucp_device_request_init(uct_device_ep_t *device_ep,
6462
/**
6563
* Macro for device put operations with retry logic
6664
*/
67-
#define UCP_DEVICE_SEND_BLOCKING(_level, _uct_device_ep_send, _device_ep, ...) \
65+
#define UCP_DEVICE_SEND_BLOCKING(_level, _uct_device_ep_send, _device_ep, \
66+
_req, ...) \
6867
({ \
6968
ucs_status_t _status; \
7069
do { \
7170
_status = _uct_device_ep_send<_level>(_device_ep, __VA_ARGS__); \
7271
if (_status != UCS_ERR_NO_RESOURCE) { \
7372
break; \
7473
} \
75-
_status = uct_device_ep_progress<_level>(_device_ep); \
76-
} while (!UCS_STATUS_IS_ERR(_status)); \
74+
uct_device_ep_progress<_level>(_device_ep); \
75+
} while (1); \
76+
if (_req != nullptr) { \
77+
_req->status = _status; \
78+
} \
7779
_status; \
7880
})
7981

@@ -154,10 +156,9 @@ UCS_F_DEVICE ucs_status_t ucp_device_put_single(
154156
return status;
155157
}
156158

157-
status = UCP_DEVICE_SEND_BLOCKING(level, uct_device_ep_put_single,
158-
device_ep, uct_elem, address,
159-
remote_address, length, flags, comp);
160-
return status;
159+
return UCP_DEVICE_SEND_BLOCKING(level, uct_device_ep_put_single, device_ep,
160+
req, uct_elem, address, remote_address,
161+
length, flags, comp);
161162
}
162163

163164

@@ -209,8 +210,8 @@ UCS_F_DEVICE ucs_status_t ucp_device_counter_inc(
209210
}
210211

211212
return UCP_DEVICE_SEND_BLOCKING(level, uct_device_ep_atomic_add, device_ep,
212-
uct_elem, inc_value, remote_address, flags,
213-
comp);
213+
req, uct_elem, inc_value, remote_address,
214+
flags, comp);
214215
}
215216

216217

@@ -266,8 +267,9 @@ UCS_F_DEVICE ucs_status_t ucp_device_put_multi(
266267
}
267268

268269
return UCP_DEVICE_SEND_BLOCKING(level, uct_device_ep_put_multi, device_ep,
269-
uct_mem_list, mem_list_h->mem_list_length,
270-
addresses, remote_addresses, lengths,
270+
req, uct_mem_list,
271+
mem_list_h->mem_list_length, addresses,
272+
remote_addresses, lengths,
271273
counter_inc_value, counter_remote_address,
272274
flags, comp);
273275
}
@@ -346,11 +348,12 @@ UCS_F_DEVICE ucs_status_t ucp_device_put_multi_partial(
346348
}
347349

348350
return UCP_DEVICE_SEND_BLOCKING(level, uct_device_ep_put_multi_partial,
349-
device_ep, uct_mem_list, mem_list_indices,
350-
mem_list_count, addresses, remote_addresses,
351-
local_offsets, remote_offsets, lengths,
352-
counter_index, counter_inc_value,
353-
counter_remote_address, flags, comp);
351+
device_ep, req, uct_mem_list,
352+
mem_list_indices, mem_list_count, addresses,
353+
remote_addresses, local_offsets,
354+
remote_offsets, lengths, counter_index,
355+
counter_inc_value, counter_remote_address,
356+
flags, comp);
354357
}
355358

356359

@@ -417,19 +420,14 @@ UCS_F_DEVICE void ucp_device_counter_write(void *counter_ptr, uint64_t value)
417420
template<ucs_device_level_t level = UCS_DEVICE_LEVEL_THREAD>
418421
UCS_F_DEVICE ucs_status_t ucp_device_progress_req(ucp_device_request_t *req)
419422
{
420-
ucs_status_t status;
421-
422-
if (ucs_likely(req->comp.count == 0)) {
423-
return req->comp.status;
424-
}
425-
426-
status = uct_device_ep_progress<level>(req->device_ep);
427-
if (status != UCS_OK) {
428-
return status;
423+
if (ucs_likely(req->status != UCS_INPROGRESS)) {
424+
return req->status;
429425
}
430426

431-
return (ucs_likely(req->comp.count == 0)) ? req->comp.status :
432-
UCS_INPROGRESS;
427+
uct_device_ep_progress<level>(req->device_ep);
428+
req->status = uct_device_ep_check_completion<level>(req->device_ep,
429+
&req->comp);
430+
return req->status;
433431
}
434432

435433
#endif /* UCP_DEVICE_IMPL_H */

src/ucp/core/ucp_mm.c

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ ucp_mem_dummy_handle_t ucp_mem_dummy_handle = {
4040
.parent = &ucp_mem_dummy_handle.memh,
4141
.mem_type = UCS_MEMORY_TYPE_HOST,
4242
.sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN,
43-
.packed_sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN,
4443
.md_map = 0,
4544
.inv_md_map = 0,
4645
.reg_id = 0,
@@ -727,13 +726,6 @@ static void ucp_memh_init(ucp_mem_h memh, ucp_context_h context,
727726
memh->alloc_method = method;
728727
memh->mem_type = mem_type;
729728
memh->sys_dev = sys_dev;
730-
731-
/* Cache sys_dev in a format packed to rkey to minimize overhead during
732-
* rndv protocols. TODO remove if using another method to mark rkey with
733-
* remote flush requirement. */
734-
memh->packed_sys_dev = (sys_dev == UCS_SYS_DEVICE_ID_UNKNOWN) ?
735-
UCS_SYS_DEVICE_ID_UNKNOWN :
736-
ucp_rkey_pack_sys_dev(memh);
737729
}
738730

739731
static ucs_status_t

src/ucp/core/ucp_mm.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,21 @@ enum {
3636
/*
3737
* Memory handle was imported and points to some peer's memory buffer.
3838
*/
39-
UCP_MEMH_FLAG_IMPORTED = UCS_BIT(0),
40-
UCP_MEMH_FLAG_MLOCKED = UCS_BIT(1),
41-
UCP_MEMH_FLAG_HAS_AUTO_GVA = UCS_BIT(2),
39+
UCP_MEMH_FLAG_IMPORTED = UCS_BIT(0),
40+
UCP_MEMH_FLAG_MLOCKED = UCS_BIT(1),
41+
UCP_MEMH_FLAG_HAS_AUTO_GVA = UCS_BIT(2),
4242

4343
/**
4444
* Avoid using registration cache for the particular memory region.
4545
*/
46-
UCP_MEMH_FLAG_NO_RCACHE = UCS_BIT(3)
46+
UCP_MEMH_FLAG_NO_RCACHE = UCS_BIT(3),
47+
48+
/**
49+
* Track if sender-side flush is needed, check is only done when needed
50+
* and cached.
51+
*/
52+
UCP_MEMH_FLAG_SEND_FLUSH_CHECKED = UCS_BIT(4),
53+
UCP_MEMH_FLAG_SEND_FLUSH_NEEDED = UCS_BIT(5)
4754
};
4855

4956

@@ -68,7 +75,6 @@ typedef struct ucp_mem {
6875
ucp_context_h context; /* UCP context that owns a memory handle */
6976
uct_alloc_method_t alloc_method; /* Method used to allocate the memory */
7077
ucs_sys_device_t sys_dev; /* System device index */
71-
ucs_sys_device_t packed_sys_dev; /* System device index */
7278
ucs_memory_type_t mem_type; /* Type of allocated or registered memory */
7379
ucp_md_index_t alloc_md_index; /* Index of MD used to allocate the memory */
7480
uint64_t remote_uuid; /* Remote UUID */

src/ucp/core/ucp_rkey.c

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -150,29 +150,38 @@ ucp_sys_dev_map_t ucp_memh_sys_dev_map(ucp_mem_h memh)
150150
return 0;
151151
}
152152

153-
ucs_sys_device_t ucp_rkey_pack_sys_dev(ucp_mem_h memh)
153+
static int ucp_memh_send_flush_is_needed(ucp_mem_h memh)
154154
{
155-
ucs_sys_device_t sys_dev_packed = memh->sys_dev;
156155
ucp_md_index_t md_index;
157156
ucp_sys_dev_map_t sys_dev_map;
158157
ucs_sys_device_t sys_dev;
159158

160-
ucs_assert(sys_dev_packed <= UCP_SYS_DEVICE_MAX_PACKED);
159+
if (memh->flags & UCP_MEMH_FLAG_SEND_FLUSH_CHECKED) {
160+
return !!(memh->flags & UCP_MEMH_FLAG_SEND_FLUSH_NEEDED);
161+
}
162+
163+
memh->flags |= UCP_MEMH_FLAG_SEND_FLUSH_CHECKED;
161164

162-
ucs_for_each_bit(md_index, memh->md_map) {
163-
sys_dev_map = memh->context->tl_mds[md_index].sys_dev_map;
164-
ucs_for_each_bit(sys_dev, sys_dev_map) {
165-
if (ucs_topo_is_sibling(sys_dev, sys_dev_packed)) {
166-
/* PUT operation on such rkey requires remote flush.
167-
* Set a flag for the peer to recognize it. */
168-
sys_dev_packed |= UCP_SYS_DEVICE_FLUSH_BIT;
169-
goto out;
165+
if (memh->sys_dev != UCS_SYS_DEVICE_ID_UNKNOWN) {
166+
ucs_assert(memh->sys_dev <= UCP_SYS_DEVICE_MAX_PACKED);
167+
168+
ucs_for_each_bit(md_index, memh->md_map) {
169+
sys_dev_map = memh->context->tl_mds[md_index].sys_dev_map;
170+
ucs_for_each_bit(sys_dev, sys_dev_map) {
171+
if (ucs_topo_is_sibling(sys_dev, memh->sys_dev)) {
172+
/*
173+
* PUT operation on such device will require remote flush
174+
* when using network devices.
175+
* Set a flag for the peer to recognize it.
176+
*/
177+
memh->flags |= UCP_MEMH_FLAG_SEND_FLUSH_NEEDED;
178+
return 1;
179+
}
170180
}
171181
}
172182
}
173183

174-
out:
175-
return sys_dev_packed;
184+
return 0;
176185
}
177186

178187
UCS_PROFILE_FUNC(ssize_t, ucp_rkey_pack_memh,
@@ -231,8 +240,14 @@ UCS_PROFILE_FUNC(ssize_t, ucp_rkey_pack_memh,
231240

232241
if (md_map != 0) {
233242
/* Since UCX 1.20: always pack sys_dev for non-empty rkeys. */
234-
ucs_assert(memh != NULL);
235-
*ucs_serialize_next(&p, uint8_t) = memh->packed_sys_dev;
243+
ucs_assert(memh != NULL);
244+
245+
sys_dev = memh->sys_dev;
246+
if (ucp_memh_send_flush_is_needed(memh)) {
247+
sys_dev |= UCP_SYS_DEVICE_FLUSH_BIT;
248+
}
249+
250+
*ucs_serialize_next(&p, uint8_t) = sys_dev;
236251
}
237252

238253
if ((mem_info->sys_dev == UCS_SYS_DEVICE_ID_UNKNOWN) || (md_map == 0)) {
@@ -829,15 +844,24 @@ ucp_rkey_unpack_lanes_distance(const ucp_ep_config_key_t *ep_config_key,
829844
}
830845
}
831846

832-
static UCS_F_ALWAYS_INLINE ucs_sys_device_t
847+
static UCS_F_ALWAYS_INLINE void
833848
ucp_rkey_extract_sys_dev(const ucp_ep_config_t *ep_config, ucp_rkey_h rkey,
834-
const void **buffer_p, const void *buffer_end)
849+
const void **buffer_p, const void *buffer_end,
850+
ucp_rkey_config_key_t *rkey_config_key)
835851
{
836852
if ((*buffer_p < buffer_end) ||
837853
((ep_config->key.dst_version > 19) && (rkey->md_map != 0))) {
838-
return *ucs_serialize_next(buffer_p, const uint8_t);
854+
rkey_config_key->sys_dev = *ucs_serialize_next(buffer_p, const uint8_t);
855+
} else {
856+
rkey_config_key->sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN;
857+
}
858+
859+
if ((rkey_config_key->sys_dev != UCS_SYS_DEVICE_ID_UNKNOWN) &&
860+
(rkey_config_key->sys_dev & UCP_SYS_DEVICE_FLUSH_BIT)) {
861+
rkey_config_key->flags = UCP_RKEY_CONFIG_FLAG_FLUSH;
862+
rkey_config_key->sys_dev &= ~UCP_SYS_DEVICE_FLUSH_BIT;
839863
} else {
840-
return UCS_SYS_DEVICE_ID_UNKNOWN;
864+
rkey_config_key->flags = 0;
841865
}
842866
}
843867

@@ -864,8 +888,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rkey_proto_resolve,
864888
rkey_config_key.md_map = rkey->md_map;
865889
rkey_config_key.mem_type = rkey->mem_type;
866890
rkey_config_key.unreachable_md_map = unreachable_md_map;
867-
rkey_config_key.sys_dev = ucp_rkey_extract_sys_dev(
868-
ep_config, rkey, &p, buffer_end);
891+
892+
ucp_rkey_extract_sys_dev(ep_config, rkey, &p, buffer_end, &rkey_config_key);
869893

870894
/* Starting with UCX v1.20, lane distances are always packed if sys_dev is
871895
* not UNKNOWN. Even if the rkey length is not explicitly passed to the API,

src/ucp/core/ucp_rkey.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ enum {
4444
};
4545

4646

47+
/**
48+
* Rkey config flags
49+
*/
50+
enum {
51+
UCP_RKEY_CONFIG_FLAG_FLUSH = UCS_BIT(0) /* Put and atomic operations on this rkey
52+
require remote flush */
53+
};
54+
55+
4756
/**
4857
* Rkey configuration key
4958
*/
@@ -57,6 +66,9 @@ struct ucp_rkey_config_key {
5766
/* Remote system device id */
5867
ucs_sys_device_t sys_dev;
5968

69+
/* Rkey specific flags, like @a UCP_RKEY_CONFIG_FLAG_FLUSH */
70+
uint8_t flags;
71+
6072
/* Remote memory type */
6173
ucs_memory_type_t mem_type;
6274

@@ -65,8 +77,8 @@ struct ucp_rkey_config_key {
6577
};
6678

6779

68-
#define UCP_SYS_DEVICE_FLUSH_BIT UCS_BIT(7)
69-
#define UCP_SYS_DEVICE_MAX_PACKED UCP_SYS_DEVICE_FLUSH_BIT - 1
80+
#define UCP_SYS_DEVICE_FLUSH_BIT UCS_BIT(7)
81+
#define UCP_SYS_DEVICE_MAX_PACKED (UCP_SYS_DEVICE_FLUSH_BIT - 1)
7082

7183

7284
/**
@@ -243,7 +255,4 @@ void ucp_rkey_proto_select_dump(ucp_worker_h worker,
243255
ucp_worker_cfg_index_t rkey_cfg_index,
244256
ucs_string_buffer_t *strb);
245257

246-
247-
ucs_sys_device_t ucp_rkey_pack_sys_dev(ucp_mem_h memh);
248-
249258
#endif

0 commit comments

Comments
 (0)