From 9dece9dd5b8e48d04915cc7e867198ec9b30e986 Mon Sep 17 00:00:00 2001 From: bokassa Date: Sat, 19 Jul 2025 05:41:32 +0200 Subject: [PATCH 1/5] Add a modernized lex definition AC_PROG_LEX([yywrap]) --- configure.ac | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index 4bfe8b3..329ffc6 100644 --- a/configure.ac +++ b/configure.ac @@ -105,7 +105,7 @@ AC_PROG_CC AC_PROG_INSTALL AC_PROG_RANLIB AC_PROG_YACC -AC_PROG_LEX +AC_PROG_LEX([yywrap]) AC_PROG_LN_S AC_PROG_MAKE_SET From 3055aa626a8e5021e5db04ce6c82373c978d7e22 Mon Sep 17 00:00:00 2001 From: spagochitarra Date: Fri, 5 Sep 2025 17:57:09 -0400 Subject: [PATCH 2/5] Add service files. Modified .gitignore to accept the files in config. Remove a space from Makefile.am --- .gitignore | 1 - config/volclava-lim.service | 20 ++++++++++++++++++++ config/volclava-res.service | 19 +++++++++++++++++++ config/volclava-sbatchd.service | 19 +++++++++++++++++++ lsf/Makefile.am | 2 +- 5 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 config/volclava-lim.service create mode 100644 config/volclava-res.service create mode 100644 config/volclava-sbatchd.service diff --git a/.gitignore b/.gitignore index 568d7a4..c5f0a75 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,6 @@ configure depcomp install-sh missing -openlava-* config.h config.log config.status diff --git a/config/volclava-lim.service b/config/volclava-lim.service new file mode 100644 index 0000000..db91c40 --- /dev/null +++ b/config/volclava-lim.service @@ -0,0 +1,20 @@ +[Unit] +Description=Volclava LIM (Load Information Manager) +Wants=network-online.target +After=network-online.target +Before=volclava-res.service volclava-sbatchd.service + +[Service] +Type=forking +# Point LIM to the directory that contains lsf.conf and the daemons +Environment=LSF_ENVDIR=/opt/volclava/etc +ExecStart=/opt/volclava/sbin/lim +KillMode=control-group +GuessMainPID=no +Restart=on-failure +RestartSec=3 +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target + diff --git a/config/volclava-res.service b/config/volclava-res.service new file mode 100644 index 0000000..74cd1b0 --- /dev/null +++ b/config/volclava-res.service @@ -0,0 +1,19 @@ +[Unit] +Description=Volclava RES (Remote Execution Server) +Wants=network-online.target +After=network-online.target volclava-lim.service + +[Service] +Type=forking +Environment=LSF_ENVDIR=/opt/volclava/etc +Environment=LSF_SERVERDIR=/opt/volclava/sbin +ExecStart=/opt/volclava/sbin/res +KillMode=control-group +GuessMainPID=no +Restart=on-failure +RestartSec=3 +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target + diff --git a/config/volclava-sbatchd.service b/config/volclava-sbatchd.service new file mode 100644 index 0000000..8702b4e --- /dev/null +++ b/config/volclava-sbatchd.service @@ -0,0 +1,19 @@ +[Unit] +Description=Volclava sbatchd (Batch Daemon) +Wants=network-online.target +After=network-online.target volclava-lim.service volclava-res.service + +[Service] +Type=forking +Environment=LSF_ENVDIR=/opt/volclava/etc +Environment=LSF_SERVERDIR=/opt/volclava/sbin +ExecStart=/opt/volclava/sbin/sbatchd +KillMode=control-group +GuessMainPID=no +Restart=on-failure +RestartSec=3 +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target + diff --git a/lsf/Makefile.am b/lsf/Makefile.am index 7f11739..99749aa 100644 --- a/lsf/Makefile.am +++ b/lsf/Makefile.am @@ -2,7 +2,7 @@ # Copyright (C) 2011 openlava foundation # -include_HEADERS = lsf.h +include_HEADERS = lsf.h SUBDIRS = intlib lib lim pim lstools lsadm man From 8ae90d0499e455a45ceac8abc0f0a40f198f7918 Mon Sep 17 00:00:00 2001 From: spagochitarra Date: Sat, 6 Sep 2025 21:08:53 -0400 Subject: [PATCH 3/5] Add the software version in the path to the daemons. --- config/volclava-lim.service | 4 ++-- config/volclava-res.service | 5 ++--- config/volclava-sbatchd.service | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/config/volclava-lim.service b/config/volclava-lim.service index db91c40..b3e1acd 100644 --- a/config/volclava-lim.service +++ b/config/volclava-lim.service @@ -7,8 +7,8 @@ Before=volclava-res.service volclava-sbatchd.service [Service] Type=forking # Point LIM to the directory that contains lsf.conf and the daemons -Environment=LSF_ENVDIR=/opt/volclava/etc -ExecStart=/opt/volclava/sbin/lim +Environment=LSF_ENVDIR=/opt/volclava-2.0/etc +ExecStart=/opt/volclava-2.0/sbin/lim KillMode=control-group GuessMainPID=no Restart=on-failure diff --git a/config/volclava-res.service b/config/volclava-res.service index 74cd1b0..e75f1af 100644 --- a/config/volclava-res.service +++ b/config/volclava-res.service @@ -5,9 +5,8 @@ After=network-online.target volclava-lim.service [Service] Type=forking -Environment=LSF_ENVDIR=/opt/volclava/etc -Environment=LSF_SERVERDIR=/opt/volclava/sbin -ExecStart=/opt/volclava/sbin/res +Environment=LSF_ENVDIR=/opt/volclava-2.0/etc +ExecStart=/opt/volclava-2.0/sbin/res KillMode=control-group GuessMainPID=no Restart=on-failure diff --git a/config/volclava-sbatchd.service b/config/volclava-sbatchd.service index 8702b4e..387147c 100644 --- a/config/volclava-sbatchd.service +++ b/config/volclava-sbatchd.service @@ -5,9 +5,8 @@ After=network-online.target volclava-lim.service volclava-res.service [Service] Type=forking -Environment=LSF_ENVDIR=/opt/volclava/etc -Environment=LSF_SERVERDIR=/opt/volclava/sbin -ExecStart=/opt/volclava/sbin/sbatchd +Environment=LSF_ENVDIR=/opt/volclava-2.0/etc +ExecStart=/opt/volclava-2.0/sbin/sbatchd KillMode=control-group GuessMainPID=no Restart=on-failure From 13dcb8e1d4ae6af5150ee8f87d9b7dd10557b9e9 Mon Sep 17 00:00:00 2001 From: spagochitarra Date: Mon, 29 Sep 2025 11:11:09 -0400 Subject: [PATCH 4/5] emacs reformat the lsb.job.c file. --- lsbatch/lib/lsb.jobs.c | 240 ++++++++++++++++++++--------------------- 1 file changed, 120 insertions(+), 120 deletions(-) diff --git a/lsbatch/lib/lsb.jobs.c b/lsbatch/lib/lsb.jobs.c index 131a485..9cf2259 100644 --- a/lsbatch/lib/lsb.jobs.c +++ b/lsbatch/lib/lsb.jobs.c @@ -41,7 +41,7 @@ lsb_openjobinfo (LS_LONG_INT jobId, char *jobName, char *userName, struct jobInfoHead *jobInfoHead; jobInfoHead = lsb_openjobinfo_a (jobId, jobName, userName, queueName, - hostName, options); + hostName, options); if (!jobInfoHead) return (-1); return (jobInfoHead->numJobs); @@ -50,7 +50,7 @@ lsb_openjobinfo (LS_LONG_INT jobId, char *jobName, char *userName, struct jobInfoHead * lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, - char *queueName, char *hostName, int options) + char *queueName, char *hostName, int options) { static int first = TRUE; static struct jobInfoReq jobInfoReq; @@ -64,9 +64,9 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, char lsfUserName[MAXLINELEN]; if (first) { if ( !(jobInfoReq.jobName = (char *) malloc(MAX_CMD_DESC_LEN)) - || !(jobInfoReq.queue = (char *) malloc(MAX_LSB_NAME_LEN)) - || !(jobInfoReq.userName = (char *) malloc(MAX_LSB_NAME_LEN)) - || !(jobInfoReq.host = (char *) malloc(MAXHOSTNAMELEN))) { + || !(jobInfoReq.queue = (char *) malloc(MAX_LSB_NAME_LEN)) + || !(jobInfoReq.userName = (char *) malloc(MAX_LSB_NAME_LEN)) + || !(jobInfoReq.host = (char *) malloc(MAXHOSTNAMELEN))) { lsberrno = LSBE_SYS_CALL; return(NULL); } @@ -80,7 +80,7 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, lsberrno = LSBE_BAD_QUEUE; return(NULL); } - TIMEIT(1, strcpy(jobInfoReq.queue, queueName), "strcpy"); + TIMEIT(1, strcpy(jobInfoReq.queue, queueName), "strcpy"); } if (hostName == NULL) @@ -93,28 +93,28 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, } else { struct hostent *hp; - TIMEIT(0, (hp = Gethostbyname_(hostName)), "getHostOfficialByName_"); - if (hp != NULL) { - struct hostInfo *hostinfo; + TIMEIT(0, (hp = Gethostbyname_(hostName)), "getHostOfficialByName_"); + if (hp != NULL) { + struct hostInfo *hostinfo; char officialNameBuf[MAXHOSTNAMELEN]; strcpy(officialNameBuf, hp->h_name); - hostinfo = ls_gethostinfo("-", + hostinfo = ls_gethostinfo("-", NULL, (char **)&hp->h_name, 1, LOCAL_ONLY); - if (hostinfo == NULL) { - strcpy(jobInfoReq.host, hostName); - } else { - strcpy(jobInfoReq.host, officialNameBuf); - } + if (hostinfo == NULL) { + strcpy(jobInfoReq.host, hostName); + } else { + strcpy(jobInfoReq.host, officialNameBuf); + } } else { if (strlen (hostName) >= MAXHOSTNAMELEN - 1) { lsberrno = LSBE_BAD_HOST; return(NULL); } - strcpy(jobInfoReq.host, hostName); + strcpy(jobInfoReq.host, hostName); } } @@ -127,30 +127,30 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, lsberrno = LSBE_BAD_JOB; return(NULL); } - strcpy(jobInfoReq.jobName, jobName); + strcpy(jobInfoReq.jobName, jobName); } if (userName == NULL ) { TIMEIT(0, (cc = getLSFUser_(lsfUserName, MAXLINELEN)), "getLSFUser_"); if (cc != 0) { - return (NULL); + return (NULL); } - TIMEIT(1, strcpy(jobInfoReq.userName, lsfUserName), "strcpy"); + TIMEIT(1, strcpy(jobInfoReq.userName, lsfUserName), "strcpy"); } else { if (strlen (userName) >= MAX_LSB_NAME_LEN - 1) { lsberrno = LSBE_BAD_USER; return(NULL); } - strcpy(jobInfoReq.userName, userName); + strcpy(jobInfoReq.userName, userName); } if ((options & ~(JOBID_ONLY | JOBID_ONLY_ALL | HOST_NAME | NO_PEND_REASONS)) == 0) - jobInfoReq.options = CUR_JOB; + jobInfoReq.options = CUR_JOB; else jobInfoReq.options = options; if (jobId < 0) { - lsberrno = LSBE_BAD_ARG; - return(NULL); + lsberrno = LSBE_BAD_ARG; + return(NULL); } jobInfoReq.jobId = jobId; @@ -160,7 +160,7 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, hdr.opCode = mbdReqtype; TIMEIT(1, (aa = xdr_encodeMsg(&xdrs, (char *) &jobInfoReq , &hdr, - xdr_jobInfoReq, 0, NULL)), "xdr_encodeMsg"); + xdr_jobInfoReq, 0, NULL)), "xdr_encodeMsg"); if (aa == FALSE) { lsberrno = LSBE_XDR; return(NULL); @@ -169,10 +169,10 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, TIMEIT(0, (cc = callmbd (clusterName, request_buf, XDR_GETPOS(&xdrs), - &reply_buf, &hdr, &mbdSock, NULL, NULL)), "callmbd"); + &reply_buf, &hdr, &mbdSock, NULL, NULL)), "callmbd"); if (cc == -1) { xdr_destroy(&xdrs); - return (NULL); + return (NULL); } xdr_destroy(&xdrs); @@ -183,22 +183,22 @@ lsb_openjobinfo_a (LS_LONG_INT jobId, char *jobName, char *userName, if (lsberrno == LSBE_NO_ERROR) { - xdrmem_create(&xdrs2, reply_buf, XDR_DECODE_SIZE_(cc), XDR_DECODE); - if (! xdr_jobInfoHead (&xdrs2, &jobInfoHead, &hdr)) { - lsberrno = LSBE_XDR; + xdrmem_create(&xdrs2, reply_buf, XDR_DECODE_SIZE_(cc), XDR_DECODE); + if (! xdr_jobInfoHead (&xdrs2, &jobInfoHead, &hdr)) { + lsberrno = LSBE_XDR; xdr_destroy(&xdrs2); - if (cc) - free(reply_buf); - return(NULL); + if (cc) + free(reply_buf); + return(NULL); } - xdr_destroy(&xdrs2); - if (cc) - free(reply_buf); + xdr_destroy(&xdrs2); + if (cc) + free(reply_buf); return (&jobInfoHead); } if (cc) - free(reply_buf); + free(reply_buf); return(NULL); } @@ -221,45 +221,45 @@ lsb_readjobinfo(int *more) TIMEIT(0, (num = readNextPacket(&buffer, _lsb_recvtimeout, &hdr, - mbdSock)), "readNextPacket"); + mbdSock)), "readNextPacket"); if (num < 0) { - closeSession(mbdSock); + closeSession(mbdSock); lsberrno = LSBE_EOF; - return NULL; + return NULL; } if (first) { - if ( (submitReq.fromHost = malloc(MAXHOSTNAMELEN)) == NULL - || (submitReq.jobFile = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.inFile = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.outFile = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.errFile = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.inFileSpool = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.commandSpool = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.hostSpec = malloc(MAXHOSTNAMELEN)) == NULL - || (submitReq.chkpntDir = malloc(MAXFILENAMELEN)) == NULL - || (submitReq.subHomeDir = malloc(MAXFILENAMELEN)) == NULL - || (jobInfoReply.userName = malloc(MAXLSFNAMELEN)) == NULL - || (submitReq.cwd = malloc(MAXFILENAMELEN)) == NULL) { - lsberrno = LSBE_NO_MEM; - FREEUP(submitReq.fromHost); - FREEUP(submitReq.jobFile); - FREEUP(submitReq.inFile); - FREEUP(submitReq.outFile); - FREEUP(submitReq.errFile); - FREEUP(submitReq.inFileSpool); - FREEUP(submitReq.commandSpool); - FREEUP(submitReq.hostSpec); - FREEUP(submitReq.chkpntDir); - FREEUP(submitReq.subHomeDir); - FREEUP(jobInfoReply.userName); - FREEUP(submitReq.cwd); - - free(buffer); - return NULL; - } - - submitReq.xf = NULL; + if ( (submitReq.fromHost = malloc(MAXHOSTNAMELEN)) == NULL + || (submitReq.jobFile = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.inFile = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.outFile = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.errFile = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.inFileSpool = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.commandSpool = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.hostSpec = malloc(MAXHOSTNAMELEN)) == NULL + || (submitReq.chkpntDir = malloc(MAXFILENAMELEN)) == NULL + || (submitReq.subHomeDir = malloc(MAXFILENAMELEN)) == NULL + || (jobInfoReply.userName = malloc(MAXLSFNAMELEN)) == NULL + || (submitReq.cwd = malloc(MAXFILENAMELEN)) == NULL) { + lsberrno = LSBE_NO_MEM; + FREEUP(submitReq.fromHost); + FREEUP(submitReq.jobFile); + FREEUP(submitReq.inFile); + FREEUP(submitReq.outFile); + FREEUP(submitReq.errFile); + FREEUP(submitReq.inFileSpool); + FREEUP(submitReq.commandSpool); + FREEUP(submitReq.hostSpec); + FREEUP(submitReq.chkpntDir); + FREEUP(submitReq.subHomeDir); + FREEUP(jobInfoReply.userName); + FREEUP(submitReq.cwd); + + free(buffer); + return NULL; + } + + submitReq.xf = NULL; submitReq.nxf = 0; jobInfoReply.numToHosts = 0; submitReq.numAskedHosts = 0; @@ -269,15 +269,15 @@ lsb_readjobinfo(int *more) jobInfoReply.jobBill = &submitReq; if (jobInfoReply.numToHosts > 0) { - for (i=0; irLimits[i]; + jobInfo.submit.rLimits[i] = jobInfoReply.jobBill->rLimits[i]; } jobInfo.submit.hostSpec = jobInfoReply.jobBill->hostSpec; jobInfo.submit.sigValue = jobInfoReply.jobBill->sigValue; @@ -407,7 +407,7 @@ lsb_readjobinfo(int *more) jobInfo.chargedSAAP = jobInfoReply.chargedSAAP; if (more) - *more = hdr.reserved; + *more = hdr.reserved; return &jobInfo; } @@ -416,7 +416,7 @@ lsb_readjobinfo(int *more) void lsb_closejobinfo() { - closeSession(mbdSock); + closeSession(mbdSock); } int @@ -433,27 +433,27 @@ lsb_runjob(struct runJobRequest* runJobRequest) if (runJobRequest == NULL - || runJobRequest->numHosts == 0 - || runJobRequest->hostname == NULL - || runJobRequest->jobId < 0 - || ( runJobRequest->options != 0 - && ! (runJobRequest->options & - (RUNJOB_OPT_NORMAL | RUNJOB_OPT_NOSTOP)))) + || runJobRequest->numHosts == 0 + || runJobRequest->hostname == NULL + || runJobRequest->jobId < 0 + || ( runJobRequest->options != 0 + && ! (runJobRequest->options & + (RUNJOB_OPT_NORMAL | RUNJOB_OPT_NOSTOP)))) { - lsberrno = LSBE_BAD_ARG; - return(-1); + lsberrno = LSBE_BAD_ARG; + return(-1); } if (!( runJobRequest->options & (RUNJOB_OPT_NORMAL | RUNJOB_OPT_NOSTOP))) { - runJobRequest->options |= RUNJOB_OPT_NORMAL; + runJobRequest->options |= RUNJOB_OPT_NORMAL; } if (authTicketTokens_(&auth, NULL) == -1) { - lsberrno = LSBE_LSBLIB; - return (-1); + lsberrno = LSBE_LSBLIB; + return (-1); } @@ -461,37 +461,37 @@ lsb_runjob(struct runJobRequest* runJobRequest) xdrmem_create(&xdrs, - request_buf, - MSGSIZE/2, - XDR_ENCODE); + request_buf, + MSGSIZE/2, + XDR_ENCODE); initLSFHeader_(&lsfHeader); lsfHeader.opCode = mbdReqType; if (!xdr_encodeMsg(&xdrs, - (char *)runJobRequest, - &lsfHeader, - xdr_runJobReq, - 0, - &auth)) { - lsberrno = LSBE_XDR; - xdr_destroy(&xdrs); - return(-1); + (char *)runJobRequest, + &lsfHeader, + xdr_runJobReq, + 0, + &auth)) { + lsberrno = LSBE_XDR; + xdr_destroy(&xdrs); + return(-1); } if ((cc = callmbd(NULL, - request_buf, - XDR_GETPOS(&xdrs), - &reply_buf, - &lsfHeader, - NULL, - NULL, - NULL)) == -1) { - xdr_destroy(&xdrs); - return(-1); + request_buf, + XDR_GETPOS(&xdrs), + &reply_buf, + &lsfHeader, + NULL, + NULL, + NULL)) == -1) { + xdr_destroy(&xdrs); + return(-1); } @@ -501,9 +501,9 @@ lsb_runjob(struct runJobRequest* runJobRequest) lsberrno = lsfHeader.opCode; if (lsberrno == LSBE_NO_ERROR) - retVal = 0; + retVal = 0; else - retVal = -1; + retVal = -1; return(retVal); @@ -516,11 +516,11 @@ lsb_jobid2str (LS_LONG_INT jobId) if (LSB_ARRAY_IDX(jobId) == 0) { - sprintf(string, "%d", LSB_ARRAY_JOBID(jobId)); + sprintf(string, "%d", LSB_ARRAY_JOBID(jobId)); } else { - sprintf(string, "%d[%d]", LSB_ARRAY_JOBID(jobId), - LSB_ARRAY_IDX(jobId)); + sprintf(string, "%d[%d]", LSB_ARRAY_JOBID(jobId), + LSB_ARRAY_IDX(jobId)); } return(string); From 92bff63d70fd2da7ecaf2c6b75e389fdfe072e75 Mon Sep 17 00:00:00 2001 From: spagochitarra Date: Mon, 29 Sep 2025 11:11:09 -0400 Subject: [PATCH 5/5] epoll integration: Complete network I/O overhaul across Volclava components This commit consolidates comprehensive epoll integration across multiple Volclava components, replacing traditional socket handling with efficient event-driven I/O: Key Features: - Register/deregister epoll channels with static openSock() and %m logging - epollChan_() implementation working across lim, lstools, and API libraries - Proper listening socket cleanup on master restart during request processing - Fixed epoll timeout handling for reliable operation - Enabled epoll support in mbatchd and sbatchd network communication - Create epoll_fd post-fork to prevent descriptor reset in child processes - Non-blocking channel reads when events are ready Code Quality: - Reformatted lsb.job.c for consistency - Cleaned up and untabified messy code sections for maintainability The implementation provides scalable, non-blocking I/O handling that properly manages file descriptors across process boundaries and ensures robust error handling throughout the event loop. --- config/Makefile.am | 12 +- lsbatch/daemons/daemons.c | 18 +- lsbatch/daemons/mbd.main.c | 83 +++-- lsbatch/daemons/mbd.serv.c | 6 +- lsbatch/daemons/sbd.comm.c | 2 - lsbatch/daemons/sbd.main.c | 649 +++++++++++++++++++------------------ lsbatch/daemons/sbd.misc.c | 90 ++--- lsbatch/lib/lsb.comm.c | 12 +- lsf/lib/lib.channel.c | 308 +++++++++++++++++- lsf/lib/lib.channel.h | 81 +++-- lsf/lim/lim.cluster.c | 37 +-- lsf/lim/lim.h | 2 +- lsf/lim/lim.main.c | 175 +++++----- 13 files changed, 910 insertions(+), 565 deletions(-) diff --git a/config/Makefile.am b/config/Makefile.am index 878f2b2..5e7c5d8 100644 --- a/config/Makefile.am +++ b/config/Makefile.am @@ -3,16 +3,20 @@ # Copyright (C) David Bigagli # -# Install configuration files for volclava base and batch +# Install configuration files for volclava base and batch # in the conf directory. -# Install the shell profile scrips and the system startup +# Install the shell profile scrips and the system startup # script in the etc directory. etcdir = $(prefix)/etc config_files = volclava volclava.sh volclava.csh volclava.setup \ lsf.conf lsf.cluster.@volclavacluster@ lsf.shared lsf.task \ - lsb.hosts lsb.params lsb.queues lsb.users +<<<<<<< HEAD + lsb.hosts lsb.params lsb.queues lsb.users +======= + lsb.hosts lsb.params lsb.queues lsb.users +>>>>>>> abf430c (epoll integration: Complete network I/O overhaul across Volclava components) INTERACTIVE=@INTERACTIVE@ @@ -60,7 +64,7 @@ install-data-local: mkdir -p $(etcdir);\ for file in $(config_files); do \ install -m 644 "$(srcdir)/$$file" "$(etcdir)/"; \ - done; \ + done; \ fi clean-local: diff --git a/lsbatch/daemons/daemons.c b/lsbatch/daemons/daemons.c index 7b49e78..ff1ad86 100644 --- a/lsbatch/daemons/daemons.c +++ b/lsbatch/daemons/daemons.c @@ -109,6 +109,16 @@ init_ServSock(u_short port) return -1; } + if (chanEpollInit_() < 0) { + ls_syslog(LOG_ERR, "%s: chanEpollInit_() failed %m", __func__); + return -1; + } + + if (chanRegisterEpoll_(ch, EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) { + ls_syslog(LOG_ERR, "%s: chanRegisterEpoll_() failed %m", __func__); + return-1; + } + return ch; } @@ -172,7 +182,13 @@ do_readyOp(XDR *xdrs, int chanfd, struct sockaddr_in *from, xdr_destroy(&xdrs2); return(-1); } - + if (chanModEpoll_(chanfd, EPOLLOUT|EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) { + ls_syslog(LOG_ERR, "%s: chanModEpoll_() failed %m", __func__); + /* xdr_destroy is NOOP it is just a logical operation + */ + xdr_destroy(&xdrs2); + return -1; + } xdr_destroy(&xdrs2); return(0); } diff --git a/lsbatch/daemons/mbd.main.c b/lsbatch/daemons/mbd.main.c index 11d9c3b..b787ce1 100644 --- a/lsbatch/daemons/mbd.main.c +++ b/lsbatch/daemons/mbd.main.c @@ -170,7 +170,7 @@ static int authRequest(struct lsfAuth *, XDR *, struct LSFHeader *, char *, int); static int processClient(struct clientNode *, int *); -static void clientIO(struct Masks *); +static void clientIO(struct chanData *, int); static int forkOnRequest(mbdReqType); static void shutdownSbdConnections(void); static void processSbdNode(struct sbdNode *, int); @@ -189,12 +189,8 @@ extern int initLimSock_(void); int main (int argc, char **argv) { - fd_set readmask; - struct Masks sockmask; - struct Masks chanmask; struct timeval timeout; int nready; - int i; int cc; int hsKeeping = FALSE; time_t lastPeriodicCheckTime = 0; @@ -248,7 +244,7 @@ main (int argc, char **argv) } if (debug < 2 && !lsb_CheckMode) { - for (i = sysconf(_SC_OPEN_MAX) ; i >= 3 ; i--) + for (int i = sysconf(_SC_OPEN_MAX) ; i >= 3 ; i--) close(i); } @@ -352,7 +348,7 @@ main (int argc, char **argv) TIMEIT(0, minit(FIRST_START),"minit"); masterHost = ls_getmastername(); - for (i = 0; i < 3 && !masterHost && lserrno == LSE_TIME_OUT; i++) { + for (int i = 0; i < 3 && !masterHost && lserrno == LSE_TIME_OUT; i++) { millisleep_(6000); masterHost = ls_getmastername(); } @@ -417,8 +413,6 @@ main (int argc, char **argv) for (;;) { int maxfd; - FD_ZERO(&readmask); - maxfd = sysconf(_SC_OPEN_MAX); now = time(0); @@ -445,9 +439,8 @@ main (int argc, char **argv) timeout.tv_sec = 0; } - sockmask.rmask = readmask; - - nready = chanSelect_(&sockmask, &chanmask, &timeout); + int tm = timeout.tv_sec * 1000; + nready = chanEpoll_(tm); if (nready < 0) { if (errno != EINTR) ls_syslog(LOG_ERR, "\ @@ -479,12 +472,16 @@ main (int argc, char **argv) timeout.tv_sec = 0; timeout.tv_usec = 0; - if (FD_ISSET(batchSock, &chanmask.rmask)) { - acceptConnection(batchSock); - } + for (int i = 0; i < nready; i++) { + int ch = epoll_events[i].data.u32; - clientIO(&chanmask); + if (chanSock_(batchSock) == channels[ch].handle) { + acceptConnection(batchSock); + continue; + } + clientIO(&channels[ch], ch); + } } /* for (;;) */ } @@ -537,13 +534,12 @@ acceptConnection(int socket) } static void -clientIO(struct Masks *chanmask) +clientIO(struct chanData *chan, int chfd) { struct clientNode *cliPtr; struct clientNode *nextClient; struct sbdNode *sbdPtr; struct sbdNode *nextSbdPtr; - int exception; if (logclass & LC_TRACE) ls_syslog(LOG_DEBUG,"clientIO: Entering..."); @@ -553,44 +549,47 @@ clientIO(struct Masks *chanmask) sbdPtr = nextSbdPtr) { nextSbdPtr = sbdPtr->forw; - if (FD_ISSET(sbdPtr->chanfd, &chanmask->rmask) - || FD_ISSET(sbdPtr->chanfd, &chanmask->emask)) { + if (chanSock_(sbdPtr->chanfd) != chan->handle) + continue; - if (FD_ISSET(sbdPtr->chanfd, &chanmask->emask)) - exception = TRUE; - else - exception = FALSE; + if (chan->events & EPOLL_EVENTS_READ + || chan->events & EPOLL_EVENTS_ERROR) { + int exception = (chan->events & EPOLL_EVENTS_ERROR) != 0; processSbdNode(sbdPtr, exception); + // we found and handled the matching SBD; stop scanning + return; } } - + // Find the client which corresponds to this channel for (cliPtr = clientList->forw; cliPtr != clientList; cliPtr = nextClient) { - int needFree; nextClient = cliPtr->forw; - if (FD_ISSET(cliPtr->chanfd, &chanmask->emask)) { - shutDownClient(cliPtr); + if (chanSock_(cliPtr->chanfd) != chan->handle) continue; + + if (chan->events & EPOLL_EVENTS_ERROR) { + shutDownClient(cliPtr); + return; } - needFree = FALSE; - if (FD_ISSET(cliPtr->chanfd, &chanmask->rmask)) { - - int saveChfd; - saveChfd = cliPtr->chanfd; - if (processClient(cliPtr, &needFree) == 0) { - - FD_CLR(saveChfd, &chanmask->rmask); - if (needFree == TRUE) { - offList((struct listEntry *)cliPtr); - FREEUP(cliPtr->fromHost); - FREEUP(cliPtr); - } + + /* The channel is not ready to be read yet + */ + if (! (chan->events & EPOLL_EVENTS_READ)) + continue; + // needFree is unused... investigate + int needFree = FALSE; + if (processClient(cliPtr, &needFree) == 0) { + + if (needFree == TRUE) { + offList((struct listEntry *)cliPtr); + FREEUP(cliPtr->fromHost); + FREEUP(cliPtr); } + return; } - } } diff --git a/lsbatch/daemons/mbd.serv.c b/lsbatch/daemons/mbd.serv.c index b07fb2d..558cfe1 100644 --- a/lsbatch/daemons/mbd.serv.c +++ b/lsbatch/daemons/mbd.serv.c @@ -1724,7 +1724,7 @@ do_reconfigReq(XDR *xdrs, if (!xdr_controlReq(xdrs, &mbdCtrlReq, reqHdr)) { memset(&mbdCtrlReq, 0, sizeof(struct controlReq)); ls_syslog(LOG_WARNING, "%s: %s failed to decode comments on reconfig/mbdrestart", __func__, "xdr_controlReq"); - } + } xdrmem_create(&xdrs2, reply_buf, MSGSIZE, XDR_ENCODE); replyHdr.opCode = LSBE_NO_ERROR; @@ -2097,6 +2097,10 @@ doNewJobReply(struct sbdNode *sbdPtr, int exception) } else { sbdPtr->reqCode = MBD_NEW_JOB_KEEP_CHAN; + if (chanModEpoll_(sbdPtr->chanfd, + EPOLLOUT|EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) { + ls_syslog(LOG_ERR, "%s: chanModEpoll_() failed %m", __func__); + } } } else { diff --git a/lsbatch/daemons/sbd.comm.c b/lsbatch/daemons/sbd.comm.c index 67d3352..c099c85 100644 --- a/lsbatch/daemons/sbd.comm.c +++ b/lsbatch/daemons/sbd.comm.c @@ -525,8 +525,6 @@ msgSbd(LS_LONG_INT jobId, char *req, sbdReqType reqType, int (*xdrFunc)()) myhostnm = "localhost"; } - - cc = call_server(myhostnm, sbd_port, requestBuf, XDR_GETPOS(&xdrs), &reply_buf, &hdr, 100000, 0, NULL, NULL, NULL, 0); if (cc < 0) { diff --git a/lsbatch/daemons/sbd.main.c b/lsbatch/daemons/sbd.main.c index ff996a5..c60aca4 100644 --- a/lsbatch/daemons/sbd.main.c +++ b/lsbatch/daemons/sbd.main.c @@ -24,20 +24,20 @@ #include "sbd.h" #include "../../lsf/lib/lsi18n.h" -#define NL_SETN 11 +#define NL_SETN 11 #include -#define NL_SETN 11 +#define NL_SETN 11 extern void do_sbdDebug(XDR *xdrs, int chfd, struct LSFHeader *reqHdr); void sinit(void); void init_sstate(void); static void processMsg(struct clientNode *); -static void clientIO(struct Masks *); +static void clientIO(struct chanData *, int); static void houseKeeping(void); static int authCmdRequest(struct clientNode *client, XDR *xdrs, - struct LSFHeader *reqHdr); + struct LSFHeader *reqHdr); static int isLSFAdmin(struct lsfAuth *auth); #ifdef INTER_DAEMON_AUTH static int authMbdRequest(struct clientNode *, XDR *, struct LSFHeader *); @@ -131,7 +131,6 @@ main (int argc, char **argv) int nready, i; sigset_t oldsigmask, newmask; struct timeval timeout; - struct Masks sockmask, chanmask; int aopt; extern char *optarg; extern int opterr; @@ -144,18 +143,18 @@ main (int argc, char **argv) saveDaemonDir_(argv[0]); - for (i=1; i 1) { @@ -173,7 +172,7 @@ main (int argc, char **argv) } if( (daemonParams[LSB_STDOUT_DIRECT].paramValue != NULL) - &&(daemonParams[LSB_STDOUT_DIRECT].paramValue[0] == 'y' + &&(daemonParams[LSB_STDOUT_DIRECT].paramValue[0] == 'y' || daemonParams[LSB_STDOUT_DIRECT].paramValue[0] == 'Y' ) ) { lsbStdoutDirect = TRUE; } @@ -185,8 +184,8 @@ main (int argc, char **argv) ls_syslog(LOG_WARNING, "\ %s: LSB_STDOUT_DIRECT configured as %s", fname, - (daemonParams[LSB_STDOUT_DIRECT].paramValue) - ? daemonParams[LSB_STDOUT_DIRECT].paramValue :"NULL"); + (daemonParams[LSB_STDOUT_DIRECT].paramValue) + ? daemonParams[LSB_STDOUT_DIRECT].paramValue :"NULL"); opterr = 0; while ((aopt = getopt(argc, argv, "hV:d:123")) != EOF) { @@ -199,7 +198,7 @@ main (int argc, char **argv) break; case 'd': env_dir = optarg; - break; + break; case 'h': default: fprintf(stderr, @@ -212,9 +211,9 @@ main (int argc, char **argv) if (!debug && isint_(daemonParams[LSB_DEBUG].paramValue)) { - debug = atoi(daemonParams[LSB_DEBUG].paramValue); - if (debug <= 0 || debug > 3) - debug = 1; + debug = atoi(daemonParams[LSB_DEBUG].paramValue); + if (debug <= 0 || debug > 3) + debug = 1; } @@ -229,29 +228,29 @@ main (int argc, char **argv) if (debug == 0) { if(getuid() != 0) { fprintf(stderr, _i18n_msg_get(ls_catd , NL_SETN, 2, - "%s: Real uid is %d, not root\n"), /* catgets 2 */ - argv[0], (int)getuid()); + "%s: Real uid is %d, not root\n"), /* catgets 2 */ + argv[0], (int)getuid()); exit(1); } if(geteuid() != 0) { fprintf(stderr, _i18n_msg_get(ls_catd , NL_SETN, 3, - "%s: Effective uid is %d, not root\n"), /* catgets 3 */ - argv[0], (int)geteuid()); + "%s: Effective uid is %d, not root\n"), /* catgets 3 */ + argv[0], (int)geteuid()); exit(1); } } else { - if(getuid() == 0) { - fprintf(stderr, _i18n_msg_get(ls_catd , NL_SETN, 4, - "%s: root cannot run in debug mode\n"), /* catgets 4 */ - argv[0]); - exit(1); - } + if(getuid() == 0) { + fprintf(stderr, _i18n_msg_get(ls_catd , NL_SETN, 4, + "%s: root cannot run in debug mode\n"), /* catgets 4 */ + argv[0]); + exit(1); + } } (void)umask(022); if (debug < 2) { - daemonize_(); + daemonize_(); } @@ -261,10 +260,10 @@ main (int argc, char **argv) if (debug > 1) ls_openlog("sbatchd", daemonParams[LSF_LOGDIR].paramValue, TRUE, - daemonParams[LSF_LOG_MASK].paramValue); + daemonParams[LSF_LOG_MASK].paramValue); else ls_openlog("sbatchd", daemonParams[LSF_LOGDIR].paramValue, FALSE, - daemonParams[LSF_LOG_MASK].paramValue); + daemonParams[LSF_LOG_MASK].paramValue); if (logclass) ls_syslog(LOG_DEBUG3, "%s: logclass=%x", fname, logclass); @@ -296,41 +295,41 @@ main (int argc, char **argv) getpwnamRetry = atoi(daemonParams[LSF_GETPWNAM_RETRY].paramValue); if (daemonParams[LSB_MEMLIMIT_ENFORCE].paramValue != NULL) { - if (!strcasecmp(daemonParams[LSB_MEMLIMIT_ENFORCE].paramValue, "y")) - { + if (!strcasecmp(daemonParams[LSB_MEMLIMIT_ENFORCE].paramValue, "y")) + { lsbMemEnforce = TRUE; - } + } } lsbJobCpuLimit = -1; if (daemonParams[LSB_JOB_CPULIMIT].paramValue != NULL) { - if (!strcasecmp(daemonParams[LSB_JOB_CPULIMIT].paramValue, "y")) - { - lsbJobCpuLimit = 1; - } else if (!strcasecmp(daemonParams[LSB_JOB_CPULIMIT].paramValue, - "n")) { - lsbJobCpuLimit = 0; - } else { - ls_syslog(LOG_ERR, I18N(5001, - "%s: LSB_JOB_CPULIMIT <%s> in lsf.conf is invalid. Valid values are y|Y or n|N; ignoring the parameter."), - fname, daemonParams[LSB_JOB_CPULIMIT].paramValue); /* catgets 5001 */ - - } + if (!strcasecmp(daemonParams[LSB_JOB_CPULIMIT].paramValue, "y")) + { + lsbJobCpuLimit = 1; + } else if (!strcasecmp(daemonParams[LSB_JOB_CPULIMIT].paramValue, + "n")) { + lsbJobCpuLimit = 0; + } else { + ls_syslog(LOG_ERR, I18N(5001, + "%s: LSB_JOB_CPULIMIT <%s> in lsf.conf is invalid. Valid values are y|Y or n|N; ignoring the parameter."), + fname, daemonParams[LSB_JOB_CPULIMIT].paramValue); /* catgets 5001 */ + + } } lsbJobMemLimit = -1; if (daemonParams[LSB_JOB_MEMLIMIT].paramValue != NULL) { - if (!strcasecmp(daemonParams[LSB_JOB_MEMLIMIT].paramValue, "y")) - { - lsbJobMemLimit = 1; - } else if (!strcasecmp(daemonParams[LSB_JOB_MEMLIMIT].paramValue, - "n")) { - lsbJobMemLimit = 0; - } else { - ls_syslog(LOG_ERR, I18N(5002, - "%s: LSB_JOB_MEMLIMIT <%s> in lsf.conf is invalid. Valid values are y|Y or n|N; ignoring the parameter."), - fname, daemonParams[LSB_JOB_MEMLIMIT].paramValue); /* catgets 5002 */ - } + if (!strcasecmp(daemonParams[LSB_JOB_MEMLIMIT].paramValue, "y")) + { + lsbJobMemLimit = 1; + } else if (!strcasecmp(daemonParams[LSB_JOB_MEMLIMIT].paramValue, + "n")) { + lsbJobMemLimit = 0; + } else { + ls_syslog(LOG_ERR, I18N(5002, + "%s: LSB_JOB_MEMLIMIT <%s> in lsf.conf is invalid. Valid values are y|Y or n|N; ignoring the parameter."), + fname, daemonParams[LSB_JOB_MEMLIMIT].paramValue); /* catgets 5002 */ + } } now = time(0); @@ -343,7 +342,7 @@ main (int argc, char **argv) while ((allLsInfo = ls_info()) == NULL) { - millisleep_ (6000); + millisleep_ (6000); } @@ -377,148 +376,152 @@ main (int argc, char **argv) struct sockaddr_in from; struct clientNode *client; - sigemptyset(&newmask); - sigaddset(&newmask, SIGCHLD); - sigprocmask(SIG_BLOCK, &newmask, NULL); + sigemptyset(&newmask); + sigaddset(&newmask, SIGCHLD); + sigprocmask(SIG_BLOCK, &newmask, NULL); - /* job_checking is exclusive*/ + /* job_checking is exclusive*/ if (!debug) - chdir(LSTMPDIR); + chdir(LSTMPDIR); - if (!delay_check) { - TIMEIT(1, job_checking(), "job_checking"); + if (!delay_check) { + TIMEIT(1, job_checking(), "job_checking"); status_report(); - } else { - timeout.tv_sec = sbdSleepTime/10; - } + } else { + timeout.tv_sec = sbdSleepTime/10; + } - if (failcnt && failcnt < 5) - timeout.tv_sec = sbdSleepTime/(5-failcnt); + if (failcnt && failcnt < 5) + timeout.tv_sec = sbdSleepTime/(5-failcnt); - sigprocmask(SIG_SETMASK, &oldsigmask, NULL); - /* unblock SIGCHLD */ + sigprocmask(SIG_SETMASK, &oldsigmask, NULL); + /* unblock SIGCHLD */ if (need_checkfinish) { need_checkfinish = FALSE; - TIMEIT(1, checkFinish(), "checkFinish"); + TIMEIT(1, checkFinish(), "checkFinish"); } - FD_ZERO(&sockmask.rmask); - - houseKeeping(); + houseKeeping(); - if (logclass & LC_COMM) - ls_syslog(LOG_DEBUG3, "Into select"); + if (logclass & LC_COMM) + ls_syslog(LOG_DEBUG3, "Into select"); - nready = chanSelect_(&sockmask, &chanmask, &timeout); + int tm = timeout.tv_sec * 1000; + nready = chanEpoll_(tm); - if (logclass & LC_COMM) - ls_syslog(LOG_DEBUG3, "Out of select: nready=%d", nready); + if (logclass & LC_COMM) + ls_syslog(LOG_DEBUG3, "Out of select: nready=%d", nready); now = time(0); if (nready < 0) { - if (errno == EINTR) - delay_check = FALSE; - else + if (errno == EINTR) + delay_check = FALSE; + else ls_syslog(LOG_ERR, I18N_FUNC_FAIL_M, fname, "select"); continue; } - if (sbdSleepTime < 0) { - ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5004, - "%s: Sleep time <%d> is not a non-negative integer; re-life"), fname, sbdSleepTime); /* catgets 5004 */ - relife(fname, " Sleep time"); - } + if (sbdSleepTime < 0) { + ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5004, + "%s: Sleep time <%d> is not a non-negative integer; re-life"), fname, sbdSleepTime); /* catgets 5004 */ + relife(fname, " Sleep time"); + } timeout.tv_sec = sbdSleepTime; - sigemptyset(&newmask); - sigaddset(&newmask, SIGCHLD); - sigprocmask(SIG_BLOCK, &newmask, NULL); - /* block SIGCHLD before all */ + sigemptyset(&newmask); + sigaddset(&newmask, SIGCHLD); + sigprocmask(SIG_BLOCK, &newmask, NULL); + /* block SIGCHLD before all */ if (nready == 0) { - if (delay_check) { - delay_check = FALSE; - continue; - } + if (delay_check) { + delay_check = FALSE; + continue; + } continue; } - if (statusChan >= 0 && (FD_ISSET(statusChan, &chanmask.rmask) || - FD_ISSET(statusChan, &chanmask.emask))) { - - if (logclass & LC_COMM) - ls_syslog(LOG_DEBUG, - "main: Exception on statusChan <%d>, rmask <%x>", - statusChan, chanmask.rmask); - chanClose_(statusChan); - statusChan = -1; - } - - if (!FD_ISSET(batchSock, &chanmask.rmask)) { - ls_syslog(LOG_DEBUG,"main: connection already known"); - clientIO(&chanmask); - continue; - } - - s = chanAccept_(batchSock, &from); - if (s == -1) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "chanAccept_"); - continue; - } + for (i = 0; i < nready; i++) { + int ch = epoll_events[i].data.u32; + struct chanData *chan = &channels[ch]; + if (statusChan >= 0 + && (chan->events & EPOLL_EVENTS_READ + || chan->events & EPOLL_EVENTS_ERROR)) { - client = (struct clientNode *)malloc(sizeof(struct clientNode)); - if (!client) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_M, fname, "malloc"); - ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5006, - "%s: Unable to accept connection")); /* catgets 5006 */ - chanClose_(s); - continue; - } + if (logclass & LC_COMM) + ls_syslog(LOG_DEBUG, + "main: Exception on statusChan <%d>, events <%x>", + statusChan, chan->events); + chanClose_(statusChan); + statusChan = -1; + } - client->chanfd = s; + if (chanSock_(batchSock) != chan->handle) { + ls_syslog(LOG_DEBUG,"main: connection already known"); + clientIO(chan, ch); + continue; + } - client->from = from; - client->jp = NULL; - client->jobId = -1; + s = chanAccept_(batchSock, &from); + if (s == -1) { + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "chanAccept_"); + continue; + } - inList( (struct listEntry *)clientList, (struct listEntry *) client); + client = (struct clientNode *)malloc(sizeof(struct clientNode)); + if (!client) { + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_M, fname, "malloc"); + ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5006, + "%s: Unable to accept connection")); /* catgets 5006 */ + chanClose_(s); + continue; + } - if (logclass & LC_COMM ) - ls_syslog(LOG_DEBUG, "%s: Accepted connection from host <%s> on channel <%d>", fname, sockAdd2Str_(&from), client->chanfd); + client->chanfd = s; + client->from = from; + client->jp = NULL; + client->jobId = -1; - clientIO(&chanmask); - } + inList( (struct listEntry *)clientList, (struct listEntry *) client); + continue; + + if (logclass & LC_COMM ) + ls_syslog(LOG_DEBUG, "%s: Accepted connection from host <%s> on channel <%d>", fname, sockAdd2Str_(&from), client->chanfd); + + clientIO(chan, ch); + } + } return(0); } static void -clientIO(struct Masks *chanmask) +clientIO(struct chanData *chan, int chfd) { struct clientNode *cliPtr, *nextClient; if (logclass & LC_TRACE) - ls_syslog(LOG_DEBUG,"clientIO: Entering..."); - + ls_syslog(LOG_DEBUG,"clientIO: Entering..."); for(cliPtr=clientList->forw; cliPtr != clientList; cliPtr=nextClient) { nextClient = cliPtr->forw; - if (FD_ISSET(cliPtr->chanfd, &chanmask->emask)) { + if (chanSock_(cliPtr->chanfd) != chan->handle) + continue; + + if (chan->events & EPOLL_EVENTS_ERROR) { shutDownClient(cliPtr); continue; } - if (FD_ISSET(cliPtr->chanfd, &chanmask->rmask)) { - processMsg(cliPtr); + if (chan->events & EPOLL_EVENTS_READ) { + processMsg(cliPtr); } - } } - static void processMsg(struct clientNode *client) { @@ -536,7 +539,7 @@ processMsg(struct clientNode *client) if (chanDequeue_(client->chanfd, &buf) < 0) { ls_syslog(LOG_ERR, I18N_FUNC_FAIL_ENO_D, fname, "chanDequeue_", - cherrno); + cherrno); shutDownClient(client); return; } @@ -546,7 +549,7 @@ processMsg(struct clientNode *client) if (!xdr_LSFHeader(&xdrs, &reqHdr)) { ls_syslog(LOG_ERR, I18N_FUNC_FAIL, fname, "xdr_LSFHeader"); shutDownClient(client); - xdr_destroy(&xdrs); + xdr_destroy(&xdrs); chanFreeBuf_(buf); return; } @@ -554,7 +557,7 @@ processMsg(struct clientNode *client) sbdReqtype = reqHdr.opCode; if (logclass & (LC_TRACE | LC_COMM)) - ls_syslog(LOG_DEBUG,"%s: received msg <%d>",fname, sbdReqtype); + ls_syslog(LOG_DEBUG,"%s: received msg <%d>",fname, sbdReqtype); if (sbdReqtype != PREPARE_FOR_OP) if (io_block_(s) < 0) @@ -563,74 +566,74 @@ processMsg(struct clientNode *client) if (sbdReqtype == MBD_NEW_JOB || sbdReqtype == MBD_SIG_JOB || - sbdReqtype == MBD_SWIT_JOB || sbdReqtype == MBD_PROBE || - sbdReqtype == MBD_REBOOT || sbdReqtype == MBD_SHUTDOWN || - sbdReqtype == MBD_MODIFY_JOB) { + sbdReqtype == MBD_SWIT_JOB || sbdReqtype == MBD_PROBE || + sbdReqtype == MBD_REBOOT || sbdReqtype == MBD_SHUTDOWN || + sbdReqtype == MBD_MODIFY_JOB) { #ifdef INTER_DAEMON_AUTH - if (daemonParams[LSF_AUTH_DAEMONS].paramValue) { - char *aux_file, aux_file_buf[MAXPATHLEN]; - - - putEauthAuxDataEnvVar(NULL); - - if (sbdReqtype == MBD_NEW_JOB ) { - aux_file = tempnam(LSTMPDIR, ".auxs"); - if (aux_file) { - putEauthAuxDataEnvVar(aux_file); - free(aux_file); - } - else { - - sprintf(aux_file_buf, "/tmp/.auxsbd_%lu", time(0)); - putEauthAuxDataEnvVar(aux_file_buf); - } - } - - if (authMbdRequest(client, &xdrs, &reqHdr) != LSBE_NO_ERROR) { - ls_syslog(LOG_ERR, I18N_FUNC_S_FAIL, fname, "authMbdRequest", - sockAdd2Str_(&client->from)); - shutDownClient(client); - xdr_destroy(&xdrs); - chanFreeBuf_(buf); - return; - } - } - else { + if (daemonParams[LSF_AUTH_DAEMONS].paramValue) { + char *aux_file, aux_file_buf[MAXPATHLEN]; + + + putEauthAuxDataEnvVar(NULL); + + if (sbdReqtype == MBD_NEW_JOB ) { + aux_file = tempnam(LSTMPDIR, ".auxs"); + if (aux_file) { + putEauthAuxDataEnvVar(aux_file); + free(aux_file); + } + else { + + sprintf(aux_file_buf, "/tmp/.auxsbd_%lu", time(0)); + putEauthAuxDataEnvVar(aux_file_buf); + } + } + + if (authMbdRequest(client, &xdrs, &reqHdr) != LSBE_NO_ERROR) { + ls_syslog(LOG_ERR, I18N_FUNC_S_FAIL, fname, "authMbdRequest", + sockAdd2Str_(&client->from)); + shutDownClient(client); + xdr_destroy(&xdrs); + chanFreeBuf_(buf); + return; + } + } + else { #endif - if (!portok(&client->from)) { - ls_syslog(LOG_ERR, - _i18n_msg_get(ls_catd , NL_SETN, 5010, - "%s: Received request %d from bad port <%s>"), /* catgets 5010 */ - fname, sbdReqtype, sockAdd2Str_(&client->from)); - shutDownClient(client); - xdr_destroy(&xdrs); - chanFreeBuf_(buf); - return; - } + if (!portok(&client->from)) { + ls_syslog(LOG_ERR, + _i18n_msg_get(ls_catd , NL_SETN, 5010, + "%s: Received request %d from bad port <%s>"), /* catgets 5010 */ + fname, sbdReqtype, sockAdd2Str_(&client->from)); + shutDownClient(client); + xdr_destroy(&xdrs); + chanFreeBuf_(buf); + return; + } #ifdef INTER_DAEMON_AUTH - } + } #endif if (get_new_master(&client->from) < 0) { errorBack(client->chanfd, LSBE_NOLSF_HOST, &client->from); - shutDownClient(client); - chanFreeBuf_(buf); - xdr_destroy(&xdrs); - return; + shutDownClient(client); + chanFreeBuf_(buf); + xdr_destroy(&xdrs); + return; } } else if (sbdReqtype == CMD_SBD_REBOOT || - sbdReqtype == CMD_SBD_SHUTDOWN || + sbdReqtype == CMD_SBD_SHUTDOWN || sbdReqtype == CMD_SBD_DEBUG) { - if ((cc = authCmdRequest(client, &xdrs, &reqHdr)) != LSBE_NO_ERROR) { - ls_syslog(LOG_ERR, I18N_FUNC_S_D_FAIL_M, fname, "authCmdRequest", - sockAdd2Str_(&client->from), sbdReqtype); + if ((cc = authCmdRequest(client, &xdrs, &reqHdr)) != LSBE_NO_ERROR) { + ls_syslog(LOG_ERR, I18N_FUNC_S_D_FAIL_M, fname, "authCmdRequest", + sockAdd2Str_(&client->from), sbdReqtype); errorBack(client->chanfd, cc, &client->from); - shutDownClient(client); - chanFreeBuf_(buf); - xdr_destroy(&xdrs); - return; - } + shutDownClient(client); + chanFreeBuf_(buf); + xdr_destroy(&xdrs); + return; + } } @@ -657,9 +660,9 @@ processMsg(struct clientNode *client) break; case MBD_MODIFY_JOB: - TIMEIT(2, do_modifyjob (&xdrs, client->chanfd, &reqHdr), "do_modifyjob"); - delay_check = TRUE; - break; + TIMEIT(2, do_modifyjob (&xdrs, client->chanfd, &reqHdr), "do_modifyjob"); + delay_check = TRUE; + break; case MBD_PROBE: TIMEIT(2, do_probe (&xdrs, client->chanfd, &reqHdr), "do_probe"); delay_check = TRUE; @@ -676,46 +679,46 @@ processMsg(struct clientNode *client) break; case CMD_SBD_DEBUG: - TIMEIT(2, do_sbdDebug(&xdrs, client->chanfd, &reqHdr), "do_sbdDebug"); - break; + TIMEIT(2, do_sbdDebug(&xdrs, client->chanfd, &reqHdr), "do_sbdDebug"); + break; case BATCH_JOB_MSG: - NEW_BUCKET(bucket,buf); - if (bucket) { - do_jobMsg(bucket, &xdrs, client->chanfd, &reqHdr); - } else { - ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5012, - "%s: BATCH_JOB_MSG newBucket failed: %m"), /* catgets 5012 */ - fname); - } - break; + NEW_BUCKET(bucket,buf); + if (bucket) { + do_jobMsg(bucket, &xdrs, client->chanfd, &reqHdr); + } else { + ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5012, + "%s: BATCH_JOB_MSG newBucket failed: %m"), /* catgets 5012 */ + fname); + } + break; case SBD_JOB_SETUP: - TIMEIT(2, do_jobSetup(&xdrs, client->chanfd, &reqHdr), "do_jobSetup"); - delay_check = TRUE; - break; + TIMEIT(2, do_jobSetup(&xdrs, client->chanfd, &reqHdr), "do_jobSetup"); + delay_check = TRUE; + break; case SBD_SYSLOG: - TIMEIT(4, do_jobSyslog(&xdrs, client->chanfd, &reqHdr), "do_jobSyslog");; - delay_check = TRUE; - break; + TIMEIT(4, do_jobSyslog(&xdrs, client->chanfd, &reqHdr), "do_jobSyslog");; + delay_check = TRUE; + break; case RM_CONNECT: - TIMEIT(2, do_rmConn(&xdrs, client->chanfd, &reqHdr, client), "do_rmConn"); - break; + TIMEIT(2, do_rmConn(&xdrs, client->chanfd, &reqHdr, client), "do_rmConn"); + break; case RM_JOB_MSG: - TIMEIT(2, do_lsbMsg(&xdrs, client->chanfd, &reqHdr), "do_lsbMsg"); - break; + TIMEIT(2, do_lsbMsg(&xdrs, client->chanfd, &reqHdr), "do_lsbMsg"); + break; default: ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5013, - "%s: Unknown request type <%d>"), /* catgets 5013 */ - fname,sbdReqtype); + "%s: Unknown request type <%d>"), /* catgets 5013 */ + fname,sbdReqtype); break; } xdr_destroy(&xdrs); chanFreeBuf_(buf); if (reqHdr.opCode != PREPARE_FOR_OP && - reqHdr.opCode != RM_CONNECT ) + reqHdr.opCode != RM_CONNECT ) shutDownClient(client); } @@ -729,9 +732,9 @@ shutDownClient(struct clientNode *client) offList((struct listEntry *)client); if (client->jp) { - client->jp->client = NULL; + client->jp->client = NULL; - client->jp->regOpFlag &= REG_RUSAGE; + client->jp->regOpFlag &= REG_RUSAGE; } FREEUP(client); @@ -749,26 +752,26 @@ start_master(void) ls_syslog(LOG_DEBUG, "%s: Entering this routine...", __func__); if (mbdStartedBySbd) { - switch (mbdExitVal) { - case MASTER_RECONFIG: - break; - case MASTER_RESIGN: - if (now - lastTime < mbdExitCnt * 60) - return; - break; - case MASTER_FATAL: - case MASTER_MEM: - case MASTER_CONF: - if (now - lastTime < mbdExitCnt * 30) - return; - break; - default: - if (now - lastTime < mbdExitCnt * CHECK_MBD_TIME) { - now = time(0); - return; - } - break; - } + switch (mbdExitVal) { + case MASTER_RECONFIG: + break; + case MASTER_RESIGN: + if (now - lastTime < mbdExitCnt * 60) + return; + break; + case MASTER_FATAL: + case MASTER_MEM: + case MASTER_CONF: + if (now - lastTime < mbdExitCnt * 30) + return; + break; + default: + if (now - lastTime < mbdExitCnt * CHECK_MBD_TIME) { + now = time(0); + return; + } + break; + } } lastTime = now; @@ -776,15 +779,15 @@ start_master(void) i = 1; if (debug) { - margv[i] = my_malloc(MAXFILENAMELEN, __func__); - sprintf(margv[i], "-%d", debug); - i++; + margv[i] = my_malloc(MAXFILENAMELEN, __func__); + sprintf(margv[i], "-%d", debug); + i++; } if (env_dir != NULL) { - margv[i] = "-d"; - i++; - margv[i] = env_dir; - i++; + margv[i] = "-d"; + i++; + margv[i] = env_dir; + i++; } margv[i] = NULL; @@ -796,18 +799,18 @@ start_master(void) } if (newMbdPid == 0) { - sigset_t newmask; + sigset_t newmask; - sigemptyset(&newmask); - sigprocmask(SIG_SETMASK, &newmask, NULL); + sigemptyset(&newmask); + sigprocmask(SIG_SETMASK, &newmask, NULL); closeBatchSocket(); - execve(margv[0], margv, environ); - ls_syslog(LOG_ERR, "\ + execve(margv[0], margv, environ); + ls_syslog(LOG_ERR, "\ %s: execve() failed %m", __func__); - lsb_mperr("Cannot execute mbatchd"); - exit(-1); + lsb_mperr("Cannot execute mbatchd"); + exit(-1); } if (debug) @@ -821,8 +824,8 @@ start_master(void) if (newMbdPid > 0) { mbdPid = newMbdPid; - mbdStartedBySbd = TRUE; - ls_syslog(LOG_NOTICE, "\ + mbdStartedBySbd = TRUE; + ls_syslog(LOG_NOTICE, "\ %s: Master [%d] started by sbatchd on host %s", __func__, mbdPid, myhostnm); } @@ -869,10 +872,10 @@ sinit (void) ls_syslog(LOG_DEBUG, "sbatchd/%s: Entering this routine...", fname); if (getBootTime(&bootTime) == -1) { - ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5021, - "%s: getBootTime() failed; assuming host was not rebooted while sbatchd was down"), fname); /* catgets 5021 */ - bootTime = 0; - die(SLAVE_FATAL); + ls_syslog(LOG_ERR, _i18n_msg_get(ls_catd , NL_SETN, 5021, + "%s: getBootTime() failed; assuming host was not rebooted while sbatchd was down"), fname); /* catgets 5021 */ + bootTime = 0; + die(SLAVE_FATAL); } Signal_(SIGALRM, SIG_IGN); @@ -885,27 +888,27 @@ sinit (void) Signal_(SIGPIPE, SIG_IGN); if (!debug) { - Signal_(SIGTTOU, SIG_IGN); - Signal_(SIGTTIN, SIG_IGN); - Signal_(SIGTSTP, SIG_IGN); + Signal_(SIGTTOU, SIG_IGN); + Signal_(SIGTTIN, SIG_IGN); + Signal_(SIGTSTP, SIG_IGN); } jobQueHead = (struct jobCard *) mkListHeader () ; clientList = (struct clientNode *) mkListHeader () ; if (!jobQueHead || !clientList ) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL, fname, "mkListHeader"); + ls_syslog(LOG_ERR, I18N_FUNC_FAIL, fname, "mkListHeader"); die(SLAVE_FATAL); } if ((clusterName = ls_getclustername()) == NULL) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_getclustername"); - while ((clusterName = ls_getclustername()) == NULL) - millisleep_(sbdSleepTime * 1000); + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_getclustername"); + while ((clusterName = ls_getclustername()) == NULL) + millisleep_(sbdSleepTime * 1000); } if ((masterHost = ls_getmastername()) == NULL) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_getmastername"); - while ((masterHost = ls_getmastername()) == NULL) - millisleep_(sbdSleepTime * 1000); + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_getmastername"); + while ((masterHost = ls_getmastername()) == NULL) + millisleep_(sbdSleepTime * 1000); } ls_syslog(LOG_INFO, (_i18n_msg_get(ls_catd , NL_SETN, 5043, "\ %s: Cluster %s, master %s")), "sbatchd/main", clusterName, masterHost); /* catgets 5043 */ @@ -915,14 +918,14 @@ sinit (void) batchId = getuid(); myhostname = ls_getmyhostname(); if (myhostname == NULL) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_getmyhostname"); - die(SLAVE_FATAL); + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_getmyhostname"); + die(SLAVE_FATAL); } while ((myinfo = ls_gethostinfo(NULL, NULL, &myhostname, 1, 0)) == NULL) { - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_gethostinfo"); - millisleep_(sbdSleepTime * 1000); + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, fname, "ls_gethostinfo"); + millisleep_(sbdSleepTime * 1000); } myFactor = myinfo->cpuFactor; @@ -938,20 +941,20 @@ sinit (void) die(SLAVE_FATAL); } if (! debug) { - nice(NICE_LEAST); - nice(NICE_MIDDLE); - nice(0); + nice(NICE_LEAST); + nice(NICE_MIDDLE); + nice(0); } if (!debug) { - if (chdir(LSTMPDIR) < 0) { - ls_syslog(LOG_ERR, I18N_FUNC_S_FAIL_M, fname, "chdir", LSTMPDIR); - lsb_mperr( _i18n_printf(I18N_FUNC_S_FAIL, fname, - "chdir", LSTMPDIR)); - die(SLAVE_FATAL); - } + if (chdir(LSTMPDIR) < 0) { + ls_syslog(LOG_ERR, I18N_FUNC_S_FAIL_M, fname, "chdir", LSTMPDIR); + lsb_mperr( _i18n_printf(I18N_FUNC_S_FAIL, fname, + "chdir", LSTMPDIR)); + die(SLAVE_FATAL); + } } @@ -983,7 +986,7 @@ init_sstate (void) jobTerminateInterval = sbdPackage.jobTerminateInterval; for (i = 0; i < sbdPackage.nAdmins; i++) - FREEUP(sbdPackage.admins[i]); + FREEUP(sbdPackage.admins[i]); FREEUP(sbdPackage.admins); } @@ -1031,25 +1034,25 @@ houseKeeping(void) if (equalHost_(masterHost, myhostnm)) { if (mbdPid != 0) { if (kill(mbdPid, 0) != 0) - { - if ((now - lastStartMbdTime >= - 3 * CHECK_MBD_TIME) || !mbdStartedBySbd) { - start_master(); - lastStartMbdTime = now; - } - } - } - } - } + { + if ((now - lastStartMbdTime >= + 3 * CHECK_MBD_TIME) || !mbdStartedBySbd) { + start_master(); + lastStartMbdTime = now; + } + } + } + } + } } drainMsgQueue(); if (now - lastTime >= sbdSleepTime / 2) { - if (ls_servavail(2, 1) < 0) - ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, "main", "ls_servavail"); - lastTime = now; + if (ls_servavail(2, 1) < 0) + ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, "main", "ls_servavail"); + lastTime = now; } } @@ -1068,37 +1071,37 @@ authCmdRequest(struct clientNode *client, sizeof(in_addr_t), AF_INET); if (hp == NULL) { - ls_syslog(LOG_ERR, "\ + ls_syslog(LOG_ERR, "\ %s: gethostbyaddr() failed for %s", __func__, sockAdd2Str_(&client->from)); - return LSBE_NOLSF_HOST; + return LSBE_NOLSF_HOST; } if (!xdr_lsfAuth(xdrs, &auth, reqHdr)) { - ls_syslog(LOG_ERR,"%s: xdr_lsfAuth() failed", __func__); - return LSBE_XDR; + ls_syslog(LOG_ERR,"%s: xdr_lsfAuth() failed", __func__); + return LSBE_XDR; } if (hostOk(hp->h_name, 0) < 0) { - ls_syslog(LOG_ERR, "\ + ls_syslog(LOG_ERR, "\ %s: host %s is not a valid volclava host", __func__, hp->h_name, sockAdd2Str_(&client->from)); - return LSBE_NOLSF_HOST; + return LSBE_NOLSF_HOST; } putEauthClientEnvVar("user"); putEauthServerEnvVar("sbatchd"); if (!userok(s, &client->from, hp->h_name, NULL, &auth, debug)) { - ls_syslog(LOG_ERR, "\ + ls_syslog(LOG_ERR, "\ %s: userok() has faled from host %s", __func__, hp->h_name); - return LSBE_PERMISSION; + return LSBE_PERMISSION; } if (!isLSFAdmin(&auth)) { - return LSBE_PERMISSION; + return LSBE_PERMISSION; } return LSBE_NO_ERROR; @@ -1122,5 +1125,3 @@ isLSFAdmin(struct lsfAuth *auth) return(FALSE); } - - diff --git a/lsbatch/daemons/sbd.misc.c b/lsbatch/daemons/sbd.misc.c index b61e406..ef045e7 100644 --- a/lsbatch/daemons/sbd.misc.c +++ b/lsbatch/daemons/sbd.misc.c @@ -24,17 +24,17 @@ #include "sbd.h" #include "../../lsf/lib/lsi18n.h" -#define NL_SETN 11 +#define NL_SETN 11 extern short mbdExitVal; extern int mbdExitCnt; -#define NL_SETN 11 +#define NL_SETN 11 void milliSleep( int msec ) { struct timeval dtime; - + if (msec < 1) return; dtime.tv_sec = msec/1000; @@ -46,7 +46,7 @@ milliSleep( int msec ) -char +char window_ok (struct jobCard *jobPtr) { windows_t *wp; @@ -56,62 +56,62 @@ window_ok (struct jobCard *jobPtr) time_t now; now = time(0); - active = jobPtr->active; + active = jobPtr->active; if (active && (jobPtr->jobSpecs.options & SUB_WINDOW_SIG)) - ckTime = now + WARN_TIME; + ckTime = now + WARN_TIME; else ckTime = now; if (jobPtr->windEdge > ckTime || jobPtr->windEdge == 0) return (jobPtr->active); - getDayHour (&dayhour, ckTime); - if (jobPtr->week[dayhour.day] == NULL) { + getDayHour (&dayhour, ckTime); + if (jobPtr->week[dayhour.day] == NULL) { jobPtr->active = TRUE; jobPtr->windEdge = now + (24.0 - dayhour.hour) * 3600.0; return (jobPtr->active); } - + jobPtr->active = FALSE; jobPtr->windEdge = now + (24.0 - dayhour.hour) * 3600.0; - for (wp = jobPtr->week[dayhour.day]; wp; wp=wp->nextwind) + for (wp = jobPtr->week[dayhour.day]; wp; wp=wp->nextwind) checkWindow(&dayhour, &jobPtr->active, &jobPtr->windEdge, wp, now); if (active && !jobPtr->active && now - jobPtr->windWarnTime >= WARN_TIME && (jobPtr->jobSpecs.options & SUB_WINDOW_SIG)) { - - if (!(jobPtr->jobSpecs.jStatus & JOB_STAT_RUN)) - job_resume(jobPtr); + + if (!(jobPtr->jobSpecs.jStatus & JOB_STAT_RUN)) + job_resume(jobPtr); jobsig (jobPtr, sig_decode (jobPtr->jobSpecs.sigValue), TRUE); jobPtr->windWarnTime = now; } - + return (jobPtr->active); -} +} void shout_err (struct jobCard *jobPtr, char *msg) { char buf[MSGSIZE]; - sprintf(buf, _i18n_msg_get(ls_catd, NL_SETN, 600, + sprintf(buf, _i18n_msg_get(ls_catd, NL_SETN, 600, "We are unable to run your job %s:<%s>. The error is:\n%s."), /* catgets 600 */ lsb_jobid2str(jobPtr->jobSpecs.jobId), jobPtr->jobSpecs.command, msg); - - if (jobPtr->jobSpecs.options & SUB_MAIL_USER) + + if (jobPtr->jobSpecs.options & SUB_MAIL_USER) merr_user (jobPtr->jobSpecs.mailUser, jobPtr->jobSpecs.fromHost, buf, I18N_error); else merr_user (jobPtr->jobSpecs.userName, jobPtr->jobSpecs.fromHost, buf, I18N_error); - -} + +} -void +void child_handler (int sig) { int pid; @@ -129,7 +129,7 @@ child_handler (int sig) if (pid == mbdPid) { int sig = WTERMSIG(status); if (mbdExitCnt > 150) - mbdExitCnt = 150; + mbdExitCnt = 150; mbdExitVal = WIFSIGNALED(status); if (mbdExitVal) { if (WCOREDUMP(status)) @@ -157,29 +157,29 @@ child_handler (int sig) } } - ls_ruunix2lsf (&rusage, &lsfRusage); + ls_ruunix2lsf (&rusage, &lsfRusage); cpuTime = lsfRusage.ru_utime + lsfRusage.ru_stime; - - for (jobCard = jobQueHead->forw; (jobCard != jobQueHead); + + for (jobCard = jobQueHead->forw; (jobCard != jobQueHead); jobCard = jobCard->forw) { if (jobCard->exitPid == pid) { jobCard->w_status = LS_STATUS(status); jobCard->exitPid = -1; } - + if (jobCard->jobSpecs.jobPid == pid) { jobCard->collectedChild = TRUE; jobCard->cpuTime = cpuTime; jobCard->w_status = LS_STATUS(status); - jobCard->exitPid = -1; + jobCard->exitPid = -1; memcpy ((char *) &jobCard->lsfRusage, (char *) &lsfRusage, - sizeof (struct lsfRusage)); + sizeof (struct lsfRusage)); jobCard->notReported++; - - - - if (sbd_finish_sleep < 0) { + + + + if (sbd_finish_sleep < 0) { if (daemonParams[LSB_SBD_FINISH_SLEEP].paramValue) { errno = 0; sbd_finish_sleep = atoi(daemonParams[LSB_SBD_FINISH_SLEEP].paramValue); @@ -195,13 +195,13 @@ child_handler (int sig) need_checkfinish = TRUE; - break; + break; } } - } + } -} +} #ifndef BSIZE #define BSIZE 1024 @@ -252,7 +252,7 @@ fcp(char *file1, char *file2, struct hostent *hp) close(fd1); close(fd2); return (0); -} +} #include @@ -276,16 +276,18 @@ rmDir(char *dir) closedir (dirp); return (rmdir(dir)); -} +} -void closeBatchSocket (void) +void closeBatchSocket (void) { - if (batchSock > 0) { - chanClose_(batchSock); - batchSock = -1; - } -} + /* close the epoll and back sockets but do not do any + * epoll_* operation as that will affect the epoll + * object we share with the parent process + */ + close(epoll_fd); + close(channels[batchSock].handle); +} void getManagerId(struct sbdPackage *sbdPackage) @@ -317,4 +319,4 @@ getManagerId(struct sbdPackage *sbdPackage) fname); /* catgets 5609 */ die(FATAL_ERR); } -} +} diff --git a/lsbatch/lib/lsb.comm.c b/lsbatch/lib/lsb.comm.c index 92bf58f..c04e01f 100644 --- a/lsbatch/lib/lsb.comm.c +++ b/lsbatch/lib/lsb.comm.c @@ -181,7 +181,6 @@ call_server (char * host, ls_syslog(LOG_ERR, I18N_FUNC_FAIL_MM, "callserver", "chanSetMode"); CLOSECD(serverSock); return (-2); } - if (postSndFunc) tsize += ((struct lenData *)postSndFuncArg)->len + NET_INTSIZE_; @@ -212,6 +211,16 @@ call_server (char * host, CLOSECD(serverSock); return (-2); } + /* Add the server mask to the epoll object + */ + cc = chanRegisterEpoll_(serverSock, EPOLLOUT|EPOLLIN|EPOLLERR|EPOLLRDHUP); + if (cc < 0) { + ls_syslog(LOG_ERR, "%s: chanRegisterEpoll_() failed %m", __func__); + chanFreeBuf_(sndBuf); + CLOSECD(serverSock); + return (-2); + } + } else { cc = chanRpc_(serverSock, &reqbuf, @@ -696,4 +705,3 @@ getCpuFactor (char *host, int name) return (tempPtr); } - diff --git a/lsf/lib/lib.channel.c b/lsf/lib/lib.channel.c index 1542186..344b9f9 100644 --- a/lsf/lib/lib.channel.c +++ b/lsf/lib/lib.channel.c @@ -25,9 +25,6 @@ #define MAXLOOP 3000 -#define DEFAULT_MAX_CHANNELS 64 -#define INVALID_HANDLE -1 - #define NL_SETN 23 #define CLOSEIT(i) { \ @@ -35,7 +32,7 @@ channels[i].state = CH_DISC; \ channels[i].handle = INVALID_HANDLE; } -static struct chanData *channels; +struct chanData *channels; int cherrno = 0; extern int errno; int chanIndex; @@ -50,6 +47,11 @@ static void enqueueTail_(struct Buffer *, struct Buffer *); static void dequeue_(struct Buffer *); static int findAFreeChannel(void); +// epoll interface +int epoll_fd; +struct epoll_event *epoll_events; +static int chanOpenSock_(int, int); + int chanInit_(void) { @@ -60,12 +62,19 @@ chanInit_(void) first = FALSE; + // By default this is 1024 is this big enough? chanMaxSize = sysconf(_SC_OPEN_MAX); channels = calloc(chanMaxSize, sizeof(struct chanData)); if (channels == NULL) return -1; + for (int i = 0; i < chanMaxSize; i++) { + channels[i].handle = INVALID_HANDLE; + channels[i].state = CH_FREE; + channels[i].events = EPOLL_EVENTS_NONE; + } + chanIndex = 0; return 0; @@ -88,7 +97,6 @@ chanServSocket_(int type, u_short port, int backlog, int options) lserrno = LSE_SOCK_SYS; return(-1); } - memset((char*)&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); @@ -121,7 +129,8 @@ chanServSocket_(int type, u_short port, int backlog, int options) channels[ch].type = CH_TYPE_UDP; else channels[ch].type = CH_TYPE_PASSIVE; - return(ch); + + return ch; } int @@ -318,8 +327,8 @@ chanConnect_(int chfd, struct sockaddr_in *peer, int timeout, int options) return(0); } channels[chfd].state = CH_CONN; - return(0); + return(0); } int @@ -523,7 +532,7 @@ chanOpen: connect() failed, laddr=%s, addr=%s"),/*catgets 5003*/ } /* chanOpen_() */ -int +static int chanOpenSock_(int s, int options) { int i; @@ -554,6 +563,9 @@ chanOpenSock_(int s, int options) lserrno = LSE_MALLOC; return(-1); } + + // Register with epoll + chanRegisterEpoll_(i, EPOLLIN|EPOLLERR|EPOLLRDHUP); return(i); } @@ -575,6 +587,18 @@ chanClose_(int chfd) cherrno = CHANE_BADCHFD; return(-1); } + + /* Unregister from epoll before closing. If we are in + * forked child don't do this, don't touch the epoll object + * as it shared between the child and the parent. + */ + if (chanUnRegisterEpoll_(chfd) < 0) { + // Log but continue cleanup + if (logclass & LC_COMM) { + ls_syslog(LOG_WARNING, "%s: epoll_ctl EPOLL_CTL_DEL failed for chfd " + "%d fd %d: %m", __func__, chfd, channels[chfd].handle); + } + } close(channels[chfd].handle); if (channels[chfd].send @@ -1198,3 +1222,271 @@ findAFreeChannel(void) return i; } + +// epoll interface + +int +chanEpollInit_(void) +{ + /* This is the epoll_events array which is the same size of + * of the channels array. + */ + epoll_events = calloc(chanMaxSize, sizeof(struct epoll_event)); + if (epoll_events == NULL) { + return -1; + } + + epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + free(epoll_events); + return -1; + } + + return 0; +} + +int +chanEpoll_(int tms) +{ + int i; + int nReady; + + nReady = epoll_wait(epoll_fd, epoll_events, chanMaxSize, tms); + if (nReady <= 0) + return nReady; + + for (i = 0; i < nReady; i++) { + int ch = epoll_events[i].data.u32; + uint32_t ev = epoll_events[i].events; + + if (ch < 0 || ch >= chanMaxSize) + continue; + + if (channels[ch].handle == INVALID_HANDLE) + continue; + + channels[ch].events = EPOLL_EVENTS_NONE; + channels[ch].chanerr = CHANE_NOERR ; + + /* These are accept channels that don't have buffers + */ + if ((channels[ch].send == NULL || channels[ch].recv == NULL) + && channels[ch].state != CH_PRECONN) { + if (ev & EPOLLIN) + channels[ch].events |= EPOLL_EVENTS_READ; + if (ev & EPOLLOUT) + channels[ch].events |= EPOLL_EVENTS_WRITE; + if (ev & (EPOLLERR | EPOLLRDHUP)) + channels[ch].events |= EPOLL_EVENTS_ERROR; + continue; + } + + // Handle error conditions + if ((ev & EPOLLERR) || (ev & EPOLLHUP) || (ev & EPOLLRDHUP)) { + channels[ch].events |= EPOLL_EVENTS_ERROR; + channels[ch].chanerr = CHANE_SYSCALL; + continue; + } + + // Handle pre-connection state + if (channels[ch].state == CH_PRECONN + && (ev & EPOLLOUT)) { + chanHandlePreconn(ch); + continue; + } + + // Handle readable event + if (ev & EPOLLIN) { + doread2(ch); + } + + // Handle writable event if there's data to send + if ((channels[ch].send && channels[ch].send->forw != channels[ch].send) + && (ev & EPOLLOUT)) { + dowrite2(ch); + } + } + + return nReady; +} + +void +chanHandlePreconn(int ch) +{ + if (channels[ch].state != CH_PRECONN) + return; + + channels[ch].state = CH_CONN; + channels[ch].send = newBuf(); + channels[ch].recv = newBuf(); + channels[ch].events = EPOLL_EVENTS_WRITE; +} + +void +doread2(int chfd) +{ + struct Buffer *rcvbuf; + int cc; + + if (channels[chfd].recv->forw == channels[chfd].recv) { + rcvbuf = newBuf(); + if (!rcvbuf) { + channels[chfd].chanerr = LSE_MALLOC; + channels[chfd].events |= EPOLL_EVENTS_ERROR; + return; + } + enqueueTail_(rcvbuf, channels[chfd].recv); + } else { + rcvbuf = channels[chfd].recv->forw; + } + + if (!rcvbuf->len) { + rcvbuf->data = malloc(LSF_HEADER_LEN); + if (!rcvbuf->data) { + channels[chfd].chanerr = LSE_MALLOC; + channels[chfd].events |= EPOLL_EVENTS_ERROR; + return; + } + rcvbuf->len = LSF_HEADER_LEN; + rcvbuf->pos = 0; + } + + if (rcvbuf->pos == rcvbuf->len) { + channels[chfd].events |= EPOLL_EVENTS_READ; + return; + } + + errno = 0; + cc = read(channels[chfd].handle, rcvbuf->data + rcvbuf->pos, + rcvbuf->len - rcvbuf->pos); + + if (cc == 0 && errno == EINTR) { + ls_syslog(LOG_ERR, "%s: read() returned EOF on EINTR", __func__); + return; + } + + if (cc <= 0) { + if (cc == 0 || BAD_IO_ERR(errno)) { + channels[chfd].chanerr = CHANE_CONNRESET; + channels[chfd].events |= EPOLL_EVENTS_ERROR; + } + return; + } + + rcvbuf->pos += cc; + + if ((rcvbuf->len == LSF_HEADER_LEN) && (rcvbuf->pos == rcvbuf->len)) { + XDR xdrs; + struct LSFHeader hdr; + char *newdata; + + xdrmem_create(&xdrs, rcvbuf->data, sizeof(struct LSFHeader), XDR_DECODE); + if (!xdr_LSFHeader(&xdrs, &hdr)) { + channels[chfd].chanerr = CHANE_BADHDR; + channels[chfd].events |= EPOLL_EVENTS_ERROR; + xdr_destroy(&xdrs); + return; + } + + if (hdr.length) { + rcvbuf->len = hdr.length + LSF_HEADER_LEN; + newdata = realloc(rcvbuf->data, rcvbuf->len); + if (!newdata) { + channels[chfd].chanerr = LSE_MALLOC; + channels[chfd].events |= EPOLL_EVENTS_ERROR; + xdr_destroy(&xdrs); + return; + } + rcvbuf->data = newdata; + } + xdr_destroy(&xdrs); + } + + if (rcvbuf->pos == rcvbuf->len) { + channels[chfd].events |= EPOLL_EVENTS_READ; + } +} + +void +dowrite2(int chfd) +{ + struct Buffer *sendbuf; + int cc; + + if (channels[chfd].send->forw == channels[chfd].send) + return; + + sendbuf = channels[chfd].send->forw; + + cc = write(channels[chfd].handle, + sendbuf->data + sendbuf->pos, + sendbuf->len - sendbuf->pos); + + if (cc < 0 && BAD_IO_ERR(errno)) { + channels[chfd].chanerr = LSE_MSG_SYS; + channels[chfd].events = EPOLL_EVENTS_ERROR; + return; + } + + sendbuf->pos += cc; + + if (sendbuf->pos == sendbuf->len) { + dequeue_(sendbuf); + free(sendbuf->data); + free(sendbuf); + // Remove the EPOLLOUT as all data have been sent + chanModEpoll_(chfd, EPOLLIN|EPOLLERR|EPOLLRDHUP); + } +} + +int +chanRegisterEpoll_(int ch, uint32_t mask) +{ + struct epoll_event ev = {0}; + + if (channels[ch].handle == INVALID_HANDLE) + return -1; + + ev.events = mask; + ev.data.u32 = (uint32_t)ch; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, channels[ch].handle, &ev) == -1) { + channels[ch].chanerr = errno; + return -2; + } + + return 0; +} + +extern int batchSock; +int +chanUnRegisterEpoll_(int ch) +{ + struct epoll_event ev = {0}; + /* See BUGS in the man page of epoll_ctl() + */ + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, channels[ch].handle, &ev) == -1) { + channels[ch].chanerr = errno; + return -1; + } + return 0; +} + + +int +chanModEpoll_(int ch, uint32_t mask) +{ + if (channels[ch].handle == INVALID_HANDLE) { + return -1; + } + + struct epoll_event ev; + ev.events = mask; + ev.data.u32 = (uint32_t)ch; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, channels[ch].handle, &ev) == -1) { + channels[ch].chanerr = errno; + return -2; + } + return 0; +} diff --git a/lsf/lib/lib.channel.h b/lsf/lib/lib.channel.h index b07c533..e78ec5b 100644 --- a/lsf/lib/lib.channel.h +++ b/lsf/lib/lib.channel.h @@ -18,6 +18,7 @@ #ifndef CHANNEL_H #define CHANNEL_H #include +#include #include "lib.hdr.h" @@ -26,7 +27,7 @@ enum chanState {CH_FREE, CH_PRECONN, CH_CONN, CH_WAIT, - CH_INACTIVE + CH_INACTIVE }; enum chanType {CH_TYPE_UDP, CH_TYPE_TCP, CH_TYPE_LOCAL, CH_TYPE_PASSIVE, @@ -43,7 +44,8 @@ enum chanType {CH_TYPE_UDP, CH_TYPE_TCP, CH_TYPE_LOCAL, CH_TYPE_PASSIVE, #define CHAN_MODE_BLOCK 0x01 #define CHAN_MODE_NONBLOCK 0x02 -#define CLOSECD(c) { chanClose_((c)); (c) = -1; } +#define INVALID_HANDLE -1 +#define CLOSECD(c) { chanClose_((c)); (c) = -1;} #define CHAN_INIT_BUF(b) memset((b), 0, sizeof(struct Buffer)); @@ -60,20 +62,31 @@ struct Masks { fd_set rmask; fd_set wmask; fd_set emask; + }; +typedef enum { + EPOLL_EVENTS_NONE = 0, + EPOLL_EVENTS_READ = 1, + EPOLL_EVENTS_WRITE = 2, + EPOLL_EVENTS_ERROR = 4 +} epoll_events_t; struct chanData { - int handle; + int handle; enum chanType type; enum chanState state; - enum chanState prestate; - int chanerr; + enum chanState prestate; + int chanerr; struct Buffer *send; struct Buffer *recv; - + epoll_events_t events; }; +extern int epoll_fd; +extern struct epoll_event *epoll_events; +extern struct chanData *channels; + #define CHANE_NOERR 0 #define CHANE_CONNECTED 1 #define CHANE_NOTCONN 2 @@ -93,36 +106,46 @@ int chanInit_(void); #define chanSend_ chanEnqueue_ #define chanRecv_ chanDequeue_ -int chanOpen_(u_int, u_short, int); -int chanEnqueue_(int chfd, struct Buffer *buf); -int chanDequeue_(int chfd, struct Buffer **buf); +extern int chanOpen_(u_int, u_short, int); +extern int chanEnqueue_(int chfd, struct Buffer *buf); +extern int chanDequeue_(int chfd, struct Buffer **buf); -int chanSelect_(struct Masks *, struct Masks *, struct timeval *timeout); -int chanClose_(int chfd); -void chanCloseAll_(void); -int chanSock_(int chfd); +extern int chanSelect_(struct Masks *, struct Masks *, struct timeval *timeout); +extern int chanClose_(int chfd); +extern void chanCloseAll_(void); +extern int chanSock_(int chfd); -int chanServSocket_(int, u_short, int, int); -int chanAccept_(int, struct sockaddr_in *); +extern int chanServSocket_(int, u_short, int, int); +extern int chanAccept_(int, struct sockaddr_in *); -int chanClientSocket_(int, int, int); -int chanConnect_(int, struct sockaddr_in *, int , int); +extern int chanClientSocket_(int, int, int); +extern int chanConnect_(int, struct sockaddr_in *, int , int); -int chanSendDgram_(int, char *, int , struct sockaddr_in *); -int chanRcvDgram_(int , char *, int, struct sockaddr_in *, int); -int chanRpc_(int , struct Buffer *, struct Buffer *, struct LSFHeader *, int timeout); -int chanRead_(int, char *, int); -int chanReadNonBlock_(int, char *, int, int); -int chanWrite_(int, char *, int); +extern int chanSendDgram_(int, char *, int , struct sockaddr_in *); +extern int chanRcvDgram_(int , char *, int, struct sockaddr_in *, int); +extern int chanRpc_(int , struct Buffer *, struct Buffer *, + struct LSFHeader *, int timeout); +extern int chanRead_(int, char *, int); +extern int chanReadNonBlock_(int, char *, int, int); +extern int chanWrite_(int, char *, int); -int chanAllocBuf_(struct Buffer **buf, int size); -int chanFreeBuf_(struct Buffer *buf); -int chanFreeStashedBuf_(struct Buffer *buf); -int chanOpenSock_(int , int); -int chanSetMode_(int, int); +extern int chanAllocBuf_(struct Buffer **buf, int size); +extern int chanFreeBuf_(struct Buffer *buf); +extern int chanFreeStashedBuf_(struct Buffer *buf); +extern int chanSetMode_(int, int); extern int chanIndex; extern int cherrno; -#endif +/* epoll API + */ +extern int chanEpollInit_(void); +extern int chanRegisterEpoll_(int, uint32_t); +extern int chanModEpoll_(int, uint32_t); +extern int chanUnRegisterEpoll_(int); +extern int chanEpoll_(int); +extern void chanHandlePreconn(int); +extern void doread2(int); +extern void dowrite2(int); +#endif diff --git a/lsf/lim/lim.cluster.c b/lsf/lim/lim.cluster.c index 99ba808..639bf51 100644 --- a/lsf/lim/lim.cluster.c +++ b/lsf/lim/lim.cluster.c @@ -17,7 +17,7 @@ */ #include "lim.h" #include "../../lsf/lib/lsi18n.h" - +extern int epoll_fd; #define NL_SETN 24 #define ABORT 1 @@ -37,29 +37,22 @@ static void clientReq(XDR *, struct LSFHeader *, int ); static void shutDownChan(int); void -clientIO(struct Masks *chanmasks) +clientIO(struct chanData *chan, int chfd) { - int i; - - for (i = 0; (i < chanIndex) && (i < MAXCLIENTS); i++) { - - if (i == limSock || i == limTcpSock) - continue; - if (FD_ISSET(i, &chanmasks->emask)) { + if (chan->events & EPOLL_EVENTS_ERROR) { - if (clientMap[i]) + if (clientMap[chfd]) ls_syslog(LOG_ERR, "\ -%s: Lost connection with client %s IO or decode error", +%s: Lost connection with client %s IO or decode error %d", __func__, - sockAdd2Str_(&clientMap[i]->from)); - shutDownChan(i); - continue; - } + sockAdd2Str_(&clientMap[chfd]->from), chan->chanerr); + shutDownChan(chfd); + return; + } - if (FD_ISSET(i, &chanmasks->rmask)) { - processMsg(i); - } + if (chan->events & EPOLL_EVENTS_READ) { + processMsg(chfd); } } @@ -234,9 +227,12 @@ clientReq(XDR *xdrs, struct LSFHeader *hdr, int chfd) } if (pid == 0) { - + // close() limSock should the master restart when processing + // this request. if (! limParams[LIM_NO_FORK].paramValue) - chanClose_(limSock); + close(chanSock_(limSock)); + if (epoll_fd >= 0) + close(epoll_fd); XDR_SETPOS(xdrs, oldpos); io_block_(chanSock_(chfd)); @@ -291,4 +287,3 @@ shutDownChan(int chanfd) FREEUP(clientMap[chanfd]); } } - diff --git a/lsf/lim/lim.h b/lsf/lim/lim.h index 5d328d9..71fcba8 100644 --- a/lsf/lim/lim.h +++ b/lsf/lim/lim.h @@ -504,7 +504,7 @@ extern int xdr_loadmatrix(XDR *, int, struct loadVectorStruct *, struct LSFHeader *); extern int xdr_masterReg(XDR *, struct masterReg *, struct LSFHeader *); extern int xdr_statInfo(XDR *, struct statInfo *, struct LSFHeader *); -extern void clientIO(struct Masks *); +extern void clientIO(struct chanData *, int); /* openlava floating host management */ diff --git a/lsf/lim/lim.main.c b/lsf/lim/lim.main.c index 8070746..e80d9b2 100644 --- a/lsf/lim/lim.main.c +++ b/lsf/lim/lim.main.c @@ -107,27 +107,24 @@ extern char *getExtResourcesLoc(char *); extern char *getExtResourcesVal(char *); /* UDP message buffer. - */ +*/ static char reqBuf[MSGSIZE]; static void usage(void) { fprintf(stderr, "\ -lim: [-C] [-V] [-h] [-t] [-debug_level] [-d env_dir]\n"); + lim: [-C] [-V] [-h] [-t] [-debug_level] [-d env_dir]\n"); } int li_len = 0; struct liStruct *li = NULL; /* LIM main() - */ +*/ int main(int argc, char **argv) { - fd_set allMask; - struct Masks sockmask; - struct Masks chanmask; struct timeval timer; struct timeval t0; struct timeval t1; @@ -180,8 +177,8 @@ main(int argc, char **argv) } if (lim_debug > 1) - fprintf(stderr, "\ -Reading configuration from %s/lsf.conf\n", env_dir); + fprintf(stderr, "%s: Reading configuration from %s/lsf.conf\n", __func__, + env_dir); if (initenv_(limParams, env_dir) < 0) { @@ -193,7 +190,7 @@ Reading configuration from %s/lsf.conf\n", env_dir); (lim_debug == 2), limParams[LSF_LOG_MASK].paramValue); ls_syslog(LOG_ERR, "\ -%s: initenv() failed reading lsf.conf from %s", __func__, env_dir); + %s: initenv() failed reading lsf.conf from %s", __func__, env_dir); lim_Exit("main"); } @@ -204,7 +201,7 @@ Reading configuration from %s/lsf.conf\n", env_dir); cc = initAndConfig(lim_CheckMode, &kernelPerm); if (cc < 0) { ls_syslog(LOG_ERR, "\ -%s: failed to configure, exiting...", __func__); + %s: failed to configure, exiting...", __func__); return -1; } printTypeModel(); @@ -223,13 +220,12 @@ Reading configuration from %s/lsf.conf\n", env_dir); } if (getuid() != 0) { - fprintf(stderr, "\ -%s: Real uid is %d, not root\n", __func__, (int)getuid()); + fprintf(stderr, "%s: Real uid is %d, not root\n", __func__, getuid()); } if (geteuid() != 0) { - fprintf(stderr, "\ -%s: Effective uid is %d, not root\n", __func__, (int)geteuid()); + fprintf(stderr, "%s: Effective uid is %d, not root\n", __func__, + geteuid()); } /* If started with -2 in debug mode @@ -268,7 +264,7 @@ Reading configuration from %s/lsf.conf\n", env_dir); cc = initAndConfig(lim_CheckMode, &kernelPerm); if (cc < 0) { ls_syslog(LOG_ERR, "\ -%s: failed to configure, exiting...", __func__); + %s: failed to configure, exiting...", __func__); return -1; } @@ -284,7 +280,7 @@ Reading configuration from %s/lsf.conf\n", env_dir); if (lim_CheckError == EXIT_WARNING_ERROR) { ls_syslog(LOG_WARNING, "\ -%s: Checking Done. Warning(s)/error(s) found.", __func__); + %s: Checking Done. Warning(s)/error(s) found.", __func__); exit(EXIT_WARNING_ERROR); } @@ -297,14 +293,14 @@ Reading configuration from %s/lsf.conf\n", env_dir); initSignals(); ls_syslog(LOG_INFO, "\ -%s: Daemon running (%d %d %d)", __func__, myClusterPtr->checkSum, + %s: Daemon running (%d %d %d)", __func__, myClusterPtr->checkSum, ntohs(myHostPtr->statInfo.portno), VOLCLAVA_VERSION); ls_syslog(LOG_DEBUG, "\ -%s: sampleIntvl %f exchIntvl %f hostInactivityLimit %d masterInactivityLimit %d retryLimit %d", __func__, sampleIntvl, exchIntvl, + %s: sampleIntvl %f exchIntvl %f hostInactivityLimit %d masterInactivityLimit %d retryLimit %d", __func__, sampleIntvl, exchIntvl, hostInactivityLimit, masterInactivityLimit, retryLimit); /* Initialize and load events. - */ + */ logInit(); if (masterMe) @@ -313,7 +309,6 @@ Reading configuration from %s/lsf.conf\n", env_dir); if (lim_debug < 2) chdir("/tmp"); - FD_ZERO(&allMask); /* We use seconds based precision timer * which is good enough, just make sure * that every 5 seconds we read the load @@ -328,35 +323,25 @@ Reading configuration from %s/lsf.conf\n", env_dir); sigset_t newMask; int nReady; - sockmask.rmask = allMask; if (pimPid == -1) startPIM(argc, argv); - ls_syslog(LOG_DEBUG2, "\ -%s: Before select: timer %dsec", __func__, timer.tv_sec); + ls_syslog(LOG_DEBUG2, "%s: Before chanEpoll_", __func__); - nReady = chanSelect_(&sockmask, &chanmask, &timer); + int ms = 5 * 1000; // milliseconds + nReady = chanEpoll_(ms); if (nReady < 0) { if (errno != EINTR) - ls_syslog(LOG_ERR, "\ -%s: chanSelect() failed %M", __func__); + ls_syslog(LOG_ERR, "%s: chanEpoll_() failed: %m", __func__); continue; } - /* Check if timer expired, if not - * reload it with the time till - * its expiration. - */ gettimeofday(&t1, NULL); if (t1.tv_sec - t0.tv_sec >= 5) { - /* set the new timer - */ timer.tv_sec = 5; timer.tv_usec = 0; - /* reset the start time - */ t0.tv_sec = t1.tv_sec; - t0.tv_usec = t1.tv_sec; + t0.tv_usec = t1.tv_usec; alarmed = 1; } else { timer.tv_sec = 5 - (t1.tv_sec - t0.tv_sec); @@ -364,40 +349,44 @@ Reading configuration from %s/lsf.conf\n", env_dir); alarmed = 0; } - ls_syslog(LOG_DEBUG2,"\ -%s: After select: cc %d alarmed %d timer %dsec", - __func__, cc, alarmed, timer.tv_sec); + ls_syslog(LOG_DEBUG2, "%s: After chanEpoll_: nReady %d alarmed %d timer %dsec", + __func__, nReady, alarmed, timer.tv_sec); blockSigs_(0, &newMask, &oldMask); if (alarmed) { periodic(kernelPerm); - sigprocmask(SIG_SETMASK, &oldMask, NULL); } - if (nReady <= 0) { - sigprocmask(SIG_SETMASK, &oldMask, NULL); - continue; - } - - if (FD_ISSET(limSock, &chanmask.rmask)) { - processUDPMsg(); - } + for (int i = 0; i < nReady; i++) { + // Get the channel number registered previously + int ch = epoll_events[i].data.u32; - if (FD_ISSET(limTcpSock, &chanmask.rmask)) { - doAcceptConn(); - } + if (channels[ch].handle == INVALID_HANDLE) { + // assert(channels.state == CH_FREE) + continue; + } + if (channels[ch].handle == chanSock_(limSock) + && (channels[ch].events & EPOLL_EVENTS_READ)) { + processUDPMsg(); + continue; + } - clientIO(&chanmask); + if (channels[ch].handle == chanSock_(limTcpSock) + && (channels[ch].events & EPOLL_EVENTS_READ)) { + doAcceptConn(); + continue; + } + clientIO(&channels[ch], ch); + } sigprocmask(SIG_SETMASK, &oldMask, NULL); - - } /* for (;;) */ + } // main loop } /* main() */ /* processUDPMsg() - */ +*/ static int processUDPMsg(void) { @@ -414,7 +403,7 @@ processUDPMsg(void) cc = chanRcvDgram_(limSock, reqBuf, MSGSIZE, &from, -1); if (cc < 0) { ls_syslog(LOG_ERR, "\ -%s: chanRcvDgram() failed limSock %d: %m", + %s: chanRcvDgram() failed limSock %d: %m", __func__, limSock); return -1; } @@ -424,7 +413,7 @@ processUDPMsg(void) if (!xdr_LSFHeader(&xdrs, &reqHdr)) { ls_syslog(LOG_ERR, "\ -%s: failed to decode xdr_LSFHeader %M", __func__); + %s: failed to decode xdr_LSFHeader %M", __func__); xdr_destroy(&xdrs); return -1; } @@ -442,10 +431,10 @@ processUDPMsg(void) if (limParams[LIM_NO_MIGRANT_HOSTS].paramValue) { ls_syslog(LOG_WARNING,"\ -%s: Received request %d from non-LSF host %s", + %s: Received request %d from non-LSF host %s", __func__, limReqCode, sockAdd2Str_(&from)); /* tell the remote that we don't know him. - */ + */ errorBack(&from, &reqHdr, LIME_NAUTH_HOST, -1); xdr_destroy(&xdrs); return -1; @@ -460,7 +449,7 @@ processUDPMsg(void) AF_INET); if (hp == NULL) { ls_syslog(LOG_WARNING, "\ -%s: Received request %d from unresolvable address %s", __func__, + %s: Received request %d from unresolvable address %s", __func__, limReqCode, sockAdd2Str_(&from)); errorBack(&from, &reqHdr, LIME_NAUTH_HOST, -1); xdr_destroy(&xdrs); @@ -469,7 +458,7 @@ processUDPMsg(void) } ls_syslog(LOG_DEBUG, "\ -%s: Received request %d from host %s %s", + %s: Received request %d from host %s %s", __func__, limReqCode, (fromHost ? fromHost->hostName : hp->h_name), sockAdd2Str_(&from)); @@ -544,7 +533,7 @@ processUDPMsg(void) errorBack(&from, &reqHdr, LIME_BAD_REQ_CODE, -1); if (limReqCode != lastcode) ls_syslog(LOG_ERR, "\ -%s: Unknown request code %d vers %d from %s", __func__, + %s: Unknown request code %d vers %d from %s", __func__, limReqCode, reqHdr.version, sockAdd2Str_(&from)); lastcode = limReqCode; @@ -566,12 +555,12 @@ doAcceptConn(void) if (logclass & (LC_TRACE | LC_COMM)) ls_syslog(LOG_DEBUG, "\ -%s: Entering this routine...", __func__); + %s: Entering this routine...", __func__); ch = chanAccept_(limTcpSock, &from); if (ch < 0) { ls_syslog(LOG_ERR, "\ -%s: failed accept() new connection socket %d: %M", __func__, limTcpSock); + %s: failed accept() new connection socket %d: %M", __func__, limTcpSock); return; } @@ -582,15 +571,15 @@ doAcceptConn(void) * a tcp operation it should be its registation. */ ls_syslog(LOG_WARNING,"\ -%s: Received request from non-LSF host %s", - __func__, sockAdd2Str_(&from)); + %s: Received request from non-LSF host %s", + __func__, sockAdd2Str_(&from)); return; } client = calloc(1, sizeof(struct clientNode)); if (!client) { ls_syslog(LOG_ERR, "\ -%s: calloc() failed: %M connection from %s dropped", + %s: calloc() failed: %M connection from %s dropped", __func__, sockAdd2Str_(&from)); chanClose_(ch); @@ -659,7 +648,7 @@ initAndConfig(int checkMode, int *kernelPerm) struct tclLsInfo *tclLsInfo; ls_syslog(LOG_DEBUG, "\ -%s: Entering this routine...; checkMode=%d", __func__, checkMode); + %s: Entering this routine...; checkMode=%d", __func__, checkMode); /* LIM is running in a non shared fiel system mode, * contact the master and retrieve the shared file @@ -669,7 +658,7 @@ initAndConfig(int checkMode, int *kernelPerm) cc = getClusterConfig(); if (cc < 0) { ls_syslog(LOG_ERR, "\ -%s: failed getting cluster configuration files %M, exiting...", __func__); + %s: failed getting cluster configuration files %M, exiting...", __func__); return -1; } } @@ -724,7 +713,7 @@ initAndConfig(int checkMode, int *kernelPerm) if (logclass & LC_TRACE) { ls_syslog(LOG_DEBUG2, "\ -%s: LSF_LIM_LOCK %s", __func__, lsfLimLock); + %s: LSF_LIM_LOCK %s", __func__, lsfLimLock); } sscanf(lsfLimLock, "%d %ld", &flag, &lockTime); if (flag > 0) { @@ -777,7 +766,7 @@ periodic(int kernelPerm) } /* term_handler() - */ +*/ static void term_handler(int signum) { @@ -788,7 +777,7 @@ term_handler(int signum) Signal_(signum, SIG_DFL); ls_syslog(LOG_ERR, "\ -%s: Received signal %d, exiting", __func__, signum); + %s: Received signal %d, exiting", __func__, signum); chanClose_(limSock); chanClose_(limTcpSock); @@ -804,7 +793,7 @@ term_handler(int signum) } /* term_handler() */ /* child_handler() - */ +*/ static void child_handler(int sig) { @@ -817,7 +806,7 @@ child_handler(int sig) while ((pid = waitpid(-1, &status, WNOHANG)) > 0) { if (pid == elim_pid) { ls_syslog(LOG_ERR, "\ -%s: elim (pid=%d) died (exit_code=%d,exit_sig=%d)", + %s: elim (pid=%d) died (exit_code=%d,exit_sig=%d)", __func__, (int)elim_pid, WEXITSTATUS (status), @@ -827,7 +816,7 @@ child_handler(int sig) if (pid == pimPid) { if (logclass & LC_PIM) ls_syslog(LOG_DEBUG, "\ -child_handler: pim (pid=%d) died", pid); + child_handler: pim (pid=%d) died", pid); pimPid = -1; } } @@ -842,13 +831,13 @@ initSock(int checkMode) if (limParams[LSF_LIM_PORT].paramValue == NULL) { ls_syslog(LOG_ERR, "\ -%s: fatal error LSF_LIM_PORT is not defined in lsf.conf", __func__); + %s: fatal error LSF_LIM_PORT is not defined in lsf.conf", __func__); return -1; } if ((lim_port = atoi(limParams[LSF_LIM_PORT].paramValue)) <= 0) { ls_syslog(LOG_ERR, "\ -%s: LSF_LIM_PORT <%s> must be a positive number", + %s: LSF_LIM_PORT <%s> must be a positive number", __func__, limParams[LSF_LIM_PORT].paramValue); return -1; } @@ -856,7 +845,7 @@ initSock(int checkMode) limSock = chanServSocket_(SOCK_DGRAM, lim_port, -1, 0); if (limSock < 0) { ls_syslog(LOG_ERR, "\ -%s: unable to create datagram socket port %d; another LIM running?: %M ", + %s: unable to create datagram socket port %d; another LIM running?: %M ", __func__, lim_port); return -1; } @@ -875,12 +864,27 @@ initSock(int checkMode) &size) < 0) { ls_syslog(LOG_ERR, "\ -%s: getsockname(%d) failed %M", __func__, limTcpSock); + %s: getsockname(%d) failed %M", __func__, chanSock_(limTcpSock)); return -1; } lim_tcp_port = limTcpSockId.sin_port; + if (chanEpollInit_() < 0) { + ls_syslog(LOG_ERR, "%s: chanEpollInit_() failed %m", __func__); + return -1; + } + + /* Register the UDP and TCP channels + */ + if (chanRegisterEpoll_(limSock, EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) { + return -1; + } + + if (chanRegisterEpoll_(limTcpSock, EPOLLIN|EPOLLERR|EPOLLRDHUP) < 0) { + return -1; + } + return 0; } @@ -1142,7 +1146,7 @@ printTypeModel(void) } /* initMiscLiStruct() - */ +*/ static void initMiscLiStruct(void) { @@ -1165,7 +1169,7 @@ initMiscLiStruct(void) } /* initMiscLiStruct() */ /* getClusterConfig() - */ +*/ static int getClusterConfig(void) { @@ -1178,7 +1182,7 @@ getClusterConfig(void) return 0; ls_syslog(LOG_DEBUG, "\ -%s: volclava non shared fs configured", __func__); + %s: volclava non shared fs configured", __func__); sprintf(buf, "%s/esync", limParams[LSF_BINDIR].paramValue); @@ -1188,14 +1192,14 @@ getClusterConfig(void) * build our own? */ ls_syslog(LOG_ERR, "\ -%s: stat(%s) failed %m", __func__, buf); + %s: stat(%s) failed %m", __func__, buf); return -1; } fp = popen(buf, "r"); if (fp == NULL) { ls_syslog(LOG_ERR, "\ -%s: popen(%s) failed %m", __func__, buf); + %s: popen(%s) failed %m", __func__, buf); } memset(buf, 0, sizeof(buf)); @@ -1207,8 +1211,7 @@ getClusterConfig(void) pclose(fp); ls_syslog(LOG_INFO, "\ -%s: configuration files sync done", __func__); + %s: configuration files sync done", __func__); return 0; - -} /* getClusterConfig() */ +}