Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
09fd077
Add sst_support.
Jakio815 Jan 31, 2025
3fb0bf9
Add sst_priv_t struct.
Jakio815 Jan 31, 2025
27ac24e
Add initialize_netdrv for sst.
Jakio815 Jan 31, 2025
a814a07
Add free_netdrv for sst
Jakio815 Jan 31, 2025
82e63bd
Add create_server, with also passing path as global var in lf_sst_sup…
Jakio815 Jan 31, 2025
4a5f1b6
Add structure of accept_netdrv
Jakio815 Jan 31, 2025
001b5fe
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Jan 31, 2025
532922b
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Jan 31, 2025
4d28852
Add comments.
Jakio815 Jan 31, 2025
241d8da
Set handshake with client.
Jakio815 Jan 31, 2025
5be07ad
Add create_client and connect_to_netdrv for sst.
Jakio815 Jan 31, 2025
37145ff
Add user input path of sst config to federate.c
Jakio815 Jan 31, 2025
42a688c
Add get/set functions.
Jakio815 Jan 31, 2025
c374b72
Add read/write/shutdown functions.
Jakio815 Jan 31, 2025
a96d00e
Minor fix on adding void
Jakio815 Feb 1, 2025
8922c0d
Minor fix on adding `void` and new line on EOF.
Jakio815 Feb 1, 2025
ab3d92d
Enable finding the sst-c-api library, and include it in lf_sst_support.h
Jakio815 Feb 1, 2025
cedbddf
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Feb 1, 2025
2772e86
Add options to use user specified port for sst.
Jakio815 Feb 2, 2025
cb5b23b
Minor fix on including headers.
Jakio815 Feb 2, 2025
e7cea3b
Add -sst option for federate.
Jakio815 Feb 3, 2025
3fa7c48
Add usage for --sst for RTI.
Jakio815 Feb 4, 2025
a3887e9
Fix read/write to send header separately to match numbers. Fed-to-Fed…
Jakio815 Feb 6, 2025
53b2100
Minor cleanup.
Jakio815 Feb 6, 2025
af591a2
Fix read/write to match for fed2fed messages.
Jakio815 Feb 6, 2025
b92b737
Fix forwarding on port absent messages.
Jakio815 Feb 6, 2025
62bad67
Revert "Fix read/write to send header separately to match numbers. Fe…
Jakio815 Feb 12, 2025
f6e8674
Minor fix.
Jakio815 Feb 28, 2025
b2846c2
Minor fix on formatting.
Jakio815 Feb 28, 2025
01a4b55
Fix names to chan.
Jakio815 Feb 28, 2025
83b91d1
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Feb 28, 2025
1c48985
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Feb 28, 2025
c10017d
Merge branch 'shutdown' of github.com:lf-lang/reactor-c into sst
Jakio815 Mar 4, 2025
d1a967a
Minor change
Jakio815 Mar 14, 2025
ca6c10e
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Nov 10, 2025
ff0aecf
Add include to sst support.h
Jakio815 Nov 10, 2025
813d3f0
Add find openssl in cmake.
Jakio815 Nov 10, 2025
28e64cc
Fix to netchan to net_abstraction
Jakio815 Nov 10, 2025
e21fed9
Minor fix on name.
Jakio815 Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" -a, --auth Turn on HMAC authentication options.\n");
lf_print(" -t, --tracing Turn on tracing.\n");
lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n");
lf_print(" -sst, --sst SST config path for RTI.\n");

