Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@ module github.com/ray-project/kuberay
go 1.24.0

require (
github.com/alibabacloud-go/tea v1.3.10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd better have a new go.mod in the historyserver directory.

github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aliyun/credentials-go v1.4.7
github.com/dustinkirkland/golang-petname v0.0.0-20240428194347-eebcea082ee0
github.com/elazarl/go-bindata-assetfs v1.0.1
github.com/fsnotify/fsnotify v1.8.0
github.com/go-logr/logr v1.4.3
github.com/go-logr/zerologr v1.2.3
github.com/golang/mock v1.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
github.com/hpcloud/tail v1.0.0
github.com/novln/docker-parser v1.0.0
github.com/onsi/ginkgo/v2 v2.23.4
github.com/onsi/gomega v1.37.0
Expand All @@ -20,6 +25,7 @@ require (
github.com/ray-project/kuberay/ray-operator v0.0.0
github.com/rs/cors v1.11.1
github.com/rs/zerolog v1.34.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
Expand All @@ -42,6 +48,7 @@ require (
require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/MakeNowJust/heredoc v1.0.0 // indirect
github.com/alibabacloud-go/debug v1.0.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -51,7 +58,6 @@ require (
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
github.com/fatih/camelcase v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down Expand Up @@ -105,7 +111,10 @@ require (
golang.org/x/tools v0.31.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.33.1 // indirect
k8s.io/apiserver v0.33.1 // indirect
Expand Down
81 changes: 81 additions & 0 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions historyserver/backend/collector/runtime/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package runtime

type RayLogCollector interface {
Start(<-chan int)
}
251 changes: 251 additions & 0 deletions historyserver/backend/collector/runtime/logcollector/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package logcollector

import (
"bytes"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/fsnotify/fsnotify"
"github.com/hpcloud/tail"
"github.com/ray-project/kuberay/historyserver/backend/collector/storage"
"github.com/sirupsen/logrus"
)

type RayLogHandler struct {
EnableMeta bool

RootDir string
LogDir string
LogFiles chan string

LogBatching int
PushInterval time.Duration

Writter storage.StorageWritter
}

func (r *RayLogHandler) Start(stop <-chan int) {

}

func (h *RayLogHandler) Run(stop chan struct{}) error {
watchPath := h.LogDir

watcher, err := fsnotify.NewWatcher()
if err != nil {
logrus.Fatalf("Create fsnotify NewWatcher error %v", err)
}
defer watcher.Close()
go h.WatchLogsLoops(watcher, watchPath, stop)
go h.PushLogsLoops(stop)

if h.EnableMeta {
// Persist meta data
// go h.PersistMetaLoop(stop)
}

select {
case <-stop:
logrus.Warnf("Receive stop single, so stop ray collector ")
return nil
}
logrus.Errorf("Run exist ...")
return nil
}

func (h *RayLogHandler) AddLogFile(absoluteLogPathName string) {
h.LogFiles <- absoluteLogPathName
}

func (h *RayLogHandler) PushLog(absoluteLogPathName string) error {
// 判断文件是否存在
// 计算相对路径
absoluteLogPathName = strings.TrimSpace(absoluteLogPathName)
absoluteLogPathName = filepath.Clean(absoluteLogPathName)

relativePath := strings.TrimPrefix(absoluteLogPathName, fmt.Sprintf("%s/", h.LogDir))
// 分割相对路径为子目录和文件名
// subdir events/aa/
// filename a.txt
subdir, filename := filepath.Split(relativePath)

if len(subdir) != 0 {
dirName := path.Join(h.RootDir, subdir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dirName := path.Join(h.RootDir, subdir)
dirName := filepath.Join(h.RootDir, subdir)

if err := h.Writter.CreateDirectory(dirName); err != nil {
logrus.Errorf("Failed to create directory '%s': %v", dirName, err)
return err
}
}

objectName := path.Join(h.RootDir, subdir, filename)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
objectName := path.Join(h.RootDir, subdir, filename)
objectName := filepath.Join(h.RootDir, subdir, filename)

logrus.Infof("Begin to create and append oss object %s by absoluteLogPathName %s, oss subdir [%s] filename [%s] relativePath[%s]",
objectName, absoluteLogPathName, subdir, filename, relativePath)

// utils.DeleteObject(h.OssBucket, objectName)

// 从文件开始读,
// Go 1.20推荐使用 io.SeekEnd, 老版本可能需要改为os.SEEK_END
seek := &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}
t, err := tail.TailFile(absoluteLogPathName, tail.Config{
Follow: true,
Location: seek,
})
if err != nil {
logrus.Errorf("Create TailFile %s error %v", absoluteLogPathName, err)
return err
}

var nextPos int64 = 0

logrus.Infof("Begin to first append oss object %s ...", objectName)
nextPos, err = h.Writter.Append(objectName, strings.NewReader(""), nextPos)
if err != nil {
logrus.Errorf("First append object %s error %v",
objectName,
err)
return err
}

lines := 0
lastPush := time.Now()
buf := bytes.NewBufferString("")
for {
select {
case <-time.Tick(h.PushInterval):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time.Tick will create a ticker underneath. Even though it might not cause a leakage after 1.23. Did you mean ?

for {
    select {
    case <-time.After(h.PushInterval):
...

or

ticker := time.Tick(h.PushInterval)
for {
    select {
    case <-ticker:
...

if lines > 0 {
nextPos, err = h.Writter.Append(objectName, buf, nextPos)
if err != nil {
logrus.Errorf("Tail file %s to object %s error, append value: %v, nextPos %d error [%v]",
absoluteLogPathName,
objectName,
buf.String(),
nextPos,
err)
return err
}

now := time.Now()
logrus.Infof("Tail file %s to object %s success, by interval, append value %s, lines %v, interval %v",
absoluteLogPathName, objectName, buf.String(), lines, now.Sub(lastPush).Seconds())

lastPush = now
lines = 0
buf = bytes.NewBufferString("")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it use buf.Reset() after the buffer has been successful written to writer?

}
case line, ok := <-t.Lines:
if !ok {
logrus.Infof("channel for path %v is closed", absoluteLogPathName)
return nil
}
lines++
buf.WriteString(line.Text + "\n")
if lines >= h.LogBatching {
nextPos, err = h.Writter.Append(objectName, buf, nextPos)
if err != nil {
logrus.Errorf("Tail file %s to object %s error, append value: %v, nextPos %d error [%v]",
absoluteLogPathName,
objectName,
buf.String(),
nextPos,
err)
return err
}

now := time.Now()
logrus.Infof("Tail file %s to object %s success, by line count, append value %s, lines %v, interval %v",
absoluteLogPathName, objectName, buf.String(), lines, now.Sub(lastPush).Seconds())

lastPush = now
lines = 0
buf = bytes.NewBufferString("")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it possible to put this part of code into a function? It looks similar to the code in the branch above if the lines is greater than zero.

}
}
}
}

func (h *RayLogHandler) PushLogsLoops(stop chan struct{}) {

if err := h.Writter.CreateDirectory(h.RootDir); err != nil {
logrus.Errorf("Failed to create oss root directory %s: %v", h.RootDir, err)
return
}

for {
select {
case logfile := <-h.LogFiles:
go h.PushLog(logfile)
case <-stop:
logrus.Warnf("Receive stop signal, so return PushLogsLoop")
return
}
}
}

func (h *RayLogHandler) WatchLogsLoops(watcher *fsnotify.Watcher, walkPath string, stop chan struct{}) {
// 监听当前目录
if err := watcher.Add(walkPath); err != nil {
logrus.Fatalf("Watcher rootpath %s error %v", h.LogDir, err)
}

err := filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
logrus.Errorf("Walk path %s error %v", walkPath, err)
return err // 返回错误
}
// 检查是否是文件
if !info.IsDir() {
logrus.Infof("Walk find new file %s", path) // 输出文件路径
go h.AddLogFile(path)
} else {
logrus.Infof("Walk find new dir %s", path) // 输出dir路径
if err := watcher.Add(path); err != nil {
logrus.Fatalf("Watcher add %s error %v", h.LogDir, err)
}
}
return nil
})

if err != nil {
logrus.Errorf("Walk path %s error %++v", walkPath, err)
return
}
logrus.Infof("Walk path %s success", walkPath)

for {
select {
case <-stop:
logrus.Warnf("Receive stop signal, so return watchFileLoops")
return
case event, ok := <-watcher.Events:
if !ok {
logrus.Warnf("Receive watcher events not ok")
return
}
if event.Op == fsnotify.Create {
name := event.Name
info, _ := os.Stat(name)

// 判断是文件还是目录
if !info.IsDir() {
logrus.Infof("Watch find: create a new file %s", name)
h.AddLogFile(name)
} else {
if err := watcher.Add(name); err != nil {
logrus.Fatalf("Watch add file %s error %v", name, err)
}
}
}
case _, ok := <-watcher.Errors:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it skip the error? Is it meaningless?

if !ok {
close(stop)
logrus.Warnf("Watcher error, so return watchFileLoops")
return
}
}
}
}
20 changes: 20 additions & 0 deletions historyserver/backend/collector/runtime/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package runtime

import (
"github.com/ray-project/kuberay/historyserver/backend/collector/runtime/logcollector"
"github.com/ray-project/kuberay/historyserver/backend/collector/storage"
"github.com/ray-project/kuberay/historyserver/backend/types"
)

func NewCollector(config *types.RayCollectorConfig, writter storage.StorageWritter) RayLogCollector {
handler := logcollector.RayLogHandler{
LogFiles: make(chan string),

LogBatching: config.LogBatching,
PushInterval: config.PushInterval,

Writter: writter,
}

return &handler
}
59 changes: 59 additions & 0 deletions historyserver/backend/collector/storage/aliyunoss/config/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Package config is
/*
Copyright 2024 by the bingyu [email protected] Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
"k8s.io/apimachinery/pkg/util/sets"
)

type EnterConfig struct {
Module string
ModuleExecBinary string
EnterOssExpireHour int
WatchLogDir string
EnableMeta bool
}

type GlobalConfig struct {
OSSEndpoint string
OSSRegion string
OSSBucket string
OSSHistoryServerDir string
}

type RayMetaHanderConfig struct {
GlobalConfig
RayClusterName string
RayClusterID string
OSSExpireHour int
}

type RayHistoryServerConfig struct {
GlobalConfig
DashBoardDir string

ArmsRegionId string
ACKClusterId string
WebAppId string
WebAppSecret string
LocalServiceName string
AllowedUID sets.Set[string]
AllowedAID sets.Set[string]

MaxOssListLimit int
}
Loading
Loading