Skip to content

Commit

Permalink
Fix set_delete notification path in rail
Browse files Browse the repository at this point in the history
  • Loading branch information
narategithub committed Aug 16, 2024
1 parent f4a672a commit 8200bcc
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 68 deletions.
6 changes: 6 additions & 0 deletions ldms/python/ldms.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ cdef extern from "ldms.h" nogil:
EVENT_ERROR "LDMS_XPRT_EVENT_ERROR"
EVENT_DISCONNECTED "LDMS_XPRT_EVENT_DISCONNECTED"
EVENT_RECV "LDMS_XPRT_EVENT_RECV"
EVENT_SET_DELETE "LDMS_XPRT_EVENT_SET_DELETE"
EVENT_SEND_COMPLETE "LDMS_XPRT_EVENT_SEND_COMPLETE"
EVENT_SEND_CREDIT_DEPOSITED "LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED"
EVENT_LAST "LDMS_XPRT_EVENT_LAST"
Expand All @@ -285,19 +286,24 @@ cdef extern from "ldms.h" nogil:
LDMS_XPRT_EVENT_ERROR
LDMS_XPRT_EVENT_DISCONNECTED
LDMS_XPRT_EVENT_RECV
LDMS_XPRT_EVENT_SET_DELETE
LDMS_XPRT_EVENT_SEND_COMPLETE
LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED
LDMS_XPRT_EVENT_LAST
cdef struct ldms_xprt_credit_event_data:
uint64_t credit
int ep_idx
cdef struct ldms_xprt_set_delete_data:
void * set
const char *name
cdef struct ldms_xprt_event:
ldms_xprt_event_type type
size_t data_len
# data, and credit are in union. Cython doesn't care. It just want to
# know the names of the "fields" it can access in C code.
char *data
ldms_xprt_credit_event_data credit
ldms_xprt_set_delete_data set_delete
ctypedef ldms_xprt_event *ldms_xprt_event_t
ctypedef void (*ldms_event_cb_t)(ldms_t x, ldms_xprt_event_t e, void *cb_arg)

Expand Down
24 changes: 24 additions & 0 deletions ldms/python/ldms.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,20 @@ cdef class CreditEventData(object):
def __repr__(self):
return str(self)

cdef class SetDeleteEventData(object):
cdef readonly str name
cdef readonly Set set

def __cinit__(self, Set _lset, str _name):
self.set = _lset
self.name = _name

def __str__(self):
return f"({self.set}, {self.name})"

def __repr__(self):
return str(self)


