Skip to content

Commit

Permalink
Merge pull request #106 from codex-team/master
Browse files Browse the repository at this point in the history
Update prod
  • Loading branch information
n0str authored Dec 13, 2024
2 parents e40f633 + b487670 commit 9ea1d92
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func (x *RunCommand) Execute(args []string) error {
// connect to accounts MongoDB
doneAccountsContext := make(chan struct{})
accountsClient := accounts.New(cfg.AccountsMongoDBURI)

err = accountsClient.UpdateTokenCache()
if err != nil {
log.Errorf("failed to update token cache: %s", err)
}

go periodic.RunPeriodically(accountsClient.UpdateTokenCache, cfg.TokenUpdatePeriod, doneAccountsContext)
defer close(doneAccountsContext)

Expand Down
88 changes: 88 additions & 0 deletions pkg/server/errorshandler/handler_sentry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package errorshandler

import (
"encoding/json"
"fmt"

"github.com/codex-team/hawk.collector/pkg/broker"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
)

const SentryQueueName = "errors/sentry"
const CatcherType = "sentry"

// HandleHTTP processes HTTP requests with JSON body
func (handler *Handler) HandleSentry(ctx *fasthttp.RequestCtx) {
if ctx.Request.Header.ContentLength() > handler.MaxErrorCatcherMessageSize {
log.Warnf("Incoming request with size %d", ctx.Request.Header.ContentLength())
sendAnswerHTTP(ctx, ResponseMessage{Code: 400, Error: true, Message: "Request is too large"})
return
}

// check that X-Sentry-Auth header is available
auth := ctx.Request.Header.Peek("X-Sentry-Auth")
if auth == nil {
log.Warnf("Incoming request without X-Sentry-Auth header")
sendAnswerHTTP(ctx, ResponseMessage{Code: 400, Error: true, Message: "X-Sentry-Auth header is missing"})
return
}

hawkToken, err := getSentryKeyFromAuth(string(auth))
if err != nil {
log.Warnf("Incoming request with invalid X-Sentry-Auth header: %s", err)
sendAnswerHTTP(ctx, ResponseMessage{Code: 400, Error: true, Message: err.Error()})
return
}

log.Debugf("Incoming request with hawk integration token: %s", hawkToken)

body := ctx.PostBody()

sentryEnvelopeBody, err := decompressGzipString(body)
if err != nil {
log.Warnf("Failed to decompress gzip body: %s", err)
sendAnswerHTTP(ctx, ResponseMessage{Code: 400, Error: true, Message: "Failed to decompress gzip body"})
return
}
log.Debugf("Decompressed body: %s", sentryEnvelopeBody)

projectId, ok := handler.AccountsMongoDBClient.ValidTokens[hawkToken]
if !ok {
log.Debugf("Token %s is not in the accounts cache", hawkToken)
sendAnswerHTTP(ctx, ResponseMessage{400, true, fmt.Sprintf("Integration token invalid: %s", hawkToken)})
return
}
log.Debugf("Found project with ID %s for integration token %s", projectId, hawkToken)

if handler.RedisClient.IsBlocked(projectId) {
handler.ErrorsBlockedByLimit.Inc()
sendAnswerHTTP(ctx, ResponseMessage{402, true, "Project has exceeded the events limit"})
return
}

// convert message to JSON format
rawMessage := RawSentryMessage{Envelope: sentryEnvelopeBody}
jsonMessage, err := json.Marshal(rawMessage)
if err != nil {
log.Errorf("Message marshalling error: %v", err)
sendAnswerHTTP(ctx, ResponseMessage{400, true, "Cannot serialize envelope"})
}

messageToSend := BrokerMessage{ProjectId: projectId, Payload: json.RawMessage(jsonMessage), CatcherType: CatcherType}
payloadToSend, err := json.Marshal(messageToSend)
if err != nil {
log.Errorf("Message marshalling error: %v", err)
sendAnswerHTTP(ctx, ResponseMessage{400, true, "Cannot serialize envelope"})
}

// send serialized message to a broker
brokerMessage := broker.Message{Payload: payloadToSend, Route: SentryQueueName}
log.Debugf("Send to queue: %s", brokerMessage)
handler.Broker.Chan <- brokerMessage

// increment processed errors counter
handler.ErrorsProcessed.Inc()

sendAnswerHTTP(ctx, ResponseMessage{200, false, "OK"})
}
4 changes: 4 additions & 0 deletions pkg/server/errorshandler/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ type BrokerMessage struct {
Payload json.RawMessage `json:"payload"`
CatcherType string `json:"catcherType"`
}

type RawSentryMessage struct {
Envelope []byte `json:"envelope"`
}
39 changes: 39 additions & 0 deletions pkg/server/errorshandler/sentry_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package errorshandler

import (
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"strings"
)

func decompressGzipString(gzipString []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(gzipString))
if err != nil {
return []byte(""), fmt.Errorf("failed to create gzip reader: %w", err)
}
defer reader.Close()

var result bytes.Buffer
_, err = io.Copy(&result, reader)
if err != nil {
return []byte(""), fmt.Errorf("failed to decompress data: %w", err)
}

return result.Bytes(), nil
}

func getSentryKeyFromAuth(auth string) (string, error) {
auth = strings.TrimPrefix(auth, "Sentry ")
pairs := strings.Split(auth, ", ")
for _, pair := range pairs {
kv := strings.SplitN(pair, "=", 2)
if len(kv) == 2 && kv[0] == "sentry_key" {
return kv[1], nil
}
}

return "", errors.New("sentry_key not found")
}
7 changes: 7 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ func (s *Server) handler(ctx *fasthttp.RequestCtx) {
s.ErrorsHandler.HandleWebsocket(ctx)
case "/release":
s.ReleaseHandler.HandleHTTP(ctx)
case "/api/0/envelope/":
auth := ctx.Request.Header.Peek("X-Sentry-Auth")
if auth != nil {
s.ErrorsHandler.HandleSentry(ctx)
} else {
ctx.Error("X-Sentry-Auth not found", fasthttp.StatusBadRequest)
}
default:
ctx.Error("Not found", fasthttp.StatusNotFound)
}
Expand Down

0 comments on commit 9ea1d92

Please sign in to comment.