Skip to content

Commit

Permalink
feat: PRT-get-best-endpoint-csm (lavanet#1396)
Browse files Browse the repository at this point in the history
* finally fetching all endpoints and also sorting them by best latency so later we get best endpoint first

* adding sort mechanism

* changing func name

* changing func name

* adding testServer that responds to Probing, with a builtin Delay. also adding another test to test the Endpoint sorting mechanism.

* renaming lint

* improving swap skills

* I wonder if anyone will ever see this commit message.

* changing error for not implemented

* adding error protection
  • Loading branch information
ranlavanet authored May 2, 2024
1 parent 7e399a3 commit 41b3b3a
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 95 deletions.
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
func isGrpcServerUp(url string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
conn, err := lavasession.ConnectgRPCClient(context.Background(), url, true)
conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true)
if err != nil {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func IsSessionSyncLoss(err error) bool {
return code == codes.Code(SessionOutOfSyncError.ABCICode())
}

func ConnectgRPCClient(ctx context.Context, address string, allowInsecure bool) (*grpc.ClientConn, error) {
func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool) (*grpc.ClientConn, error) {
var tlsConf tls.Config
if allowInsecure {
tlsConf.InsecureSkipVerify = true // this will allow us to use self signed certificates in development.
Expand Down
101 changes: 63 additions & 38 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lavasession

import (
"context"
"fmt"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -196,52 +197,73 @@ func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingLi

// this code needs to be thread safe
func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSessionsWithProvider *ConsumerSessionsWithProvider, epoch uint64, tryReconnectToDisabledEndpoints bool) (latency time.Duration, providerAddress string, err error) {
// TODO: fetch all endpoints not just one
connected, endpoint, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, tryReconnectToDisabledEndpoints)
connected, endpoints, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, tryReconnectToDisabledEndpoints, true)
if err != nil || !connected {
if AllProviderEndpointsDisabledError.Is(err) {
csm.blockProvider(providerAddress, true, epoch, MaxConsecutiveConnectionAttempts, 0, csm.GenerateReconnectCallback(consumerSessionsWithProvider)) // reporting and blocking provider this epoch
}
return 0, providerAddress, err
}

relaySentTime := time.Now()
connectCtx, cancel := context.WithTimeout(ctx, common.AverageWorldLatency)
defer cancel()
guid, found := utils.GetUniqueIdentifier(connectCtx)
if !found {
return 0, providerAddress, utils.LavaFormatError("probeProvider failed fetching unique identifier from context when it's set", nil)
}
if endpoint.Client == nil {
consumerSessionsWithProvider.Lock.Lock()
defer consumerSessionsWithProvider.Lock.Unlock()
return 0, providerAddress, utils.LavaFormatError("returned nil client in endpoint", nil, utils.Attribute{Key: "consumerSessionWithProvider", Value: consumerSessionsWithProvider})
}
client := *endpoint.Client
probeReq := &pairingtypes.ProbeRequest{
Guid: guid,
SpecId: csm.rpcEndpoint.ChainID,
ApiInterface: csm.rpcEndpoint.ApiInterface,
}
var trailer metadata.MD
probeResp, err := client.Probe(connectCtx, probeReq, grpc.Trailer(&trailer))
versions := trailer.Get(common.VersionMetadataKey)
relayLatency := time.Since(relaySentTime)
if err != nil {
return 0, providerAddress, utils.LavaFormatError("probe call error", err, utils.Attribute{Key: "provider", Value: providerAddress})
}
providerGuid := probeResp.GetGuid()
if providerGuid != guid {
return 0, providerAddress, utils.LavaFormatWarning("mismatch probe response", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "provider Guid", Value: providerGuid}, utils.Attribute{Key: "sent guid", Value: guid})
}
if probeResp.LatestBlock == 0 {
return 0, providerAddress, utils.LavaFormatWarning("provider returned 0 latest block", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "sent guid", Value: guid})
var endpointInfos []EndpointInfo
lastError := fmt.Errorf("endpoints list is empty") // this error will happen if we had 0 endpoints
for _, endpoint := range endpoints {
err := func() error {
connectCtx, cancel := context.WithTimeout(ctx, common.AverageWorldLatency)
defer cancel()
guid, found := utils.GetUniqueIdentifier(connectCtx)
if !found {
return utils.LavaFormatError("probeProvider failed fetching unique identifier from context when it's set", nil)
}
if endpoint.Client == nil {
consumerSessionsWithProvider.Lock.Lock()
defer consumerSessionsWithProvider.Lock.Unlock()
return utils.LavaFormatError("returned nil client in endpoint", nil, utils.Attribute{Key: "consumerSessionWithProvider", Value: consumerSessionsWithProvider})
}
client := *endpoint.Client
probeReq := &pairingtypes.ProbeRequest{
Guid: guid,
SpecId: csm.rpcEndpoint.ChainID,
ApiInterface: csm.rpcEndpoint.ApiInterface,
}
var trailer metadata.MD
relaySentTime := time.Now()
probeResp, err := client.Probe(connectCtx, probeReq, grpc.Trailer(&trailer))
relayLatency := time.Since(relaySentTime)
versions := trailer.Get(common.VersionMetadataKey)
if err != nil {
return utils.LavaFormatError("probe call error", err, utils.Attribute{Key: "provider", Value: providerAddress})
}
providerGuid := probeResp.GetGuid()
if providerGuid != guid {
return utils.LavaFormatWarning("mismatch probe response", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "provider Guid", Value: providerGuid}, utils.Attribute{Key: "sent guid", Value: guid})
}
if probeResp.LatestBlock == 0 {
return utils.LavaFormatWarning("provider returned 0 latest block", nil, utils.Attribute{Key: "provider", Value: providerAddress}, utils.Attribute{Key: "sent guid", Value: guid})
}

endpointInfos = append(endpointInfos, EndpointInfo{
Latency: relayLatency,
Endpoint: endpoint,
})
// public lava address is a value that is not changing, so it's thread safe
if DebugProbes {
utils.LavaFormatDebug("Probed provider successfully", utils.Attribute{Key: "latency", Value: relayLatency}, utils.Attribute{Key: "provider", Value: consumerSessionsWithProvider.PublicLavaAddress}, utils.LogAttr("version", strings.Join(versions, ",")))
}
return nil
}()
if err != nil {
lastError = err
}
}
// public lava address is a value that is not changing, so it's thread safe
if DebugProbes {
utils.LavaFormatDebug("Probed provider successfully", utils.Attribute{Key: "latency", Value: relayLatency}, utils.Attribute{Key: "provider", Value: consumerSessionsWithProvider.PublicLavaAddress}, utils.LogAttr("version", strings.Join(versions, ",")))

if len(endpointInfos) == 0 {
// no endpoints.
return 0, providerAddress, lastError
}
return relayLatency, providerAddress, nil
sort.Sort(EndpointInfoList(endpointInfos))
consumerSessionsWithProvider.sortEndpointsByLatency(endpointInfos)
return endpointInfos[0].Latency, providerAddress, nil
}

// csm needs to be locked here
Expand Down Expand Up @@ -375,7 +397,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessionEpoch := sessionWithProvider.CurrentEpoch

// Get a valid Endpoint from the provider chosen
connected, endpoint, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false)
connected, endpoints, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false, false)
if err != nil {
// verify err is AllProviderEndpointsDisabled and report.
if AllProviderEndpointsDisabledError.Is(err) {
Expand All @@ -397,6 +419,9 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
continue
}

// get the endpoint we got, as its the only one returned when asking fetchEndpointConnectionFromConsumerSessionWithProvider with false value
endpoint := endpoints[0]

// we get the reported providers here after we try to connect, so if any provider didn't respond he will already be added to the list.
reportedProviders := csm.GetReportedProviders(sessionEpoch)

Expand Down
175 changes: 128 additions & 47 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,63 +40,167 @@ const (
maxCuForVirtualEpoch = uint64(200)
)

type testServer struct {
delay time.Duration
}

func (rpcps *testServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) {
utils.LavaFormatDebug("Debug probe called")
probeReply := &pairingtypes.ProbeReply{
Guid: probeReq.GetGuid(),
LatestBlock: 1,
FinalizedBlocksHashes: []byte{},
LavaEpoch: 1,
LavaLatestBlock: 1,
}
time.Sleep(rpcps.delay)
return probeReply, nil
}

func (rpcps *testServer) Relay(context.Context, *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
return nil, utils.LavaFormatError("not Implemented", nil)
}

func (rpcps *testServer) RelaySubscribe(*pairingtypes.RelayRequest, pairingtypes.Relayer_RelaySubscribeServer) error {
return utils.LavaFormatError("not implemented", nil)
}

// Test the basic functionality of the consumerSessionManager
func TestHappyFlow(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
require.Equal(t, cs.Session.RelayNum, relayNumberAfterFirstCall)
require.Equal(t, cs.Session.LatestBlock, servicedBlockNumber)
}
}

