diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 0e73b29..0ce6492 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: "1.20" + go-version: "1.24.0" - name: Build run: go build -v ./... diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e2c6e82..3228b50 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -21,7 +21,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: '1.20' + go-version: '1.24.0' - name: Define File Name diff --git a/config.yaml b/config.yaml index df00b35..003987e 100644 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,5 @@ port: 2668 # 服务 HTTP 协议端口 -mode: "std" # 默认为 std 标准库,另外可以设置 mmap 模式(本功能待完善) +mode: "std" # 默认 active 为 std 标准库,旧数据是 mmap 模式,这个 std 配置可以移除了 path: "/tmp/urnadb" # 数据库文件存储目录 auth: "Are we wide open to the world?" # 访问 HTTP 协议的秘密 logpath: "/tmp/urnadb/out.log" # urnadb 在运行时程序产生的日志存储文件 @@ -7,7 +7,7 @@ debug: false # 是否开启 debug 模式 region: # 数据区 enable: true # 是否开启数据压缩功能 cron: "0 0 3 * *" # 垃圾回收器执行周期改为 cron 的格式 - threshold: 2 # 默认个数据文件大小,单位 GB + threshold: 1 # 默认个数据文件大小,单位 GB encryptor: # 是否开启静态数据加密功能 enable: false secret: "your-static-data-secret!" diff --git a/go.mod b/go.mod index 1a65d70..ad4691c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/auula/urnadb -go 1.20 +go 1.24.0 + +toolchain go1.24.10 require ( github.com/fatih/color v1.13.0 @@ -60,6 +62,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.23.0 // indirect + golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/go.sum b/go.sum index 713579d..92ae905 100644 --- a/go.sum +++ b/go.sum @@ -286,6 +286,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 h1:zfMcR1Cs4KNuomFFgGefv5N0czO2XZpUbxGUy8i8ug0= +golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/vfs/lfs.go b/vfs/lfs.go index fedcc40..172a36e 100644 --- a/vfs/lfs.go +++ b/vfs/lfs.go @@ -36,6 +36,7 @@ import ( "github.com/auula/urnadb/utils" "github.com/robfig/cron/v3" "github.com/spaolacci/murmur3" + "golang.org/x/exp/mmap" ) const ( @@ -86,6 +87,11 @@ type indexMap struct { index map[uint64]*inode } +type Region struct { + Fd *os.File + *mmap.ReaderAt +} + // LogStructuredFS represents the virtual file storage system. type LogStructuredFS struct { mu sync.RWMutex @@ -95,10 +101,10 @@ type LogStructuredFS struct { fsPerm os.FileMode indexs []*indexMap active *os.File - regions map[int64]*os.File + regions map[int64]*Region gcstate _GC_STATE compactTask *cron.Cron - dirtyRegions []*os.File + dirtyRegions []*Region regionThreshold int64 checkpointWorker *time.Ticker expireLoopWorker *time.Ticker @@ -230,14 +236,30 @@ func (lfs *LogStructuredFS) FetchSegment(key string) (uint64, *Segment, error) { return 0, nil, fmt.Errorf("inode index for %d has expired", inum) } - fd, ok := lfs.regions[atomic.LoadInt64(&inode.RegionID)] + region, ok := lfs.regions[atomic.LoadInt64(&inode.RegionID)] if !ok { return 0, nil, fmt.Errorf("data region with ID %d not found", inode.RegionID) } - _, segment, err := readSegment(fd, atomic.LoadInt64(&inode.Position), _SEGMENT_PADDING) + // 如果是 Active Region 它的 ReaderAt 为 nil,直接读取不需要使用 mmap + if region.ReaderAt == nil { + lfs.mu.RLock() + isActive := (atomic.LoadInt64(&inode.RegionID) == lfs.regionID) + lfs.mu.RUnlock() + + // 从 Active Region 直接读取 + if isActive { + _, segment, err := readSegment(region.Fd, atomic.LoadInt64(&inode.Position), _SEGMENT_PADDING) + if err != nil { + return 0, nil, fmt.Errorf("failed to read segment from active region: %w", err) + } + return atomic.LoadUint64(&inode.mvcc), segment, nil + } + } + + _, segment, err := readSegmentMmap(region.ReaderAt, atomic.LoadInt64(&inode.Position), _SEGMENT_PADDING) if err != nil { - return 0, nil, fmt.Errorf("failed to read segment: %w", err) + return 0, nil, fmt.Errorf("failed to read segment from mmap: %w", err) } // Return the fetched segment and multi-version concurrency ID @@ -381,7 +403,19 @@ func (lfs *LogStructuredFS) changeRegions() error { return fmt.Errorf("failed to change active regions: %w", err) } - lfs.regions[lfs.regionID] = lfs.active + // 重新以只读的方式打开这个文件,并且开发 mmap 映射 + fileName, err := toStringFileName(lfs.regionID) + if err != nil { + return fmt.Errorf("failed to active region name to string: %w", err) + } + + // 写满了就映射为 mmap 的方式读取 + reader, err := mmap.Open(filepath.Join(lfs.directory, fileName)) + if err != nil { + return fmt.Errorf("failed to mmap data file: %w", err) + } + + lfs.regions[lfs.regionID].ReaderAt = reader err = lfs.createActiveRegion() if err != nil { @@ -393,7 +427,7 @@ func (lfs *LogStructuredFS) changeRegions() error { func (lfs *LogStructuredFS) createActiveRegion() error { lfs.regionID += 1 - fileName, err := generateFileName(lfs.regionID) + fileName, err := toStringFileName(lfs.regionID) if err != nil { return fmt.Errorf("failed to new active region name: %w", err) } @@ -414,7 +448,8 @@ func (lfs *LogStructuredFS) createActiveRegion() error { lfs.active = active lfs.offset = int64(len(dataFileMetadata)) - lfs.regions[lfs.regionID] = lfs.active + // Active region 不立即 mmap,只存储 Fd + lfs.regions[lfs.regionID] = &Region{Fd: lfs.active, ReaderAt: nil} return nil } @@ -434,11 +469,17 @@ func (lfs *LogStructuredFS) scanAndRecoverRegions() error { return fmt.Errorf("failed to open data file: %w", err) } + reader, err := mmap.Open(filepath.Join(lfs.directory, file.Name())) + if err != nil { + return fmt.Errorf("failed to mmap data file: %w", err) + } + regionID, err := parseDataFileName(file.Name()) if err != nil { return fmt.Errorf("failed to get region id: %w", err) } - lfs.regions[regionID] = fd + + lfs.regions[regionID] = &Region{Fd: fd, ReaderAt: reader} } } } @@ -462,7 +503,7 @@ func (lfs *LogStructuredFS) scanAndRecoverRegions() error { if !ok { return fmt.Errorf("region file not found for region id: %d", lfs.regionID) } - stat, err := active.Stat() + stat, err := active.Fd.Stat() if err != nil { return fmt.Errorf("failed to get region file info: %w", err) } @@ -470,12 +511,17 @@ func (lfs *LogStructuredFS) scanAndRecoverRegions() error { if stat.Size() >= lfs.regionThreshold { return lfs.createActiveRegion() } else { - offset, err := active.Seek(0, io.SeekEnd) + offset, err := active.Fd.Seek(0, io.SeekEnd) if err != nil { return fmt.Errorf("failed to get region file offset: %w", err) } - lfs.active = active + lfs.active = active.Fd lfs.offset = offset + // Active region 不使用 mmap,只有当它成为旧的 region 文件时才 mmap + if lfs.regions[lfs.regionID].ReaderAt != nil { + lfs.regions[lfs.regionID].ReaderAt.Close() + lfs.regions[lfs.regionID].ReaderAt = nil + } } } else { // If it is an empty directory, create a writable data file @@ -723,7 +769,7 @@ func OpenFS(opt *Options) (*LogStructuredFS, error) { instance := &LogStructuredFS{ indexs: make([]*indexMap, shard), - regions: make(map[int64]*os.File, 10), + regions: make(map[int64]*Region, 10), offset: int64(len(dataFileMetadata)), regionID: 0, directory: opt.Path, @@ -764,8 +810,14 @@ func OpenFS(opt *Options) (*LogStructuredFS, error) { func (lfs *LogStructuredFS) CloseFS() error { lfs.mu.Lock() defer lfs.mu.Unlock() - for _, file := range lfs.regions { - err := utils.FlushToDisk(file) + for _, reg := range lfs.regions { + if reg.ReaderAt != nil { + err := reg.ReaderAt.Close() + if err != nil { + return fmt.Errorf("failed to close mmap reader: %w", err) + } + } + err := utils.FlushToDisk(reg.Fd) if err != nil { // In-memory indexes must be persisted inner := lfs.ExportSnapshotIndex() @@ -927,7 +979,7 @@ func recoveryIndex(fd *os.File, indexs []*indexMap) error { // 4. If DEL is 1, the corresponding entry is deleted from the in-memory index. // 5. Otherwise, the disk metadata is reconstructed into the index. // | DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 | -func crashRecoveryAllIndex(regions map[int64]*os.File, indexs []*indexMap) error { +func crashRecoveryAllIndex(regions map[int64]*Region, indexs []*indexMap) error { var regionIds []int64 for v := range regions { regionIds = append(regionIds, v) @@ -938,12 +990,12 @@ func crashRecoveryAllIndex(regions map[int64]*os.File, indexs []*indexMap) error }) for _, regionId := range regionIds { - fd, ok := regions[regionId] + reg, ok := regions[regionId] if !ok { return fmt.Errorf("data file does not exist regions id: %d", regionId) } - finfo, err := fd.Stat() + finfo, err := reg.Fd.Stat() if err != nil { return err } @@ -951,7 +1003,7 @@ func crashRecoveryAllIndex(regions map[int64]*os.File, indexs []*indexMap) error offset := int64(len(dataFileMetadata)) for offset < finfo.Size() { - inum, segment, err := readSegment(fd, offset, _SEGMENT_PADDING) + inum, segment, err := readSegment(reg.Fd, offset, _SEGMENT_PADDING) if err != nil { return fmt.Errorf("failed to parse data file segment: %w", err) } @@ -1054,8 +1106,7 @@ func checkFileSystem(path string, fsPerm fs.FileMode) error { return nil } -// | DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 | -func readSegment(fd *os.File, offset int64, bufsize int64) (uint64, *Segment, error) { +func readSegment(fd *os.File, offset, bufsize int64) (uint64, *Segment, error) { buf := make([]byte, bufsize) _, err := fd.ReadAt(buf, offset) @@ -1066,6 +1117,79 @@ func readSegment(fd *os.File, offset int64, bufsize int64) (uint64, *Segment, er var seg Segment readOffset := 0 + seg.Tombstone = int8(buf[readOffset]) + readOffset++ + + seg.Type = kind(buf[readOffset]) + readOffset++ + + seg.ExpiredAt = int64(binary.LittleEndian.Uint64(buf[readOffset : readOffset+8])) + readOffset += 8 + + seg.CreatedAt = int64(binary.LittleEndian.Uint64(buf[readOffset : readOffset+8])) + readOffset += 8 + + seg.KeySize = int32(binary.LittleEndian.Uint32(buf[readOffset : readOffset+4])) + readOffset += 4 + + seg.ValueSize = int32(binary.LittleEndian.Uint32(buf[readOffset : readOffset+4])) + readOffset += 4 + + if seg.KeySize < 0 || seg.ValueSize < 0 { + return 0, nil, fmt.Errorf("invalid segment size: keySize=%d, valueSize=%d", seg.KeySize, seg.ValueSize) + } + + keybuf := make([]byte, seg.KeySize) + _, err = fd.ReadAt(keybuf, int64(offset)+int64(readOffset)) + if err != nil { + return 0, nil, fmt.Errorf("failed to parse key in segment: %w", err) + } + readOffset += int(seg.KeySize) + + valuebuf := make([]byte, seg.ValueSize) + _, err = fd.ReadAt(valuebuf, int64(offset)+int64(readOffset)) + if err != nil { + return 0, nil, fmt.Errorf("failed to parse value in segment: %w", err) + } + readOffset += int(seg.ValueSize) + + checksumBuf := make([]byte, 4) + _, err = fd.ReadAt(checksumBuf, int64(offset)+int64(readOffset)) + if err != nil { + return 0, nil, fmt.Errorf("failed to read checksum in segment: %w", err) + } + + checksum := binary.LittleEndian.Uint32(checksumBuf) + buf = append(buf, keybuf...) + buf = append(buf, valuebuf...) + + if checksum != crc32.ChecksumIEEE(buf) { + return 0, nil, fmt.Errorf("crc32 checksum mismatch: %d", checksum) + } + + decodedData, err := transformer.Decode(valuebuf) + if err != nil { + return 0, nil, fmt.Errorf("failed to transformer decode value in segment: %w", err) + } + + seg.Key = keybuf + seg.Value = decodedData + + return inodeNum(string(keybuf)), &seg, nil +} + +// | DEL 1 | KIND 1 | EAT 8 | CAT 8 | KLEN 4 | VLEN 4 | KEY ? | VALUE ? | CRC32 4 | +func readSegmentMmap(reader *mmap.ReaderAt, offset, bufsize int64) (uint64, *Segment, error) { + buf := make([]byte, bufsize) + + _, err := reader.ReadAt(buf, offset) + if err != nil { + return 0, nil, err + } + + var seg Segment + readOffset := 0 + // Parse Tombstone (1 byte) seg.Tombstone = int8(buf[readOffset]) readOffset++ @@ -1094,7 +1218,7 @@ func readSegment(fd *os.File, offset int64, bufsize int64) (uint64, *Segment, er // Read Key data keybuf := make([]byte, seg.KeySize) - _, err = fd.ReadAt(keybuf, int64(offset)+int64(readOffset)) + _, err = reader.ReadAt(keybuf, int64(offset)+int64(readOffset)) if err != nil { return 0, nil, fmt.Errorf("failed to parse key in segment: %w", err) } @@ -1102,7 +1226,7 @@ func readSegment(fd *os.File, offset int64, bufsize int64) (uint64, *Segment, er // Read Value data valuebuf := make([]byte, seg.ValueSize) - _, err = fd.ReadAt(valuebuf, int64(offset)+int64(readOffset)) + _, err = reader.ReadAt(valuebuf, int64(offset)+int64(readOffset)) if err != nil { return 0, nil, fmt.Errorf("failed to parse value in segment: %w", err) } @@ -1110,7 +1234,7 @@ func readSegment(fd *os.File, offset int64, bufsize int64) (uint64, *Segment, er // Read checksum (4 bytes) checksumBuf := make([]byte, 4) - _, err = fd.ReadAt(checksumBuf, int64(offset)+int64(readOffset)) + _, err = reader.ReadAt(checksumBuf, int64(offset)+int64(readOffset)) if err != nil { return 0, nil, fmt.Errorf("failed to read checksum in segment: %w", err) } @@ -1137,7 +1261,7 @@ func readSegment(fd *os.File, offset int64, bufsize int64) (uint64, *Segment, er return inodeNum(string(keybuf)), &seg, nil } -func generateFileName(regionID int64) (string, error) { +func toStringFileName(regionID int64) (string, error) { fileName := formatDataFileName(regionID) // Verify if regionID starts with 0 (valid only for 8 digits) if strings.HasPrefix(fileName, "0") { @@ -1330,6 +1454,15 @@ func (lfs *LogStructuredFS) cleanupDirtyRegions() error { // find 40% dirty regions for i := 0; i < 4 && i < len(regionIds); i++ { + lfs.mu.RLock() + exclude := regionIds[i] == lfs.regionID + lfs.mu.RUnlock() + + // 排除活跃的文件 + if exclude { + continue + } + dirtyIds = append(dirtyIds, regionIds[i]) lfs.dirtyRegions = append(lfs.dirtyRegions, lfs.regions[regionIds[i]]) } @@ -1340,8 +1473,8 @@ func (lfs *LogStructuredFS) cleanupDirtyRegions() error { lfs.dirtyRegions = nil }() - for _, fd := range lfs.dirtyRegions { - finfo, err := fd.Stat() + for _, reg := range lfs.dirtyRegions { + finfo, err := reg.Fd.Stat() if err != nil { return err } @@ -1349,7 +1482,7 @@ func (lfs *LogStructuredFS) cleanupDirtyRegions() error { readOffset := int64(len(dataFileMetadata)) for readOffset < finfo.Size() { - inum, segment, err := readSegment(fd, readOffset, _SEGMENT_PADDING) + inum, segment, err := readSegmentMmap(reg.ReaderAt, readOffset, _SEGMENT_PADDING) if err != nil { return err } @@ -1418,9 +1551,9 @@ func (lfs *LogStructuredFS) cleanupDirtyRegions() error { func(reg_id int64) { lfs.mu.Lock() defer lfs.mu.Unlock() - fd, ok := lfs.regions[reg_id] + reg, ok := lfs.regions[reg_id] if ok { - _ = os.Remove(filepath.Join(lfs.directory, fd.Name())) + _ = os.Remove(filepath.Join(lfs.directory, reg.Fd.Name())) delete(lfs.regions, reg_id) } }(reg_id) @@ -1485,7 +1618,7 @@ func cleanupDirtyCheckpoint(directory, newCheckpoint string) error { return nil } -func scanAndRecoverCheckpoint(files []string, regions map[int64]*os.File, indexs []*indexMap) error { +func scanAndRecoverCheckpoint(files []string, regions map[int64]*Region, indexs []*indexMap) error { var ( ckpt int path string @@ -1537,12 +1670,12 @@ func scanAndRecoverCheckpoint(files []string, regions map[int64]*os.File, indexs }) for _, regionId := range regionIds { - fd, ok := regions[regionId] + reg, ok := regions[regionId] if !ok { return fmt.Errorf("data file does not exist regions id: %d", regionId) } - finfo, err := fd.Stat() + finfo, err := reg.Fd.Stat() if err != nil { return err } @@ -1550,7 +1683,7 @@ func scanAndRecoverCheckpoint(files []string, regions map[int64]*os.File, indexs offset := int64(len(dataFileMetadata)) for offset < finfo.Size() { - inum, segment, err := readSegment(fd, offset, _SEGMENT_PADDING) + inum, segment, err := readSegment(reg.Fd, offset, _SEGMENT_PADDING) if err != nil { return fmt.Errorf("failed to parse data file segment: %w", err) }