Skip to content

Commit

Permalink
Fix issue after merging PRO code
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Oct 25, 2024
1 parent d3c95f1 commit cca448b
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions output_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ type httpWorker struct {
output *HTTPOutput
client *HTTPClient
lastActivity time.Time
queue chan []byte
queue chan *Message
stop chan bool
}

func newHTTPWorker(output *HTTPOutput, queue chan []byte) *httpWorker {
func newHTTPWorker(output *HTTPOutput, queue chan *Message) *httpWorker {
client := NewHTTPClient(output.config)

w := &httpWorker{client: client, output: output}
if queue == nil {
w.queue = make(chan []byte, 100)
w.queue = make(chan *Message, 100)
} else {
w.queue = queue
}
Expand All @@ -110,8 +110,8 @@ func newHTTPWorker(output *HTTPOutput, queue chan []byte) *httpWorker {
go func() {
for {
select {
case payload := <-w.queue:
output.sendRequest(client, payload)
case msg := <-w.queue:
output.sendRequest(client, msg)
case <-w.stop:
return
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
newConfig.WorkersMin = 1000
}
if newConfig.WorkersMax <= 0 {
newConfig.WorkersMax = math.MaxInt32 // idealy so large
newConfig.WorkersMax = math.MaxInt32 // ideally so large
}
if newConfig.WorkersMax < newConfig.WorkersMin {
newConfig.WorkersMax = newConfig.WorkersMin
Expand Down Expand Up @@ -238,7 +238,7 @@ func (o *HTTPOutput) sessionWorkerMaster() {
o.workerSessions[sessionID] = worker
}

worker.queue <- msg.Data
worker.queue <- msg
worker.lastActivity = time.Now()
case <-gc:
now := time.Now()
Expand All @@ -260,7 +260,7 @@ func (o *HTTPOutput) startWorker() {
case <-o.stopWorker:
return
case msg := <-o.queue:
o.sendRequest(o.client, msg.Data)
o.sendRequest(o.client, msg)
}
}
}
Expand Down Expand Up @@ -321,14 +321,14 @@ func (o *HTTPOutput) PluginRead() (*Message, error) {
return &msg, nil
}

func (o *HTTPOutput) sendRequest(client *HTTPClient, data []byte) {
if !isRequestPayload(data) {
func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
if !isRequestPayload(msg.Meta) {
return
}

uuid := payloadID(data)
uuid := payloadID(msg.Meta)
start := time.Now()
resp, err := client.Send(data)
resp, err := client.Send(msg.Data)
stop := time.Now()

if err != nil {
Expand All @@ -344,7 +344,7 @@ func (o *HTTPOutput) sendRequest(client *HTTPClient, data []byte) {
}

if o.elasticSearch != nil {
o.elasticSearch.ResponseAnalyze(data, resp, start, stop)
o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
}
}

Expand Down Expand Up @@ -384,7 +384,7 @@ func NewHTTPClient(config *HTTPOutputConfig) *HTTPClient {
},
}
if config.SkipVerify {
// clone to avoid modying global default RoundTripper
// clone to avoid modifying global default RoundTripper
transport = http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
client.Client.Transport = transport
Expand All @@ -393,7 +393,7 @@ func NewHTTPClient(config *HTTPOutputConfig) *HTTPClient {
return client
}

// Send sends an http request using client create by NewHTTPClient
// Send sends an http request using client created by NewHTTPClient
func (c *HTTPClient) Send(data []byte) ([]byte, error) {
var req *http.Request
var resp *http.Response
Expand Down

0 comments on commit cca448b

Please sign in to comment.