Skip to content

Commit 75d0163

Browse files
Merge pull request #63 from dirname/main
feat: add aliyun oss store
2 parents 8e0e8f1 + ae6c6e5 commit 75d0163

File tree

5 files changed

+204
-4
lines changed

5 files changed

+204
-4
lines changed

arseeding.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,26 @@ func New(
4848
boltDirPath, mySqlDsn string, sqliteDir string, useSqlite bool,
4949
arWalletKeyPath string, arNode, payUrl string, noFee bool, enableManifest bool,
5050
useS3 bool, s3AccKey, s3SecretKey, s3BucketPrefix, s3Region, s3Endpoint string,
51-
use4EVER bool, port string, customTags []types.Tag,
51+
use4EVER bool, useAliyun bool, aliyunEndpoint, aliyunAccKey, aliyunSecretKey, aliyunPrefix string,
52+
port string, customTags []types.Tag,
5253
) *Arseeding {
5354
var err error
5455
KVDb := &Store{}
55-
if useS3 {
56+
57+
switch {
58+
case useS3 && useAliyun:
59+
panic("can not use both s3 and aliyun")
60+
case useS3:
5661
if use4EVER {
5762
s3Endpoint = rawdb.ForeverLandEndpoint // inject 4everland endpoint
5863
}
5964
KVDb, err = NewS3Store(s3AccKey, s3SecretKey, s3Region, s3BucketPrefix, s3Endpoint)
60-
} else {
65+
case useAliyun:
66+
KVDb, err = NewAliyunStore(aliyunEndpoint, aliyunAccKey, aliyunSecretKey, aliyunPrefix)
67+
default:
6168
KVDb, err = NewBoltStore(boltDirPath)
6269
}
70+
6371
if err != nil {
6472
panic(err)
6573
}

cmd/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ func main() {
3737
&cli.StringFlag{Name: "s3_endpoint", Value: "", Usage: "s3 bucket endpoint", EnvVars: []string{"S3_ENDPOINT"}},
3838
&cli.BoolFlag{Name: "use_4ever", Value: false, Usage: "run with 4everland s3 service", EnvVars: []string{"USE_4EVER"}},
3939

40+
&cli.BoolFlag{Name: "use_aliyun", Value: false, Usage: "run with aliyun oss store", EnvVars: []string{"USE_ALIYUN"}},
41+
&cli.StringFlag{Name: "aliyun_endpoint", Value: "oss-cn-shanghai.aliyuncs.com", Usage: "aliyun oss endpoint", EnvVars: []string{"ALIYUN_ENDPOINT"}},
42+
&cli.StringFlag{Name: "aliyun_acc_key", Value: "your oss access key", Usage: "aliyun oss access key", EnvVars: []string{"ALIYUN_ACC_KEY"}},
43+
&cli.StringFlag{Name: "aliyun_secret_key", Value: "your oss secret key", Usage: "aliyun oss secret key", EnvVars: []string{"ALIYUN_SECRET_KEY"}},
44+
&cli.StringFlag{Name: "aliyun_prefix", Value: "arseed", Usage: "aliyun oss bucket name prefix", EnvVars: []string{"ALIYUN_PREFIX"}},
45+
4046
&cli.StringFlag{Name: "port", Value: ":8080", EnvVars: []string{"PORT"}},
4147
&cli.StringFlag{Name: "tags", Value: `{"Community":"PermaDAO","Website":"permadao.com"}`, EnvVars: []string{"TAGS"}},
4248
},
@@ -70,7 +76,8 @@ func run(c *cli.Context) error {
7076
c.String("db_dir"), c.String("mysql"), c.String("sqlite_dir"), c.Bool("use_sqlite"),
7177
c.String("key_path"), c.String("ar_node"), c.String("pay"), c.Bool("no_fee"), c.Bool("manifest"),
7278
c.Bool("use_s3"), c.String("s3_acc_key"), c.String("s3_secret_key"), c.String("s3_prefix"), c.String("s3_region"), c.String("s3_endpoint"),
73-
c.Bool("use_4ever"), c.String("port"), customTags)
79+
c.Bool("use_4ever"), c.Bool("use_aliyun"), c.String("aliyun_endpoint"), c.String("aliyun_acc_key"), c.String("aliyun_secret_key"),
80+
c.String("aliyun_prefix"), c.String("port"), customTags)
7481
s.Run(c.String("port"), c.Int("bundle_interval"))
7582

7683
common.NewMetricServer()

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/everFinance/arseeding
33
go 1.17
44

55
require (
6+
github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible
67
github.com/aws/aws-sdk-go v1.27.0
78
github.com/everFinance/everpay-go v0.0.2
89
github.com/everFinance/goar v1.4.7
@@ -75,6 +76,7 @@ require (
7576
golang.org/x/net v0.2.0 // indirect
7677
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
7778
golang.org/x/sys v0.2.0 // indirect
79+
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
7880
google.golang.org/protobuf v1.26.0 // indirect
7981
gopkg.in/yaml.v2 v2.4.0 // indirect
8082
gopkg.in/yaml.v3 v3.0.1 // indirect

rawdb/aliyun.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package rawdb
2+
3+
import (
4+
"bytes"
5+
"github.com/aliyun/aliyun-oss-go-sdk/oss"
6+
"github.com/everFinance/arseeding/schema"
7+
"io"
8+
)
9+
10+
// refer https://help.aliyun.com/document_detail/32157.html?spm=a2c4g.11186623.0.0.1a4b32bcxaC4kR
11+
const (
12+
ossErrorNoSuchKey = "NoSuchKey"
13+
)
14+
15+
type AliyunDB struct {
16+
bucketPrefix string
17+
client *oss.Client
18+
}
19+
20+
func NewAliyunDB(endpoint, accKey, accessKeySecret, bktPrefix string) (*AliyunDB, error) {
21+
client, err := oss.New(endpoint, accKey, accessKeySecret)
22+
if err != nil {
23+
return nil, err
24+
}
25+
26+
err = createAliyunBucket(client, bktPrefix)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
log.Info("run with aliyun oss success")
32+
33+
return &AliyunDB{
34+
bucketPrefix: bktPrefix,
35+
client: client,
36+
}, nil
37+
}
38+
39+
func (a *AliyunDB) Put(bucket, key string, value []byte) (err error) {
40+
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
41+
if err != nil {
42+
return err
43+
}
44+
45+
return bkt.PutObject(key, bytes.NewReader(value))
46+
}
47+
48+
func (a *AliyunDB) Get(bucket, key string) (data []byte, err error) {
49+
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
50+
if err != nil {
51+
return
52+
}
53+
54+
body, err := bkt.GetObject(key)
55+
if err != nil {
56+
// handleOSSErr make file non-existent errors converted to schema.ErrNotFound
57+
return nil, handleOSSErr(err)
58+
}
59+
60+
defer func(body io.ReadCloser) {
61+
_ = body.Close()
62+
}(body)
63+
64+
data, err = io.ReadAll(body)
65+
return
66+
}
67+
68+
func (a *AliyunDB) GetAllKey(bucket string) (keys []string, err error) {
69+
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
70+
if err != nil {
71+
return
72+
}
73+
74+
keys = make([]string, 0)
75+
76+
startAfter := ""
77+
continueToken := ""
78+
var lsRes oss.ListObjectsResultV2
79+
80+
for {
81+
lsRes, err = bkt.ListObjectsV2(oss.StartAfter(startAfter), oss.ContinuationToken(continueToken))
82+
if err != nil {
83+
break
84+
}
85+
for _, object := range lsRes.Objects {
86+
keys = append(keys, object.Key)
87+
}
88+
if lsRes.IsTruncated {
89+
startAfter = lsRes.StartAfter
90+
continueToken = lsRes.NextContinuationToken
91+
} else {
92+
break
93+
}
94+
}
95+
96+
if len(keys) == 0 {
97+
err = schema.ErrNotExist
98+
}
99+
100+
return
101+
}
102+
103+
func (a *AliyunDB) Delete(bucket, key string) (err error) {
104+
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
105+
if err != nil {
106+
return
107+
}
108+
109+
return bkt.DeleteObject(key)
110+
}
111+
112+
func (a *AliyunDB) Close() (err error) {
113+
return
114+
}
115+
116+
func createAliyunBucket(svc *oss.Client, prefix string) error {
117+
bucketNames := []string{
118+
schema.ChunkBucket,
119+
schema.TxDataEndOffSetBucket,
120+
schema.TxMetaBucket,
121+
schema.ConstantsBucket,
122+
schema.TaskIdPendingPoolBucket,
123+
schema.TaskBucket,
124+
schema.BundleItemBinary,
125+
schema.BundleItemMeta,
126+
schema.BundleWaitParseArIdBucket,
127+
schema.BundleArIdToItemIdsBucket,
128+
}
129+
130+
ownBuckets, err := getBucketWithPrefix(svc, prefix)
131+
if err != nil {
132+
return err
133+
}
134+
135+
for _, bucketName := range bucketNames {
136+
s3Bkt := getS3Bucket(prefix, bucketName) // s3 bucket name only accept lower case
137+
if !ownBuckets[s3Bkt] {
138+
err := svc.CreateBucket(s3Bkt)
139+
if err != nil {
140+
return err
141+
}
142+
}
143+
}
144+
return nil
145+
}
146+
147+
func getBucketWithPrefix(svc *oss.Client, prefix string) (map[string]bool, error) {
148+
res := make(map[string]bool)
149+
150+
lsRes, err := svc.ListBuckets(oss.Prefix(prefix))
151+
if err != nil {
152+
return nil, err
153+
}
154+
155+
for _, bucket := range lsRes.Buckets {
156+
res[bucket.Name] = true
157+
}
158+
159+
return res, nil
160+
}
161+
162+
func handleOSSErr(ossErr error) (err error) {
163+
switch ossErr.(type) {
164+
case oss.ServiceError:
165+
if ossErr.(oss.ServiceError).Code == ossErrorNoSuchKey {
166+
err = schema.ErrNotExist
167+
}
168+
default:
169+
err = ossErr
170+
}
171+
172+
return
173+
}

store.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ func NewBoltStore(boltDirPath string) (*Store, error) {
3333
return &Store{KVDb: Db}, nil
3434
}
3535

36+
func NewAliyunStore(endpoint, accKey, secretKey, bucketPrefix string) (*Store, error) {
37+
Db, err := rawdb.NewAliyunDB(endpoint, accKey, secretKey, bucketPrefix)
38+
if err != nil {
39+
return nil, err
40+
}
41+
return &Store{
42+
KVDb: Db,
43+
}, nil
44+
}
45+
3646
func (s *Store) Close() error {
3747
return s.KVDb.Close()
3848
}

0 commit comments

Comments
 (0)