Skip to content

Commit 52456b4

Browse files
committed
initial commit
1 parent 8b82ea4 commit 52456b4

20 files changed

+2828
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
//go:build integration
2+
3+
package rclone
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"io"
9+
"net"
10+
"os"
11+
"path/filepath"
12+
"strconv"
13+
"strings"
14+
"testing"
15+
"time"
16+
17+
filesystemtest "github.com/goforj/filesystem/testutil"
18+
"github.com/goftp/server"
19+
"github.com/rclone/rclone/fs/config/obscure"
20+
)
21+
22+
type rcloneRemotes struct {
23+
inline string
24+
minioRemote string
25+
gcsRemote string
26+
ftpRemote string
27+
sftpRemote string
28+
}
29+
30+
// hostPort extracts host:port from an endpoint URL (http://host:port).
31+
func hostPort(endpoint string) string {
32+
trim := strings.TrimPrefix(endpoint, "http://")
33+
trim = strings.TrimPrefix(trim, "https://")
34+
return trim
35+
}
36+
37+
// ensureRcloneConfig builds a composite inline config once and returns remote names.
38+
func ensureRcloneConfig(t *testing.T) rcloneRemotes {
39+
t.Helper()
40+
var res rcloneRemotes
41+
var sb strings.Builder
42+
ctx := context.Background()
43+
44+
// MinIO/S3
45+
if endpoint, region, access, secret, bucket := filesystemtest.S3Settings(); filesystemtest.Reachable(hostPort(endpoint)) {
46+
if err := filesystemtest.EnsureS3Bucket(ctx, endpoint, region, access, secret, bucket); err == nil {
47+
sb.WriteString(fmt.Sprintf(`
48+
[minio]
49+
type = s3
50+
provider = Minio
51+
access_key_id = %s
52+
secret_access_key = %s
53+
region = %s
54+
endpoint = %s
55+
force_path_style = true
56+
`, access, secret, region, endpoint))
57+
res.minioRemote = fmt.Sprintf("minio:%s", bucket)
58+
}
59+
}
60+
61+
// GCS (requires creds JSON)
62+
if creds := os.Getenv("INTEGRATION_GCS_CREDS_JSON"); creds != "" {
63+
if endpoint, bucket := filesystemtest.GCSSettings(); filesystemtest.Reachable(hostPort(endpoint)) {
64+
if err := filesystemtest.EnsureGCSBucket(ctx, endpoint, bucket); err == nil {
65+
sb.WriteString(fmt.Sprintf(`
66+
[fakegcs]
67+
type = googlecloudstorage
68+
service_account_credentials = %s
69+
endpoint = %s
70+
`, creds, endpoint))
71+
res.gcsRemote = fmt.Sprintf("fakegcs:%s", bucket)
72+
}
73+
}
74+
}
75+
76+
// FTP
77+
host := filesystemtest.GetenvDefault("INTEGRATION_FTP_HOST", "127.0.0.1")
78+
port := filesystemtest.GetenvDefault("INTEGRATION_FTP_PORT", "2121")
79+
user := filesystemtest.GetenvDefault("INTEGRATION_FTP_USER", "ftpuser")
80+
pass := filesystemtest.GetenvDefault("INTEGRATION_FTP_PASS", "ftppass")
81+
if addr := fmt.Sprintf("%s:%s", host, port); !filesystemtest.Reachable(addr) {
82+
host, port, user, pass = startEmbeddedFTP(t)
83+
}
84+
if addr := fmt.Sprintf("%s:%s", host, port); filesystemtest.Reachable(addr) {
85+
sb.WriteString(fmt.Sprintf(`
86+
[ftpbackend]
87+
type = ftp
88+
host = %s
89+
port = %s
90+
user = %s
91+
pass = %s
92+
`, host, port, user, obscure.MustObscure(pass)))
93+
res.ftpRemote = "ftpbackend:/"
94+
}
95+
96+
// SFTP
97+
shost := filesystemtest.GetenvDefault("INTEGRATION_SFTP_HOST", "127.0.0.1")
98+
sport := filesystemtest.GetenvDefault("INTEGRATION_SFTP_PORT", "2222")
99+
suser := filesystemtest.GetenvDefault("INTEGRATION_SFTP_USER", "fsuser")
100+
spass := obscure.MustObscure(filesystemtest.GetenvDefault("INTEGRATION_SFTP_PASS", "fspass"))
101+
if addr := fmt.Sprintf("%s:%s", shost, sport); filesystemtest.Reachable(addr) {
102+
sb.WriteString(fmt.Sprintf(`
103+
[sftpbackend]
104+
type = sftp
105+
host = %s
106+
port = %s
107+
user = %s
108+
pass = %s
109+
md5sum_command =
110+
sha1sum_command =
111+
`, shost, sport, suser, spass))
112+
res.sftpRemote = "sftpbackend:/config"
113+
}
114+
115+
res.inline = sb.String()
116+
if res.inline != "" {
117+
_ = setRcloneConfigData(res.inline)
118+
}
119+
return res
120+
}
121+
122+
func startEmbeddedFTP(t *testing.T) (host, port, user, pass string) {
123+
t.Helper()
124+
root := t.TempDir()
125+
host = "127.0.0.1"
126+
user, pass = "ftpuser", "ftppass"
127+
128+
l, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
129+
if err != nil {
130+
t.Fatalf("ftp listen: %v", err)
131+
}
132+
port = fmt.Sprintf("%d", l.Addr().(*net.TCPAddr).Port)
133+
_ = l.Close()
134+
135+
opts := &server.ServerOpts{
136+
Factory: &memFactory{root: root},
137+
Hostname: host,
138+
Port: atoi(port),
139+
Auth: &server.SimpleAuth{Name: user, Password: pass},
140+
}
141+
s := server.NewServer(opts)
142+
go func() { _ = s.ListenAndServe() }()
143+
t.Cleanup(func() { _ = s.Shutdown() })
144+
time.Sleep(200 * time.Millisecond)
145+
return
146+
}
147+
148+
type memFactory struct {
149+
root string
150+
}
151+
152+
func (f *memFactory) NewDriver() (server.Driver, error) {
153+
return &memDriver{root: f.root, perm: server.NewSimplePerm("user", "group")}, nil
154+
}
155+
156+
type memDriver struct {
157+
root string
158+
perm server.Perm
159+
}
160+
161+
func (d *memDriver) Init(*server.Conn) {}
162+
163+
func (d *memDriver) Stat(p string) (server.FileInfo, error) {
164+
fi, err := os.Stat(d.abs(p))
165+
if err != nil {
166+
return nil, err
167+
}
168+
return fileInfo{FileInfo: fi}, nil
169+
}
170+
171+
func (d *memDriver) ChangeDir(p string) error {
172+
fi, err := os.Stat(d.abs(p))
173+
if err != nil {
174+
return err
175+
}
176+
if !fi.IsDir() {
177+
return os.ErrInvalid
178+
}
179+
return nil
180+
}
181+
182+
func (d *memDriver) ListDir(p string, cb func(server.FileInfo) error) error {
183+
dir := d.abs(p)
184+
entries, err := os.ReadDir(dir)
185+
if err != nil {
186+
return err
187+
}
188+
for _, entry := range entries {
189+
info, err := entry.Info()
190+
if err != nil {
191+
return err
192+
}
193+
if err := cb(fileInfo{FileInfo: info}); err != nil {
194+
return err
195+
}
196+
}
197+
return nil
198+
}
199+
200+
func (d *memDriver) DeleteDir(p string) error { return os.RemoveAll(d.abs(p)) }
201+
func (d *memDriver) DeleteFile(p string) error { return os.Remove(d.abs(p)) }
202+
func (d *memDriver) Rename(from, to string) error {
203+
return os.Rename(d.abs(from), d.abs(to))
204+
}
205+
func (d *memDriver) MakeDir(p string) error {
206+
return os.MkdirAll(d.abs(p), 0o755)
207+
}
208+
func (d *memDriver) GetFile(p string, _ int64) (int64, io.ReadCloser, error) {
209+
f, err := os.Open(d.abs(p))
210+
if err != nil {
211+
return 0, nil, err
212+
}
213+
info, _ := f.Stat()
214+
return info.Size(), f, nil
215+
}
216+
func (d *memDriver) PutFile(p string, r io.Reader, _ bool) (int64, error) {
217+
fp := d.abs(p)
218+
if err := os.MkdirAll(filepath.Dir(fp), 0o755); err != nil {
219+
return 0, err
220+
}
221+
f, err := os.Create(fp)
222+
if err != nil {
223+
return 0, err
224+
}
225+
defer f.Close()
226+
return io.Copy(f, r)
227+
}
228+
229+
func (d *memDriver) abs(p string) string {
230+
if p == "" || p == "." || p == "/" {
231+
return d.root
232+
}
233+
return filepath.Join(d.root, p)
234+
}
235+
236+
type fileInfo struct {
237+
os.FileInfo
238+
}
239+
240+
func (f fileInfo) Owner() string { return "user" }
241+
func (f fileInfo) Group() string { return "group" }
242+
243+
func atoi(s string) int {
244+
i, _ := strconv.Atoi(s)
245+
return i
246+
}

