Skip to content

Commit a3676ba

Browse files
authored
Merge pull request #6630 from The-K-R-O-K/illia-malachyn/6617-new-ws-handler
[Access] Add new websocket handler and skeleton for its deps
2 parents c5bde97 + c38f6ce commit a3676ba

32 files changed

+737
-43
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,10 @@ generate-mocks: install-mock-generators
203203
mockery --name 'API' --dir="./engine/protocol" --case=underscore --output="./engine/protocol/mock" --outpkg="mock"
204204
mockery --name '.*' --dir="./engine/access/state_stream" --case=underscore --output="./engine/access/state_stream/mock" --outpkg="mock"
205205
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
206+
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_provider" --case=underscore --output="./engine/access/rest/websockets/data_provider/mock" --outpkg="mock"
206207
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
207208
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
208209
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"
209-
210210
mockery --name '.*' --dir=model/fingerprint --case=underscore --output="./model/fingerprint/mock" --outpkg="mock"
211211
mockery --name 'ExecForkActor' --structname 'ExecForkActorMock' --dir=module/mempool/consensus/mock/ --case=underscore --output="./module/mempool/consensus/mock/" --outpkg="mock"
212212
mockery --name '.*' --dir=engine/verification/fetcher/ --case=underscore --output="./engine/verification/fetcher/mock" --outpkg="mockfetcher"

cmd/observer/node_builder/observer_builder.go

+3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy"
4646
commonrest "github.com/onflow/flow-go/engine/access/rest/common"
4747
"github.com/onflow/flow-go/engine/access/rest/router"
48+
"github.com/onflow/flow-go/engine/access/rest/websockets"
4849
"github.com/onflow/flow-go/engine/access/rpc"
4950
"github.com/onflow/flow-go/engine/access/rpc/backend"
5051
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
@@ -168,6 +169,7 @@ type ObserverServiceConfig struct {
168169
registerCacheSize uint
169170
programCacheSize uint
170171
registerDBPruneThreshold uint64
172+
websocketConfig websockets.Config
171173
}
172174

173175
// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
@@ -252,6 +254,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
252254
registerCacheSize: 0,
253255
programCacheSize: 0,
254256
registerDBPruneThreshold: pruner.DefaultThreshold,
257+
websocketConfig: websockets.NewDefaultWebsocketConfig(),
255258
}
256259
}
257260

cmd/util/cmd/run-script/cmd.go

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/onflow/flow-go/cmd/util/ledger/util"
1717
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
1818
"github.com/onflow/flow-go/engine/access/rest"
19+
"github.com/onflow/flow-go/engine/access/rest/websockets"
1920
"github.com/onflow/flow-go/engine/access/state_stream/backend"
2021
"github.com/onflow/flow-go/engine/access/subscription"
2122
"github.com/onflow/flow-go/engine/execution/computation"
@@ -169,6 +170,7 @@ func run(*cobra.Command, []string) {
169170
metrics.NewNoopCollector(),
170171
nil,
171172
backend.Config{},
173+
websockets.NewDefaultWebsocketConfig(),
172174
)
173175
if err != nil {
174176
log.Fatal().Err(err).Msg("failed to create server")

engine/access/handle_irrecoverable_state_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
accessmock "github.com/onflow/flow-go/engine/access/mock"
2424
"github.com/onflow/flow-go/engine/access/rest"
2525
"github.com/onflow/flow-go/engine/access/rest/router"
26+
"github.com/onflow/flow-go/engine/access/rest/websockets"
2627
"github.com/onflow/flow-go/engine/access/rpc"
2728
"github.com/onflow/flow-go/engine/access/rpc/backend"
2829
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
@@ -109,6 +110,7 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
109110
RestConfig: rest.Config{
110111
ListenAddress: unittest.DefaultAddress,
111112
},
113+
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
112114
}
113115

114116
// generate a server certificate that will be served by the GRPC server

engine/access/integration_unsecure_grpc_server_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/onflow/flow-go/engine"
2222
"github.com/onflow/flow-go/engine/access/index"
2323
accessmock "github.com/onflow/flow-go/engine/access/mock"
24+
"github.com/onflow/flow-go/engine/access/rest/websockets"
2425
"github.com/onflow/flow-go/engine/access/rpc"
2526
"github.com/onflow/flow-go/engine/access/rpc/backend"
2627
"github.com/onflow/flow-go/engine/access/state_stream"
@@ -138,6 +139,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
138139
UnsecureGRPCListenAddr: unittest.DefaultAddress,
139140
SecureGRPCListenAddr: unittest.DefaultAddress,
140141
HTTPListenAddr: unittest.DefaultAddress,
142+
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
141143
}
142144

143145
blockCount := 5

engine/access/rest/router/router.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package router
22