cdef class XprtEvent(object):
"""An LDMS transport event
Expand Down Expand Up @@ -1381,16 +1395,23 @@ cdef class XprtEvent(object):
cdef readonly CreditEventData credit
"""Current send credit (for the EVENT_SEND_CREDIT_DEPOSITED"""

cdef readonly SetDeleteEventData set_delete

# NOTE: This is a Python object wrapper of `struct ldms_xprt_event`.
#
# The values of the attributes are also new Python objects. So, we
# don't have to worry about the actual `ldms_xprt_event_t` being
# destroyed in the C level after the callback.
def __cinit__(self, Ptr ptr):
cdef ldms_xprt_event_t e = <ldms_xprt_event_t>ptr.c_ptr
cdef ldms_set_t cset
self.type = ldms_xprt_event_type(e.type)
if self.type == ldms.LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED:
self.credit = CreditEventData(e.credit.credit, e.credit.ep_idx)
elif self.type == ldms.LDMS_XPRT_EVENT_SET_DELETE:
cset = <ldms_set_t>e.set_delete.set
lset = Set(None, None, set_ptr=PTR(cset)) if cset else None
self.set_delete = SetDeleteEventData(lset, STR(e.set_delete.name))
else:
self.data = e.data[:e.data_len]

Expand Down Expand Up @@ -1439,6 +1460,9 @@ cdef void xprt_cb(ldms_t _x, ldms_xprt_event *e, void *arg) with gil:
elif e.type == EVENT_SEND_CREDIT_DEPOSITED:
# do NOT sem_post()
return
elif e.type == EVENT_SET_DELETE:
# do NOT sem_post()
return
else:
raise OSError(EINVAL, "Unknown LDMS event type {}".format(e.type))
sem_post(&x._conn_sem)
Expand Down
95 changes: 93 additions & 2 deletions ldms/src/core/ldms_rail.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,20 @@ void __rail_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg)

/*
* NOTE: `x` is an xprt that could be:
* - legacy passive endpoint
* - legacy passive endpoint (e.g. old remote peer using 4.3.11)
* - ldms xprt member in a rail
*/

if (r->xtype == LDMS_XTYPE_PASSIVE_RAIL && r->state == LDMS_RAIL_EP_LISTENING) {
/* x is a legacy xprt accepted by a listening rail */
/*
* This condition is only for `x` with legacy LDMS remote peer.
* I this case, we shall wrap `x` with a new rail before
* continuing.
*
* NOTE: If the remote is rail, it will use rail connect message
* which is handled by `__rail_zap_handle_conn_req()`, which
* bundles xprt members into the associated rail object.
*/
if (e->type != LDMS_XPRT_EVENT_CONNECTED) {
/* bad passive legacy xprt does not notify the app */
ldms_xprt_put(x);
Expand Down Expand Up @@ -1583,3 +1592,85 @@ static ldms_set_t __rail_set_by_name(ldms_t _x, const char *set_name)
}
return set;
}

/* defined in ldms_xprt.c */
size_t format_set_delete_req(struct ldms_request *req, uint64_t xid,
const char *inst_name);

/* Called from ldms_xprt.c.
* Tell the peer that have an RBD for this set that it is being
* deleted. When they all reply, we can delete the set.
*/
void __rail_set_delete(ldms_t _r, struct ldms_set *s,
ldms_set_delete_cb_t cb_fn)
{
/*
*/
assert(XTYPE_IS_RAIL(_r->xtype));

ldms_rail_t r = (ldms_rail_t)_r;
struct ldms_request *req;
struct ldms_context *ctxt;
size_t len;
struct rbn *rbn;
struct xprt_set_coll_entry *ent;
int i;
ldms_t x;

x = NULL;

/* for each x in r */
for (i = 0; i < r->n_eps; i++) {
x = r->eps[i].ep;
pthread_mutex_lock(&x->lock);
rbn = rbt_find(&x->set_coll, s);
goto found;
pthread_mutex_unlock(&x->lock);
}

/* No rbn found in any x in r. Just use the first x to notify with
* ctxt->set_delete.lookup being 0. */
x = r->eps[0].ep;

/* let through */

pthread_mutex_lock(&x->lock);
found:
ctxt = __ldms_alloc_ctxt
(x,
sizeof(struct ldms_request) + sizeof(struct ldms_context),
LDMS_CONTEXT_SET_DELETE,
s,
cb_fn);
if (!ctxt) {
ovis_log(xlog, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n",
__FILE__, __func__, __LINE__);
pthread_mutex_unlock(&x->lock);
return;
}
if (rbn) {
/* We'll put set ref when we receive the reply. */
ctxt->set_delete.lookup = 1;
rbt_del(&x->set_coll, rbn);
ent = container_of(rbn, struct xprt_set_coll_entry, rbn);
free(ent);
} else {
/* We won't put ref on receiving reply. */
ctxt->set_delete.lookup = 0;
}
req = (struct ldms_request *)(ctxt + 1);
len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt,
ldms_set_instance_name_get(s));
zap_err_t zerr = zap_send(x->zap_ep, req, len);
if (zerr) {
char name[128];
(void) ldms_xprt_names(x, NULL, 0, NULL, 0, name, 128,
NULL, 0, NI_NUMERICHOST);
ovis_log(xlog, OVIS_LERROR, "%s:%s:%d Error %d sending "
"the LDMS_SET_DELETE message to '%s'\n",
__FILE__, __func__, __LINE__, zerr, name);
x->zerrno = zerr;
__ldms_free_ctxt(x, ctxt);
}
pthread_mutex_unlock(&x->lock);
}
86 changes: 20 additions & 66 deletions ldms/src/core/ldms_xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ const char *ldms_xprt_event_type_to_str(enum ldms_xprt_event_type t)
return xprt_event_type_names[t];
}

void ldms_xprt_set_delete(ldms_t x, struct ldms_set *s, ldms_set_delete_cb_t cb_fn);

static ldms_t __ldms_xprt_get(ldms_t x)
{
int a;
Expand Down Expand Up @@ -302,12 +300,15 @@ ldms_t ldms_xprt_by_remote_sin(struct sockaddr *sa)
goto next;

if (__is_same_addr(sa, (struct sockaddr *)&ss_remote)) {
/* Put the next ref back. */
/*
* Put the next ref back (taken in ldms_xprt_first()
* or ldms_xprt_next()).
*/
ldms_xprt_put(l);
r = __ldms_xprt_to_rail(l);
ldms_xprt_get(r);
/*
* Put back the app reference taken in
* Put back the caller reference taken in
* ldms_xprt_first() or ldms_xprt_next().
*
* The rail hold a reference on the ldms_xprt object already.
Expand Down Expand Up @@ -608,13 +609,17 @@ static void __set_delete_cb(ldms_t xprt, int status, ldms_set_t rbd, void *cb_ar
ref_put(&set->ref, "xprt_set_coll");
}

/* implementation in ldms_rail.c */
void __rail_set_delete(ldms_t _r, struct ldms_set *s,
ldms_set_delete_cb_t cb_fn);

void __ldms_dir_del_set(struct ldms_set *set)
{
/*
* LDMS versions >= 4.3.4 do not send LDMS_DIR_DEL, instead
* they use the two way handshake provided by
* ldms_xprt_set_delete() to inform the peer and receive
* acknowledgment of the set's disuse.
* __rail_set_delete() (previously __xprt_set_delete() before rail)
* to inform the peer and receive acknowledgment of the set's disuse.
*
* We still handle LDMS_DIR_DEL and pass it to the application
* so that it can interoperate with compute nodes that are
Expand All @@ -623,10 +628,17 @@ void __ldms_dir_del_set(struct ldms_set *set)
* dir_update(set, LDMS_DIR_DEL);
*/
struct ldms_xprt *x;
ldms_t r;
pthread_mutex_lock(&xprt_list_lock);
LIST_FOREACH(x, &xprt_list, xprt_link) {
if (x->remote_dir_xid)
ldms_xprt_set_delete(x, set, __set_delete_cb);
if (x->remote_dir_xid) {
/* NOTE:
* There will be only one `x` in `r` that has
* `x->remote_dir_xid != 0`.
*/
r = __ldms_xprt_to_rail(x);
__rail_set_delete(r, set, __set_delete_cb);
}
}
pthread_mutex_unlock(&xprt_list_lock);
}
Expand Down Expand Up @@ -3838,64 +3850,6 @@ int ldms_register_notify_cb(ldms_t x, ldms_set_t s, int flags,
return -1;
}

/*
* Tell all peers that have an RBD for this set that it is being
* deleted. When they all reply, we can delete the set.
*/
void ldms_xprt_set_delete(ldms_t x, struct ldms_set *s, ldms_set_delete_cb_t cb_fn)
{
struct ldms_request *req;
struct ldms_context *ctxt;
size_t len;
struct rbn *rbn;
struct xprt_set_coll_entry *ent;

pthread_mutex_lock(&x->lock);
if (!ldms_xprt_connected(x)) {
pthread_mutex_unlock(&x->lock);
return;
}

ctxt = __ldms_alloc_ctxt
(x,
sizeof(struct ldms_request) + sizeof(struct ldms_context),
LDMS_CONTEXT_SET_DELETE,
s,
cb_fn);
if (!ctxt) {
XPRT_LOG(x, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n",
__FILE__, __func__, __LINE__);
pthread_mutex_unlock(&x->lock);
return;
}
rbn = rbt_find(&x->set_coll, s);
if (rbn) {
/* We'll put set ref when we receive the reply. */
ctxt->set_delete.lookup = 1;
rbt_del(&x->set_coll, rbn);
ent = container_of(rbn, struct xprt_set_coll_entry, rbn);
free(ent);
} else {
/* We won't put ref on receiving reply. */
ctxt->set_delete.lookup = 0;
}
req = (struct ldms_request *)(ctxt + 1);
len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt,
ldms_set_instance_name_get(s));
zap_err_t zerr = zap_send(x->zap_ep, req, len);
if (zerr) {
char name[128];
(void) ldms_xprt_names(x, NULL, 0, NULL, 0, name, 128,
NULL, 0, NI_NUMERICHOST);
XPRT_LOG(x, OVIS_LERROR, "%s:%s:%d Error %d sending "
"the LDMS_SET_DELETE message to '%s'\n",
__FILE__, __func__, __LINE__, zerr, name);
x->zerrno = zerr;
__ldms_free_ctxt(x, ctxt);
}
pthread_mutex_unlock(&x->lock);
}

static int send_cancel_notify(ldms_t _x, ldms_set_t s)
{
struct ldms_xprt *x = _x;
Expand Down

0 comments on commit 8200bcc

Please sign in to comment.