Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local: implement Put in an atomic, isolated, and durable way #220

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions local/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"errors"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
Expand All @@ -16,6 +17,8 @@ type container struct {
path string
}

const tmpPrefix = ".stow."

func (c *container) ID() string {
return c.path
}
Expand Down Expand Up @@ -58,22 +61,45 @@ func (c *container) Put(name string, r io.Reader, size int64, metadata map[strin
path: path,
contPrefixLen: len(c.path) + 1,
}
err := os.MkdirAll(filepath.Dir(path), 0777)
fileDir := filepath.Dir(path)
err := os.MkdirAll(fileDir, 0777)
if err != nil {
return nil, err
}
f, err := os.Create(path)
tmpFile, err := ioutil.TempFile(fileDir, tmpPrefix)
if err != nil {
return nil, err
}
defer f.Close()
n, err := io.Copy(f, r)
tmpName := tmpFile.Name()
defer func() {
if tmpFile != nil {
tmpFile.Close()
}
if tmpName != "" {
os.Remove(tmpName)
}
}()
n, err := io.Copy(tmpFile, r)
if err != nil {
return nil, err
}
if n != size {
return nil, errors.New("bad size")
}
err = tmpFile.Sync()
if err != nil {
return nil, err
}
err = tmpFile.Close()
tmpFile = nil
if err != nil {
return nil, err
}
err = os.Rename(tmpName, path)
if err != nil {
return nil, err
}
tmpName = ""
return item, nil
}

Expand Down Expand Up @@ -108,6 +134,10 @@ func (c *container) Items(prefix, cursor string, count int) ([]stow.Item, string
if f.IsDir() {
continue
}
base := filepath.Base(f.Name())
if strings.HasPrefix(base, tmpPrefix) {
continue
}
path, err := filepath.Abs(filepath.Join(c.path, f.Name()))
if err != nil {
return nil, "", err
Expand Down Expand Up @@ -152,6 +182,11 @@ func (c *container) Item(id string) (stow.Item, error) {
func flatdirs(path string) ([]os.FileInfo, error) {
var list []os.FileInfo
err := filepath.Walk(path, func(p string, info os.FileInfo, err error) error {
// On Unix-like OS, file might have disappeared between calling readdir
// and calling lstat - this is not a global error
if os.IsNotExist(err) && p != path {
return nil
}
if err != nil {
return err
}
Expand Down
80 changes: 80 additions & 0 deletions local/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package local_test

import (
"fmt"
"io"
"strings"
"testing"

Expand Down Expand Up @@ -60,3 +61,82 @@ func TestItemsPaging(t *testing.T) {
is.Equal(err, stow.ErrBadCursor)

}

type chunkOfZero int

func (c *chunkOfZero) Read(b []byte) (n int, err error) {
if *c <= 0 {
return 0, io.EOF
}
toFill := len(b)
if toFill > int(*c) {
toFill = int(*c)
}
leftBytes := int(*c) - toFill
*c = chunkOfZero(leftBytes)
for i := 0; i < toFill; i++ {
b[i] = 0
}
return toFill, nil
}

func TestAtomicWrites(t *testing.T) {
is := is.New(t)
testDir, teardown, err := setup()
is.NoErr(err)
defer teardown()
cfg := stow.ConfigMap{"path": testDir}
l, err := stow.Dial(local.Kind, cfg)
is.NoErr(err)
is.OK(l)

container, err := l.Container("one")
is.NoErr(err)
is.OK(container)

initialVer := chunkOfZero(1000000)
initialItem, err := container.Put("test-file", &initialVer, 1000000, nil)
is.NoErr(err)
sz, err := initialItem.Size()
is.NoErr(err)
is.True(sz == 1000000)

end := make(chan error)
go func() {
for i := 0; i < 10; i++ {
c := chunkOfZero(1000000)
_, err := container.Put("test-file", &c, 1000000, nil)
if err != nil {
end <- err
}
}

end <- nil
}()

loop:
for {
items, _, err := container.Items("", "", 10)
is.NoErr(err)
is.True(len(items) < 10)
found := false
for _, i := range items {
if i.Name() != "test-file" {
continue
}

found = true
sz, err := i.Size()
is.NoErr(err)
is.True(sz == 1000000)
}
is.True(found)

select {
case err := <-end:
is.NoErr(err)
break loop
default:
}
}
}