Skip to content

Commit c0bc805

Browse files
committed
Implement proposed k8s-stream-file format
Instead of parsing the contents of the data read from the `stdout` and `stderr` pipes, this commit adds support for a "stream" format, named `k8s-stream-file`, which just records what is read from a pipe to disk. It significantly saves on CPU spend processing the buffer read, uses only 2 I/O vectors, and never touches the memory read from the pipe. This is an updated implementation of cri-o/cri-o#1605.
1 parent 3161452 commit c0bc805

File tree

1 file changed

+135
-5
lines changed

1 file changed

+135
-5
lines changed

src/ctr_logging.c

+135-5
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ static inline int sd_journal_sendv(G_GNUC_UNUSED const struct iovec *iov, G_GNUC
2424
/* Different types of container logging */
2525
static gboolean use_journald_logging = FALSE;
2626
static gboolean use_k8s_logging = FALSE;
27+
static gboolean use_k8s_stream_logging = FALSE;
2728

2829
/* Value the user must input for each log driver */
2930
static const char *const K8S_FILE_STRING = "k8s-file";
31+
static const char *const K8S_STREAM_FILE_STRING = "k8s-stream-file";
3032
static const char *const JOURNALD_FILE_STRING = "journald";
3133

3234
/* Max log size for any log file types */
@@ -64,6 +66,7 @@ static void parse_log_path(char *log_config);
6466
static const char *stdpipe_name(stdpipe_t pipe);
6567
static int write_journald(int pipe, char *buf, ssize_t num_read);
6668
static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
69+
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
6770
static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen);
6871
static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len);
6972
static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf);
@@ -134,9 +137,10 @@ void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, char *cuu
134137
* parse_log_path branches on log driver type the user inputted.
135138
* log_config will either be a ':' delimited string containing:
136139
* <DRIVER_NAME>:<PATH_NAME> or <PATH_NAME>
137-
* in the case of no colon, the driver will be kubernetes-log-file,
140+
* in the case the log driver is 'k8s-stream-file', the <PATH_NAME> must be present.
141+
* in the case of no colon, the driver will be k8s-file,
138142
* in the case the log driver is 'journald', the <PATH_NAME> is ignored.
139-
* exits with error if <DRIVER_NAME> isn't 'journald' or 'kubernetes-log-file'
143+
* exits with error if <DRIVER_NAME> isn't 'journald' or 'k8s-file'
140144
*/
141145
static void parse_log_path(char *log_config)
142146
{
@@ -165,6 +169,17 @@ static void parse_log_path(char *log_config)
165169
return;
166170
}
167171

