Skip to content

Commit

Permalink
Events: APIs to set/get global options (#672)
Browse files Browse the repository at this point in the history
* first cut

* first cut lib code with unit test

* save it just in case, as VM under risk

* partial update, as vm at risk

* intermediate saving

* First run code complete

* self review update

* partial compile

* In middle of compilation

* compiled OK

* common ut passes

* events_service unit tests complete

* test code in progress

* evens publish covere by unit test

* events 75% covered by UT

* subscribe UT done

* Drop internal readme

* Update upon self review - mostly on comments

* More on comments update

* minor fix of copy/paste error

* Comments update

* more comments

* minor updates; merged with master via shared

* Per review comments

* minor updates

* minor: name change for a typedef

* Added EXIT code

* Minor name change; Enabled py build for events

* Minor signature update to adapt to SWIG generated python

* Made zmq send & receive thread safe

* temp commit to enable merge

* removed hacks; Added set log level API

* Minor signature change

* Added unit test for C wrap

* Corrected per review comments; Addede log message for published events

* minor updates;no logical code changes

* corrected per review comments; Added accidentally removed test

* Added back accidentally removed test code file

* retire runtime id on deinit

* restore accidental removal

* Switched to shared_ptr per review comments

* minor non logical code changes

* fix syntax

* few follow up changes for bare ptr to shared ptr conversion

* send event as JSON string

* syntax

* syntax

* syntax

* syntax

* syntax

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* changing wrap signature to use struct instead of JSON string

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* compile fix

* Add libzmq5 as dependency for ubuntu-20.04 build (sonic-net#7)

Co-authored-by: Ubuntu <zain@zb-dev-vm.022x1jpnpm4u1iy2d325acts3c.yx.internal.cloudapp.net>

* Modify azp (sonic-net#8)

* Add libzmq5 as dependency for ubuntu-20.04 build

* Add libzmq3-dev dependency to build script

Co-authored-by: Ubuntu <zain@zb-dev-vm.022x1jpnpm4u1iy2d325acts3c.yx.internal.cloudapp.net>

* Modify azp (sonic-net#9)

* Add libzmq5 as dependency for ubuntu-20.04 build

* Add libzmq3-dev dependency to build script

* Add uuid-dev and libboost-serialization-dev dependencies

Co-authored-by: Ubuntu <zain@zb-dev-vm.022x1jpnpm4u1iy2d325acts3c.yx.internal.cloudapp.net>

* Modify azp (sonic-net#10)

* Add libzmq5 as dependency for ubuntu-20.04 build

* Add libzmq3-dev dependency to build script

* Add uuid-dev and libboost-serialization-dev dependencies

* Add dependencies for bazel build

Co-authored-by: Ubuntu <zain@zb-dev-vm.022x1jpnpm4u1iy2d325acts3c.yx.internal.cloudapp.net>

* compile errors

* compile errors

* Compile error from Ubuntu

* drop unused macro

* compile fix

* compile fix

* remove pedantic

* restored ABORT_IF_NOT

* revert flag add

* comments correction

* Log every published event

* Install dependencies for vstest (sonic-net#15)

* Install deps vstest (sonic-net#16)

* Install dependencies for vstest

* Add comments

* Install deps vstest (sonic-net#17)

* Install dependencies for vstest

* Add comments

* Correct lib spelling

* Added LINGER timeout to service sockets

* Added heartbeat

* compile fix

* compile fix

* compile fix

* set LINGER_TIMEOUT before connect/bind

* Add global options

* Add global options

* Add global options

* Add global options

* Add global options

* Add global options

* Add global options

* Add global options

* Add global options

* added define

* Moved C API to C header file

* minor upates

* minor upates

* Updated comments

* comments update

* comments update

* Fix build issue

* Modify comment

* Modify comment

* Modify comment

* Modify comment

* Modify comment

* Modify comment

* Modify comment

Co-authored-by: Renuka Manavalan <[email protected]>
Co-authored-by: Renuka Manavalan <[email protected]>
Co-authored-by: Ubuntu <zain@zb-dev-vm.022x1jpnpm4u1iy2d325acts3c.yx.internal.cloudapp.net>
  • Loading branch information
4 people authored Aug 24, 2022
1 parent 5467c89 commit 56b0f18
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .azure-pipelines/build_and_install_module.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function build_and_install_kmodule()
apt-get install -y build-essential linux-headers-${KERNEL_RELEASE} autoconf pkg-config fakeroot
apt-get install -y flex bison libssl-dev libelf-dev
apt-get install -y libnl-route-3-200 libnl-route-3-dev libnl-cli-3-200 libnl-cli-3-dev libnl-3-dev
# Install libs required by libswsscommon
# Install libs required by libswsscommon for build
apt-get install -y libzmq3-dev libzmq5 libboost-serialization1.71.0 uuid-dev

# Add the apt source mirrors and download the linux image source code
Expand Down
63 changes: 58 additions & 5 deletions common/events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

lst_publishers_t EventPublisher::s_publishers;
int EventPublisher::LINGER_TIMEOUT = 100; // In milliseconds

event_handle_t
EventPublisher::get_publisher(const string event_source)
Expand Down Expand Up @@ -73,15 +72,18 @@ get_uuid()

int EventPublisher::init(const string event_source)
{
int rc = -1;
m_zmq_ctx = zmq_ctx_new();
void *sock = zmq_socket (m_zmq_ctx, ZMQ_PUB);

int rc = zmq_connect (sock, get_config(XSUB_END_KEY).c_str());
RET_ON_ERR(rc == 0, "Publisher fails to connect %s", get_config(XSUB_END_KEY).c_str());
void *sock = zmq_socket (m_zmq_ctx, ZMQ_PUB);
RET_ON_ERR(sock != NULL, "Failed to ZMQ_PUB socket");

rc = zmq_setsockopt (sock, ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
RET_ON_ERR(rc == 0, "Failed to ZMQ_LINGER to %d", LINGER_TIMEOUT);

rc = zmq_connect (sock, get_config(XSUB_END_KEY).c_str());
RET_ON_ERR(rc == 0, "Publisher fails to connect %s", get_config(XSUB_END_KEY).c_str());

/*
* Event service could be down. So have a timeout.
*
Expand Down Expand Up @@ -191,7 +193,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
}

str_data = convert_to_json(m_event_source + ":" + tag, *params);
SWSS_LOG_INFO("EVENT_PUBLISHED: %s", str_data.c_str());
SWSS_LOG_ERROR("EVENT_PUBLISHED: %s", str_data.c_str());

rc = send_evt(str_data);
RET_ON_ERR(rc == 0, "failed to send event str[%d]= %s", (int)str_data.size(),
Expand Down Expand Up @@ -681,8 +683,59 @@ event_receive_wrap(void *handle, event_receive_op_C_t *evt)

}


void swssSetLogPriority(int pri)
{
swss::Logger::setMinPrio((swss::Logger::Priority) pri);
}


int
event_set_global_options(const char *options)
{
int ret = -1, rc;
void *zmq_ctx;
event_service svc;

zmq_ctx = zmq_ctx_new();
RET_ON_ERR(zmq_ctx != NULL, "Failed to get zmq ctx");

rc = svc.init_client(zmq_ctx);
RET_ON_ERR (rc == 0, "Failed to init event service rc=%d", rc);

rc = svc.global_options_set(options);
RET_ON_ERR (rc == 0, "Failed to set options in event service rc=%d", rc);
ret = 0;
out:
svc.close_service();
zmq_ctx_term(zmq_ctx);

return ret;
}


int
event_get_global_options(char *options, int options_size)
{
int ret = -1, rc;
void *zmq_ctx;
event_service svc;

zmq_ctx = zmq_ctx_new();
RET_ON_ERR(zmq_ctx != NULL, "Failed to get zmq ctx");

rc = svc.init_client(zmq_ctx);
RET_ON_ERR (rc == 0, "Failed to init event service rc=%d", rc);

rc = svc.global_options_get(options, options_size);
RET_ON_ERR (rc >= 0, "Failed to set options in event service rc=%d", rc);

ret = rc;

out:
svc.close_service();
zmq_ctx_term(zmq_ctx);

return ret;
}

6 changes: 3 additions & 3 deletions common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <map>

/*
* Events library
* Events library APIs.
*
* APIs are for publishing & receiving events with source, tag and params along with timestamp.
* Used by event publishers and those interested in receiving published events.
Expand All @@ -31,7 +31,7 @@ typedef void* event_handle_t;
* is complete. Hence recommend, do the init as soon as the process starts.
*
* Input:
* event_source
* event_source:
* The YANG module name for the event source. All events published with the handle
* returned by this call is tagged with this source, transparently. The receiver
* could subscribe with this source as filter.
Expand All @@ -47,7 +47,7 @@ event_handle_t events_init_publisher(const std::string event_source);
* De-init/free the publisher
*
* Input:
* Handle returned from events_init_publisher
* Handle returned from events_init_publisher.
*
* Output:
* Handle is nullified.
Expand Down
3 changes: 2 additions & 1 deletion common/events_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ using namespace chrono;
SWSS_LOG_ERROR("last:errno=%d", _e); \
goto out; }

static const int LINGER_TIMEOUT = 100; /* Linger timeout in milliseconds */

/* helper API to print variable type */
/*
Expand Down Expand Up @@ -210,7 +211,7 @@ typedef string runtime_id_t;
#define EVENT_STR_CTRL_PREFIX_SZ ((int)sizeof(EVENT_STR_CTRL_PREFIX) - 1)

/* The internal code that caches runtime-IDs could retire upon de-init */
#define EVENT_STR_CTRL_DEINIT "CONTROL_DEINIT"
#define EVENT_STR_CTRL_DEINIT EVENT_STR_CTRL_PREFIX "DEINIT"

typedef vector<internal_event_t> internal_events_lst_t;

Expand Down
1 change: 0 additions & 1 deletion common/events_pi.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ typedef map <string, EventPublisher_ptr_t> lst_publishers_t;
class EventPublisher : public events_base
{
static lst_publishers_t s_publishers;
static int LINGER_TIMEOUT;

public:
virtual ~EventPublisher();
Expand Down
44 changes: 44 additions & 0 deletions common/events_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ event_service::init_client(void *zmq_ctx, int block_ms)
void *sock = zmq_socket (zmq_ctx, ZMQ_REQ);
RET_ON_ERR(sock != NULL, "Failed to get ZMQ_REQ socket rc=%d", rc);

rc = zmq_setsockopt (sock, ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
RET_ON_ERR(rc == 0, "Failed to ZMQ_LINGER to %d", LINGER_TIMEOUT);

rc = zmq_connect (sock, get_config(REQ_REP_END_KEY).c_str());
RET_ON_ERR(rc == 0, "Failed to connect to %s", get_config(REQ_REP_END_KEY).c_str());

Expand All @@ -60,6 +63,9 @@ event_service::init_server(void *zmq_ctx, int block_ms)
void *sock = zmq_socket (zmq_ctx, ZMQ_REP);
RET_ON_ERR(sock != NULL, "Failed to get ZMQ_REP socket rc=%d", rc);

rc = zmq_setsockopt (sock, ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
RET_ON_ERR(rc == 0, "Failed to ZMQ_LINGER to %d", LINGER_TIMEOUT);

rc = zmq_bind (sock, get_config(REQ_REP_END_KEY).c_str());
RET_ON_ERR(rc == 0, "Failed to bind to %s", get_config(REQ_REP_END_KEY).c_str());

Expand Down Expand Up @@ -155,6 +161,44 @@ event_service::cache_read(event_serialized_lst_t &lst)
}


int
event_service::global_options_set(const char *val)
{
int rc;
event_serialized_lst_t lst;

lst.push_back(string(val));

RET_ON_ERR((rc = send_recv(EVENT_OPTIONS, &lst, NULL)) == 0,
"Failed to send global options request rc=%d", rc);
out:
return rc;
}


int
event_service::global_options_get(char *val, int sz)
{
int ret = -1, rc;
string s;
event_serialized_lst_t lst;

RET_ON_ERR((rc = send_recv(EVENT_OPTIONS, NULL, &lst)) == 0,
"Failed to receive global options request rc=%d", rc);

if (!lst.empty()) {
s = *lst.begin();
}

strncpy(val, s.c_str(), sz);

val[sz - 1] = 0;
ret = (int)s.size();
out:
return ret;
}


int
event_service::channel_read(int &code, event_serialized_lst_t &data)
{
Expand Down
33 changes: 33 additions & 0 deletions common/events_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ typedef enum {
EVENT_CACHE_STOP, /* Stop the cache */
EVENT_CACHE_READ, /* Read cached events */
EVENT_ECHO, /* Echoes the received data in request via response */
EVENT_OPTIONS, /* global options Set/Get */
EVENT_EXIT /* Exit the eventd service -- Useful for unit test.*/
} event_req_type_t;

Expand Down Expand Up @@ -217,6 +218,38 @@ class event_service {
*/
int echo_receive(string &s);


/*
* Global options request set
*
* Input:
* val -- Put the interval for set
*
* Return:
* 0 - On Success
* -1 - On Failure
*/
int global_options_set(const char *val);


/*
* Global options request get.
*
* Input:
* val_sz -- Size of val buffer
*
* Output:
* val -- Get the current val
*
* Return:
* > 0 - Count of bytes to copied/to-be-copied.
* Result is truncated if given size <= this value.
* But copied string is *always* null termninated.
*
* -1 - On Failure
*/
int global_options_get(char *val, int val_sz);

/*
* The read for req/resp from client/server. The APIs above use this
* to read response and the server use this to read request.
Expand Down
71 changes: 71 additions & 0 deletions common/events_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,77 @@ int event_receive_wrap(void *handle, event_receive_op_C_t *evt);
*/
void swssSetLogPriority(int pri);


/*
* Global configurable options can be set via this API.
*
* The options are provided as JSON object with key/values as JSON string.
* key = <option name case insensitive>
* value = <option value>
*
* Supported options
*
* Option name: HEARTBEAT_INTERVAL_SECS
* Option Value: interval in seconds as int
* A value of -1 implies no heartbeat
* A value of 0 implies the lowest possible interval as possible/supported.
* This depends on implementation.
* Any non zero value implies count of seconds.
* NOTE: System will round it to the multiple of lowest interval supported.
* Any negative value other than -1 is treated as invalid.
*
*
* Option name: OFFLINE_CACHE_SIZE TODO/Not Yet Implemented
* Option Value: Size in count MBs as int
* A value of 0 implies the system default
* This depends on implementation.
* Any non zero value is accepted.
* This is constraint on available memory. Hence this is only a
* guideline to take if possible.
* Any negative value other than -1 is treated as invalid.
*
* Input:
* options - A c string holding JSON string of the options.
*
* Return:
* 0 - Implies success. It also implies that the provided options and
* their values were valid.
* < 0 - Implies failure. Either internal failure or invalid options or
* invalid values. Look at syslog for details.
*/

#define GLOBAL_OPTION_HEARTBEAT "HEARTBEAT_INTERVAL"

int event_set_global_options(const char *options);



/*
* A way to read the current values for global options.
* Refer above for details.
*
* Input:
* None
*
* Output:
* options - A buffer for c string holding JSON string of the options.
*
* options_size - Size of options buffer. The size must include space
* for terminating NULL character. If string to be copied
* is same or greater size, (options_size - 1) bytes
* are copied with a terminating NULL.
*
*
* Return:
* > 0 - Count of characters of the final JSON string to return. If given
* size is less/equal, then it implies the buffer carries truncated string.
* NOTE: The final copied string is always null terminated.
*
* < 0 - Implies failure to reach eventd service.
*
*/
int event_get_global_options(char *options, int options_size);

#ifdef __cplusplus
}
#endif
Expand Down
21 changes: 20 additions & 1 deletion tests/events_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static event_serialized_lst_t server_rd_lst, server_wr_lst;
void serve_commands()
{
int code;
event_serialized_lst_t lst;
event_serialized_lst_t lst, opt_lst;
EXPECT_EQ(0, service_svr.init_server(zmq_ctx, 1000));
while(!do_terminate) {
if (0 != service_svr.channel_read(code, lst)) {
Expand Down Expand Up @@ -51,6 +51,16 @@ void serve_commands()
server_ret = 0;
server_wr_lst = lst;
break;
case EVENT_OPTIONS:
server_ret = 0;
if (lst.empty()) {
server_wr_lst = opt_lst;
}
else {
opt_lst = lst;
server_wr_lst.clear();
}
break;
default:
EXPECT_TRUE(false);
server_ret = -1;
Expand Down Expand Up @@ -118,10 +128,19 @@ TEST(events_common, cache_cmds)
EXPECT_FALSE(server_rd_lst.empty());
EXPECT_EQ(s1, s);

string sopt("{\"HEARTBEAT_INTERVAL\": 2000, \"OFFLINE_CACHE_SIZE\": 500}");
char rd_opt[100];
rd_opt[0] = 0;
EXPECT_EQ(0, service_cl.global_options_set(sopt.c_str()));
EXPECT_LT(0, service_cl.global_options_get(rd_opt, (int)sizeof(rd_opt)));
EXPECT_EQ(EVENT_OPTIONS, server_rd_code);
EXPECT_EQ(sopt, string(rd_opt));

do_terminate = true;
service_cl.close_service();
EXPECT_FALSE(service_cl.is_active());
thr.join();
zmq_ctx_term(zmq_ctx);
}


Loading

0 comments on commit 56b0f18

Please sign in to comment.