Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch recent logs via logcache #45

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/jetstream/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ replace (
github.com/cloudfoundry-incubator/stratos/src/jetstream/crypto => ./crypto
github.com/cloudfoundry-incubator/stratos/src/jetstream/docs => ./docs
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/cfapppush => ./plugins/cfapppush
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/cloudfoundry => ./plugins/cloudfoundry
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes => ./plugins/kubernetes
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/auth => ./plugins/kubernetes/auth
github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/terminal => ./plugins/kubernetes/terminal
Expand All @@ -23,6 +24,8 @@ replace (
)

require (
code.cloudfoundry.org/go-log-cache v1.0.1-0.20230224210401-5e305670b626
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/antonlindstrom/pgstore v0.0.0-20220421113606-e3a6e3fed12a
github.com/cf-stratos/mysqlstore v0.0.0-20170822100912-304308519d13
Expand Down Expand Up @@ -64,10 +67,10 @@ require (
code.cloudfoundry.org/cli v0.0.0-20230912192837-efd1d03e7292 // indirect
code.cloudfoundry.org/cli-plugin-repo v0.0.0-20230912184324-f005268561a6 // indirect
code.cloudfoundry.org/clock v1.1.0 // indirect
code.cloudfoundry.org/go-log-cache v1.0.1-0.20230224210401-5e305670b626 // indirect
code.cloudfoundry.org/go-loggregator/v8 v8.0.5 // indirect
code.cloudfoundry.org/gofileutils v0.0.0-20170111115228-4d0c80011a0f // indirect
code.cloudfoundry.org/jsonry v1.1.4 // indirect
code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 // indirect
code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25 // indirect
code.cloudfoundry.org/ykk v0.0.0-20170424192843-e4df4ce2fd4d // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230106234847-43070de90fa1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions src/jetstream/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ code.cloudfoundry.org/go-diodes v0.0.0-20180905200951-72629b5276e3/go.mod h1:Jzi
code.cloudfoundry.org/go-envstruct v1.5.0/go.mod h1:E2S/gzRZpZ51PZnIv7Bo7QvcgH18yio19upkrRk0xLU=
code.cloudfoundry.org/go-log-cache v1.0.1-0.20211011162012-ede82a99d3cc h1:8gj5Z08i9ZvoIGi1A/E2CEQTbvJjogYQgBQUI2/DyNE=
code.cloudfoundry.org/go-log-cache v1.0.1-0.20211011162012-ede82a99d3cc/go.mod h1:8thG6lrstlbeI44hc7QgSnX8eau68+mNt9Pp33/TEcg=
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible h1:KqZYloMQWM5Zg/BQKunOIA4OODh7djZbk48qqbowNFI=
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible/go.mod h1:KPBTRqj+y738Nhf1+g4JHFaBU8j7dedirR5ETNHvMXU=
code.cloudfoundry.org/go-loggregator/v8 v8.0.2-0.20200722201844-b5130958b65d/go.mod h1:Or3cWTXwK6d3caPRBTUJv/suT+47jOltB7hYC/3ECCo=
code.cloudfoundry.org/go-loggregator/v8 v8.0.5 h1:p1rrGxTwUqLjlUVtbjTAvKOSGNmPuBja8LeQOQgRrBc=
code.cloudfoundry.org/go-loggregator/v8 v8.0.5/go.mod h1:mLlJ1ZyG6gVvBEtYypvbztRvFeCtBsTxE9tt+85tS6Y=
Expand All @@ -65,6 +67,8 @@ code.cloudfoundry.org/lager v2.0.0+incompatible/go.mod h1:O2sS7gKP3HM2iemG+Enwvy
code.cloudfoundry.org/lager/v3 v3.0.2 h1:H0dcQY+814G1Ea0e5K/AMaMpcr+Pe5Iv+AALJEwrP9U=
code.cloudfoundry.org/lager/v3 v3.0.2/go.mod h1:zA6tOIWhr5uZUez+PGpdfBHDWQOfhOrr0cgKDagZPwk=
code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4=
code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 h1:mrZQaZmuDIPhSp6b96b+CRKC2uH44ifa5cjDV2epKis=
code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78/go.mod h1:tkZo8GtzBjySJ7USvxm4E36lNQw1D3xM6oKHGqdaAJ4=
code.cloudfoundry.org/tlsconfig v0.0.0-20200131000646-bbe0f8da39b3/go.mod h1:eTbFJpyXRGuFVyg5+oaj9B2eIbIc+0/kZjH8ftbtdew=
code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25 h1:vfCOuqnZi86Jg1EXMWtdCb9ieko4hna/CLzI6ECTgFA=
code.cloudfoundry.org/tlsconfig v0.0.0-20230929201433-6cd2b78aba25/go.mod h1:C8SxvGRSutmgzV2FxH8Zwqz2Q8HsaAITQRQFKhlDzPw=
Expand Down
122 changes: 85 additions & 37 deletions src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package cloudfoundry

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"

logcache "code.cloudfoundry.org/go-log-cache"
"code.cloudfoundry.org/go-log-cache/rpc/logcache_v1"
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
"github.com/cloudfoundry-incubator/stratos/src/jetstream/api"
"github.com/cloudfoundry/noaa"
"github.com/cloudfoundry/noaa/consumer"
noaa_errors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
Expand All @@ -31,19 +34,19 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

func (c CloudFoundrySpecification) appStream(echoContext echo.Context) error {
func (c *CloudFoundrySpecification) appStream(echoContext echo.Context) error {
return c.commonStreamHandler(echoContext, appStreamHandler)
}

func (c CloudFoundrySpecification) firehose(echoContext echo.Context) error {
func (c *CloudFoundrySpecification) firehose(echoContext echo.Context) error {
return c.commonStreamHandler(echoContext, firehoseStreamHandler)
}

func (c CloudFoundrySpecification) appFirehose(echoContext echo.Context) error {
func (c *CloudFoundrySpecification) appFirehose(echoContext echo.Context) error {
return c.commonStreamHandler(echoContext, appFirehoseStreamHandler)
}

func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error {
func (c *CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error {
ac, err := c.openNoaaConsumer(echoContext)
if err != nil {
return err
Expand All @@ -67,13 +70,14 @@ func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context,
}

type AuthorizedConsumer struct {
consumer *consumer.Consumer
authToken string
refreshToken func() error
consumer *consumer.Consumer
logCacheClient *logcache.Client
authToken string
refreshToken func() error
}

// Refresh the Authorization token if needed and create a new Noaa consumer
func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) {
func (c *CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) {

ac := &AuthorizedConsumer{}

Expand Down Expand Up @@ -118,32 +122,75 @@ func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*
log.Debugf("Creating Noaa consumer for Doppler endpoint %s", dopplerAddress)
ac.consumer = consumer.New(dopplerAddress, &tls.Config{InsecureSkipVerify: true}, http.ProxyFromEnvironment)

//Open a LogCache client to the log cache endpoint
logCacheUrl := strings.Replace(cnsiRecord.APIEndpoint.String(), "api.sys.", "log-cache.sys.", 1)
log.Debugf("Creating LogCache client for endpoint %s", logCacheUrl)
ac.logCacheClient = logcache.NewClient(logCacheUrl, logcache.WithHTTPClient(
NewLogCacheHttpClient(func() string {
return ac.authToken
})),
)

return ac, nil
}

// Attempts to get the recent logs, if we get an unauthorized error we will refresh the auth token and retry once
func getRecentLogs(ac *AuthorizedConsumer, cnsiGUID, appGUID string) ([]*events.LogMessage, error) {
log.Debug("getRecentLogs")
messages, err := ac.consumer.RecentLogs(appGUID, ac.authToken)
if err != nil {
errorPattern := "Failed to get recent messages for App %s on CNSI %s [%v]"
if _, ok := err.(*noaa_errors.UnauthorizedError); ok {
// If unauthorized, we may need to refresh our Auth token
// Note: annoyingly, older versions of CF also send back "401 - Unauthorized" when the app doesn't exist...
// This means we sometimes end up here even when our token is legit
if err := ac.refreshToken(); err != nil {
return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err)
}
messages, err = ac.consumer.RecentLogs(appGUID, ac.authToken)
// Attempts to relay the recent logs, if we get an unauthorized error we will refresh the auth token and retry once
func relayRecentLogsFromCache(relay func(msg *events.LogMessage), ac *AuthorizedConsumer, appGUID string) error {
logLineRequestCount := 1000
var envelopes []*loggregator_v2.Envelope
var err error

for logLineRequestCount >= 1 {
envelopes, err = ac.logCacheClient.Read(
context.Background(),
appGUID,
time.Time{},
logcache.WithEnvelopeTypes(logcache_v1.EnvelopeType_LOG),
logcache.WithLimit(logLineRequestCount),
)
if err != nil && err.Error() == "unexpected status code 429" {
err = ac.refreshToken()
if err != nil {
msg := fmt.Sprintf(errorPattern, appGUID, cnsiGUID, err)
return nil, echo.NewHTTPError(http.StatusUnauthorized, msg)
return fmt.Errorf("cannot refresh token when reading from cache again cause %v", err)
}
} else {
return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err)
err = nil
continue
}
if err == nil || err.Error() != "unexpected status code 429" {
break
}
logLineRequestCount /= 2
}
if err != nil {
return fmt.Errorf("failed to retrieve logs from Log Cache: %s", err)
}
return messages, nil

for _, envelope := range envelopes {
logEnvelope, ok := envelope.GetMessage().(*loggregator_v2.Envelope_Log)
if !ok {
continue
}
log := logEnvelope.Log
relay(&events.LogMessage{
Message: log.Payload,
MessageType: func(t loggregator_v2.Log_Type) *events.LogMessage_MessageType {
var r events.LogMessage_MessageType
switch t {
case loggregator_v2.Log_OUT:
r = events.LogMessage_OUT
case loggregator_v2.Log_ERR:
r = events.LogMessage_ERR
}
return &r
}(log.Type),
Timestamp: func(i int64) *int64 { return &i }(envelope.GetTimestamp()),
AppId: &appGUID,
SourceType: func(s string) *string { return &s }(envelope.GetTags()["source_type"]),
SourceInstance: &envelope.InstanceId,
})
}

return err
}

func drainErrors(errorChan <-chan error) {
Expand Down Expand Up @@ -184,11 +231,6 @@ func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWe
appGUID := echoContext.Param("appGuid")

log.Infof("Received request for log stream for App ID: %s - in CNSI: %s", appGUID, cnsiGUID)

messages, err := getRecentLogs(ac, cnsiGUID, appGUID)
if err != nil {
return err
}
// Reusable closure to pump messages from Noaa to the client WebSocket
// N.B. We convert protobuf messages to JSON for ease of use in the frontend
relayLogMsg := func(msg *events.LogMessage) {
Expand All @@ -202,9 +244,15 @@ func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWe
}
}

// Send the recent messages, sorted in Chronological order
for _, msg := range noaa.SortRecent(messages) {
relayLogMsg(msg)
/*
* Split into two parts…
* 1. LogCache Read for recent logs - inspired by CF CLI in order to replace noaa RecentLogs
* https://github.com/cloudfoundry/stratos/issues/5037
* 2. Stream subsequent logs as before
*/
err := relayRecentLogsFromCache(relayLogMsg, ac, appGUID)
if err != nil {
log.Errorf("Cannot relay recent logs via cache cause %v", err)
}

msgChan, errorChan := ac.consumer.TailingLogs(appGUID, ac.authToken)
Expand Down
20 changes: 20 additions & 0 deletions src/jetstream/plugins/cloudfoundry/log_cache_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cloudfoundry

import "net/http"

type LogCacheHttpClient struct {
httpClient *http.Client
accessToken func() string
}

func NewLogCacheHttpClient(accessToken func() string) *LogCacheHttpClient {
return &LogCacheHttpClient{
httpClient: http.DefaultClient,
accessToken: accessToken,
}
}

func (c *LogCacheHttpClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", c.accessToken())
return c.httpClient.Do(req)
}