Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] custom: Go plugin support for custom plugins to support OTel extensions #9470

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build_go_extensions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

# !! TODO: move to a Makefile

CGO_CFLAGS="-I${HOME}/src/fluent-bit/include -I${HOME}/src/fluent-bit/lib/monkey/include -I${HOME}/src/fluent-bit/build/lib/monkey/include/monkey -I${HOME}/src/fluent-bit/lib/cfl/include -I${HOME}/src/fluent-bit/lib/cfl/lib/xxhash -I${HOME}/src/fluent-bit/lib/cmetrics/include -I${HOME}/src/fluent-bit/lib/flb_libco -I${HOME}/src/fluent-bit/lib/c-ares-1.33.1/include -I${HOME}/src/fluent-bit/lib/msgpack-c/include -I${HOME}/src/fluent-bit/lib/ctraces/include -I${HOME}/src/fluent-bit/lib/ctraces/lib/mpack/src" CGO_LDFLAGS="-L${HOME}/src/fluent-bit/build/lib -lfluent-bit" sh -c 'echo CGO_CFLAGS=${CGO_CFLAGS}; echo CGO_LDFLAGS=${CGO_LDFLAGS}; go build -buildmode=c-shared -o build/bin/custom_extensions_go.so github.com/fluent/fluent-bit/plugins/custom_extensions_go'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's avoid adding extra dev files to the repo.

3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/fluent/fluent-bit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's avoid adding extra dev files to the repo.


go 1.23.0
11 changes: 11 additions & 0 deletions include/fluent-bit/flb_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,20 @@
#include <fluent-bit/flb_metrics.h>
#endif

/* Custom plugin types */
#define FLB_CUSTOM_PLUGIN_CORE 0
#define FLB_CUSTOM_PLUGIN_PROXY 1

struct flb_custom_instance;

struct flb_custom_plugin {
/*
* The type defines if this is a core-based plugin or it's handled by
* some specific proxy.
*/
int type;
void *proxy;

int flags; /* Flags (not available at the moment */
char *name; /* Custom plugin short name */
char *description; /* Description */
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
/* Plugin Types */
#define FLB_PROXY_INPUT_PLUGIN 1
#define FLB_PROXY_OUTPUT_PLUGIN 2
#define FLB_PROXY_CUSTOM_PLUGIN 3

/* Proxies available */
#define FLB_PROXY_GOLANG 11
Expand Down
103 changes: 103 additions & 0 deletions plugins/custom_extensions_go/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Fluent Bit Go!
// ==============
// Copyright (C) 2024 The Fluent Bit Go Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package main

// #include <stdlib.h>
// #include "fluent-bit/flb_plugin.h"
// #include "fluent-bit/flb_plugin_proxy.h"
// #include "fluent-bit/flb_custom.h"
import "C"

import (
"fmt"
"time"
"unsafe"
)

// Define constants matching Fluent Bit core
const (
FLB_ERROR = C.FLB_ERROR
FLB_OK = C.FLB_OK
FLB_RETRY = C.FLB_RETRY

FLB_PROXY_CUSTOM_PLUGIN = C.FLB_CF_CUSTOM
FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG
)

// Local type to define a plugin definition
type (
FLBPluginProxyDef C.struct_flb_plugin_proxy_def
FLBCustomInstance C.struct_flb_custom_instance
)

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
//
//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer, name, desc string) int {
p := (*FLBPluginProxyDef)(def)
p._type = FLB_PROXY_CUSTOM_PLUGIN
p.proxy = FLB_PROXY_GOLANG
p.flags = 0
p.name = C.CString(name)
p.description = C.CString(desc)
return 0
}

// (fluentbit will call this)
// plugin (context) pointer to fluentbit context (state/ c code)
//
//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
extensions := FLBPluginConfigKey(plugin, "extensions")
fmt.Printf("[flb-go] extensions = '%s'\n", extensions)
go func() {
for {
fmt.Printf("[flb-go] Go extensions alive %v\n", time.Now())
time.Sleep(10 * time.Second)
}
}()
return FLB_OK
}

// Release resources allocated by the plugin initialization
//
//export FLBPluginUnregister
func FLBPluginUnregister(def unsafe.Pointer) {
p := (*FLBPluginProxyDef)(def)
C.free(unsafe.Pointer(p.name))
C.free(unsafe.Pointer(p.description))
}

//export FLBPluginExit
func FLBPluginExit() int {
return FLB_OK
}

