Skip to content

Commit d44ff68

Browse files
authored
Merge pull request #10483 from ofirfarjun7/topic/fix-copy-header-reconfig-bug-v1_18_x
UCP/PROTO/RECONFIG: Copy AM header when flag is enabled - v1.18.x
2 parents bf836d7 + 70b824a commit d44ff68

File tree

2 files changed

+37
-8
lines changed

2 files changed

+37
-8
lines changed

src/ucp/proto/proto_reconfig.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010

1111
#include "proto_debug.h"
1212
#include "proto_select.h"
13+
#include "proto_am.inl"
1314
#include "proto_common.inl"
1415

16+
#include <ucp/am/ucp_am.inl>
1517
#include <ucp/core/ucp_worker.inl>
1618

1719

@@ -30,11 +32,21 @@ static ucs_status_t ucp_proto_reconfig_select_progress(uct_pending_req_t *self)
3032
return req->send.uct.func(&req->send.uct);
3133
}
3234

35+
static void ucp_proto_reconfig_abort(ucp_request_t *req, ucs_status_t status)
36+
{
37+
if (ucp_proto_config_is_am(req->send.proto_config)) {
38+
ucp_am_release_user_header(req);
39+
}
40+
41+
ucp_request_complete_send(req, status);
42+
}
43+
3344
static ucs_status_t ucp_proto_reconfig_progress(uct_pending_req_t *self)
3445
{
3546
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
3647
ucp_ep_h ep = req->send.ep;
3748
UCS_STRING_BUFFER_ONSTACK(strb, 256);
49+
ucs_status_t status;
3850

3951
/* This protocol should not be selected for valid and connected endpoint */
4052
if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) {
@@ -47,10 +59,19 @@ static ucs_status_t ucp_proto_reconfig_progress(uct_pending_req_t *self)
4759
ucp_operation_names, &strb);
4860
ucs_error("cannot find remote protocol for: %s",
4961
ucs_string_buffer_cstr(&strb));
50-
ucp_request_complete_send(req, UCS_ERR_CANCELED);
62+
ucp_proto_request_abort(req, UCS_ERR_CANCELED);
5163
return UCS_OK;
5264
}
5365

66+
if (ucp_proto_config_is_am(req->send.proto_config) &&
67+
(req->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER)) {
68+
status = ucp_proto_am_req_copy_header(req);
69+
if (status != UCS_OK) {
70+
ucp_proto_request_abort(req, status);
71+
return UCS_OK;
72+
}
73+
}
74+
5475
if (ep->cfg_index != req->send.proto_config->ep_cfg_index) {
5576
ucp_trace_req(req,
5677
"ep configuration changed from %d to %d,"
@@ -96,6 +117,6 @@ ucp_proto_t ucp_reconfig_proto = {
96117
.probe = ucp_proto_reconfig_probe,
97118
.query = ucp_proto_default_query,
98119
.progress = {ucp_proto_reconfig_progress},
99-
.abort = ucp_request_complete_send,
120+
.abort = ucp_proto_reconfig_abort,
100121
.reset = (ucp_request_reset_func_t)ucs_empty_function_return_success
101122
};

test/gtest/ucp/test_ucp_sockaddr.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2518,6 +2518,7 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
25182518
std::string sb(size, 'x');
25192519
std::string rb(size, 'y');
25202520
std::string shdr(hdr_size, 'x');
2521+
std::string shdr_copy = shdr;
25212522
std::string rhdr(hdr_size, 'y');
25222523
ucp_mem_h smemh(NULL);
25232524
ucp_mem_h rmemh(NULL);
@@ -2543,6 +2544,14 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
25432544
ucs_status_ptr_t sreq = ucp_am_send_nbx(sender().ep(), 0,
25442545
&shdr[0], hdr_size,
25452546
&sb[0], size, &param);
2547+
/* First message request triggers connection establishment and
2548+
* is placed into pending queue.
2549+
* To check UCP_AM_SEND_FLAG_COPY_HEADER we change AM header
2550+
* content while the request is still in pending queue.*/
2551+
if (flags & UCP_AM_SEND_FLAG_COPY_HEADER) {
2552+
shdr.assign(shdr.size(), 'a');
2553+
}
2554+
25462555
request_wait(sreq);
25472556
wait_for_flag(&arg.received);
25482557
// wait for receive request completion after 'received' flag set to
@@ -2551,7 +2560,7 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
25512560
EXPECT_TRUE(arg.received);
25522561

25532562
compare_buffers(sb, rb);
2554-
compare_buffers(shdr, rhdr);
2563+
compare_buffers(shdr_copy, rhdr);
25552564

25562565
set_am_data_handler(receiver(), 0, NULL, NULL);
25572566

@@ -2785,28 +2794,27 @@ UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols,
27852794
test_am_send_recv(64 * UCS_KBYTE, 0, 2, true, true);
27862795
}
27872796
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_short_reset,
2788-
RUNNING_ON_VALGRIND, "PROTO_ENABLE=n", "ZCOPY_THRESH=inf")
2797+
RUNNING_ON_VALGRIND, "ZCOPY_THRESH=inf")
27892798
{
27902799
test_am_send_recv(16, 8, 1, false, false, UCP_AM_SEND_FLAG_COPY_HEADER);
27912800
}
27922801

27932802
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_bcopy_reset,
2794-
RUNNING_ON_VALGRIND,
2795-
"PROTO_ENABLE=n", "ZCOPY_THRESH=inf")
2803+
RUNNING_ON_VALGRIND, "ZCOPY_THRESH=inf")
27962804
{
27972805
test_am_send_recv(2 * UCS_KBYTE, 8, 1, false, false,
27982806
UCP_AM_SEND_FLAG_COPY_HEADER);
27992807
}
28002808

28012809
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_zcopy_reset,
2802-
RUNNING_ON_VALGRIND, "PROTO_ENABLE=n")
2810+
RUNNING_ON_VALGRIND)
28032811
{
28042812
test_am_send_recv(16 * UCS_KBYTE, 8, 1, false, false,
28052813
UCP_AM_SEND_FLAG_COPY_HEADER);
28062814
}
28072815

28082816
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_rndv_reset,
2809-
RUNNING_ON_VALGRIND, "PROTO_ENABLE=n", "RNDV_THRESH=0")
2817+
RUNNING_ON_VALGRIND, "RNDV_THRESH=0")
28102818
{
28112819
test_am_send_recv(16 * UCS_KBYTE, 8, 1, false, false,
28122820
UCP_AM_SEND_FLAG_COPY_HEADER);

0 commit comments

Comments
 (0)