func getDelayedAddress() string {
delayedServerAddress := "127.0.0.1:3335"
// because grpcListener is random we might have overlap. in that case just change the port.
if grpcListener == delayedServerAddress {
delayedServerAddress = "127.0.0.1:3336"
}
return delayedServerAddress
}

func TestEndpointSortingFlow(t *testing.T) {
delayedAddress := getDelayedAddress()
err := createGRPCServer(delayedAddress, time.Millisecond)
csp := &ConsumerSessionsWithProvider{}
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, delayedAddress)
if err != nil {
utils.LavaFormatDebug("waiting for grpc server to launch")
continue
}
cancel()
break
}
require.NoError(t, err)
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
pairingList[0].Endpoints = append(pairingList[0].Endpoints, &Endpoint{NetworkAddress: delayedAddress, Enabled: true, Client: nil, ConnectionRefusals: 0})
// swap locations so that the endpoint of the delayed will be first
pairingList[0].Endpoints[0], pairingList[0].Endpoints[1] = pairingList[0].Endpoints[1], pairingList[0].Endpoints[0]

// update the pairing and wait for the routine to send all requests
err = csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)

_, ok := csm.pairing[pairingList[0].PublicLavaAddress]
require.True(t, ok)

// because probing is in a routine we need to wait for the sorting and probing to end asynchronously
swapped := false
for i := 0; i < 10; i++ {
if pairingList[0].Endpoints[0].NetworkAddress == grpcListener {
fmt.Println("Endpoints Are Sorted!", i)
swapped = true
break
}
time.Sleep(1 * time.Second)
fmt.Println("Endpoints did not swap yet, attempt:", i)
}
require.True(t, swapped)
// after creating all the sessions
}