33
import (
44
"fmt"
5+
"net/http"
56
"regexp"
67
"strings"
78

@@ -10,8 +11,9 @@ import (
1011

1112
"github.com/onflow/flow-go/access"
1213
"github.com/onflow/flow-go/engine/access/rest/common/middleware"
13-
"github.com/onflow/flow-go/engine/access/rest/http"
14+
flowhttp "github.com/onflow/flow-go/engine/access/rest/http"
1415
"github.com/onflow/flow-go/engine/access/rest/http/models"
16+
"github.com/onflow/flow-go/engine/access/rest/websockets"
1517
legacyws "github.com/onflow/flow-go/engine/access/rest/websockets/legacy"
1618
"github.com/onflow/flow-go/engine/access/state_stream"
1719
"github.com/onflow/flow-go/engine/access/state_stream/backend"
@@ -54,7 +56,7 @@ func (b *RouterBuilder) AddRestRoutes(
5456
) *RouterBuilder {
5557
linkGenerator := models.NewLinkGeneratorImpl(b.v1SubRouter)
5658
for _, r := range Routes {
57-
h := http.NewHandler(b.logger, backend, r.Handler, linkGenerator, chain, maxRequestSize)
59+
h := flowhttp.NewHandler(b.logger, backend, r.Handler, linkGenerator, chain, maxRequestSize)
5860
b.v1SubRouter.
5961
Methods(r.Method).
6062
Path(r.Pattern).
@@ -64,8 +66,8 @@ func (b *RouterBuilder) AddRestRoutes(
6466
return b
6567
}
6668

67-
// AddWsLegacyRoutes adds WebSocket routes to the router.
68-
func (b *RouterBuilder) AddWsLegacyRoutes(
69+
// AddLegacyWebsocketsRoutes adds WebSocket routes to the router.
70+
func (b *RouterBuilder) AddLegacyWebsocketsRoutes(
6971
stateStreamApi state_stream.API,
7072
chain flow.Chain,
7173
stateStreamConfig backend.Config,
@@ -84,6 +86,23 @@ func (b *RouterBuilder) AddWsLegacyRoutes(
8486
return b
8587
}
8688

89+
func (b *RouterBuilder) AddWebsocketsRoute(
90+
chain flow.Chain,
91+
config websockets.Config,
92+
streamApi state_stream.API,
93+
streamConfig backend.Config,
94+
maxRequestSize int64,
95+
) *RouterBuilder {
96+
handler := websockets.NewWebSocketHandler(b.logger, config, chain, streamApi, streamConfig, maxRequestSize)
97+
b.v1SubRouter.
98+
Methods(http.MethodGet).
99+
Path("/ws").
100+
Name("ws").
101+
Handler(handler)
102+
103+
return b
104+
}
105+
87106
func (b *RouterBuilder) Build() *mux.Router {
88107
return b.router
89108
}

engine/access/rest/router/router_test_helpers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func ExecuteRequest(req *http.Request, backend access.API) *httptest.ResponseRec
135135
return rr
136136
}
137137

138-
func ExecuteWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *TestHijackResponseRecorder, chain flow.Chain) {
138+
func ExecuteLegacyWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *TestHijackResponseRecorder, chain flow.Chain) {
139139
restCollector := metrics.NewNoopCollector()
140140

141141
config := backend.Config{
@@ -147,7 +147,7 @@ func ExecuteWsRequest(req *http.Request, stateStreamApi state_stream.API, respon
147147
router := NewRouterBuilder(
148148
unittest.Logger(),
149149
restCollector,
150-
).AddWsLegacyRoutes(
150+
).AddLegacyWebsocketsRoutes(
151151
stateStreamApi,
152152
chain, config, common.DefaultMaxRequestSize,
153153
).Build()

engine/access/rest/server.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/onflow/flow-go/access"
1111
"github.com/onflow/flow-go/engine/access/rest/router"
12+
"github.com/onflow/flow-go/engine/access/rest/websockets"
1213
"github.com/onflow/flow-go/engine/access/state_stream"
1314
"github.com/onflow/flow-go/engine/access/state_stream/backend"
1415
"github.com/onflow/flow-go/model/flow"
@@ -42,12 +43,15 @@ func NewServer(serverAPI access.API,
4243
restCollector module.RestMetrics,
4344
stateStreamApi state_stream.API,
4445
stateStreamConfig backend.Config,
46+
wsConfig websockets.Config,
4547
) (*http.Server, error) {
4648
builder := router.NewRouterBuilder(logger, restCollector).AddRestRoutes(serverAPI, chain, config.MaxRequestSize)
4749
if stateStreamApi != nil {
48-
builder.AddWsLegacyRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
50+
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
4951
}
5052

53+
builder.AddWebsocketsRoute(chain, wsConfig, stateStreamApi, stateStreamConfig, config.MaxRequestSize)
54+
5155
c := cors.New(cors.Options{
5256
AllowedOrigins: []string{"*"},
5357
AllowedHeaders: []string{"*"},
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package websockets
2+
3+
import (
4+
"time"
5+
)
6+
7+
type Config struct {
8+
MaxSubscriptionsPerConnection uint64
9+
MaxResponsesPerSecond uint64
10+
SendMessageTimeout time.Duration
11+
MaxRequestSize int64
12+
}
13+
14+
func NewDefaultWebsocketConfig() Config {
15+
return Config{
16+
MaxSubscriptionsPerConnection: 1000,
17+
MaxResponsesPerSecond: 1000,
18+
SendMessageTimeout: 10 * time.Second,
19+
MaxRequestSize: 1024,
20+
}
21+
}

0 commit comments

Comments
 (0)