Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepvinayak committed Jan 22, 2025
1 parent c95f790 commit c8a11bd
Show file tree
Hide file tree
Showing 9 changed files with 5,438 additions and 32 deletions.
6 changes: 5 additions & 1 deletion docstore/awsdynamodb/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, o
go func() { defer close(ch2); c.transactWrite(ctx, writesTx, errs, opts) }()
c.runGets(ctx, gets, errs, opts)
<-ch
<-ch2
c.runGets(ctx, afterGets, errs, opts)
return driver.NewActionListError(errs)
}
Expand Down Expand Up @@ -615,10 +616,12 @@ func revisionPrecondition(doc driver.Document, revField string) (*expression.Con
return &cb, nil
}

// transactWrite execute the write actions in an atomic manner, either they all succeed or they all fail together.
func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions) {
if len(actions) == 0 {
return
}
// all actions will fail atomically even if a single action fails
setErr := func(err error) {
for _, a := range actions {
errs[a.Index] = err
Expand All @@ -630,7 +633,8 @@ func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action
for _, w := range actions {
op, err := c.newWriteOp(w, opts)
if err != nil {
errs[w.Index] = err
setErr(err)
return
} else {
ops = append(ops, op)
tws = append(tws, op.writeItem)
Expand Down
Loading

0 comments on commit c8a11bd

Please sign in to comment.