lf_print("Command given:");
for (int i = 0; i < argc; i++) {
Expand Down Expand Up @@ -220,7 +221,7 @@ int process_args(int argc, const char* argv[]) {
rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines
lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes);
} else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) {
#ifdef COMM_TYPE_TCP
#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST)
if (argc < i + 2) {
lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX);
usage(argc, argv);
Expand Down Expand Up @@ -252,6 +253,15 @@ int process_args(int argc, const char* argv[]) {
return 0;
#endif
rti.authentication_enabled = true;
} else if (strcmp(argv[i], "-sst") == 0 || strcmp(argv[i], "--sst") == 0) {
#ifndef COMM_TYPE_SST
lf_print_error("--sst requires the RTI to be built with the --DCOMM_TYPE=SST option.");
usage(argc, argv);
return 0;
#else
i++;
lf_set_sst_config_path(argv[i]);
#endif
} else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) {
rti.base.tracing_enabled = true;
} else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) {
Expand Down
1 change: 0 additions & 1 deletion core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,6 @@ static int handle_port_absent_message(net_abstraction_t net_abstraction, int fed
* network abstraction in _fed.net_abstractions_for_inbound_p2p_connections
* to -1 and returns, terminating the thread.
* @param _args The remote federate ID (cast to void*).
* @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to.
* This procedure frees the memory pointed to before returning.
*/
static void* listen_to_federates(void* _args) {
Expand Down
14 changes: 14 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,9 @@ void usage(int argc, const char* argv[]) {
printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n");
printf(" -l\n");
printf(" Send stdout to individual log files for each federate.\n\n");
#ifdef COMM_TYPE_SST
printf(" -sst, --sst <n>\n");
#endif
#endif

printf("Command given:\n");
Expand Down Expand Up @@ -1123,6 +1126,17 @@ int process_args(int argc, const char* argv[]) {
return 0;
}
}
#endif
#ifdef COMM_TYPE_SST
else if (strcmp(arg, "-sst") == 0 || strcmp(arg, "--sst") == 0) {
if (argc < i + 1) {
lf_print_error("--sst needs a string argument.");
usage(argc, argv);
return 0;
}
const char* fid = argv[i++];
lf_set_sst_config_path(fid);
}
#endif
else if (strcmp(arg, "--ros-args") == 0) {
// FIXME: Ignore ROS arguments for now
Expand Down
2 changes: 1 addition & 1 deletion logging/api/logging_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,4 @@ static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG;
} \
} while (0)

#endif // LOGGING_MACROS_H
#endif // LOGGING_MACROS_H
15 changes: 15 additions & 0 deletions network/api/lf_sst_support.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef LF_SST_SUPPORT_H
#define LF_SST_SUPPORT_H

#include "socket_common.h"
#include <sst-c-api/c_api.h>

typedef struct sst_priv_t {
socket_priv_t* socket_priv;
SST_ctx_t* sst_ctx;
SST_session_ctx_t* session_ctx;
} sst_priv_t;

void lf_set_sst_config_path(const char* config_path);

#endif /* LF_SST_SUPPORT_H */
3 changes: 3 additions & 0 deletions network/api/net_abstraction.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#define NET_ABSTRACTION_H

#include "socket_common.h"
#if defined(COMM_TYPE_SST)
#include "lf_sst_support.h"
#endif

typedef void* net_abstraction_t;
// net_abstraction_t
Expand Down
5 changes: 5 additions & 0 deletions network/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ target_sources(lf-network-impl PUBLIC

if(COMM_TYPE MATCHES TCP)
target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c)
elseif(COMM_TYPE MATCHES SST)
find_package(OpenSSL REQUIRED)
find_package(sst-lib REQUIRED)
target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c)
target_link_libraries(lf-network-impl PUBLIC sst-lib::sst-c-api)
else()
message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.")
endif()
Expand Down
247 changes: 247 additions & 0 deletions network/impl/src/lf_sst_support.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
#include <stdlib.h> // malloc()
#include <string.h> // strncpy()

#include "net_abstraction.h"
#include "lf_sst_support.h"
#include "util.h"

const char* sst_config_path; // The SST's configuration file path.

static sst_priv_t* get_sst_priv_t(net_abstraction_t net_abs) {
if (net_abs == NULL) {
lf_print_error("Network abstraction is already closed.");
return NULL;
}
return (sst_priv_t*)net_abs;
}

net_abstraction_t initialize_net_abstraction() {
// Initialize sst_priv.
sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t));
if (sst_priv == NULL) {
lf_print_error_and_exit("Falied to malloc sst_priv_t.");
}
// Initialize socket_priv.
socket_priv_t* socket_priv = malloc(sizeof(socket_priv_t));
if (socket_priv == NULL) {
lf_print_error_and_exit("Falied to malloc socket_priv_t.");
}

// Server initialization.
socket_priv->port = 0;
socket_priv->user_specified_port = 0;
socket_priv->socket_descriptor = -1;

// Federate initialization
strncpy(socket_priv->server_hostname, "localhost", INET_ADDRSTRLEN);
socket_priv->server_ip_addr.s_addr = 0;
socket_priv->server_port = -1;

sst_priv->socket_priv = socket_priv;

// SST initialization. Only set pointers to NULL.
sst_priv->sst_ctx = NULL;
sst_priv->session_ctx = NULL;

return (net_abstraction_t)sst_priv;
}

void free_net_abstraction(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
free(priv->socket_priv);
free(priv);
}

int create_server(net_abstraction_t net_abs, bool increment_port_on_retry) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
SST_ctx_t* ctx = init_SST(sst_config_path);
priv->sst_ctx = ctx;
return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor,
&priv->socket_priv->port, TCP, increment_port_on_retry);
}

