Skip to content

Commit

Permalink
move loki client into separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 15, 2023
1 parent 91366c2 commit 1d026ac
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
3 changes: 2 additions & 1 deletion internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/canonical/pebble/internals/overlord/logstate/loki"

Check failure on line 22 in internals/overlord/logstate/gatherer.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s Prefix(github.com/canonical/pebble) (gci)
"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"

Check failure on line 25 in internals/overlord/logstate/gatherer.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s Prefix(github.com/canonical/pebble) (gci)
Expand Down Expand Up @@ -312,7 +313,7 @@ type logClient interface {
func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
case plan.LokiTarget:
return newLokiClient(target), nil
return loki.NewClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate
package loki

import (
"bytes"
Expand All @@ -31,11 +31,11 @@ import (
)

const (
maxLokiRequestSize = 10
lokiRequestTimeout = 10 * time.Second
maxRequestSize = 10
requestTimeout = 10 * time.Second
)

type lokiClient struct {
type Client struct {
target *plan.LogTarget
remoteURL string
// buffered entries are "sharded" by service name
Expand All @@ -45,32 +45,32 @@ type lokiClient struct {
httpClient http.Client
}

func newLokiClient(target *plan.LogTarget) logClient {
return &lokiClient{
func NewClient(target *plan.LogTarget) *Client {
return &Client{
target: target,
remoteURL: target.Location,
entries: map[string][]lokiEntry{},
httpClient: http.Client{Timeout: lokiRequestTimeout},
httpClient: http.Client{Timeout: requestTimeout},
}
}

func (c *lokiClient) Write(ctx context.Context, entry servicelog.Entry) error {
c.entries[entry.Service] = append(c.entries[entry.Service], asLokiEntry(entry))
func (c *Client) Write(ctx context.Context, entry servicelog.Entry) error {
c.entries[entry.Service] = append(c.entries[entry.Service], encodeEntry(entry))
c.numEntries++
if c.numEntries >= maxLokiRequestSize {
if c.numEntries >= maxRequestSize {
return c.Flush(ctx)
}
return nil
}

func asLokiEntry(entry servicelog.Entry) lokiEntry {
func encodeEntry(entry servicelog.Entry) lokiEntry {
return lokiEntry{
strconv.FormatInt(entry.Time.UnixNano(), 10),
strings.TrimSuffix(entry.Message, "\n"),
}
}

func (c *lokiClient) Flush(ctx context.Context) error {
func (c *Client) Flush(ctx context.Context) error {
if c.numEntries == 0 {
return nil // no-op
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *lokiClient) Flush(ctx context.Context) error {
return nil
}

func (c *lokiClient) buildRequest() lokiRequest {
func (c *Client) buildRequest() request {
// Sort keys to guarantee deterministic output
var services []string
for svc, entries := range c.entries {
Expand All @@ -124,10 +124,10 @@ func (c *lokiClient) buildRequest() lokiRequest {
}
sort.Strings(services)

var req lokiRequest
var req request
for _, service := range services {
entries := c.entries[service]
stream := lokiStream{
stream := stream{
Labels: map[string]string{
"pebble_service": service,
},
Expand All @@ -138,18 +138,18 @@ func (c *lokiClient) buildRequest() lokiRequest {
return req
}

func (c *lokiClient) emptyBuffer() {
func (c *Client) emptyBuffer() {
for svc := range c.entries {
c.entries[svc] = c.entries[svc][:0]
}
c.numEntries = 0
}

type lokiRequest struct {
Streams []lokiStream `json:"streams"`
type request struct {
Streams []stream `json:"streams"`
}

type lokiStream struct {
type stream struct {
Labels map[string]string `json:"stream"`
Entries []lokiEntry `json:"values"`
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logstate
package loki

import (
"context"
Expand Down Expand Up @@ -113,7 +113,7 @@ func (*lokiSuite) TestLokiRequest(c *C) {
}))
defer server.Close()

client := newLokiClient(&plan.LogTarget{Location: server.URL})
client := NewClient(&plan.LogTarget{Location: server.URL})
for _, entry := range input {
err := client.Write(context.Background(), entry)
c.Assert(err, IsNil)
Expand All @@ -135,7 +135,7 @@ func (*lokiSuite) TestLokiFlushCancelContext(c *C) {
defer server.Close()
defer killServer()

client := newLokiClient(&plan.LogTarget{Location: server.URL})
client := NewClient(&plan.LogTarget{Location: server.URL})
err := client.Write(context.Background(), servicelog.Entry{
Time: time.Now(),
Service: "svc1",
Expand Down

0 comments on commit 1d026ac

Please sign in to comment.