Skip to content

Commit 12d3420

Browse files
authored
Merge pull request #259 from filipecosta90/core.and.redis.improvements
Redis and DynamoDB further improvements/tuning parameters
2 parents 5f28bb1 + 1fd2b71 commit 12d3420

File tree

4 files changed

+56
-18
lines changed

4 files changed

+56
-18
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,9 @@ Common configurations:
330330
|dynamodb.ensure.clean.table|true|On load mode ensure that the table is clean at the begining. In case of true and if the table previously exists it will be deleted and recreated|
331331
|dynamodb.endpoint|""|Used endpoint for connection. If empty will use the default loaded configs|
332332
|dynamodb.region|""|Used region for connection ( should match endpoint ). If empty will use the default loaded configs|
333+
|dynamodb.consistent.reads|false|Reads on DynamoDB provide an eventually consistent read by default. If your benchmark/use-case requires a strongly consistent read, set this option to true|
334+
|dynamodb.delete.after.run.stage|false|Detele the database table after the run stage|
335+
333336

334337

335338
## TODO

db/dynamodb/db.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,23 @@ type dynamodbWrapper struct {
2525
primarykeyPtr *string
2626
readCapacityUnits int64
2727
writeCapacityUnits int64
28+
consistentRead bool
29+
deleteAfterRun bool
30+
command string
2831
}
2932

3033
func (r *dynamodbWrapper) Close() error {
31-
return nil
34+
var err error = nil
35+
if strings.Compare("run", r.command) == 0 {
36+
log.Printf("Ensuring that the table is deleted after the run stage...\n")
37+
if r.deleteAfterRun {
38+
err = r.deleteTable()
39+
if err != nil {
40+
log.Printf("Couldn't delete table after run. Here's why: %v\n", err)
41+
}
42+
}
43+
}
44+
return err
3245
}
3346

