Skip to content

Commit 881a419

Browse files
authored
Merge pull request #10791 from rakhmets/topic/direct-nic-rkey-pack
UCP: Direct NIC Flush.
2 parents 1084d5f + e816f53 commit 881a419

File tree

20 files changed

+479
-49
lines changed

20 files changed

+479
-49
lines changed

src/ucp/core/ucp_ep.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ static ucp_ep_h ucp_ep_allocate(ucp_worker_h worker, const char *peer_name)
229229
ep->ext->unflushed_lanes = 0;
230230
ep->ext->fence_seq = 0;
231231
ep->ext->uct_eps = NULL;
232+
ep->ext->flush_sys_dev_map = 0;
232233

233234
UCS_STATIC_ASSERT(sizeof(ep->ext->ep_match) >=
234235
sizeof(ep->ext->flush_state));
@@ -537,8 +538,9 @@ void ucp_ep_flush_state_reset(ucp_ep_h ep)
537538
(flush_state->cmpl_sn == 0) &&
538539
ucs_hlist_is_empty(&flush_state->reqs)));
539540

540-
flush_state->send_sn = 0;
541-
flush_state->cmpl_sn = 0;
541+
flush_state->send_sn = 0;
542+
flush_state->cmpl_sn = 0;
543+
flush_state->mem_in_progress = 0;
542544
ucs_hlist_head_init(&flush_state->reqs);
543545
ucp_ep_update_flags(ep, UCP_EP_FLAG_FLUSH_STATE_VALID, 0);
544546
}

src/ucp/core/ucp_ep.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ typedef struct {
500500
are waiting for remote completion */
501501
uint32_t send_sn; /* Sequence number of sent operations */
502502
uint32_t cmpl_sn; /* Sequence number of completions */
503+
uint32_t mem_in_progress; /* Track ongoing memory flushes for this endpoint */
503504
} ucp_ep_flush_state_t;
504505

505506

@@ -554,6 +555,12 @@ typedef struct ucp_ep_ext {
554555
* structure. TODO allocate this array dynamically.
555556
*/
556557
uct_ep_h *uct_eps;
558+
559+
560+
/**
561+
* Map of system devices that require a flush operation
562+
*/
563+
ucp_sys_dev_map_t flush_sys_dev_map;
557564
} ucp_ep_ext_t;
558565

559566

@@ -935,4 +942,16 @@ ucs_status_t ucp_ep_realloc_lanes(ucp_ep_h ep, unsigned new_num_lanes);
935942
*/
936943
void ucp_ep_set_cfg_index(ucp_ep_h ep, ucp_worker_cfg_index_t cfg_index);
937944

945+
946+
/**
947+
* @brief Progress function for memory specific remote flushing.
948+
*
949+
* This call starts and progresses all memory specific remote flushes.
950+
*
951+
* @param[in] self Pending request tracking the flush.
952+
*
953+
* @return Error code as defined by @ref ucs_status_t
954+
*/
955+
ucs_status_t ucp_ep_flush_mem_progress(uct_pending_req_t *self);
956+
938957
#endif

