@@ -8,43 +8,43 @@ import (
88 "time"
99
1010 "github.com/go-kit/log"
11+ "github.com/grafana/dskit/backoff"
1112 "github.com/prometheus/client_golang/prometheus"
1213
1314 "github.com/grafana/alloy/internal/component/common/loki"
1415 "github.com/grafana/alloy/internal/component/common/loki/client/internal"
1516 "github.com/grafana/alloy/internal/useragent"
16- "github.com/grafana/dskit/backoff"
1717)
1818
19- func NewFanoutConsumer (logger log.Logger , reg prometheus.Registerer , clientCfgs ... Config ) (* FanoutConsumer , error ) {
20- if len (clientCfgs ) == 0 {
19+ func NewFanoutConsumer (logger log.Logger , reg prometheus.Registerer , cfgs ... Config ) (* FanoutConsumer , error ) {
20+ if len (cfgs ) == 0 {
2121 return nil , fmt .Errorf ("at least one client config must be provided" )
2222 }
2323
2424 m := & FanoutConsumer {
25- clients : make ([]* client , 0 , len (clientCfgs )),
26- recv : make (chan loki.Entry ),
25+ endpoints : make ([]* endpoint , 0 , len (cfgs )),
26+ recv : make (chan loki.Entry ),
2727 }
2828
2929 var (
30- metrics = NewMetrics (reg )
31- clientsCheck = make (map [string ]struct {})
30+ metrics = NewMetrics (reg )
31+ endpointsCheck = make (map [string ]struct {})
3232 )
3333
34- for _ , cfg := range clientCfgs {
35- // Don't allow duplicate clients , we have client specific metrics that need at least one unique label value (name).
36- clientName := getClientName (cfg )
37- if _ , ok := clientsCheck [ clientName ]; ok {
38- return nil , fmt .Errorf ("duplicate client configs are not allowed, found duplicate for name: %s" , cfg .Name )
34+ for _ , cfg := range cfgs {
35+ // Don't allow duplicate endpoints , we have endpoint specific metrics that need at least one unique label value (name).
36+ name := getEndpointName (cfg )
37+ if _ , ok := endpointsCheck [ name ]; ok {
38+ return nil , fmt .Errorf ("duplicate endpoint configs are not allowed, found duplicate for name: %s" , cfg .Name )
3939 }
4040
41- clientsCheck [ clientName ] = struct {}{}
42- client , err := newClient (metrics , cfg , logger )
41+ endpointsCheck [ name ] = struct {}{}
42+ endpoint , err := newEndpoint (metrics , cfg , logger )
4343 if err != nil {
4444 return nil , fmt .Errorf ("error starting client: %w" , err )
4545 }
4646
47- m .clients = append (m .clients , client )
47+ m .endpoints = append (m .endpoints , endpoint )
4848 }
4949
5050 m .wg .Go (m .run )
@@ -54,15 +54,15 @@ func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, clientCfgs
5454var _ Consumer = (* FanoutConsumer )(nil )
5555
5656type FanoutConsumer struct {
57- clients []* client
58- wg sync.WaitGroup
59- once sync.Once
60- recv chan loki.Entry
57+ endpoints []* endpoint
58+ wg sync.WaitGroup
59+ once sync.Once
60+ recv chan loki.Entry
6161}
6262
6363func (c * FanoutConsumer ) run () {
6464 for e := range c .recv {
65- for _ , c := range c .clients {
65+ for _ , c := range c .endpoints {
6666 c .Chan () <- e
6767 }
6868 }
@@ -78,20 +78,20 @@ func (c *FanoutConsumer) Stop() {
7878 c .wg .Wait ()
7979
8080 var stopWG sync.WaitGroup
81- // Stop all clients .
82- for _ , c := range c .clients {
81+ // Stop all endpoints .
82+ for _ , c := range c .endpoints {
8383 stopWG .Go (func () {
8484 c .Stop ()
8585 })
8686 }
8787
88- // Wait for all clients to stop.
88+ // Wait for all endpoints to stop.
8989 stopWG .Wait ()
9090}
9191
92- // getClientName computes the specific name for each client config. The name is either the configured Name setting in Config,
92+ // getEndpointName computes the specific name for each endpoint config. The name is either the configured Name setting in Config,
9393// or a hash of the config as whole, this allows us to detect repeated configs.
94- func getClientName (cfg Config ) string {
94+ func getEndpointName (cfg Config ) string {
9595 if cfg .Name != "" {
9696 return cfg .Name
9797 }
@@ -108,8 +108,7 @@ func asSha256(o any) string {
108108
109109var userAgent = useragent .Get ()
110110
111- // Client for pushing logs in snappy-compressed protos over HTTP.
112- type client struct {
111+ type endpoint struct {
113112 cfg Config
114113 entries chan loki.Entry
115114
@@ -121,8 +120,8 @@ type client struct {
121120 shards * shards
122121}
123122
124- func newClient (metrics * Metrics , cfg Config , logger log.Logger ) (* client , error ) {
125- logger = log .With (logger , "component" , "client " , "host" , cfg .URL .Host )
123+ func newEndpoint (metrics * Metrics , cfg Config , logger log.Logger ) (* endpoint , error ) {
124+ logger = log .With (logger , "component" , "endpoint " , "host" , cfg .URL .Host )
126125
127126 shards , err := newShards (metrics , logger , internal .NewNopMarkerHandler (), cfg )
128127 if err != nil {
@@ -131,7 +130,7 @@ func newClient(metrics *Metrics, cfg Config, logger log.Logger) (*client, error)
131130
132131 ctx , cancel := context .WithCancel (context .Background ())
133132
134- c := & client {
133+ c := & endpoint {
135134 cfg : cfg ,
136135 entries : make (chan loki.Entry ),
137136 shards : shards ,
@@ -145,7 +144,7 @@ func newClient(metrics *Metrics, cfg Config, logger log.Logger) (*client, error)
145144 return c , nil
146145}
147146
148- func (c * client ) run () {
147+ func (c * endpoint ) run () {
149148 for {
150149 select {
151150 case <- c .ctx .Done ():
@@ -164,11 +163,11 @@ func (c *client) run() {
164163 }
165164}
166165
167- func (c * client ) Chan () chan <- loki.Entry {
166+ func (c * endpoint ) Chan () chan <- loki.Entry {
168167 return c .entries
169168}
170169
171- func (c * client ) Stop () {
170+ func (c * endpoint ) Stop () {
172171 c .shards .stop ()
173172 c .cancel ()
174173 c .wg .Wait ()
0 commit comments