Skip to content

Commit

Permalink
abci: Add relaxed local client synchronization models (cometbft#1141)
Browse files Browse the repository at this point in the history
* proxy: Remove "unsynchronized" local client creator

Signed-off-by: Thane Thomson <[email protected]>

* proxy: Expand client creator interface

Expand the `ClientCreator` interface to allow the caller to explicitly
specify the "connection" whose client they are creating. This
potentially gives greater control over the concurrency model employed in
each type of connection.

Signed-off-by: Thane Thomson <[email protected]>

* abci/client: Clarify NewLocalClient description

Signed-off-by: Thane Thomson <[email protected]>

* proxy: Add connection-synchronized local client creator

Analogous to the old "unsynchronized" local client creator.

Signed-off-by: Thane Thomson <[email protected]>

* abci/client: Add unsynchronized local client

Signed-off-by: Thane Thomson <[email protected]>

* proxy: Add consensus-synchronized local client creator

Signed-off-by: Thane Thomson <[email protected]>

* proxy: Fix mock configuration in test

Signed-off-by: Thane Thomson <[email protected]>

* Add changelog entries

Signed-off-by: Thane Thomson <[email protected]>

* Remove changelog entry - no longer necessary

Signed-off-by: Thane Thomson <[email protected]>

* proxy: Add unsynchronized local client creator

Signed-off-by: Thane Thomson <[email protected]>

* changelog: Add entry for unsync local client creator

Signed-off-by: Thane Thomson <[email protected]>

* Update 1141-abci-unsync-proxy.md

Co-authored-by: Adi Seredinschi <[email protected]>

---------

Signed-off-by: Thane Thomson <[email protected]>
Co-authored-by: Adi Seredinschi <[email protected]>
  • Loading branch information
2 people authored and yihuang committed Oct 23, 2024
1 parent 53dcd9b commit 628b4a0
Show file tree
Hide file tree
Showing 14 changed files with 484 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[proxy]` Expand `ClientCreator` interface to allow
for per-"connection" control of client creation
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[abci/client]` Add consensus-synchronized local client creator,
which only imposes a mutex on the consensus "connection", leaving
the concurrency of all other "connections" up to the application
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
3 changes: 3 additions & 0 deletions .changelog/v0.38.0/improvements/1141-abci-unsync-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[abci/client]` Add fully unsynchronized local client creator, which
imposes no mutexes on the application, leaving all handling of concurrency up
to the application ([\#1141](https://github.com/cometbft/cometbft/pull/1141))
16 changes: 10 additions & 6 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ type localClient struct {

var _ Client = (*localClient)(nil)

// NewLocalClient creates a local client, which wraps the application interface that
// Tendermint as the client will call to the application as the server. The only
// difference, is that the local client has a global mutex which enforces serialization
// of all the ABCI calls from Tendermint to the Application.
// NewLocalClient creates a local client, which wraps the application interface
// that Comet as the client will call to the application as the server.
//
// Concurrency control in each client instance is enforced by way of a single
// mutex. If a mutex is not supplied (i.e. if mtx is nil), then one will be
// created.
func NewLocalClient(mtx *cmtsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(cmtsync.Mutex)
Expand Down Expand Up @@ -135,15 +137,17 @@ func (app *localClient) OfferSnapshot(ctx context.Context, req *types.RequestOff
}

func (app *localClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *localClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

Expand Down
136 changes: 136 additions & 0 deletions abci/client/unsync_local_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package abcicli

import (
"context"
"sync"

types "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/service"
)

type unsyncLocalClient struct {
service.BaseService

types.Application

mtx sync.Mutex
Callback
}

var _ Client = (*unsyncLocalClient)(nil)

// NewUnsyncLocalClient creates a local client, which wraps the application
// interface that Comet as the client will call to the application as the
// server.
//
// This differs from [NewLocalClient] in that it returns a client that only
// maintains a mutex over the callback used by CheckTxAsync and not over the
// application, leaving it up to the proxy to handle all concurrency. If the
// proxy does not impose any concurrency restrictions, it is then left up to
// the application to implement its own concurrency for the relevant group of
// calls.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}

func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
app.Callback = cb
app.mtx.Unlock()
}

func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

//-------------------------------------------------------

func (app *unsyncLocalClient) Error() error {
return nil
}

func (app *unsyncLocalClient) Flush(context.Context) error {
return nil
}

func (app *unsyncLocalClient) Echo(_ context.Context, msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}

func (app *unsyncLocalClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
return app.Application.Info(ctx, req)
}

func (app *unsyncLocalClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return app.Application.CheckTx(ctx, req)
}

func (app *unsyncLocalClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return app.Application.Query(ctx, req)
}

func (app *unsyncLocalClient) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx, req)
}

func (app *unsyncLocalClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
return app.Application.InitChain(ctx, req)
}

func (app *unsyncLocalClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
return app.Application.ListSnapshots(ctx, req)
}

func (app *unsyncLocalClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
return app.Application.OfferSnapshot(ctx, req)
}

func (app *unsyncLocalClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
return app.Application.ApplySnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
return app.Application.PrepareProposal(ctx, req)
}

func (app *unsyncLocalClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
return app.Application.ProcessProposal(ctx, req)
}

func (app *unsyncLocalClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) {
return app.Application.ExtendVote(ctx, req)
}

func (app *unsyncLocalClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
return app.Application.VerifyVoteExtension(ctx, req)
}

func (app *unsyncLocalClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return app.Application.FinalizeBlock(ctx, req)
}
2 changes: 1 addition & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newMockProxyApp(finalizeBlockResponse *abci.ResponseFinalizeBlock) proxy.Ap
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
finalizeBlockResponse: finalizeBlockResponse,
})
cli, _ := clientCreator.NewABCIClient()
cli, _ := clientCreator.NewABCIConsensusClient()
err := cli.Start()
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) {
}

func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*CListMempool, cleanupFunc) {
appConnMem, _ := cc.NewABCIClient()
appConnMem, _ := cc.NewABCIMempoolClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
err := appConnMem.Start()
if err != nil {
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestSerialReap(t *testing.T) {
mp, cleanup := newMempoolWithApp(cc)
defer cleanup()

appConnCon, _ := cc.NewABCIClient()
appConnCon, _ := cc.NewABCIConsensusClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err := appConnCon.Start()
require.Nil(t, err)
Expand Down Expand Up @@ -629,7 +629,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NoError(t, err)
assert.EqualValues(t, 10, mp.SizeBytes())

appConnCon, _ := cc.NewABCIClient()
appConnCon, _ := cc.NewABCIConsensusClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err = appConnCon.Start()
require.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions proxy/app_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestEcho(t *testing.T) {
})

// Start client
cli, err := clientCreator.NewABCIClient()
cli, err := clientCreator.NewABCIMempoolClient()
if err != nil {
t.Fatalf("Error creating ABCI client: %v", err.Error())
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func BenchmarkEcho(b *testing.B) {
})

// Start client
cli, err := clientCreator.NewABCIClient()
cli, err := clientCreator.NewABCIMempoolClient()
if err != nil {
b.Fatalf("Error creating ABCI client: %v", err.Error())
}
Expand Down
Loading

0 comments on commit 628b4a0

Please sign in to comment.