Skip to content

Commit 11cd4f8

Browse files
committed
feat: Add new component loki.source.string
Update documentation and changelog
1 parent 139e5b7 commit 11cd4f8

File tree

5 files changed

+256
-0
lines changed

5 files changed

+256
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ Main (unreleased)
3737

3838
- Add `truncate` stage for `loki.process` to truncate log entries, label values, and structured_metadata values. (@dehaansa)
3939

40+
- Add a new component `loki.source.string` (@ravishankar15)
41+
4042
### Enhancements
4143

4244
- Add support of `tls` in components `loki.source.(awsfirehose|gcplog|heroku|api)` and `prometheus.receive_http` and `pyroscope.receive_http`. (@fgouteroux)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
---
2+
canonical: https://grafana.com/docs/alloy/latest/reference/components/loki/loki.source.string/
3+
aliases:
4+
- ../loki.source.string/ # /docs/alloy/latest/reference/components/loki.source.string/
5+
description: Learn about loki.string
6+
labels:
7+
stage: general-availability
8+
products:
9+
- oss
10+
title: loki.source.string
11+
---
12+
13+
# `loki.source.string`
14+
15+
`loki.source.string` receives log entries as string from other components and can be ingested to any loki component.
16+
17+
You can specify multiple `loki.source.string` components by giving them different labels.
18+
19+
## Usage
20+
21+
```alloy
22+
loki.source.string "<LABEL>" {
23+
source = <TARGET>
24+
forward_to = <RECEIVER>
25+
}
26+
```
27+
28+
## Arguments
29+
30+
The components consumes the value from the source and converts them to log entries passing them to other components receivers in `forward_to`
31+
32+
| Name | Type | Description | Default | Required |
33+
| ----------------------- | -------------------- | ---------------------------------------------------------- | ------- | -------- |
34+
| `source` | `string` | A value pointing to a string source. | | yes |
35+
| `forward_to` | `LogsReceiver` | Receiver to send log entries to. | | yes |
36+
37+
## Blocks
38+
39+
The `loki.source.string` component doesn't support any blocks.
40+
41+
## Exported fields
42+
43+
`loki.source.string` doesn't export any fields.
44+
45+
## Component health
46+
47+
`loki.source.string` is only reported as unhealthy if given an invalid configuration.
48+
49+
## Debug information
50+
51+
`loki.source.string` doesn't expose any component-specific debug information.
52+
53+
## Example
54+
55+
This example creates a pipeline that reads string response from remote `/data` endpoint passes it to `loki.source.string` which converts it to [Loki `LogsReceiver`](../../../compatibility/#loki-logsreceiver-exporters) and then echo the value to stdout:
56+
57+
```alloy
58+
remote.http "server" {
59+
url = "http://localhost:2112/data"
60+
}
61+
loki.source.string "stringer" {
62+
source = remote.http.server.content
63+
forward_to = loki.echo.print.receiver
64+
}
65+
loki.echo "print" { }
66+
```
67+
68+
<!-- START GENERATED COMPATIBLE COMPONENTS -->
69+
70+
## Compatible components
71+
72+
`loki.source.string` can accept arguments from the following components:
73+
74+
- Components that export string
75+
76+
{{< admonition type="note" >}}
77+
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
78+
Refer to the linked documentation for more details.
79+
{{< /admonition >}}
80+
81+
<!-- END GENERATED COMPATIBLE COMPONENTS -->

internal/component/all/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import (
5858
_ "github.com/grafana/alloy/internal/component/loki/source/kubernetes" // Import loki.source.kubernetes
5959
_ "github.com/grafana/alloy/internal/component/loki/source/kubernetes_events" // Import loki.source.kubernetes_events
6060
_ "github.com/grafana/alloy/internal/component/loki/source/podlogs" // Import loki.source.podlogs
61+
_ "github.com/grafana/alloy/internal/component/loki/source/string" // Import loki.source.string
6162
_ "github.com/grafana/alloy/internal/component/loki/source/syslog" // Import loki.source.syslog
6263
_ "github.com/grafana/alloy/internal/component/loki/source/windowsevent" // Import loki.source.windowsevent
6364
_ "github.com/grafana/alloy/internal/component/loki/write" // Import loki.write
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package string
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/grafana/alloy/internal/component"
9+
"github.com/grafana/alloy/internal/component/common/loki"
10+
"github.com/grafana/alloy/internal/featuregate"
11+
"github.com/grafana/loki/pkg/push"
12+
)
13+
14+
func init() {
15+
component.Register(component.Registration{
16+
Name: "loki.source.string",
17+
Stability: featuregate.StabilityGenerallyAvailable,
18+
Args: Arguments{},
19+
20+
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
21+
return New(opts, args.(Arguments))
22+
},
23+
})
24+
}
25+
26+
type Arguments struct {
27+
Source string `alloy:"source,attr"`
28+
ForwardTo loki.LogsReceiver `alloy:"forward_to,attr"`
29+
}
30+
31+
var _ component.Component = (*Component)(nil)
32+
33+
type Component struct {
34+
mut sync.RWMutex
35+
36+
opts component.Options
37+
args Arguments
38+
stringInActive chan bool
39+
stringIn chan string
40+
receiver loki.LogsReceiver
41+
}
42+
43+
func New(o component.Options, args Arguments) (*Component, error) {
44+
c := &Component{
45+
opts: o,
46+
receiver: args.ForwardTo,
47+
stringInActive: make(chan bool, 1),
48+
stringIn: make(chan string),
49+
}
50+
51+
go c.run()
52+
53+
<-c.stringInActive
54+
if err := c.Update(args); err != nil {
55+
return nil, err
56+
}
57+
58+
return c, nil
59+
}
60+
61+
func (c *Component) Run(ctx context.Context) error {
62+
<-ctx.Done()
63+
close(c.stringIn)
64+
close(c.stringInActive)
65+
return nil
66+
}
67+
68+
func (c *Component) run() {
69+
c.stringInActive <- true
70+
for value := range c.stringIn {
71+
entry := loki.Entry{
72+
Entry: push.Entry{
73+
Timestamp: time.Now(),
74+
Line: value,
75+
},
76+
}
77+
c.receiver.Chan() <- entry
78+
}
79+
}
80+
81+
func (c *Component) Update(args component.Arguments) error {
82+
newArgs := args.(Arguments)
83+
84+
c.mut.Lock()
85+
defer c.mut.Unlock()
86+
c.args = newArgs
87+
88+
select {
89+
case c.stringIn <- c.args.Source:
90+
default:
91+
}
92+
93+
return nil
94+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package string
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/grafana/alloy/internal/component"
10+
"github.com/grafana/alloy/internal/component/common/loki"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestString(t *testing.T) {
15+
opts := component.Options{
16+
Logger: log.NewNopLogger(),
17+
}
18+
19+
type arguments struct {
20+
expected string
21+
args Arguments
22+
}
23+
receiver := loki.NewLogsReceiver()
24+
argsArray := []arguments{
25+
{
26+
expected: "{\"data\":\"pass1\"}",
27+
args: Arguments{
28+
Source: "{\"data\":\"pass1\"}",
29+
ForwardTo: receiver,
30+
},
31+
},
32+
{
33+
expected: "{\"key\":\"pass\",\"nestedData\":{\"nestedKey\":\"pass\"}}",
34+
args: Arguments{
35+
Source: "{\"key\":\"pass\",\"nestedData\":{\"nestedKey\":\"pass\"}}",
36+
ForwardTo: receiver,
37+
},
38+
},
39+
}
40+
41+
initArgs := arguments{
42+
expected: "{\"data\":\"init\"}",
43+
args: Arguments{
44+
Source: "{\"data\":\"init\"}",
45+
ForwardTo: receiver,
46+
},
47+
}
48+
49+
comp, err := New(opts, initArgs.args)
50+
require.NoError(t, err)
51+
52+
ctx, cancel := context.WithCancel(context.Background())
53+
defer cancel()
54+
55+
go func() {
56+
err = comp.Run(ctx)
57+
}()
58+
59+
// Make sure the first argument is received
60+
select {
61+
case received := <-receiver.Chan():
62+
require.Equal(t, initArgs.expected, received.Line)
63+
case <-time.After(time.Second):
64+
t.Fatalf("timeout waiting for log entry")
65+
}
66+
67+
// Subsequent update should be received
68+
for i, testArgs := range argsArray {
69+
comp.Update(testArgs.args)
70+
71+
select {
72+
case received := <-receiver.Chan():
73+
require.Equal(t, testArgs.expected, received.Line)
74+
case <-time.After(time.Second):
75+
t.Fatalf("timeout waiting for log entry %d", i)
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)