Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ configure
depcomp
install-sh
missing
openlava-*
config.h
config.log
config.status
Expand Down
12 changes: 8 additions & 4 deletions config/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
# Copyright (C) David Bigagli
#

# Install configuration files for volclava base and batch
# Install configuration files for volclava base and batch
# in the conf directory.

# Install the shell profile scrips and the system startup
# Install the shell profile scrips and the system startup
# script in the etc directory.

etcdir = $(prefix)/etc
config_files = volclava volclava.sh volclava.csh volclava.setup \
lsf.conf lsf.cluster.@volclavacluster@ lsf.shared lsf.task \
lsb.hosts lsb.params lsb.queues lsb.users
<<<<<<< HEAD
lsb.hosts lsb.params lsb.queues lsb.users
=======
lsb.hosts lsb.params lsb.queues lsb.users
>>>>>>> abf430c (epoll integration: Complete network I/O overhaul across Volclava components)

INTERACTIVE=@INTERACTIVE@

Expand Down Expand Up @@ -60,7 +64,7 @@ install-data-local:
mkdir -p $(etcdir);\
for file in $(config_files); do \
install -m 644 "$(srcdir)/$$file" "$(etcdir)/"; \
done; \
done; \
fi

clean-local:
Expand Down
20 changes: 20 additions & 0 deletions config/volclava-lim.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[Unit]
Description=Volclava LIM (Load Information Manager)
Wants=network-online.target
After=network-online.target
Before=volclava-res.service volclava-sbatchd.service

[Service]
Type=forking
# Point LIM to the directory that contains lsf.conf and the daemons
Environment=LSF_ENVDIR=/opt/volclava-2.0/etc
ExecStart=/opt/volclava-2.0/sbin/lim
KillMode=control-group
GuessMainPID=no
Restart=on-failure
RestartSec=3
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target

18 changes: 18 additions & 0 deletions config/volclava-res.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[Unit]
Description=Volclava RES (Remote Execution Server)
Wants=network-online.target
After=network-online.target volclava-lim.service

[Service]
Type=forking
Environment=LSF_ENVDIR=/opt/volclava-2.0/etc
ExecStart=/opt/volclava-2.0/sbin/res
KillMode=control-group
GuessMainPID=no
Restart=on-failure
RestartSec=3
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target

18 changes: 18 additions & 0 deletions config/volclava-sbatchd.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[Unit]
Description=Volclava sbatchd (Batch Daemon)
Wants=network-online.target
After=network-online.target volclava-lim.service volclava-res.service

[Service]
Type=forking
Environment=LSF_ENVDIR=/opt/volclava-2.0/etc
ExecStart=/opt/volclava-2.0/sbin/sbatchd
KillMode=control-group
GuessMainPID=no
Restart=on-failure
RestartSec=3
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target

2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ AC_PROG_CC
AC_PROG_INSTALL
AC_PROG_RANLIB
AC_PROG_YACC
AC_PROG_LEX
AC_PROG_LEX([yywrap])
AC_PROG_LN_S
AC_PROG_MAKE_SET

Expand Down
18 changes: 17 additions & 1 deletion lsbatch/daemons/daemons.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ init_ServSock(u_short port)
return -1;
}

if (chanEpollInit_() < 0) {
ls_syslog(LOG_ERR, "%s: chanEpollInit_() failed %m", __func__);
return -1;
}

if (chanRegisterEpoll_(ch, EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) {
ls_syslog(LOG_ERR, "%s: chanRegisterEpoll_() failed %m", __func__);
return-1;
}

return ch;
}

Expand Down Expand Up @@ -172,7 +182,13 @@ do_readyOp(XDR *xdrs, int chanfd, struct sockaddr_in *from,
xdr_destroy(&xdrs2);
return(-1);
}

if (chanModEpoll_(chanfd, EPOLLOUT|EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) {
ls_syslog(LOG_ERR, "%s: chanModEpoll_() failed %m", __func__);
/* xdr_destroy is NOOP it is just a logical operation
*/
xdr_destroy(&xdrs2);
return -1;
}
xdr_destroy(&xdrs2);
return(0);
}
Expand Down
83 changes: 41 additions & 42 deletions lsbatch/daemons/mbd.main.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ static int authRequest(struct lsfAuth *, XDR *, struct LSFHeader *,
char *, int);
static int processClient(struct clientNode *, int *);

static void clientIO(struct Masks *);
static void clientIO(struct chanData *, int);
static int forkOnRequest(mbdReqType);
static void shutdownSbdConnections(void);
static void processSbdNode(struct sbdNode *, int);
Expand All @@ -189,12 +189,8 @@ extern int initLimSock_(void);
int
main (int argc, char **argv)
{
fd_set readmask;
struct Masks sockmask;
struct Masks chanmask;
struct timeval timeout;
int nready;
int i;
int cc;
int hsKeeping = FALSE;
time_t lastPeriodicCheckTime = 0;
Expand Down Expand Up @@ -248,7 +244,7 @@ main (int argc, char **argv)
}

if (debug < 2 && !lsb_CheckMode) {
for (i = sysconf(_SC_OPEN_MAX) ; i >= 3 ; i--)
for (int i = sysconf(_SC_OPEN_MAX) ; i >= 3 ; i--)
close(i);
}

Expand Down Expand Up @@ -352,7 +348,7 @@ main (int argc, char **argv)
TIMEIT(0, minit(FIRST_START),"minit");

