Skip to content

Commit

Permalink
feat(log):add zap library and logging capabilities (#29)
Browse files Browse the repository at this point in the history
- fix write bug, mv don't need to delete temp file.
- add some unit tests.
- fix fs open bug, check and auto create dir.
- refactoring space, add mutex.
  • Loading branch information
loloxwg committed Jul 26, 2023
1 parent c8f0c4e commit 6df90c3
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 36 deletions.
6 changes: 4 additions & 2 deletions go/common/constant.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package common

const KReadBatchSize = 1024
const KManifestTempFileName = "manifest.tmp"
const KManifestFileName = "manifest"
const KManifestTempFileName = ".manifest.tmp"
const KManifestFileName = ".manifest"
const KManifestDir = "versions"
const KParquetDataFileSuffix = ".parquet"
const KParquetDataDir = "data"
const KOffsetFieldName = "__offset"
56 changes: 56 additions & 0 deletions go/common/log/field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package log

import "go.uber.org/zap"

var (
// not lint
Skip = zap.Skip
Binary = zap.Binary
Bool = zap.Bool
Boolp = zap.Boolp
ByteString = zap.ByteString
Complex128 = zap.Complex128
Complex128p = zap.Complex128p
Complex64 = zap.Complex64
Complex64p = zap.Complex64p
Float64 = zap.Float64
Float64p = zap.Float64p
Float32 = zap.Float32
Float32p = zap.Float32p
Int = zap.Int
Intp = zap.Intp
Int64 = zap.Int64
Int64p = zap.Int64p
Int32 = zap.Int32
Int32p = zap.Int32p
Int16 = zap.Int16
Int16p = zap.Int16p
Int8 = zap.Int8
Int8p = zap.Int8p
String = zap.String
Stringp = zap.Stringp
Uint = zap.Uint
Uintp = zap.Uintp
Uint64 = zap.Uint64
Uint64p = zap.Uint64p
Uint32 = zap.Uint32
Uint32p = zap.Uint32p
Uint16 = zap.Uint16
Uint16p = zap.Uint16p
Uint8 = zap.Uint8
Uint8p = zap.Uint8p
Uintptr = zap.Uintptr
Uintptrp = zap.Uintptrp
Reflect = zap.Reflect
Namespace = zap.Namespace
Stringer = zap.Stringer
Time = zap.Time
Timep = zap.Timep
Stack = zap.Stack
StackSkip = zap.StackSkip
Duration = zap.Duration
Durationp = zap.Durationp
Object = zap.Object
Inline = zap.Inline
Any = zap.Any
)
92 changes: 92 additions & 0 deletions go/common/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package log

import (
"io"
"os"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Level = zapcore.Level

const (
DebugLevel = zapcore.DebugLevel
InfoLevel = zapcore.InfoLevel
WarnLevel = zapcore.WarnLevel
ErrorLevel = zapcore.ErrorLevel
PanicLevel = zapcore.PanicLevel
FatalLevel = zapcore.FatalLevel
)

type Logger struct {
l *zap.Logger
al *zap.AtomicLevel
}

func New(out io.Writer, level Level) *Logger {
if out == nil {
out = os.Stderr
}

al := zap.NewAtomicLevelAt(level)
cfg := zap.NewDevelopmentEncoderConfig()

core := zapcore.NewCore(
zapcore.NewConsoleEncoder(cfg),
zapcore.AddSync(out),
al,
)
return &Logger{l: zap.New(core, zap.AddCaller(), zap.AddCallerSkip(2)), al: &al}
}

func (l *Logger) SetLevel(level Level) {
if l.al != nil {
l.al.SetLevel(level)
}
}

type Field = zap.Field

func (l *Logger) Debug(msg string, fields ...Field) {
l.l.Debug(msg, fields...)
}

func (l *Logger) Info(msg string, fields ...Field) {
l.l.Info(msg, fields...)
}

func (l *Logger) Warn(msg string, fields ...Field) {
l.l.Warn(msg, fields...)
}

func (l *Logger) Error(msg string, fields ...Field) {
l.l.Error(msg, fields...)
}

func (l *Logger) Panic(msg string, fields ...Field) {
l.l.Panic(msg, fields...)
}

func (l *Logger) Fatal(msg string, fields ...Field) {
l.l.Fatal(msg, fields...)
}

func (l *Logger) Sync() error {
return l.l.Sync()
}

var std = New(os.Stderr, DebugLevel)

func Default() *Logger { return std }
func ReplaceDefault(l *Logger) { std = l }
func SetLevel(level Level) { std.SetLevel(level) }

func Debug(msg string, fields ...Field) { std.Debug(msg, fields...) }
func Info(msg string, fields ...Field) { std.Info(msg, fields...) }
func Warn(msg string, fields ...Field) { std.Warn(msg, fields...) }
func Error(msg string, fields ...Field) { std.Error(msg, fields...) }
func Panic(msg string, fields ...Field) { std.Panic(msg, fields...) }
func Fatal(msg string, fields ...Field) { std.Fatal(msg, fields...) }

func Sync() error { return std.Sync() }
19 changes: 19 additions & 0 deletions go/common/log/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package log

import (
"testing"
)

func TestLogger(t *testing.T) {
defer Sync()
Info("Testing")
Debug("Testing")
Warn("Testing")
Error("Testing")
defer func() {
if err := recover(); err != nil {
Debug("logPanic recover")
}
}()
Panic("Testing")
}
20 changes: 20 additions & 0 deletions go/common/log/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package log

import "go.uber.org/zap"

type Option = zap.Option

var (
WrapCore = zap.WrapCore
Hooks = zap.Hooks
Fields = zap.Fields
ErrorOutput = zap.ErrorOutput
Development = zap.Development
AddCaller = zap.AddCaller
WithCaller = zap.WithCaller
AddCallerSkip = zap.AddCallerSkip
AddStacktrace = zap.AddStacktrace
IncreaseLevel = zap.IncreaseLevel
WithFatalHook = zap.WithFatalHook
WithClock = zap.WithClock
)
14 changes: 9 additions & 5 deletions go/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/milvus-io/milvus-storage-format/common/result"
"github.com/milvus-io/milvus-storage-format/common/status"
"github.com/milvus-io/milvus-storage-format/proto/schema_proto"
"path/filepath"
"strconv"
)

Expand Down Expand Up @@ -322,13 +323,16 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.

func GetNewParquetFilePath(path string) string {
scalarFileId := uuid.New()
return path + scalarFileId.String() + common.KParquetDataFileSuffix
path = filepath.Join(path, common.KParquetDataDir, scalarFileId.String()+common.KParquetDataFileSuffix)
return path
}

func GetManifestFilePath(path string) string {
return path + common.KManifestFileName
func GetManifestFilePath(path string, version int64) string {
path = filepath.Join(path, common.KManifestDir, strconv.FormatInt(version, 10)+common.KManifestFileName)
return path
}

func GetManifestTmpFilePath(path string) string {
return path + common.KManifestTempFileName
func GetManifestTmpFilePath(path string, version int64) string {
path = filepath.Join(path, common.KManifestDir, strconv.FormatInt(version, 10)+common.KManifestTempFileName)
return path
}
3 changes: 3 additions & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/bits-and-blooms/bitset v1.5.0
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.2
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.28.1
)

Expand All @@ -28,6 +29,8 @@ require (
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.6.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ github.com/apache/arrow/go/v12 v12.0.0-20230223012627-e0e740bd7a24 h1:3klg6Gtrm0
github.com/apache/arrow/go/v12 v12.0.0-20230223012627-e0e740bd7a24/go.mod h1:3JcT3bSZFdc7wLPKSlQXhf3L0GjPz0TOmLlG1YXnBfU=
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/bits-and-blooms/bitset v1.5.0 h1:NpE8frKRLGHIcEzkR+gZhiioW1+WbYV6fKwD6ZIpQT8=
github.com/bits-and-blooms/bitset v1.5.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -61,6 +62,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand All @@ -69,13 +71,21 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
9 changes: 9 additions & 0 deletions go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ package fs
import (
"github.com/milvus-io/milvus-storage-format/io/fs/file"
"os"
"path/filepath"
)

type LocalFS struct{}

func (l *LocalFS) OpenFile(path string) (file.File, error) {
// Extract the directory from the path
dir := filepath.Dir(path)
// Create the directory (including all necessary parent directories)
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, err
}
open, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
return file.NewLocalFile(open), nil
}

// Rename renames (moves) a file. If newpath already exists and is not a directory, Rename replaces it.
func (l *LocalFS) Rename(src string, dst string) error {
return os.Rename(src, dst)
}
Expand Down
Loading

0 comments on commit 6df90c3

Please sign in to comment.