diff --git a/ldms/python/ldms.pxd b/ldms/python/ldms.pxd index ff7518bdfe..7df6e04817 100644 --- a/ldms/python/ldms.pxd +++ b/ldms/python/ldms.pxd @@ -277,6 +277,7 @@ cdef extern from "ldms.h" nogil: EVENT_ERROR "LDMS_XPRT_EVENT_ERROR" EVENT_DISCONNECTED "LDMS_XPRT_EVENT_DISCONNECTED" EVENT_RECV "LDMS_XPRT_EVENT_RECV" + EVENT_SET_DELETE "LDMS_XPRT_EVENT_SET_DELETE" EVENT_SEND_COMPLETE "LDMS_XPRT_EVENT_SEND_COMPLETE" EVENT_SEND_CREDIT_DEPOSITED "LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED" EVENT_LAST "LDMS_XPRT_EVENT_LAST" @@ -285,12 +286,16 @@ cdef extern from "ldms.h" nogil: LDMS_XPRT_EVENT_ERROR LDMS_XPRT_EVENT_DISCONNECTED LDMS_XPRT_EVENT_RECV + LDMS_XPRT_EVENT_SET_DELETE LDMS_XPRT_EVENT_SEND_COMPLETE LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED LDMS_XPRT_EVENT_LAST cdef struct ldms_xprt_credit_event_data: uint64_t credit int ep_idx + cdef struct ldms_xprt_set_delete_data: + void * set + const char *name cdef struct ldms_xprt_event: ldms_xprt_event_type type size_t data_len @@ -298,6 +303,7 @@ cdef extern from "ldms.h" nogil: # know the names of the "fields" it can access in C code. char *data ldms_xprt_credit_event_data credit + ldms_xprt_set_delete_data set_delete ctypedef ldms_xprt_event *ldms_xprt_event_t ctypedef void (*ldms_event_cb_t)(ldms_t x, ldms_xprt_event_t e, void *cb_arg) diff --git a/ldms/python/ldms.pyx b/ldms/python/ldms.pyx index 67e91f88b0..c159313d8d 100644 --- a/ldms/python/ldms.pyx +++ b/ldms/python/ldms.pyx @@ -1354,6 +1354,20 @@ cdef class CreditEventData(object): def __repr__(self): return str(self) +cdef class SetDeleteEventData(object): + cdef readonly str name + cdef readonly Set set + + def __cinit__(self, Set _lset, str _name): + self.set = _lset + self.name = _name + + def __str__(self): + return f"({self.set}, {self.name})" + + def __repr__(self): + return str(self) + cdef class XprtEvent(object): """An LDMS transport event @@ -1381,6 +1395,8 @@ cdef class XprtEvent(object): cdef readonly CreditEventData credit """Current send credit (for the EVENT_SEND_CREDIT_DEPOSITED""" + cdef readonly SetDeleteEventData set_delete + # NOTE: This is a Python object wrapper of `struct ldms_xprt_event`. # # The values of the attributes are also new Python objects. So, we @@ -1388,9 +1404,14 @@ cdef class XprtEvent(object): # destroyed in the C level after the callback. def __cinit__(self, Ptr ptr): cdef ldms_xprt_event_t e = ptr.c_ptr + cdef ldms_set_t cset self.type = ldms_xprt_event_type(e.type) if self.type == ldms.LDMS_XPRT_EVENT_SEND_CREDIT_DEPOSITED: self.credit = CreditEventData(e.credit.credit, e.credit.ep_idx) + elif self.type == ldms.LDMS_XPRT_EVENT_SET_DELETE: + cset = e.set_delete.set + lset = Set(None, None, set_ptr=PTR(cset)) if cset else None + self.set_delete = SetDeleteEventData(lset, STR(e.set_delete.name)) else: self.data = e.data[:e.data_len] @@ -1439,6 +1460,9 @@ cdef void xprt_cb(ldms_t _x, ldms_xprt_event *e, void *arg) with gil: elif e.type == EVENT_SEND_CREDIT_DEPOSITED: # do NOT sem_post() return + elif e.type == EVENT_SET_DELETE: + # do NOT sem_post() + return else: raise OSError(EINVAL, "Unknown LDMS event type {}".format(e.type)) sem_post(&x._conn_sem) diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index 25e21855b2..11a7128d54 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -421,11 +421,20 @@ void __rail_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) /* * NOTE: `x` is an xprt that could be: - * - legacy passive endpoint + * - legacy passive endpoint (e.g. old remote peer using 4.3.11) + * - ldms xprt member in a rail */ if (r->xtype == LDMS_XTYPE_PASSIVE_RAIL && r->state == LDMS_RAIL_EP_LISTENING) { - /* x is a legacy xprt accepted by a listening rail */ + /* + * This condition is only for `x` with legacy LDMS remote peer. + * I this case, we shall wrap `x` with a new rail before + * continuing. + * + * NOTE: If the remote is rail, it will use rail connect message + * which is handled by `__rail_zap_handle_conn_req()`, which + * bundles xprt members into the associated rail object. + */ if (e->type != LDMS_XPRT_EVENT_CONNECTED) { /* bad passive legacy xprt does not notify the app */ ldms_xprt_put(x); @@ -1583,3 +1592,85 @@ static ldms_set_t __rail_set_by_name(ldms_t _x, const char *set_name) } return set; } + +/* defined in ldms_xprt.c */ +size_t format_set_delete_req(struct ldms_request *req, uint64_t xid, + const char *inst_name); + +/* Called from ldms_xprt.c. + * Tell the peer that have an RBD for this set that it is being + * deleted. When they all reply, we can delete the set. + */ +void __rail_set_delete(ldms_t _r, struct ldms_set *s, + ldms_set_delete_cb_t cb_fn) +{ + /* + */ + assert(XTYPE_IS_RAIL(_r->xtype)); + + ldms_rail_t r = (ldms_rail_t)_r; + struct ldms_request *req; + struct ldms_context *ctxt; + size_t len; + struct rbn *rbn; + struct xprt_set_coll_entry *ent; + int i; + ldms_t x; + + x = NULL; + + /* for each x in r */ + for (i = 0; i < r->n_eps; i++) { + x = r->eps[i].ep; + pthread_mutex_lock(&x->lock); + rbn = rbt_find(&x->set_coll, s); + goto found; + pthread_mutex_unlock(&x->lock); + } + + /* No rbn found in any x in r. Just use the first x to notify with + * ctxt->set_delete.lookup being 0. */ + x = r->eps[0].ep; + + /* let through */ + + pthread_mutex_lock(&x->lock); + found: + ctxt = __ldms_alloc_ctxt + (x, + sizeof(struct ldms_request) + sizeof(struct ldms_context), + LDMS_CONTEXT_SET_DELETE, + s, + cb_fn); + if (!ctxt) { + ovis_log(xlog, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n", + __FILE__, __func__, __LINE__); + pthread_mutex_unlock(&x->lock); + return; + } + if (rbn) { + /* We'll put set ref when we receive the reply. */ + ctxt->set_delete.lookup = 1; + rbt_del(&x->set_coll, rbn); + ent = container_of(rbn, struct xprt_set_coll_entry, rbn); + free(ent); + } else { + /* We won't put ref on receiving reply. */ + ctxt->set_delete.lookup = 0; + } + req = (struct ldms_request *)(ctxt + 1); + len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt, + ldms_set_instance_name_get(s)); + zap_err_t zerr = zap_send(x->zap_ep, req, len); + if (zerr) { + char name[128]; + (void) ldms_xprt_names(x, NULL, 0, NULL, 0, name, 128, + NULL, 0, NI_NUMERICHOST); + ovis_log(xlog, OVIS_LERROR, "%s:%s:%d Error %d sending " + "the LDMS_SET_DELETE message to '%s'\n", + __FILE__, __func__, __LINE__, zerr, name); + x->zerrno = zerr; + __ldms_free_ctxt(x, ctxt); + } + pthread_mutex_unlock(&x->lock); +} diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index 1fc16b2aee..0980e54211 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -127,8 +127,6 @@ const char *ldms_xprt_event_type_to_str(enum ldms_xprt_event_type t) return xprt_event_type_names[t]; } -void ldms_xprt_set_delete(ldms_t x, struct ldms_set *s, ldms_set_delete_cb_t cb_fn); - static ldms_t __ldms_xprt_get(ldms_t x) { int a; @@ -302,12 +300,15 @@ ldms_t ldms_xprt_by_remote_sin(struct sockaddr *sa) goto next; if (__is_same_addr(sa, (struct sockaddr *)&ss_remote)) { - /* Put the next ref back. */ + /* + * Put the next ref back (taken in ldms_xprt_first() + * or ldms_xprt_next()). + */ ldms_xprt_put(l); r = __ldms_xprt_to_rail(l); ldms_xprt_get(r); /* - * Put back the app reference taken in + * Put back the caller reference taken in * ldms_xprt_first() or ldms_xprt_next(). * * The rail hold a reference on the ldms_xprt object already. @@ -608,13 +609,17 @@ static void __set_delete_cb(ldms_t xprt, int status, ldms_set_t rbd, void *cb_ar ref_put(&set->ref, "xprt_set_coll"); } +/* implementation in ldms_rail.c */ +void __rail_set_delete(ldms_t _r, struct ldms_set *s, + ldms_set_delete_cb_t cb_fn); + void __ldms_dir_del_set(struct ldms_set *set) { /* * LDMS versions >= 4.3.4 do not send LDMS_DIR_DEL, instead * they use the two way handshake provided by - * ldms_xprt_set_delete() to inform the peer and receive - * acknowledgment of the set's disuse. + * __rail_set_delete() (previously __xprt_set_delete() before rail) + * to inform the peer and receive acknowledgment of the set's disuse. * * We still handle LDMS_DIR_DEL and pass it to the application * so that it can interoperate with compute nodes that are @@ -623,10 +628,17 @@ void __ldms_dir_del_set(struct ldms_set *set) * dir_update(set, LDMS_DIR_DEL); */ struct ldms_xprt *x; + ldms_t r; pthread_mutex_lock(&xprt_list_lock); LIST_FOREACH(x, &xprt_list, xprt_link) { - if (x->remote_dir_xid) - ldms_xprt_set_delete(x, set, __set_delete_cb); + if (x->remote_dir_xid) { + /* NOTE: + * There will be only one `x` in `r` that has + * `x->remote_dir_xid != 0`. + */ + r = __ldms_xprt_to_rail(x); + __rail_set_delete(r, set, __set_delete_cb); + } } pthread_mutex_unlock(&xprt_list_lock); } @@ -3838,64 +3850,6 @@ int ldms_register_notify_cb(ldms_t x, ldms_set_t s, int flags, return -1; } -/* - * Tell all peers that have an RBD for this set that it is being - * deleted. When they all reply, we can delete the set. - */ -void ldms_xprt_set_delete(ldms_t x, struct ldms_set *s, ldms_set_delete_cb_t cb_fn) -{ - struct ldms_request *req; - struct ldms_context *ctxt; - size_t len; - struct rbn *rbn; - struct xprt_set_coll_entry *ent; - - pthread_mutex_lock(&x->lock); - if (!ldms_xprt_connected(x)) { - pthread_mutex_unlock(&x->lock); - return; - } - - ctxt = __ldms_alloc_ctxt - (x, - sizeof(struct ldms_request) + sizeof(struct ldms_context), - LDMS_CONTEXT_SET_DELETE, - s, - cb_fn); - if (!ctxt) { - XPRT_LOG(x, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n", - __FILE__, __func__, __LINE__); - pthread_mutex_unlock(&x->lock); - return; - } - rbn = rbt_find(&x->set_coll, s); - if (rbn) { - /* We'll put set ref when we receive the reply. */ - ctxt->set_delete.lookup = 1; - rbt_del(&x->set_coll, rbn); - ent = container_of(rbn, struct xprt_set_coll_entry, rbn); - free(ent); - } else { - /* We won't put ref on receiving reply. */ - ctxt->set_delete.lookup = 0; - } - req = (struct ldms_request *)(ctxt + 1); - len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt, - ldms_set_instance_name_get(s)); - zap_err_t zerr = zap_send(x->zap_ep, req, len); - if (zerr) { - char name[128]; - (void) ldms_xprt_names(x, NULL, 0, NULL, 0, name, 128, - NULL, 0, NI_NUMERICHOST); - XPRT_LOG(x, OVIS_LERROR, "%s:%s:%d Error %d sending " - "the LDMS_SET_DELETE message to '%s'\n", - __FILE__, __func__, __LINE__, zerr, name); - x->zerrno = zerr; - __ldms_free_ctxt(x, ctxt); - } - pthread_mutex_unlock(&x->lock); -} - static int send_cancel_notify(ldms_t _x, ldms_set_t s) { struct ldms_xprt *x = _x;