diff --git a/README.md b/README.md index fce553d..d7d1cd1 100644 --- a/README.md +++ b/README.md @@ -272,7 +272,13 @@ Note that transaction-level priority takes precedence over command-level priorit In a read-write transaction, you can add a tag following `BEGIN RW TAG `. spanner-cli adds the tag set in `BEGIN RW TAG` as a transaction tag. -The tag will also be used as request tags within the transaction. +The tag will also be used as the prefix of request tags within the transaction. + +spanner-cli generates request tags in the following format. + +`` ::= `__` + +Because the length of a request_tag is [limited to 50 characters](https://github.com/googleapis/googleapis/blob/150b4079b6441ea8b3d7f9a71d0be7bbacbb4e3a/google/spanner/v1/spanner.proto#L466-L476), request tags, especially SQL Text embedded at the end of a request tag, may be truncated. ``` # Read-write transaction @@ -281,11 +287,11 @@ The tag will also be used as request tags within the transaction. | BEGIN RW TAG tx1; | | | | SELECT val | -| FROM tab1 +-----request_tag = tx1 +| FROM tab1 +-----request_tag = tx1_1_SELECT val... | WHERE id = 1; | | | | UPDATE tab1 | -| SET val = 10 +-----request_tag = tx1 +| SET val = 10 +-----request_tag = tx1_2_UPDATE tab1... | WHERE id = 1; | | | | COMMIT; | @@ -293,7 +299,8 @@ The tag will also be used as request tags within the transaction. ``` In a read-only transaction, you can add a tag following `BEGIN RO TAG `. -Since read-only transaction doesn't support transaction tag, spanner-cli adds the tag set in `BEGIN RO TAG` as request tags. +Since read-only transaction doesn't support transaction tag, spanner-cli adds the tag set in `BEGIN RO TAG` as the prefix of request tags. + ``` # Read-only transaction # transaction_tag = N/A @@ -301,7 +308,7 @@ Since read-only transaction doesn't support transaction tag, spanner-cli adds th | BEGIN RO TAG tx2; | | | | SELECT SUM(val) | -| FROM tab1 +-----request_tag = tx2 +| FROM tab1 +-----request_tag = tx2_1_SELECT SUM(val)... | WHERE id = 1; | | | | CLOSE; | diff --git a/integration_test.go b/integration_test.go index 2202a17..f0a58b4 100644 --- a/integration_test.go +++ b/integration_test.go @@ -285,6 +285,10 @@ func TestReadWriteTransaction(t *testing.T) { IsMutation: true, }) + if session.tc.statementCounter != 2 { + t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter) + } + // commit stmt, err = BuildStatement("COMMIT") if err != nil { @@ -368,6 +372,10 @@ func TestReadWriteTransaction(t *testing.T) { IsMutation: true, }) + if session.tc.statementCounter != 2 { + t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter) + } + // rollback stmt, err = BuildStatement("ROLLBACK") if err != nil { @@ -485,6 +493,10 @@ func TestReadOnlyTransaction(t *testing.T) { IsMutation: false, }) + if session.tc.statementCounter != 2 { + t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter) + } + // close stmt, err = BuildStatement("CLOSE") if err != nil { @@ -556,6 +568,10 @@ func TestReadOnlyTransaction(t *testing.T) { IsMutation: false, }) + if session.tc.statementCounter != 2 { + t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter) + } + // close stmt, err = BuildStatement("CLOSE") if err != nil { diff --git a/session.go b/session.go index e5a4fb9..d1fab38 100644 --- a/session.go +++ b/session.go @@ -58,11 +58,12 @@ type Session struct { } type transactionContext struct { - tag string - priority pb.RequestOptions_Priority - sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction. - rwTxn *spanner.ReadWriteStmtBasedTransaction - roTxn *spanner.ReadOnlyTransaction + statementCounter int64 // Count current statement number in a transaction + tag string + priority pb.RequestOptions_Priority + sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction. + rwTxn *spanner.ReadWriteStmtBasedTransaction + roTxn *spanner.ReadOnlyTransaction } func NewSession(projectId string, instanceId string, databaseId string, priority pb.RequestOptions_Priority, opts ...option.ClientOption) (*Session, error) { @@ -129,9 +130,10 @@ func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority pb.Req return err } s.tc = &transactionContext{ - tag: tag, - priority: priority, - rwTxn: txn, + statementCounter: 1, // Reset counter + tag: tag, + priority: priority, + rwTxn: txn, } return nil } @@ -195,9 +197,10 @@ func (s *Session) BeginReadOnlyTransaction(ctx context.Context, typ timestampBou } s.tc = &transactionContext{ - tag: tag, - priority: priority, - roTxn: txn, + statementCounter: 1, // Reset counter + tag: tag, + priority: priority, + roTxn: txn, } return txn.Timestamp() @@ -255,13 +258,13 @@ func (s *Session) RunAnalyzeQuery(ctx context.Context, stmt spanner.Statement) ( func (s *Session) runQueryWithOptions(ctx context.Context, stmt spanner.Statement, opts spanner.QueryOptions) (*spanner.RowIterator, *spanner.ReadOnlyTransaction) { if s.InReadWriteTransaction() { - opts.RequestTag = s.tc.tag + opts.RequestTag = generateRequestTag(s.tc, stmt) iter := s.tc.rwTxn.QueryWithOptions(ctx, stmt, opts) s.tc.sendHeartbeat = true return iter, nil } if s.InReadOnlyTransaction() { - opts.RequestTag = s.tc.tag + opts.RequestTag = generateRequestTag(s.tc, stmt) return s.tc.roTxn.QueryWithOptions(ctx, stmt, opts), s.tc.roTxn } @@ -278,7 +281,7 @@ func (s *Session) RunUpdate(ctx context.Context, stmt spanner.Statement) (int64, opts := spanner.QueryOptions{ Priority: s.currentPriority(), - RequestTag: s.tc.tag, + RequestTag: generateRequestTag(s.tc, stmt), } rowCount, err := s.tc.rwTxn.UpdateWithOptions(ctx, stmt, opts) s.tc.sendHeartbeat = true @@ -374,3 +377,7 @@ func heartbeat(txn *spanner.ReadWriteStmtBasedTransaction, priority pb.RequestOp _, err := iter.Next() return err } + +func generateRequestTag(tc *transactionContext, stmt spanner.Statement) string { + return tc.tag + "_" + fmt.Sprintf("%d", tc.statementCounter) + "_" + stmt.SQL +} diff --git a/statement.go b/statement.go index 2df98d3..17b7841 100644 --- a/statement.go +++ b/statement.go @@ -225,6 +225,11 @@ func (s *SelectStatement) Execute(ctx context.Context, session *Session) (*Resul } return nil, err } + + if session.InReadWriteTransaction() || session.InReadOnlyTransaction() { + session.tc.statementCounter++ + } + result := &Result{ ColumnNames: columnNames, Rows: rows, @@ -749,6 +754,7 @@ func (s *DmlStatement) Execute(ctx context.Context, session *Session) (*Result, rollback.Execute(ctx, session) return nil, fmt.Errorf("transaction was aborted: %v", err) } + session.tc.statementCounter++ } else { // Start implicit transaction. begin := BeginRwStatement{}