Skip to content

Commit

Permalink
Merge pull request #10 from filedrive-team/0.3.0
Browse files Browse the repository at this point in the history
0.3.0
  • Loading branch information
qianhh authored May 28, 2021
2 parents 9aca3de + e78e5df commit b3143f6
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 37 deletions.
69 changes: 64 additions & 5 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,70 @@ package graphsplit
import (
"context"
"fmt"
"os"
"path"

ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)

var log = logging.Logger("graphsplit")

func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir, graphName string, parallel int) error {
type GraphBuildCallback interface {
OnSuccess(node ipld.Node, graphName string)
OnError(error)
}

type csvCallback struct {
carDir string
}

func (cc *csvCallback) OnSuccess(node ipld.Node, graphName string) {
// Add node inof to manifest.csv
manifestPath := path.Join(cc.carDir, "manifest.csv")
_, err := os.Stat(manifestPath)
if err != nil && !os.IsNotExist(err) {
log.Fatal(err)
}
var isCreateAction bool
if err != nil && os.IsNotExist(err) {
isCreateAction = true
}
f, err := os.OpenFile(manifestPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
defer f.Close()
if isCreateAction {
if _, err := f.Write([]byte("playload_cid,filename\n")); err != nil {
log.Fatal(err)
}
}
if _, err := f.Write([]byte(fmt.Sprintf("%s,%s\n", node.Cid(), graphName))); err != nil {
log.Fatal(err)
}
}

func (cc *csvCallback) OnError(err error) {
log.Fatal(err)
}

type errCallback struct{}

func (cc *errCallback) OnSuccess(ipld.Node, string) {}
func (cc *errCallback) OnError(err error) {
log.Fatal(err)
}

func CSVCallback(carDir string) GraphBuildCallback {
return &csvCallback{carDir: carDir}
}
func ErrCallback() GraphBuildCallback {
return &errCallback{}
}

func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir, graphName string, parallel int, cb GraphBuildCallback) error {
var cumuSize int64 = 0
graphSliceCount := 0
graphFiles := make([]Finfo, 0)
Expand All @@ -20,6 +76,9 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir,
if parallel <= 0 {
return xerrors.Errorf("Unexpected! Parallel has to be greater than 0")
}
if parentPath == "" {
parentPath = targetPath
}

args := []string{targetPath}
sliceTotal := GetGraphCount(args, sliceSize)
Expand All @@ -38,7 +97,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir,
cumuSize += fileSize
graphFiles = append(graphFiles, item)
// todo build ipld from graphFiles
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel)
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb)
fmt.Printf("cumu-size: %d\n", cumuSize)
fmt.Printf(GenGraphName(graphName, graphSliceCount, sliceTotal))
fmt.Printf("=================\n")
Expand All @@ -64,7 +123,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir,
})
fileSliceCount++
// todo build ipld from graphFiles
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel)
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb)
fmt.Printf("cumu-size: %d\n", cumuSize+firstCut)
fmt.Printf(GenGraphName(graphName, graphSliceCount, sliceTotal))
fmt.Printf("=================\n")
Expand All @@ -90,7 +149,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir,
fileSliceCount++
if seekEnd-seekStart == sliceSize-1 {
// todo build ipld from graphFiles
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel)
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb)
fmt.Printf("cumu-size: %d\n", sliceSize)
fmt.Printf(GenGraphName(graphName, graphSliceCount, sliceTotal))
fmt.Printf("=================\n")
Expand All @@ -104,7 +163,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir,
}
if cumuSize > 0 {
// todo build ipld from graphFiles
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel)
BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb)
fmt.Printf("cumu-size: %d\n", cumuSize)
fmt.Printf(GenGraphName(graphName, graphSliceCount, sliceTotal))
fmt.Printf("=================\n")
Expand Down
21 changes: 16 additions & 5 deletions cmd/graphsplit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,20 @@ var chunkCmd = &cli.Command{
Required: true,
Usage: "specify graph name",
},
&cli.StringFlag{
Name: "car-dir",
Required: true,
Usage: "specify output CAR directory",
},
&cli.StringFlag{
Name: "parent-path",
Value: "",
Usage: "specify graph parent path",
},
&cli.StringFlag{
Name: "car-dir",
Required: true,
Usage: "specify output CAR directory",
&cli.BoolFlag{
Name: "save-manifest",
Value: true,
Usage: "create a mainfest.csv in car-dir to save mapping of data-cids and slice names",
},
},
Action: func(c *cli.Context) error {
Expand All @@ -74,7 +79,13 @@ var chunkCmd = &cli.Command{
}

targetPath := c.Args().First()
return graphsplit.Chunk(ctx, int64(sliceSize), parentPath, targetPath, carDir, graphName, int(parallel))
var cb graphsplit.GraphBuildCallback
if c.Bool("save-manifest") {
cb = graphsplit.CSVCallback(carDir)
} else {
cb = graphsplit.ErrCallback()
}
return graphsplit.Chunk(ctx, int64(sliceSize), parentPath, targetPath, carDir, graphName, int(parallel), cb)
},
}

Expand Down
5 changes: 5 additions & 0 deletions restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
pa "path"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -110,6 +111,10 @@ func CarTo(carPath, outputDir string, parallel int) {
if fi.IsDir() {
return nil
}
if strings.ToLower(pa.Ext(fi.Name())) != ".car" {
log.Warn(path, ", it's not a CAR file, skip it")
return nil
}
workerCh <- func() {
log.Info(path)
root, err := Import(path, bs2)
Expand Down
32 changes: 5 additions & 27 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,14 @@ type Finfo struct {
SeekEnd int64
}

func BuildIpldGraph(ctx context.Context, fileList []Finfo, graphName, parentPath, carDir string, parallel int) {
func BuildIpldGraph(ctx context.Context, fileList []Finfo, graphName, parentPath, carDir string, parallel int, cb GraphBuildCallback) {
node, err := buildIpldGraph(ctx, fileList, parentPath, carDir, parallel)
if err != nil {
log.Fatal(err)
}
// TODO: save node info
// Add node inof to manifest.csv
manifestPath := path.Join(carDir, "manifest.csv")
_, err = os.Stat(manifestPath)
if err != nil && !os.IsNotExist(err) {
log.Fatal(err)
}
var isCreateAction bool
if err != nil && os.IsNotExist(err) {
isCreateAction = true
}
f, err := os.OpenFile(manifestPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
defer f.Close()
if isCreateAction {
if _, err := f.Write([]byte("playload_cid,filename\n")); err != nil {
log.Fatal(err)
}
}
if _, err := f.Write([]byte(fmt.Sprintf("%s,%s\n", node.Cid(), graphName))); err != nil {
log.Fatal(err)
//log.Fatal(err)
cb.OnError(err)
return
}

cb.OnSuccess(node, graphName)
}

func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath, carDir string, parallel int) (ipld.Node, error) {
Expand Down

0 comments on commit b3143f6

Please sign in to comment.