func FLBPluginConfigKey(plugin unsafe.Pointer, key string) string {
k := C.CString(key)
p := plugin
v := C.GoString(C.flb_custom_get_property(k, (*C.struct_flb_custom_instance)(p)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to implement flb_custom_get_property as an API here if we create custom plugin interface like as input and output:
https://github.com/fluent/fluent-bit/blob/master/src/flb_api.c#L47-L48

C.free(unsafe.Pointer(k))
return v
}

func main() {
}
82 changes: 82 additions & 0 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_custom.h>

/* Proxies */
#include "proxy/go/go.h"
Expand Down Expand Up @@ -418,6 +419,42 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
return 0;
}

int flb_proxy_custom_cb_init(struct flb_custom_instance *ins,
struct flb_config *config, void *data);

static int flb_proxy_custom_cb_exit(void *custom_context,
struct flb_config *config);

static int flb_proxy_register_custom(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
{
struct flb_custom_plugin *custom;

custom = flb_calloc(1, sizeof(struct flb_custom_plugin));
if (!custom) {
flb_errno();
return -1;
}

/* Plugin registration */
custom->type = FLB_CUSTOM_PLUGIN_PROXY;
custom->proxy = proxy;
custom->flags = def->flags;
custom->name = flb_strdup(def->name);
custom->description = def->description;
mk_list_add(&custom->_head, &config->custom_plugins);

/*
* Set proxy callbacks: external plugins which are not following
* the core plugins specs, have a different callback approach, so
* we put our proxy-middle callbacks to do the translation properly.
*/
custom->cb_init = flb_proxy_custom_cb_init;
custom->cb_exit = flb_proxy_custom_cb_exit;
return 0;
}

void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol)
{
Expand Down Expand Up @@ -483,6 +520,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
}
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
ret = proxy_go_input_register(proxy, def);
}
else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) {
ret = proxy_go_custom_register(proxy, def);
}
#endif
}
Expand All @@ -497,6 +537,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
flb_proxy_register_input(proxy, def, config);
}
else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) {
flb_proxy_register_custom(proxy, def, config);
}
}

return 0;
Expand Down Expand Up @@ -612,3 +655,42 @@ int flb_plugin_proxy_set(struct flb_plugin_proxy_def *def, int type,

return 0;
}

int flb_proxy_custom_cb_init(struct flb_custom_instance *ins,
struct flb_config *config, void *data)
{
int ret = -1;
struct flb_plugin_proxy_context *pc;
struct flb_plugin_proxy *proxy;

pc = (struct flb_plugin_proxy_context *)(ins->context);
proxy = pc->proxy;

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_custom_init(proxy);
#endif
}

return ret;
}

int flb_proxy_custom_cb_exit(void *custom_context,
struct flb_config *config)
{
int ret = -1;
struct flb_plugin_proxy_context *ctx = custom_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);
if (!custom_context) {
return ret;
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_custom_destroy(ctx);
#endif
}

flb_free(ctx);
return ret;
}
87 changes: 86 additions & 1 deletion src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_custom.h>
#include "./go.h"

/*
Expand All @@ -41,7 +42,7 @@
*
* - name: shortname of the plugin.
* - description: plugin description.
* - type: input, output, filter, whatever.
* - type: input, output, filter, processor, custom.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if processors can be written in Go today cc: @cosmo0920

Copy link
Contributor

@cosmo0920 cosmo0920 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we can't do that. If processors can be written in Go, there is huge opportunity to handle various type of payloads including otel payloads which are belongs to traces type of events.

* - proxy: type of proxy e.g. GOLANG
* - flags: optional flags, not used by Go plugins at the moment.
*
Expand Down Expand Up @@ -286,3 +287,87 @@ void proxy_go_input_unregister(void *data) {
flb_free(plugin->name);
flb_free(plugin);
}

int proxy_go_custom_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def)
{
struct flbgo_custom_plugin *plugin;

plugin = flb_malloc(sizeof(struct flbgo_custom_plugin));
if (!plugin) {
return -1;
}

/*
* Lookup the entry point function:
*
* - FLBPluginInit
* - FLBPluginExit
*
* note: registration callback FLBPluginRegister() is resolved by the
* parent proxy interface.
*/

plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit");
if (!plugin->cb_init) {
flb_error("[go proxy]: could not load FLBPluginInit symbol");
flb_free(plugin);
return -1;
}

plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");

plugin->name = flb_strdup(def->name);

/* This Go plugin context is an opaque data for the parent proxy */
proxy->data = plugin;

return 0;
}

int proxy_go_custom_init(struct flb_plugin_proxy *proxy)
{
int ret = 0;
struct flbgo_custom_plugin *plugin = proxy->data;

/* set the API */
plugin->api = proxy->api;
plugin->i_ins = proxy->instance;
// In order to avoid having the whole instance as part of the ABI we
// copy the context pointer into the plugin.
plugin->context = ((struct flb_custom_instance *)proxy->instance)->context;

ret = plugin->cb_init(plugin);
if (ret <= 0) {
flb_error("[go proxy]: plugin '%s' failed to initialize",
plugin->name);
flb_free(plugin);
return -1;
}

return ret;
}

int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx)
{
int ret = 0;
struct flbgo_custom_plugin *plugin;

plugin = (struct flbgo_custom_plugin *) ctx->proxy->data;
flb_debug("[GO] running exit callback");

if (plugin->cb_exit) {
ret = plugin->cb_exit();
}

return ret;
}

void proxy_go_custom_unregister(void *data) {
struct flbgo_custom_plugin *plugin;

plugin = (struct flbgo_custom_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
}

18 changes: 18 additions & 0 deletions src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ struct flbgo_input_plugin {
int (*cb_exit)();
};

struct flbgo_custom_plugin {
char *name;
void *api;
void *i_ins;
struct flb_plugin_proxy_context *context;

int (*cb_init)();
int (*cb_exit)();
};

int proxy_go_output_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

Expand All @@ -69,4 +79,12 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
void *allocated_data);
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx);
void proxy_go_input_unregister(void *data);

int proxy_go_custom_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

int proxy_go_custom_init(struct flb_plugin_proxy *proxy);

int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx);
void proxy_go_custom_unregister(void *data);
#endif
Loading