172+
// Driver is k8s-file or empty
173+
if (!strcmp(driver, K8S_STREAM_FILE_STRING)) {
174+
if (path == NULL) {
175+
nexitf("k8s-stream-file requires a filename");
176+
}
177+
use_k8s_logging = TRUE;
178+
use_k8s_stream_logging = TRUE;
179+
k8s_log_path = path;
180+
return;
181+
}
182+
168183
// Driver is k8s-file or empty
169184
if (!strcmp(driver, K8S_FILE_STRING)) {
170185
if (path == NULL) {
@@ -188,9 +203,19 @@ static void parse_log_path(char *log_config)
188203
/* write container output to all logs the user defined */
189204
bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read)
190205
{
191-
if (use_k8s_logging && write_k8s_log(pipe, buf, num_read) < 0) {
192-
nwarn("write_k8s_log failed");
193-
return G_SOURCE_CONTINUE;
206+
if (use_k8s_logging) {
207+
if (use_k8s_stream_logging) {
208+
if (write_k8s_log(pipe, buf, num_read) < 0) {
209+
nwarn("write_k8s_log failed");
210+
return G_SOURCE_CONTINUE;
211+
}
212+
}
213+
else {
214+
if (write_k8s_stream_log(pipe, buf, num_read) < 0) {
215+
nwarn("write_k8s_stream_log failed");
216+
return G_SOURCE_CONTINUE;
217+
}
218+
}
194219
}
195220
if (use_journald_logging && write_journald(pipe, buf, num_read) < 0) {
196221
nwarn("write_journald failed");
@@ -540,3 +565,108 @@ void sync_logs(void)
540565
if (fsync(k8s_log_fd) < 0)
541566
pwarn("Failed to sync log file before exit");
542567
}
568+
569+
570+
/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout 9999999999 999999999 ") + 1 */
571+
#define TSSTREAMBUFLEN 128
572+
573+
/*
574+
* PROPOSED: CRI Stream Format, variable length file format
575+
*/
576+
static int set_k8s_stream_timestamp(char *buf, ssize_t bufsiz, ssize_t *tsbuflen, const char *pipename, uint64_t offset, ssize_t buflen, ssize_t *btbw)
577+
{
578+
struct tm *tm;
579+
struct timespec ts;
580+
char off_sign = '+';
581+
int off, len, err = -1;
582+
583+
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
584+
/* If CLOCK_REALTIME is not supported, we set nano seconds to 0 */
585+
if (errno == EINVAL) {
586+
ts.tv_nsec = 0;
587+
} else {
588+
return err;
589+
}
590+
}
591+
592+
if ((tm = localtime(&ts.tv_sec)) == NULL)
593+
return err;
594+
595+
off = (int)tm->tm_gmtoff;
596+
if (tm->tm_gmtoff < 0) {
597+
off_sign = '-';
598+
off = -off;
599+
}
600+
601+
len = snprintf(buf, bufsiz, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s %lud %ld ", tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
602+
tm->tm_hour, tm->tm_min, tm->tm_sec, ts.tv_nsec, off_sign, off / 3600, off % 3600, pipename, offset, buflen);
603+
604+
if (len < bufsiz)
605+
err = 0;
606+
607+
*tsbuflen = len;
608+
*btbw = len + buflen;
609+
return err;
610+
}
611+
612+
613+
/*
614+
* PROPOSED: CRI Stream Format, variable length file format
615+
*
616+
* %d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %(stream)s %(offset)lud %(buflen)ld %(buf)s
617+
*
618+
* The CRI stream fromat requires us to write each buffer read with a
619+
* timestamp, stream, length (human readable ascii), and the buffer contents
620+
* read (with a space character separating the buffer length string from the
621+
* buffer.
622+
*/
623+
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
624+
{
625+
writev_buffer_t bufv = {0};
626+
char tsbuf[TSSTREAMBUFLEN];
627+
static ssize_t bytes_written = 0;
628+
static uint64_t offset = 0;
629+
ssize_t bytes_to_be_written = 0;
630+
ssize_t tsbuflen = 0;
631+
632+
/*
633+
* Use the same timestamp for every line of the log in this buffer.
634+
* There is no practical difference in the output since write(2) is
635+
* fast.
636+
*/
637+
if (set_k8s_stream_timestamp(tsbuf, sizeof tsbuf, &tsbuflen, stdpipe_name(pipe), buflen, offset, &bytes_to_be_written))
638+
/* TODO: We should handle failures much more cleanly than this. */
639+
return -1;
640+
641+
/*
642+
* We re-open the log file if writing out the bytes will exceed the max
643+
* log size. We also reset the state so that the new file is started with
644+
* a timestamp.
645+
*/
646+
if ((opt_log_size_max > 0) && (bytes_written + bytes_to_be_written) > opt_log_size_max) {
647+
bytes_written = 0;
648+
649+
reopen_k8s_file();
650+
}
651+
652+
/* Output the timestamp, stream, and length */
653+
if (writev_buffer_append_segment(k8s_log_fd, &bufv, tsbuf, tsbuflen) < 0) {
654+
nwarn("failed to write (timestamp, stream) to log");
655+
goto stream_next;
656+
}
657+
658+
/* Output the actual contents. */
659+
if (writev_buffer_append_segment(k8s_log_fd, &bufv, buf, buflen) < 0) {
660+
nwarn("failed to write buffer to log");
661+
}
662+
663+
stream_next:
664+
bytes_written += bytes_to_be_written;
665+
offset += (uint64_t)bytes_to_be_written;
666+
667+
if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) {
668+
nwarn("failed to flush buffer to log");
669+
}
670+
671+
return 0;
672+
}

0 commit comments

Comments
 (0)