net_abstraction_t accept_net_abstraction(net_abstraction_t server_chan, net_abstraction_t rti_chan) {
sst_priv_t* serv_priv = get_sst_priv_t(server_chan);
int rti_socket;
if (rti_chan == NULL) {
// Set to -1, to indicate that this accept_net_abstraction() call is not trying to check if the rti_chan is
// available, inside the accept_socket() function.
rti_socket = -1;
} else {
sst_priv_t* rti_priv = get_sst_priv_t(rti_chan);
rti_socket = rti_priv->socket_priv->socket_descriptor;
}
net_abstraction_t fed_net_abstraction = initialize_net_abstraction();
sst_priv_t* fed_priv = get_sst_priv_t(fed_net_abstraction);

int sock = accept_socket(serv_priv->socket_priv->socket_descriptor, rti_socket);
if (sock == -1) {
free_net_abstraction(fed_net_abstraction);
return NULL;
}
fed_priv->socket_priv->socket_descriptor = sock;
// Get the peer address from the connected socket_id. Saving this for the address query.
if (get_peer_address(fed_priv->socket_priv) != 0) {
lf_print_error("RTI failed to get peer address.");
};

// TODO: Do we need to copy sst_ctx form server_chan to fed_chan?
session_key_list_t* s_key_list = init_empty_session_key_list();
SST_session_ctx_t* session_ctx =
server_secure_comm_setup(serv_priv->sst_ctx, fed_priv->socket_priv->socket_descriptor, s_key_list);
// Session key used is copied to the session_ctx.
free_session_key_list_t(s_key_list);
fed_priv->session_ctx = session_ctx;
return fed_net_abstraction;
}

void create_client(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit();
SST_ctx_t* ctx = init_SST(sst_config_path);
priv->sst_ctx = ctx;
}

int connect_to_net_abstraction(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
int ret = connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname,
priv->socket_priv->server_port);
if (ret != 0) {
return ret;
}
session_key_list_t* s_key_list = get_session_key(priv->sst_ctx, NULL);
SST_session_ctx_t* session_ctx =
secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor);
priv->session_ctx = session_ctx;
return 0;
}

// TODO: Still need to fix...
int read_from_net_abstraction(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return read_from_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer);
}

int read_from_net_abstraction_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
int read_failed = read_from_net_abstraction(net_abs, num_bytes, buffer);
if (read_failed) {
// Read failed.
// Socket has probably been closed from the other side.
// Shut down and close the socket from this side.
shutdown_socket(&priv->socket_priv->socket_descriptor, false);
return -1;
}
return 0;
}

void read_from_net_abstraction_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer,
char* format, ...) {
va_list args;
int read_failed = read_from_net_abstraction_close_on_error(net_abs, num_bytes, buffer);
if (read_failed) {
// Read failed.
if (format != NULL) {
va_start(args, format);
lf_print_error_system_failure(format, args);
va_end(args);
} else {
lf_print_error_system_failure("Failed to read from socket.");
}
}
}

int write_to_net_abstraction(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return write_to_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer);
}

int write_to_net_abstraction_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
int result = write_to_net_abstraction(net_abs, num_bytes, buffer);
if (result) {
// Write failed.
// Socket has probably been closed from the other side.
// Shut down and close the socket from this side.
shutdown_socket(&priv->socket_priv->socket_descriptor, false);
}
return result;
}

void write_to_net_abstraction_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer,
lf_mutex_t* mutex, char* format, ...) {
va_list args;
int result = write_to_net_abstraction_close_on_error(net_abs, num_bytes, buffer);
if (result) {
// Write failed.
if (mutex != NULL) {
LF_MUTEX_UNLOCK(mutex);
}
if (format != NULL) {
va_start(args, format);
lf_print_error_system_failure(format, args);
va_end(args);
} else {
lf_print_error("Failed to write to socket. Closing it.");
}
}
}

bool check_net_abstraction_closed(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return check_socket_closed(priv->socket_priv->socket_descriptor);
}

int shutdown_net_abstraction(net_abstraction_t net_abs, bool read_before_closing) {
if (net_abs == NULL) {
lf_print("Socket already closed.");
return 0;
}
sst_priv_t* priv = get_sst_priv_t(net_abs);
int ret = shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing);
if (ret != 0) {
lf_print_error("Failed to shutdown socket.");
}
free_net_abstraction(net_abs);
return ret;
}
// END of TODO:

// Get/set functions.
int32_t get_my_port(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return priv->socket_priv->port;
}

int32_t get_server_port(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return priv->socket_priv->server_port;
}

struct in_addr* get_ip_addr(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return &priv->socket_priv->server_ip_addr;
}

char* get_server_hostname(net_abstraction_t net_abs) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
return priv->socket_priv->server_hostname;
}

void set_my_port(net_abstraction_t net_abs, int32_t port) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
priv->socket_priv->port = port;
}

void set_server_port(net_abstraction_t net_abs, int32_t port) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
priv->socket_priv->server_port = port;
}

void set_server_hostname(net_abstraction_t net_abs, const char* hostname) {
sst_priv_t* priv = get_sst_priv_t(net_abs);
memcpy(priv->socket_priv->server_hostname, hostname, INET_ADDRSTRLEN);
}

// Helper function.
void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; }
Loading