Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions vfs/lfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
_GC_ACTIVE
_GC_INACTIVE
_SEGMENT_PADDING = 26
_INDEX_SEGMENT_SIZE = 49
_INDEX_SEGMENT_SIZE = 48
)

var (
Expand All @@ -79,7 +79,6 @@ type inode struct {
CreatedAt int64 // Creation time of the inode (UNIX timestamp in nano seconds)
mvcc uint64 // Multi-version concurrency ID
Length int32 // Data record length
Type kind // Data record type
}

type indexMap struct {
Expand Down Expand Up @@ -140,7 +139,6 @@ func (lfs *LogStructuredFS) PutSegment(key string, seg *Segment) error {
CreatedAt: seg.CreatedAt,
ExpiredAt: seg.ExpiredAt,
mvcc: 0,
Type: seg.Type,
}
imap.mu.Unlock()

Expand Down Expand Up @@ -548,13 +546,13 @@ func (lfs *LogStructuredFS) scanAndRecoverIndexs() error {
filePath := filepath.Join(lfs.directory, indexFileName)
if utils.IsExist(filePath) {
// If the index file exists, restore it
file, err := os.Open(filePath)
reader, err := mmap.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open index file: %w", err)
return fmt.Errorf("failed to mmap index file: %w", err)
}
defer file.Close()
defer reader.Close()

err = recoveryIndex(file, lfs.indexs)
err = recoveryIndex(reader, lfs.indexs)
if err != nil {
return fmt.Errorf("failed to recover index mapping: %w", err)
}
Expand Down Expand Up @@ -892,20 +890,15 @@ func (lfs *LogStructuredFS) ExportSnapshotIndex() error {
return nil
}

func recoveryIndex(fd *os.File, indexs []*indexMap) error {
func recoveryIndex(reader *mmap.ReaderAt, indexs []*indexMap) error {
offset := int64(len(dataFileMetadata))

finfo, err := fd.Stat()
if err != nil {
return err
}

type index struct {
inum uint64
inode *inode
}

nqueue := make(chan index, (finfo.Size()-offset)/_INDEX_SEGMENT_SIZE)
nqueue := make(chan index, (int64(reader.Len())-offset)/_INDEX_SEGMENT_SIZE)
equeue := make(chan error, 1)

var wg sync.WaitGroup
Expand All @@ -916,8 +909,8 @@ func recoveryIndex(fd *os.File, indexs []*indexMap) error {
defer close(nqueue)

buf := make([]byte, _INDEX_SEGMENT_SIZE)
for offset < finfo.Size() && len(equeue) == 0 {
_, err := fd.ReadAt(buf, offset)
for offset < int64(reader.Len()) && len(equeue) == 0 {
_, err := reader.ReadAt(buf, offset)
if err != nil {
equeue <- fmt.Errorf("failed to read index node: %w", err)
return
Expand Down Expand Up @@ -1309,7 +1302,6 @@ func serializedIndex(buf *bytes.Buffer, inum uint64, inode *inode) ([]byte, erro
binary.Write(buf, binary.LittleEndian, inode.Length)
binary.Write(buf, binary.LittleEndian, inode.ExpiredAt)
binary.Write(buf, binary.LittleEndian, inode.CreatedAt)
binary.Write(buf, binary.LittleEndian, inode.Type)

// Calculate CRC32 checksum
checksum := crc32.ChecksumIEEE(buf.Bytes())
Expand Down Expand Up @@ -1358,11 +1350,6 @@ func deserializedIndex(data []byte) (uint64, *inode, error) {
return 0, nil, err
}

err = binary.Read(buf, binary.LittleEndian, &inode.Type)
if err != nil {
return 0, nil, err
}

// Deserialize and verify CRC32 checksum
var checksum uint32
err = binary.Read(buf, binary.LittleEndian, &checksum)
Expand Down Expand Up @@ -1641,13 +1628,13 @@ func scanAndRecoverCheckpoint(files []string, regions map[int64]*Region, indexs
}
}

file, err := os.Open(path)
reader, err := mmap.Open(path)
if err != nil {
return fmt.Errorf("failed to open checkpoint file: %w", err)
return fmt.Errorf("failed to mmap checkpoint file: %w", err)
}
defer file.Close()
defer reader.Close()

err = recoveryIndex(file, indexs)
err = recoveryIndex(reader, indexs)
if err != nil {
return fmt.Errorf("failed to recover data from checkpoint: %w", err)
}
Expand Down
6 changes: 1 addition & 5 deletions vfs/lfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestSerializedIndex(t *testing.T) {
Length: 100,
ExpiredAt: 1617181723,
CreatedAt: 1617181623,
Type: table,
}

buf := new(bytes.Buffer)
Expand All @@ -51,7 +50,7 @@ func TestSerializedIndex(t *testing.T) {
}

// 检查返回的字节切片长度
assert.Equal(t, len(result), 49)
assert.Equal(t, len(result), 48)

// 验证内容字段进行反序列化并检查
inum, dnode, err := deserializedIndex(result)
Expand All @@ -78,9 +77,6 @@ func TestSerializedIndex(t *testing.T) {
if dnode.CreatedAt != in.CreatedAt {
t.Errorf("expected CreatedAt %d, got %d", in.CreatedAt, dnode.CreatedAt)
}
if dnode.Type != in.Type {
t.Errorf("expected Type %d, got %d", in.Type, dnode.Type)
}

}

Expand Down