driver/rclone/localstack_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package rclone
2+
3+
import (
4+
"context"
5+
"errors"
6+
filesystemtest "github.com/goforj/filesystem/testutil"
7+
"net/http"
8+
"os"
9+
"testing"
10+
"time"
11+
12+
"github.com/aws/aws-sdk-go-v2/aws"
13+
"github.com/aws/aws-sdk-go-v2/credentials"
14+
"github.com/aws/aws-sdk-go-v2/service/s3"
15+
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
16+
17+
"github.com/goforj/filesystem"
18+
"github.com/goforj/filesystem-rclone/rcloneconfig"
19+
)
20+
21+
// Optional integration against localstack S3.
22+
// Set LOCALSTACK_S3_ENDPOINT (e.g., http://localhost:4566) to enable.
23+
func TestRcloneWithLocalstackS3(t *testing.T) {
24+
if os.Getenv("RUN_LOCALSTACK_S3") == "" {
25+
t.Skip("RUN_LOCALSTACK_S3 not set; skipping localstack integration test")
26+
}
27+
28+
endpoint := os.Getenv("LOCALSTACK_S3_ENDPOINT")
29+
if endpoint == "" {
30+
endpoint = "http://localhost:4566"
31+
}
32+
33+
if !isEndpointReachable(endpoint) {
34+
t.Skipf("localstack endpoint %s not reachable; skipping", endpoint)
35+
}
36+
37+
bucket := "filesystemtest"
38+
awsCfg := aws.Config{
39+
Region: "us-east-1",
40+
Credentials: credentials.NewStaticCredentialsProvider("access", "secret", ""),
41+
EndpointResolverWithOptions: aws.EndpointResolverWithOptionsFunc(
42+
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
43+
return aws.Endpoint{
44+
URL: endpoint,
45+
HostnameImmutable: true,
46+
}, nil
47+
}),
48+
}
49+
awsS3 := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
50+
o.UsePathStyle = true
51+
})
52+
53+
ctx := context.Background()
54+
55+
if _, err := awsS3.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: &bucket}); err != nil {
56+
// If already exists, proceed.
57+
var apiErr *s3types.BucketAlreadyOwnedByYou
58+
if !errors.As(err, &apiErr) {
59+
t.Skipf("create bucket failed (likely no localstack or path-style mismatch): %v", err)
60+
}
61+
}
62+
// No cleanup to allow inspection in localstack; disable if you need isolated runs.
63+
64+
conf := rcloneconfig.MustRenderS3(rcloneconfig.S3Remote{
65+
Name: "s3stack",
66+
Endpoint: endpoint,
67+
Region: "us-east-1",
68+
Provider: "Minio",
69+
AccessKeyID: "access",
70+
SecretAccessKey: "secret",
71+
PathStyle: true,
72+
UseUnsignedPayload: true,
73+
})
74+
75+
cfg := filesystem.Config{
76+
Default: "s3",
77+
RcloneConfigData: conf,
78+
Disks: map[filesystem.DiskName]filesystem.DiskConfig{
79+
"s3": {
80+
Driver: "rclone",
81+
Remote: "s3stack:" + bucket,
82+
Prefix: "sandbox",
83+
},
84+
},
85+
}
86+
87+
mgr, err := filesystem.New(cfg)
88+
if err != nil {
89+
t.Skipf("New manager failed (likely due to existing rclone init or endpoint issues): %v", err)
90+
}
91+
fsys, err := mgr.Disk("s3")
92+
if err != nil {
93+
t.Fatalf("disk: %v", err)
94+
}
95+
96+
err = fsys.Put(ctx, "foo2.txt", []byte("hello localstack"))
97+
if err != nil {
98+
t.Fatalf("Put: %v", err)
99+
}
100+
101+
filesystemtest.RunFilesystemContractTests(t, fsys)
102+
}
103+
104+
func isEndpointReachable(url string) bool {
105+
client := &http.Client{Timeout: 2 * time.Second}
106+
resp, err := client.Get(url)
107+
if err != nil {
108+
return false
109+
}
110+
_ = resp.Body.Close()
111+
return true
112+
}

0 commit comments

Comments
 (0)