// This variable will hold grpc server address
var grpcListener = "localhost:0"

func CreateConsumerSessionManager() *ConsumerSessionManager {
AllowInsecureConnectionToProviders = true // set to allow insecure for tests purposes
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil)
}

var grpcServer *grpc.Server

func TestMain(m *testing.M) {
serverStarted := make(chan struct{})

go func() {
err := createGRPCServer(serverStarted)
AllowInsecureConnectionToProviders = true
err := createGRPCServer("", time.Microsecond)
if err != nil {
fmt.Println("Failed create server", err)
os.Exit(-1)
}
csp := &ConsumerSessionsWithProvider{}
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, grpcListener)
if err != nil {
fmt.Printf("Failed to start server: %v\n", err)
os.Exit(1)
utils.LavaFormatDebug("waiting for grpc server to launch")
continue
}
}()

// Wait for server to start
<-serverStarted
cancel()
break
}

// Start running tests.
code := m.Run()

os.Exit(code)
}

func createGRPCServer(serverStarted chan struct{}) error {
if grpcServer != nil {
close(serverStarted)
return nil
func createGRPCServer(changeListener string, probeDelay time.Duration) error {
listenAddress := grpcListener
if changeListener != "" {
listenAddress = changeListener
}
lis, err := net.Listen("tcp", grpcListener)
lis, err := net.Listen("tcp", listenAddress)
if err != nil {
return err
}

// Update the grpcListener with the actual address
grpcListener = lis.Addr().String()
if changeListener == "" {
grpcListener = lis.Addr().String()
}

// Create a new server with insecure credentials
tlsConfig := GetTlsConfig(NetworkAddressData{})
s := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))

s2 := &testServer{delay: probeDelay}
pairingtypes.RegisterRelayerServer(s, s2)

go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
os.Exit(-1)
}
}()

grpcServer = s
close(serverStarted) // Signal that the server has started
return nil
}

Expand Down Expand Up @@ -134,29 +238,6 @@ func createPairingList(providerPrefixAddress string, enabled bool) map[uint64]*C
return cswpList
}

// Test the basic functionality of the consumerSessionManager
func TestHappyFlow(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
require.Equal(t, cs.Session.RelayNum, relayNumberAfterFirstCall)
require.Equal(t, cs.Session.LatestBlock, servicedBlockNumber)
}
}

func TestNoPairingAvailableFlow(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
Expand Down Expand Up @@ -718,7 +799,7 @@ func TestContext(t *testing.T) {

func TestGrpcClientHang(t *testing.T) {
ctx := context.Background()
conn, err := ConnectgRPCClient(ctx, grpcListener, true)
conn, err := ConnectGRPCClient(ctx, grpcListener, true)
require.NoError(t, err)
client := pairingtypes.NewRelayerClient(conn)
err = conn.Close()
Expand Down
Loading

0 comments on commit 41b3b3a

Please sign in to comment.