Skip to content

Commit

Permalink
Clean old procs in bg thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mgrunbauer committed Oct 9, 2024
1 parent c76149b commit d4445ac
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
56 changes: 50 additions & 6 deletions adagucserverEC/fork_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
#include "CTString.h"
#include "fork_server.h"

static std::map<pid_t, int> child_sockets;
// Check this many seconds for old left-over processes
const int CHECK_CHILD_PROC_INTERVAL = 60;
// Old left-over child process should be killed after this many seconds
// TODO: should this become same as `timeout=300` from CGIRunner.py? Should this be configurable?
const int MAX_CHILD_PROC_TIMEOUT = 300;
const int DEFAULT_QUEUED_CONNECTIONS = 4;

int get_max_pending_connections() {
// Set the max number of queued connections on the socket via ADAGUC_NUMPARALLELPROCESSES
Expand All @@ -19,10 +24,15 @@ int get_max_pending_connections() {
}

// Default to 4 queued connections
return 4;
return DEFAULT_QUEUED_CONNECTIONS;
}

void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) {
/*
Read bytes from client socket with the assumption that it is the raw QUERY_STRING value.
Set the QUERY_STRING and handle the request normally.
*/

int recv_buf_len = 4096;
char recv_buf[recv_buf_len];
memset(recv_buf, 0, recv_buf_len * sizeof(char));
Expand All @@ -40,7 +50,7 @@ void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char
// fflush(stdout);
// fflush(stderr);

exit(0);
exit(status);
}
}

Expand Down Expand Up @@ -72,21 +82,50 @@ void child_signal_handler(int child_signal) {
child_status = 1;
}

int child_sock = child_sockets[child_pid];
int child_sock = child_procs[child_pid].socket;
// fprintf(stderr, "@ on_child_exit [%d] [%d] [%d] [%d]\n", child_pid, child_sock, child_signal, child_status);

// Write the status code from the child pid into the unix socket back to python
write(child_sock, reinterpret_cast<const char *>(&child_status), sizeof(child_status));
close(child_sock);

child_sockets.erase(child_pid);
child_procs.erase(child_pid);
}
}

void *clean_child_procs(void *arg) {
/*
Every `CHECK_CHILD_PROC_INTERVAL` seconds, check all child procs stored in map.
If child proc was started more than `MAX_CHILD_PROC_TIMEOUT` seconds ago, send SIGKILL.
*/

while (1) {
time_t now = time(NULL);
printf("Checking all child procs\n");

for (const auto &child_proc_mapping : child_procs) {
child_proc_t child_proc = child_proc_mapping.second;

if (difftime(now, child_proc.forked_at) < MAX_CHILD_PROC_TIMEOUT) {
continue;
}

printf("Child timeout!");

if (kill(child_proc_mapping.first, SIGKILL) == -1) {
printf("Failed to send SIGKILL to child process");
// TODO: What to do if we cannot SIGKILL child process?
}
}
sleep(CHECK_CHILD_PROC_INTERVAL);
}
}

int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp) {
/*
Start adaguc in fork mode. This means:
- Set up a signal handler for any child processes
- Start a thread in the background that cleans up left-over child processes
- Set up a socket through system calls: `socket`, `bind`, `listen`
- While true, accept any incoming connections to the socket, through system call `accept`
- If connected, fork this process.
Expand All @@ -104,6 +143,10 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
sigaction(SIGCHLD, &sa, NULL);

// Start cleaning thread in the background
pthread_t clean_child_procs_thread;
pthread_create(&clean_child_procs_thread, NULL, clean_child_procs, NULL);

// Create an endpoint for communicating through a unix socket
int listen_socket = socket(AF_UNIX, SOCK_STREAM, 0);
if (-1 == listen_socket) {
Expand Down Expand Up @@ -153,7 +196,8 @@ int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int
handle_client(client_socket, run_adaguc_once, argc, argv, envp);
} else {
// Parent process keeps track of new socket and returns to listen for new connections
child_sockets[pid] = client_socket;
child_proc_t child_proc = {client_socket, time(NULL)};
child_procs[pid] = child_proc;
}
}

Expand Down
11 changes: 11 additions & 0 deletions adagucserverEC/fork_server.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
#ifndef ADAGUC_SERVER_FORK_SERVER_H
#define ADAGUC_SERVER_FORK_SERVER_H

#include <map>
#include <time.h>

typedef struct {
int socket;
time_t forked_at;
} child_proc_t;

static std::map<pid_t, child_proc_t> child_procs;

int get_max_pending_connections();
void handle_client(int client_socket, int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp);
void child_signal_handler(int child_signal);
void *clean_child_procs(void *arg);
int run_as_fork_service(int (*run_adaguc_once)(int, char **, char **, bool), int argc, char **argv, char **envp);

#endif // ADAGUC_SERVER_FORK_SERVER_H

0 comments on commit d4445ac

Please sign in to comment.