Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Stability Refactor + Scheduled timer coroutines => AWS Distro PR #27

Open
wants to merge 22 commits into
base: 2.31.12-all-cherrypicks
Choose a base branch
from

Conversation

PettitWesley
Copy link
Owner


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

This commit combines many changes designed to
improve S3 stability:
- code clean up and refactoring for readability
- re-implement preserve_data_ordering code to
  fix several bugs and simplify logic.
- all uploads now happen outside of cb_flush
- uploads performed with new scheduled timer
  jobs with coroutines
- S3 always uses async IO, with mutex lock
  to protect it from concurrency issues
- remove trailing slash in store_dir to
  prevent double // in final path

Signed-off-by: Wesley Pettit <[email protected]>
@PettitWesley PettitWesley changed the base branch from 2_31_10_all_cherry_picks to 2_31_12_all_cherry_picks September 26, 2023 22:48
@PettitWesley PettitWesley changed the base branch from 2_31_12_all_cherry_picks to 2.31.12-all-cherrypicks September 26, 2023 22:49
Copy link

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass just observing your code. I left some comments for my understanding which will be deleted when I understand the code better and can make more helpful comments.

So far it's looking good in terms log logic. In terms of organization, I suppose Leonardo would need to weigh into that. I'm not fully understanding the single thread vs multi workers since I haven't read that part closely. Also need to still read the timer coro deletion logic section once added to the delete list.

int ret;
void *payload_buf = NULL;
size_t payload_size = 0;
size_t preCompress_size = 0;
time_t file_first_log_time = time(NULL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clay's PR is back from the dead? This looks familiar. I think we previously removed this commit since it was causing instability right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and the goal of the S3 refactor is to bring back all the S3 changes/features and redo and retest them.

@@ -1017,7 +1077,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,

if (m_upload == NULL) {
if (chunk != NULL && time(NULL) >
(chunk->create_time + ctx->upload_timeout + ctx->retry_time)) {
(chunk->create_time + ctx->upload_timeout)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the retry timer logic changing?

@@ -478,6 +480,11 @@ void flb_output_exit(struct flb_config *config)
if (params) {
flb_free(params);
}
params = FLB_TLS_GET(timer_coro_params);
if (params) {
flb_free(params);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to set the FLB_TLS timer_coro_params to NULL to avoid a double free?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This only occurs once during shutdown.

@@ -2122,88 +2068,76 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
event_chunk->tag,
flb_sds_len(event_chunk->tag));

if (upload_file == NULL) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this section get's the file_first_log_time from (1) a log with a timestamp, (2) archived timestamp (3) current time.

Comment on lines -2132 to -2134
/* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */
if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) {
flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic change, does this explain it correctly?:

  1. Unify upload failure deletion logic to the callback function
  2. Just buffer the chunk here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Callback handles all uploads and failure handling
  2. Flush just buffers

Comment on lines 2013 to 2014
ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, flb_output_coro_timer_cb, timer_data, NULL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: Check if this function is run multiple times if timer_data will be overwritten.

if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
s3_store_file_lock(upload_file);
/* sends only happen from upload daemon coroutine which iterates over queue */
mk_list_add(&upload_file->_head, &ctx->upload_queue);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is threadsafe right? Like, what if two workers add to the list at the same time?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH I was thinking we'd still only support 1 worker for S3... but yea if I add a lock here then I think we can probably support multiple... the timer coros have a lock for the ready to upload chunk list...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the code that interacts with any list in cb_s3_flush needs to be protected.

The chunk list, the upload list, these can't be iterated over if another thread is modifying them.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added pthread_mutex_lock(&ctx->cb_flush_mutex); at beginning of cb_s3_flush

@@ -393,6 +393,37 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
return 0;
}

static int flb_running_count(struct flb_config *config)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used exclusively for graceful shutdowns

th_ins = flb_output_thread_instance_get();
pthread_mutex_lock(&th_ins->timer_mutex);
mk_list_del(&timer_coro->_head);
mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list_destroy);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: understand destroy list logic better

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea... I forgot to implement deletion, its in the design: fluent#7466


FLB_TLS_SET(timer_coro_params, params);
coro->caller = co_active();
flb_coro_resume(coro);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that this will begin the callback from the start. From this point on the network events will take over control

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's the intention

Copy link

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review 2/3 - This is a closer review of the timer logic (did not review s3 changes in detail): Looks good for the most part in terms of logic! Missing cleanup/timer delete logic. Wrote some organizational and data structure recommendations.

Comment on lines 2008 to 2014
timer_data->ins = ctx->ins;
timer_data->job_name = job_name;
timer_data->cb = timer_coro_cb;
timer_data->data = ctx;

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, flb_output_coro_timer_cb, timer_data, NULL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing that this works. Pretty complicated logic.
This is slightly akward. Most of the timer is exposed in the fluent bit core lib. However an integral part of the timer functionality (creating the async timer) is implemented at the plugin level.

I would recommend extracting to a function such as:
flb_sched_async_timer_cb_create(ins, job_name, async_cb, data, etc)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please follow up on this comment?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can do this

FLB_TLS_SET(timer_coro_params, params);
coro->caller = co_active();
flb_coro_resume(coro);
return;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have a return at the bottom of your function? If it's best practices then that's good, but I'll have to adjust how I write void functions in the future.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I think its not best practice?

}

/* timer coro is complete; yield back to caller/control code */
flb_coro_yield(coro, FLB_TRUE);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete logic still to be implemented likely in engine and output_thread event loop clean up section.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea... I forgot to implement deletion, its in the design: fluent#7466