src/ucp/core/ucp_mm.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,15 @@ typedef struct {
3535

3636
ucp_mem_dummy_handle_t ucp_mem_dummy_handle = {
3737
.memh = {
38-
.alloc_method = UCT_ALLOC_METHOD_LAST,
38+
.alloc_method = UCT_ALLOC_METHOD_LAST,
3939
.alloc_md_index = UCP_NULL_RESOURCE,
40-
.parent = &ucp_mem_dummy_handle.memh,
40+
.parent = &ucp_mem_dummy_handle.memh,
41+
.mem_type = UCS_MEMORY_TYPE_HOST,
42+
.sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN,
43+
.packed_sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN,
44+
.md_map = 0,
45+
.inv_md_map = 0,
46+
.reg_id = 0,
4147
},
4248
.uct = { UCT_MEM_HANDLE_NULL }
4349
};
@@ -709,7 +715,8 @@ void ucp_memh_disable_gva(ucp_mem_h memh, ucp_md_map_t md_map)
709715

710716
static void ucp_memh_init(ucp_mem_h memh, ucp_context_h context,
711717
uint8_t memh_flags, unsigned uct_flags,
712-
uct_alloc_method_t method, ucs_memory_type_t mem_type)
718+
uct_alloc_method_t method, ucs_memory_type_t mem_type,
719+
ucs_sys_device_t sys_dev)
713720
{
714721
memh->md_map = 0;
715722
memh->inv_md_map = 0;
@@ -719,6 +726,14 @@ static void ucp_memh_init(ucp_mem_h memh, ucp_context_h context,
719726
memh->alloc_md_index = UCP_NULL_RESOURCE;
720727
memh->alloc_method = method;
721728
memh->mem_type = mem_type;
729+
memh->sys_dev = sys_dev;
730+
731+
/* Cache sys_dev in a format packed to rkey to minimize overhead during
732+
* rndv protocols. TODO remove if using another method to mark rkey with
733+
* remote flush requirement. */
734+
memh->packed_sys_dev = (sys_dev == UCS_SYS_DEVICE_ID_UNKNOWN) ?
735+
UCS_SYS_DEVICE_ID_UNKNOWN :
736+
ucp_rkey_pack_sys_dev(memh);
722737
}
723738

724739
static ucs_status_t
@@ -736,11 +751,9 @@ ucp_memh_create(ucp_context_h context, void *address, size_t length,
736751

737752
memh->super.super.start = (uintptr_t)address;
738753
memh->super.super.end = (uintptr_t)address + length;
739-
ucp_memh_init(memh, context, memh_flags, uct_flags, method, mem_type);
740-
741-
ucp_memory_detect(context, ucp_memh_address(memh), ucp_memh_length(memh),
742-
&info);
743-
memh->sys_dev = info.sys_dev;
754+
ucp_memory_detect(context, address, length, &info);
755+
ucp_memh_init(memh, context, memh_flags, uct_flags, method, mem_type,
756+
info.sys_dev);
744757

745758
*memh_p = memh;
746759
return UCS_OK;
@@ -1632,7 +1645,7 @@ ucp_mem_rcache_mem_reg_cb(void *ctx, ucs_rcache_t *rcache, void *arg,
16321645
ucp_mem_h memh = ucs_derived_of(rregion, ucp_mem_t);
16331646

16341647
ucp_memh_init(memh, context, 0, reg_ctx->uct_flags, UCT_ALLOC_METHOD_LAST,
1635-
reg_ctx->mem_type);
1648+
reg_ctx->mem_type, UCS_SYS_DEVICE_ID_UNKNOWN);
16361649
memh->reg_id = context->next_memh_reg_id++;
16371650

16381651
if (rcache_mem_reg_flags & UCS_RCACHE_MEM_REG_HIDE_ERRORS) {

src/ucp/core/ucp_mm.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ typedef struct ucp_mem {
6868
ucp_context_h context; /* UCP context that owns a memory handle */
6969
uct_alloc_method_t alloc_method; /* Method used to allocate the memory */
7070
ucs_sys_device_t sys_dev; /* System device index */
71+
ucs_sys_device_t packed_sys_dev; /* System device index */
7172
ucs_memory_type_t mem_type; /* Type of allocated or registered memory */
7273
ucp_md_index_t alloc_md_index; /* Index of MD used to allocate the memory */
7374
uint64_t remote_uuid; /* Remote UUID */

src/ucp/core/ucp_request.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,24 @@ enum {
121121
};
122122

123123

124+
/**
125+
* Memory flush related structure
126+
*/
127+
typedef struct {
128+
/* Number of 0-read flush to complete overall */
129+
int count;
130+
131+
/* Number of 0-read flush started */
132+
int started;
133+
134+
/* Shared completion to track remaining */
135+
uct_completion_t uct_comp;
136+
137+
/* List of memory areas to track 0-read flush operations */
138+
ucp_mem_area_t *entries;
139+
} ucp_mem_flush_t;
140+
141+
124142
/**
125143
* Request in progress.
126144
*/
@@ -335,6 +353,7 @@ struct ucp_request {
335353
uint8_t sw_done;
336354
uint8_t num_lanes; /* How many lanes are being flushed */
337355
ucp_lane_map_t started_lanes; /* Which lanes need were flushed */
356+
ucp_mem_flush_t mem; /* Memory specific flushes */
338357
} flush;
339358

340359
struct {

src/ucp/core/ucp_rkey.c

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,31 @@ ucp_rkey_unpack_distance(const ucp_rkey_packed_distance_t *packed_distance,
125125
distance->bandwidth = UCS_FP8_UNPACK(BANDWIDTH, packed_distance->bandwidth);
126126
}
127127

128+
ucs_sys_device_t ucp_rkey_pack_sys_dev(ucp_mem_h memh)
129+
{
130+
ucs_sys_device_t sys_dev_packed = memh->sys_dev;
131+
ucp_md_index_t md_index;
132+
ucp_sys_dev_map_t sys_dev_map;
133+
ucs_sys_device_t sys_dev;
134+
135+
ucs_assert(sys_dev_packed <= UCP_SYS_DEVICE_MAX_PACKED);
136+
137+
ucs_for_each_bit(md_index, memh->md_map) {
138+
sys_dev_map = memh->context->tl_mds[md_index].sys_dev_map;
139+
ucs_for_each_bit(sys_dev, sys_dev_map) {
140+
if (ucs_topo_is_sibling(sys_dev, sys_dev_packed)) {
141+
/* PUT operation on such rkey requires remote flush.
142+
* Set a flag for the peer to recognize it. */
143+
sys_dev_packed |= UCP_SYS_DEVICE_FLUSH_BIT;
144+
goto out;
145+
}
146+
}
147+
}
148+
149+
out:
150+
return sys_dev_packed;
151+
}
152+
128153
UCS_PROFILE_FUNC(ssize_t, ucp_rkey_pack_memh,
129154
(context, md_map, memh, address, length, mem_info, sys_dev_map,
130155
sys_distance, uct_flags, buffer),
@@ -184,7 +209,7 @@ UCS_PROFILE_FUNC(ssize_t, ucp_rkey_pack_memh,
184209
}
185210

186211
/* Pack system device id */
187-
*ucs_serialize_next(&p, uint8_t) = mem_info->sys_dev;
212+
*ucs_serialize_next(&p, uint8_t) = memh->packed_sys_dev;
188213

189214
/* Pack distance from sys_dev to each device in distance_dev_map */
190215
ucs_for_each_bit(sys_dev, sys_dev_map) {
@@ -577,12 +602,11 @@ ucp_memh_packed_size(ucp_mem_h memh, uint64_t flags, int rkey_compat)
577602
ucp_memh_export_md_map(memh));
578603
}
579604

580-
if (rkey_compat) {
581-
return ucp_rkey_packed_size(context, memh->md_map,
582-
UCS_SYS_DEVICE_ID_UNKNOWN, 0);
605+
if (!rkey_compat) {
606+
ucs_fatal("packing rkey using ucp_memh_pack() is unsupported");
583607
}
584608

585-
ucs_fatal("packing rkey using ucp_memh_pack() is unsupported");
609+
return ucp_rkey_packed_size(context, memh->md_map, memh->sys_dev, 0);
586610
}
587611

588612
static ssize_t ucp_memh_do_pack(ucp_mem_h memh, uint64_t flags,
@@ -594,15 +618,15 @@ static ssize_t ucp_memh_do_pack(ucp_mem_h memh, uint64_t flags,
594618
return ucp_memh_exported_pack(memh, memh_buffer);
595619
}
596620

597-
if (rkey_compat) {
598-
mem_info.type = memh->mem_type;
599-
mem_info.sys_dev = UCS_SYS_DEVICE_ID_UNKNOWN;
600-
return ucp_rkey_pack_memh(memh->context, memh->md_map, memh,
601-
ucp_memh_address(memh), ucp_memh_length(memh),
602-
&mem_info, 0, NULL, 0, memh_buffer);
621+
if (!rkey_compat) {
622+
ucs_fatal("packing rkey using ucp_memh_pack() is unsupported");
603623
}
604624

605-
ucs_fatal("packing rkey using ucp_memh_pack() is unsupported");
625+
mem_info.type = memh->mem_type;
626+
mem_info.sys_dev = memh->sys_dev;
627+
return ucp_rkey_pack_memh(memh->context, memh->md_map, memh,
628+
ucp_memh_address(memh), ucp_memh_length(memh),
629+
&mem_info, 0, NULL, 0, memh_buffer);
606630
}
607631

608632
int ucp_memh_buffer_is_dummy(const void *exported_memh_buffer)

src/ucp/core/ucp_rkey.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ struct ucp_rkey_config_key {
6565
};
6666

6767

68+
#define UCP_SYS_DEVICE_FLUSH_BIT UCS_BIT(7)
69+
#define UCP_SYS_DEVICE_MAX_PACKED UCP_SYS_DEVICE_FLUSH_BIT - 1
70+
71+
6872
/**
6973
* Rkey configuration
7074
*/
@@ -238,4 +242,7 @@ void ucp_rkey_proto_select_dump(ucp_worker_h worker,
238242
ucp_worker_cfg_index_t rkey_cfg_index,
239243
ucs_string_buffer_t *strb);
240244

245+
246+
ucs_sys_device_t ucp_rkey_pack_sys_dev(ucp_mem_h memh);
247+
241248
#endif

src/ucp/core/ucp_rkey.inl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,12 @@ ucp_ep_rkey_unpack_reachable(ucp_ep_h ep, const void *buffer, size_t length,
6464
UCS_SYS_DEVICE_ID_UNKNOWN, rkey_p);
6565
}
6666

67+
static UCS_F_ALWAYS_INLINE int
68+
ucp_rkey_need_remote_flush(const ucp_rkey_config_key_t *key)
69+
{
70+
return (key->sys_dev != UCS_SYS_DEVICE_ID_UNKNOWN) &&
71+
(key->sys_dev & UCP_SYS_DEVICE_FLUSH_BIT);
72+
73+
}
74+
6775
#endif

src/ucp/core/ucp_worker.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2504,6 +2504,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
25042504
ucs_list_head_init(&worker->internal_eps);
25052505
kh_init_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash);
25062506
kh_init_inplace(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash);
2507+
kh_init_inplace(ucp_worker_remote_flush, &worker->remote_flush_hash);
25072508
worker->counters.ep_creations = 0;
25082509
worker->counters.ep_creation_failures = 0;
25092510
worker->counters.ep_closures = 0;
@@ -2714,6 +2715,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
27142715
kh_destroy_inplace(ucp_worker_discard_uct_ep_hash,
27152716
&worker->discard_uct_ep_hash);
27162717
kh_destroy_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash);
2718+
kh_destroy_inplace(ucp_worker_remote_flush, &worker->remote_flush_hash);
27172719
ucp_worker_destroy_configs(worker);
27182720
ucs_free(worker);
27192721
return status;
@@ -2963,6 +2965,7 @@ void ucp_worker_destroy(ucp_worker_h worker)
29632965
ucs_strided_alloc_cleanup(&worker->ep_alloc);
29642966
kh_destroy_inplace(ucp_worker_discard_uct_ep_hash,
29652967
&worker->discard_uct_ep_hash);
2968+
kh_destroy_inplace(ucp_worker_remote_flush, &worker->remote_flush_hash);
29662969
kh_destroy_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash);
29672970
ucp_worker_destroy_configs(worker);
29682971
ucs_free(worker);

src/ucp/core/ucp_worker.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,27 @@ KHASH_TYPE(ucp_worker_mpool_hash, ucp_worker_mpool_key_t, ucs_mpool_t);
230230
typedef khash_t(ucp_worker_mpool_hash) ucp_worker_mpool_hash_t;
231231

232232

233+
typedef struct {
234+
ucp_ep_h ep;
235+
ucs_sys_device_t sys_dev;
236+
} ucp_worker_remote_flush_key_t;
237+
238+
239+
/**
240+
* Remote memory to be used for device targeted remote flushing.
241+
*/
242+
typedef struct {
243+
uct_rkey_t uct_rkey;
244+
uct_ep_t *uct_ep;
245+
uint64_t address;
246+
} ucp_mem_area_t;
247+
248+
249+
/* Hash map to find what remote devices ep needs to flush */
250+
KHASH_TYPE(ucp_worker_remote_flush, ucp_worker_remote_flush_key_t,
251+
ucp_mem_area_t);
252+
253+
233254
/* EP configurations storage */
234255
UCS_ARRAY_DECLARE_TYPE(ucp_ep_config_arr_t, unsigned, ucp_ep_config_t);
235256

@@ -342,6 +363,7 @@ typedef struct ucp_worker {
342363
mapping */
343364
UCS_PTR_MAP_T(request) request_map; /* UCP requests key to
344365
ptr mapping */
366+
kh_ucp_worker_remote_flush_t remote_flush_hash;
345367

346368
ucp_ep_config_arr_t ep_config; /* EP configurations storage */
347369

0 commit comments

Comments
 (0)