masterHost = ls_getmastername();
for (i = 0; i < 3 && !masterHost && lserrno == LSE_TIME_OUT; i++) {
for (int i = 0; i < 3 && !masterHost && lserrno == LSE_TIME_OUT; i++) {
millisleep_(6000);
masterHost = ls_getmastername();
}
Expand Down Expand Up @@ -417,8 +413,6 @@ main (int argc, char **argv)
for (;;) {
int maxfd;

FD_ZERO(&readmask);

maxfd = sysconf(_SC_OPEN_MAX);
now = time(0);

Expand All @@ -445,9 +439,8 @@ main (int argc, char **argv)
timeout.tv_sec = 0;
}

sockmask.rmask = readmask;

nready = chanSelect_(&sockmask, &chanmask, &timeout);
int tm = timeout.tv_sec * 1000;
nready = chanEpoll_(tm);
if (nready < 0) {
if (errno != EINTR)
ls_syslog(LOG_ERR, "\
Expand Down Expand Up @@ -479,12 +472,16 @@ main (int argc, char **argv)
timeout.tv_sec = 0;
timeout.tv_usec = 0;

if (FD_ISSET(batchSock, &chanmask.rmask)) {
acceptConnection(batchSock);
}
for (int i = 0; i < nready; i++) {
int ch = epoll_events[i].data.u32;

clientIO(&chanmask);
if (chanSock_(batchSock) == channels[ch].handle) {
acceptConnection(batchSock);
continue;
}

clientIO(&channels[ch], ch);
}
} /* for (;;) */
}

Expand Down Expand Up @@ -537,13 +534,12 @@ acceptConnection(int socket)
}

static void
clientIO(struct Masks *chanmask)
clientIO(struct chanData *chan, int chfd)
{
struct clientNode *cliPtr;
struct clientNode *nextClient;
struct sbdNode *sbdPtr;
struct sbdNode *nextSbdPtr;
int exception;

if (logclass & LC_TRACE)
ls_syslog(LOG_DEBUG,"clientIO: Entering...");
Expand All @@ -553,44 +549,47 @@ clientIO(struct Masks *chanmask)
sbdPtr = nextSbdPtr) {
nextSbdPtr = sbdPtr->forw;

if (FD_ISSET(sbdPtr->chanfd, &chanmask->rmask)
|| FD_ISSET(sbdPtr->chanfd, &chanmask->emask)) {
if (chanSock_(sbdPtr->chanfd) != chan->handle)
continue;

if (FD_ISSET(sbdPtr->chanfd, &chanmask->emask))
exception = TRUE;
else
exception = FALSE;
if (chan->events & EPOLL_EVENTS_READ
|| chan->events & EPOLL_EVENTS_ERROR) {
int exception = (chan->events & EPOLL_EVENTS_ERROR) != 0;
processSbdNode(sbdPtr, exception);
// we found and handled the matching SBD; stop scanning
return;
}
}


// Find the client which corresponds to this channel
for (cliPtr = clientList->forw;
cliPtr != clientList;
cliPtr = nextClient) {
int needFree;
nextClient = cliPtr->forw;

if (FD_ISSET(cliPtr->chanfd, &chanmask->emask)) {
shutDownClient(cliPtr);
if (chanSock_(cliPtr->chanfd) != chan->handle)
continue;

if (chan->events & EPOLL_EVENTS_ERROR) {
shutDownClient(cliPtr);
return;
}
needFree = FALSE;
if (FD_ISSET(cliPtr->chanfd, &chanmask->rmask)) {

int saveChfd;
saveChfd = cliPtr->chanfd;
if (processClient(cliPtr, &needFree) == 0) {

FD_CLR(saveChfd, &chanmask->rmask);
if (needFree == TRUE) {
offList((struct listEntry *)cliPtr);
FREEUP(cliPtr->fromHost);
FREEUP(cliPtr);
}

/* The channel is not ready to be read yet
*/
if (! (chan->events & EPOLL_EVENTS_READ))
continue;
// needFree is unused... investigate
int needFree = FALSE;
if (processClient(cliPtr, &needFree) == 0) {

if (needFree == TRUE) {
offList((struct listEntry *)cliPtr);
FREEUP(cliPtr->fromHost);
FREEUP(cliPtr);
}
return;
}

}
}

Expand Down
6 changes: 5 additions & 1 deletion lsbatch/daemons/mbd.serv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ do_reconfigReq(XDR *xdrs,
if (!xdr_controlReq(xdrs, &mbdCtrlReq, reqHdr)) {
memset(&mbdCtrlReq, 0, sizeof(struct controlReq));
ls_syslog(LOG_WARNING, "%s: %s failed to decode comments on reconfig/mbdrestart", __func__, "xdr_controlReq");
}
}

xdrmem_create(&xdrs2, reply_buf, MSGSIZE, XDR_ENCODE);
replyHdr.opCode = LSBE_NO_ERROR;
Expand Down Expand Up @@ -2097,6 +2097,10 @@ doNewJobReply(struct sbdNode *sbdPtr, int exception)
} else {

sbdPtr->reqCode = MBD_NEW_JOB_KEEP_CHAN;
if (chanModEpoll_(sbdPtr->chanfd,
EPOLLOUT|EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) {
ls_syslog(LOG_ERR, "%s: chanModEpoll_() failed %m", __func__);
}
}
} else {

Expand Down
2 changes: 0 additions & 2 deletions lsbatch/daemons/sbd.comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,6 @@ msgSbd(LS_LONG_INT jobId, char *req, sbdReqType reqType, int (*xdrFunc)())
myhostnm = "localhost";
}



cc = call_server(myhostnm, sbd_port, requestBuf, XDR_GETPOS(&xdrs),
&reply_buf, &hdr, 100000, 0, NULL, NULL, NULL, 0);
if (cc < 0) {
Expand Down
Loading