3447
func (r *dynamodbWrapper) InitThread(ctx context.Context, _ int, _ int) context.Context {
@@ -42,8 +55,9 @@ func (r *dynamodbWrapper) Read(ctx context.Context, table string, key string, fi
4255
data = make(map[string][]byte, len(fields))
4356

4457
response, err := r.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
45-
Key: r.GetKey(key),
46-
TableName: r.tablename,
58+
Key: r.GetKey(key),
59+
TableName: r.tablename,
60+
ConsistentRead: aws.Bool(r.consistentRead),
4761
})
4862
if err != nil {
4963
log.Printf("Couldn't get info about %v. Here's why: %v\n", key, err)
@@ -180,9 +194,11 @@ func (r dynamoDbCreator) Create(p *properties.Properties) (ycsb.DB, error) {
180194
rds.primarykeyPtr = aws.String(rds.primarykey)
181195
rds.readCapacityUnits = p.GetInt64(readCapacityUnitsFieldName, readCapacityUnitsFieldNameDefault)
182196
rds.writeCapacityUnits = p.GetInt64(writeCapacityUnitsFieldName, writeCapacityUnitsFieldNameDefault)
197+
rds.consistentRead = p.GetBool(consistentReadFieldName, consistentReadFieldNameDefault)
198+
rds.deleteAfterRun = p.GetBool(deleteTableAfterRunFieldName, deleteTableAfterRunFieldNameDefault)
183199
endpoint := p.GetString(endpointField, endpointFieldDefault)
184200
region := p.GetString(regionField, regionFieldDefault)
185-
command, _ := p.Get(prop.Command)
201+
rds.command, _ = p.Get(prop.Command)
186202
var err error = nil
187203
var cfg aws.Config
188204
if strings.Contains(endpoint, "localhost") && strings.Compare(region, "localhost") != 0 {
@@ -213,7 +229,7 @@ func (r dynamoDbCreator) Create(p *properties.Properties) (ycsb.DB, error) {
213229
rds.client = dynamodb.NewFromConfig(cfg)
214230
exists, err := rds.tableExists()
215231

216-
if strings.Compare("load", command) == 0 {
232+
if strings.Compare("load", rds.command) == 0 {
217233
if !exists {
218234
_, err = rds.createTable()
219235
} else {
@@ -265,6 +281,13 @@ const (
265281
endpointFieldDefault = ""
266282
regionField = "dynamodb.region"
267283
regionFieldDefault = ""
284+
// GetItem provides an eventually consistent read by default.
285+
// If your application requires a strongly consistent read, set ConsistentRead to true.
286+
// Although a strongly consistent read might take more time than an eventually consistent read, it always returns the last updated value.
287+
consistentReadFieldName = "dynamodb.consistent.reads"
288+
consistentReadFieldNameDefault = false
289+
deleteTableAfterRunFieldName = "dynamodb.delete.after.run.stage"
290+
deleteTableAfterRunFieldNameDefault = false
268291
)
269292

270293
func init() {

db/redis/db.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ const (
264264
redisReadTimeout = "redis.read_timeout"
265265
redisWriteTimeout = "redis.write_timeout"
266266
redisPoolSize = "redis.pool_size"
267+
redisPoolSizeDefault = 0
267268
redisMinIdleConns = "redis.min_idle_conns"
268269
redisMaxConnAge = "redis.max_conn_age"
269270
redisPoolTimeout = "redis.pool_timeout"
@@ -292,23 +293,28 @@ func parseTLS(p *properties.Properties) *tls.Config {
292293

293294
func getOptionsSingle(p *properties.Properties) *goredis.Options {
294295
opts := &goredis.Options{}
295-
opts.Network = p.GetString(redisNetwork, redisNetworkDefault)
296+
296297
opts.Addr = p.GetString(redisAddr, redisAddrDefault)
297-
opts.Password, _ = p.Get(redisPassword)
298298
opts.DB = p.GetInt(redisDB, 0)
299+
opts.Network = p.GetString(redisNetwork, redisNetworkDefault)
300+
opts.Password, _ = p.Get(redisPassword)
299301
opts.MaxRetries = p.GetInt(redisMaxRetries, 0)
300302
opts.MinRetryBackoff = p.GetDuration(redisMinRetryBackoff, time.Millisecond*8)
301303
opts.MaxRetryBackoff = p.GetDuration(redisMaxRetryBackoff, time.Millisecond*512)
302304
opts.DialTimeout = p.GetDuration(redisDialTimeout, time.Second*5)
303305
opts.ReadTimeout = p.GetDuration(redisReadTimeout, time.Second*3)
304306
opts.WriteTimeout = p.GetDuration(redisWriteTimeout, opts.ReadTimeout)
305-
opts.PoolSize = p.GetInt(redisPoolSize, 10)
307+
opts.PoolSize = p.GetInt(redisPoolSize, redisPoolSizeDefault)
308+
threadCount := p.MustGetInt("threadcount")
309+
if opts.PoolSize == 0 {
310+
opts.PoolSize = threadCount
311+
fmt.Println(fmt.Sprintf("Setting %s=%d (from <threadcount>) given you haven't specified a value.", redisPoolSize, opts.PoolSize))
312+
}
306313
opts.MinIdleConns = p.GetInt(redisMinIdleConns, 0)
307314
opts.MaxConnAge = p.GetDuration(redisMaxConnAge, 0)
308315
opts.PoolTimeout = p.GetDuration(redisPoolTimeout, time.Second+opts.ReadTimeout)
309316
opts.IdleTimeout = p.GetDuration(redisIdleTimeout, time.Minute*5)
310317
opts.IdleCheckFrequency = p.GetDuration(redisIdleCheckFreq, time.Minute)
311-
312318
opts.TLSConfig = parseTLS(p)
313319

314320
return opts
@@ -330,7 +336,12 @@ func getOptionsCluster(p *properties.Properties) *goredis.ClusterOptions {
330336
opts.DialTimeout = p.GetDuration(redisDialTimeout, time.Second*5)
331337
opts.ReadTimeout = p.GetDuration(redisReadTimeout, time.Second*3)
332338
opts.WriteTimeout = p.GetDuration(redisWriteTimeout, opts.ReadTimeout)
333-
opts.PoolSize = p.GetInt(redisPoolSize, 10)
339+
opts.PoolSize = p.GetInt(redisPoolSize, redisPoolSizeDefault)
340+
threadCount := p.MustGetInt("threadcount")
341+
if opts.PoolSize == 0 {
342+
opts.PoolSize = threadCount
343+
fmt.Println(fmt.Sprintf("Setting %s=%d (from <threadcount>) given you haven't specified a value.", redisPoolSize, opts.PoolSize))
344+
}
334345
opts.MinIdleConns = p.GetInt(redisMinIdleConns, 0)
335346
opts.MaxConnAge = p.GetDuration(redisMaxConnAge, 0)
336347
opts.PoolTimeout = p.GetDuration(redisPoolTimeout, time.Second+opts.ReadTimeout)

pkg/workload/core.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,7 @@ func (c *core) nextKeyNum(state *coreState) int64 {
408408
keyNum = c.transactionInsertKeySequence.Last() - c.keyChooser.Next(r)
409409
}
410410
} else {
411-
keyNum = math.MaxInt64
412-
for keyNum > c.transactionInsertKeySequence.Last() {
413-
keyNum = c.keyChooser.Next(r)
414-
}
411+
keyNum = c.keyChooser.Next(r)
415412
}
416413
return keyNum
417414
}
@@ -650,31 +647,35 @@ func (coreCreator) Create(p *properties.Properties) (ycsb.Workload, error) {
650647

651648
c.keySequence = generator.NewCounter(insertStart)
652649
c.operationChooser = createOperationGenerator(p)
650+
var keyrangeLowerBound int64 = insertStart
651+
var keyrangeUpperBound int64 = insertStart + insertCount - 1
653652

654653
c.transactionInsertKeySequence = generator.NewAcknowledgedCounter(c.recordCount)
655654
switch requestDistrib {
656655
case "uniform":
657-
c.keyChooser = generator.NewUniform(insertStart, insertStart+insertCount-1)
656+
c.keyChooser = generator.NewUniform(keyrangeLowerBound, keyrangeUpperBound)
658657
case "sequential":
659-
c.keyChooser = generator.NewSequential(insertStart, insertStart+insertCount-1)
658+
c.keyChooser = generator.NewSequential(keyrangeLowerBound, keyrangeUpperBound)
660659
case "zipfian":
661660
insertProportion := p.GetFloat64(prop.InsertProportion, prop.InsertProportionDefault)
662661
opCount := p.GetInt64(prop.OperationCount, 0)
663662
expectedNewKeys := int64(float64(opCount) * insertProportion * 2.0)
664-
c.keyChooser = generator.NewScrambledZipfian(insertStart, insertStart+insertCount+expectedNewKeys, generator.ZipfianConstant)
663+
keyrangeUpperBound = insertStart + insertCount + expectedNewKeys
664+
c.keyChooser = generator.NewScrambledZipfian(keyrangeLowerBound, keyrangeUpperBound, generator.ZipfianConstant)
665665
case "latest":
666666
c.keyChooser = generator.NewSkewedLatest(c.transactionInsertKeySequence)
667667
case "hotspot":
668668
hotsetFraction := p.GetFloat64(prop.HotspotDataFraction, prop.HotspotDataFractionDefault)
669669
hotopnFraction := p.GetFloat64(prop.HotspotOpnFraction, prop.HotspotOpnFractionDefault)
670-
c.keyChooser = generator.NewHotspot(insertStart, insertStart+insertCount-1, hotsetFraction, hotopnFraction)
670+
c.keyChooser = generator.NewHotspot(keyrangeLowerBound, keyrangeUpperBound, hotsetFraction, hotopnFraction)
671671
case "exponential":
672672
percentile := p.GetFloat64(prop.ExponentialPercentile, prop.ExponentialPercentileDefault)
673673
frac := p.GetFloat64(prop.ExponentialFrac, prop.ExponentialFracDefault)
674674
c.keyChooser = generator.NewExponential(percentile, float64(c.recordCount)*frac)
675675
default:
676676
util.Fatalf("unknown request distribution %s", requestDistrib)
677677
}
678+
fmt.Println(fmt.Sprintf("Using request distribution '%s' a keyrange of [%d %d]", requestDistrib, keyrangeLowerBound, keyrangeUpperBound))
678679

679680
c.fieldChooser = generator.NewUniform(0, c.fieldCount-1)
680681
switch scanLengthDistrib {

0 commit comments

Comments
 (0)