* but modifying the pending chunk and upload lists, and deleting S3 store files needs
* to be concurrent safe
*/
pthread_mutex_t flush_mutex;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is slightly confusing since it shares the name of the core flush_mutex on the output thread. Consider renaming to upload_queue_mutex

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/*
* stores timer coros on the timer_coro_list, if the output uses them
*/
struct flb_output_timer_coro {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data structures look good! But in terms of organization, would you consider renaming all flb_output_timer_... to flb_async_timer_... and moving it to a new core file called flb_async_timer.x.

This will highlight that your functions operate as a layer on top of the existing coroutine logic, and does not have interweaved logic. (calls existing coro functions, existing coro and flush functions do not call the new functions)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I think output is still needed in the name since in the future we could have timer coros for other plugin types as well I think.

So I think I'd do flb_async_timer.c and flb_out_async_timer...

Copy link

@matthewfala matthewfala Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally the convention is to prefix with flb_<file_name>_<function_name>. So the file should either be called flb_out_async_timer.c/h or the function should be called flb_async_timer_x (see flb_regex.h)

Is "out" used to indicate that the timer is only intended to be used in the output plugin? I believe the timer can be scheduled anywhere? That might be incorrect actually, or is it?

Comment on lines 92 to 94
struct mk_list timer_coro_list; /* flush context list */
struct mk_list timer_coro_list_destroy; /* flust context destroy list */
pthread_mutex_t timer_mutex; /* mutex for 'flush_list' */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This review isn't for style but incase we forget later, the comments here may be copied from flush_... and pends update

/* flush context list */

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

* If the output uses timer coros, then this is used as the callback data
* passed to flb_sched_timer_cb_create
*/
struct flb_output_coro_timer_data {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended rename to flb_output_coro_timer_initialization_data or better flb_async_timer_initialization_data

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about flb_out_async_timer_cb_data since its for the callback..

Copy link

@matthewfala matthewfala Oct 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great for this function name to include the word initialization to differentiate it from the user callback and user provided data void *data;. This datastructure is explicitly used to initialize the async timer, but is not for the user provided callback. Also to differentiate from the coro cb async_timer_cb

Name should align with:
https://github.com/PettitWesley/fluent-bit/pull/27/files#r1345008308

These two pointers go together - they are responsible for initializing an async_timer:

  • flb_async_timer_initialization_data
  • flb_async_timer_initialization_cb

Likewise we have for the user provided callback the same convention:

  • cb
  • data

struct flb_output_coro_timer_data {
struct flb_output_instance *ins; /* associate coro with this output instance */
flb_sds_t job_name; /* used on engine shutdown, print pending "custom" jobs */
void (*cb) (struct flb_config *config, void *data); /* call this output callback in the coro */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming to async_cb to differentiate from the standard scheduled timer cb

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

src/flb_engine.c Outdated
Comment on lines 403 to 407
mk_list_foreach_safe(head, tmp, &config->outputs) {
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
n = flb_output_timer_coros_size(o_ins);
timers = timers + n;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting function to flb_async_timer_running_count(config) for style. No strong convictions on this one.

src/flb_engine.c Outdated
Comment on lines 421 to 424
mk_list_foreach_safe(head, tmp, &config->outputs) {
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
flb_output_timer_coros_print(o_ins);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting function to flb_async_timer_running_print(config) for style. No strong convictions on this one.

@PettitWesley
Copy link
Owner Author

@matthewfala So I like the idea of storing the async timer lists on the scheduler struct, which could make it generic and not associated with outputs.

However, I am seeing a problem this idea.

I have methods flb_thread_pool_async_timers_size and flb_thread_pool_async_timers_print : https://github.com/PettitWesley/fluent-bit/pull/27/files#diff-27b244cc7913bcd78c3395ca61c3f13822acfdf90e384a171b7eee0c73a01f6aR124

These need to collect all active timers. This is used to print timers on shutdown that are still running, and to wait the graceful shutdown until they are done.

The sched struct right now is one per event loop AFAICT. SO one in the engine, one in each output thread. But there's no reference between the output thread and the sched instance. There's no centralized list of all sched instances. I need something like that in order for these functions to work. Basically, this has to be tied to the fact that its running on some output worker event loop. I think the simplest solution would be to link the thread that the sched is for together- by adding a reference on the thread to its sched.

@PettitWesley
Copy link
Owner Author

There are two key ways I need to track async timer coros:

  1. On shutdown we must track how many pending coros there are, and right now there’s no way to grab all of the sched structures to count these new coros. A sched is associated with an event loop, so both the engine and each worker has one.
  2. S3 output needs to create one scheduled timer in each of its output threads, so that it can get a coro every interval N in each worker thread so each worker can perform upload work (including doing gzip compression, which takes a lot of CPU and is the main reason customers ask for multi-workers).

For Problem 1, I want to propose:

  1. The output thread has a reference to its sched. This means you can iterate over the outputs and each's output threads and then get each sched and get each timer coro. Like this: 6d3ab25

@PettitWesley
Copy link
Owner Author

S3 output needs to create one scheduled timer in each of its output threads, so that it can get a coro every interval N in each worker thread so each worker can perform upload work (including doing gzip compression, which takes a lot of CPU and is the main reason customers ask for multi-workers).

With flb_output_thread_instance_get the output can get the pointer to the current worker thread that its running in. It can store these addresses in an array same length as number of workers. Each flush, it checks if the worker ID is new and not in its list, if so, it starts a new scheduled timer in that worker.

@PettitWesley
Copy link
Owner Author

Eduardo, wants it to be called flb_coro_timer


mk_list_foreach(head, &tp->list_threads) {
th = mk_list_entry(head, struct flb_tp_thread, _head);
if (th->status != FLB_THREAD_POOL_RUNNING) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One interesting thing, FLB_THREAD_POOL_RUNNING just tells you the thread has been started, the FLB_THREAD_POOL_STOPPED value is never set.

Copy link

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants