Skip to content
This repository has been archived by the owner on Jan 22, 2024. It is now read-only.

File formatting: client-example/main.go #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
go.mod
go.sum
27 changes: 14 additions & 13 deletions examples/client-example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"
"log"
"os"
"promtail/promtail"
"time"
"github.com/afiskon/promtail-client/promtail"
)

func displayUsage() {
Expand All @@ -20,13 +20,13 @@ func displayInvalidName(arg string) {

func nameIsValid(name string) bool {
for _, c := range name {
if !((c >= 'a' && c <= 'z') ||
(c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') ||
(c == '-') || (c == '_')) {
return false
}
}
if !((c >= 'a' && c <= 'z') ||
(c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') ||
(c == '-') || (c == '_')) {
return false
}
}
return true
}

Expand All @@ -50,19 +50,19 @@ func main() {
displayInvalidName("job-name")
}

labels := "{source=\""+source_name+"\",job=\""+job_name+"\"}"
labels := "{source=\"" + source_name + "\",job=\"" + job_name + "\"}"
conf := promtail.ClientConfig{
PushURL: "http://localhost:3100/api/prom/push",
Labels: labels,
BatchWait: 5 * time.Second,
BatchEntriesNumber: 10000,
SendLevel: promtail.INFO,
PrintLevel: promtail.ERROR,
BatchEntriesNumber: 100,
SendLevel: promtail.INFO,
PrintLevel: promtail.ERROR,
}

var (
loki promtail.Client
err error
err error
)

if format == "proto" {
Expand All @@ -82,6 +82,7 @@ func main() {
loki.Infof("source = %s, time = %s, i = %d\n", source_name, tstamp, i)
loki.Warnf("source = %s, time = %s, i = %d\n", source_name, tstamp, i)
loki.Errorf("source = %s, time = %s, i = %d\n", source_name, tstamp, i)
loki.LogfWithLabel("{source=\"new_"+source_name+"\"}", "source = %s, time = %s, i = %d\n", source_name, tstamp, i)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this line shouldn't be in this patch.

time.Sleep(1 * time.Second)
}

Expand Down
7 changes: 6 additions & 1 deletion promtail/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type Client interface {
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
Errorf(format string, args ...interface{})
DebugfWithLabel(label, format string, args ...interface{})
InfofWithLabel(label, format string, args ...interface{})
WarnfWithLabel(label, format string, args ...interface{})
ErrorfWithLabel(label, format string, args ...interface{})
LogfWithLabel(label, format string, args ...interface{})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Shutdown()
}

Expand Down Expand Up @@ -67,4 +72,4 @@ func (client *httpClient) sendJsonReq(method, url string, ctype string, reqBody
}

return resp, resBody, nil
}
}
79 changes: 60 additions & 19 deletions promtail/jsonclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"time"
)

type jsonLogEntryLabel struct {
Label string `json:"label"`
Ts time.Time `json:"ts"`
Line string `json:"line"`
level LogLevel // not used in JSON
}
type jsonLogEntry struct {
Ts time.Time `json:"ts"`
Line string `json:"line"`
level LogLevel // not used in JSON
level LogLevel // not used in JSON
}

type promtailStream struct {
Labels string `json:"labels"`
Entries []*jsonLogEntry `json:"entries"`
Expand All @@ -26,7 +31,7 @@ type promtailMsg struct {
type clientJson struct {
config *ClientConfig
quit chan struct{}
entries chan *jsonLogEntry
entries chan *jsonLogEntryLabel
waitGroup sync.WaitGroup
client httpClient
}
Expand All @@ -35,7 +40,7 @@ func NewClientJson(conf ClientConfig) (Client, error) {
client := clientJson{
config: &conf,
quit: make(chan struct{}),
entries: make(chan *jsonLogEntry, LOG_ENTRIES_CHAN_SIZE),
entries: make(chan *jsonLogEntryLabel, LOG_ENTRIES_CHAN_SIZE),
client: httpClient{},
}

Expand All @@ -46,24 +51,47 @@ func NewClientJson(conf ClientConfig) (Client, error) {
}

func (c *clientJson) Debugf(format string, args ...interface{}) {
c.log(format, DEBUG, "Debug: ", args...)
c.log("", format, DEBUG, "Debug: ", args...)
}

func (c *clientJson) Infof(format string, args ...interface{}) {
c.log(format, INFO, "Info: ", args...)
c.log("", format, INFO, "Info: ", args...)
}

func (c *clientJson) Warnf(format string, args ...interface{}) {
c.log(format, WARN, "Warn: ", args...)
c.log("", format, WARN, "Warn: ", args...)
}

func (c *clientJson) Logf(format string, args ...interface{}) {
c.log("", format, INFO, "", args...)
}
func (c *clientJson) Errorf(format string, args ...interface{}) {
c.log(format, ERROR, "Error: ", args...)
c.log("", format, ERROR, "Error: ", args...)
}

func (c *clientJson) log(format string, level LogLevel, prefix string, args ...interface{}) {
func (c *clientJson) DebugfWithLabel(label, format string, args ...interface{}) {
c.log(label, format, DEBUG, "Debug: ", args...)
}

func (c *clientJson) InfofWithLabel(label, format string, args ...interface{}) {
c.log(label, format, INFO, "Info: ", args...)
}

func (c *clientJson) WarnfWithLabel(label, format string, args ...interface{}) {
c.log(label, format, WARN, "Warn: ", args...)
}

func (c *clientJson) ErrorfWithLabel(label, format string, args ...interface{}) {
c.log(label, format, ERROR, "Error: ", args...)
}
func (c *clientJson) LogfWithLabel(label, format string, args ...interface{}) {
c.log(label, format, INFO, "", args...)
}

func (c *clientJson) log(label, format string, level LogLevel, prefix string, args ...interface{}) {
if (level >= c.config.SendLevel) || (level >= c.config.PrintLevel) {
c.entries <- &jsonLogEntry{
c.entries <- &jsonLogEntryLabel{
Label: label,
Ts: time.Now(),
Line: fmt.Sprintf(prefix+format, args...),
level: level,
Expand All @@ -77,7 +105,8 @@ func (c *clientJson) Shutdown() {
}

func (c *clientJson) run() {
var batch []*jsonLogEntry
var batch map[string][]*jsonLogEntry
batch = make(map[string][]*jsonLogEntry)
batchSize := 0
maxWait := time.NewTimer(c.config.BatchWait)

Expand All @@ -99,33 +128,45 @@ func (c *clientJson) run() {
}

if entry.level >= c.config.SendLevel {
batch = append(batch, entry)
if entry.Label == "" {
entry.Label = c.config.Labels
}
batch[entry.Label] = append(batch[entry.Label], &jsonLogEntry{
Ts: entry.Ts,
level: entry.level,
Line: entry.Line,
})

batchSize++
if batchSize >= c.config.BatchEntriesNumber {
c.send(batch)
batch = []*jsonLogEntry{}
batch = make(map[string][]*jsonLogEntry)

batchSize = 0
maxWait.Reset(c.config.BatchWait)
}
}
case <-maxWait.C:
if batchSize > 0 {
c.send(batch)
batch = []*jsonLogEntry{}
batch = make(map[string][]*jsonLogEntry)

batchSize = 0
}
maxWait.Reset(c.config.BatchWait)
}
}
}

func (c *clientJson) send(entries []*jsonLogEntry) {
func (c *clientJson) send(entries map[string][]*jsonLogEntry) {
var streams []promtailStream
streams = append(streams, promtailStream{
Labels: c.config.Labels,
Entries: entries,
})

for k, v := range entries {
streams = append(streams, promtailStream{
Labels: k,
Entries: v,
})
}
msg := promtailMsg{Streams: streams}
jsonMsg, err := json.Marshal(msg)
if err != nil {
Expand Down
Loading