Skip to content

Commit 3ab7119

Browse files
authored
feat: Introduce MultiIsolateRelicServer (#216)
- Implements multi-isolate support for `RelicServer` to enable concurrent request handling across multiple CPU cores. - Refactors `RelicServer` into a sealed interface with the existing implementation moved to `_RelicServer`. - Adds `IsolatedObject<T>`, a generic wrapper that spawns an object in a separate isolate and communicates via message passing. - Introduces `_IsolatedRelicServer` and `_MultiIsolateRelicServer` implementations of `RelicServer`. - Adds `noOfIsolates` optional named parameter to `RelicServer` constructor (which is now a factory, delegating to either _RelicServer or _MultiIsolateRelicServer), and `RelicApp.run`.
1 parent 75c4d55 commit 3ab7119

17 files changed

+698
-79
lines changed

doc/history.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ typedef Handler = FutureOr<HandledContext> Function(NewContext ctx);
3232
```
3333
This seems more complex, why the added complexity? The problem is that not everything follows the request response model. Today we have web-sockets, server-side events, etc. that hijacks the underlying socket of a request, and hands lifetime handling to the developer.
3434

35-
Shelf does support hijacking, but it does so by using exceptions for control flow. It will throw an exception `HijackException` that is only supposed to be caught and handled by a Shelf adaptor.
35+
Shelf does support hijacking, but it does so by using exceptions for control flow. It will throw an exception `HijackException` that is only supposed to be caught and handled by a Shelf adapter.
3636

3737
As we consider using exceptions for control flow an anti-pattern, we have opted for introducing a `RequestContext` and an associated state machine instead.
3838

example/basic/as_handle.dart

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ Future<void> main() async {
88
final router = RelicRouter()
99
..use('/', logRequests()) // log all request from / and down
1010
..any(
11-
'/**',
12-
respondWith(
13-
(final request) => Response.ok(
14-
body: Body.fromString('Hello, Relic!'),
15-
),
16-
));
11+
'/**',
12+
respondWith(
13+
(final request) => Response.ok(body: Body.fromString('Hello, Relic!')),
14+
),
15+
);
1716

1817
// Start a server that forward request to the handler
19-
final adapter = await IOAdapter.bind(InternetAddress.anyIPv4, port: 8080);
20-
final server = RelicServer(adapter);
18+
final server = RelicServer(
19+
() => IOAdapter.bind(InternetAddress.anyIPv4, port: 8080),
20+
);
2121
await server.mountAndStart(router.asHandler); // use asHandler to
2222
}

example/routing/pipeline.dart

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@ Future<void> main() async {
99
final handler = const Pipeline()
1010
// Pipelines allows middleware to run before routing
1111
.addMiddleware(logRequests())
12-
.addHandler(respondWith(
13-
(final request) => Response.ok(
14-
body: Body.fromString('Hello, Relic!'),
12+
.addHandler(
13+
respondWith(
14+
(final request) =>
15+
Response.ok(body: Body.fromString('Hello, Relic!')),
1516
),
16-
)); // handles any verb, and any path
17+
); // handles any verb, and any path
1718

1819
// Start a server that forward request to the handler
19-
final adapter = await IOAdapter.bind(InternetAddress.anyIPv4, port: 8080);
20-
final server = RelicServer(adapter);
20+
final server = RelicServer(
21+
() => IOAdapter.bind(InternetAddress.anyIPv4, port: 8080),
22+
);
2123
await server.mountAndStart(handler);
2224
}

lib/src/adapter/adapter.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ abstract class AdapterRequest {
3838
/// [AdapterRequest] format, and then handing them off to the Relic core.
3939
/// They also handle sending back responses or managing hijacked connections.
4040
abstract class Adapter {
41+
/// The port this adapter is bound to.
42+
int get port;
43+
4144
/// A stream of incoming requests from the underlying source.
4245
///
4346
/// Each event in the stream is an [AdapterRequest] representing a new

lib/src/adapter/io/io_adapter.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class IOAdapter extends Adapter {
6060
/// The [io.InternetAddress] the underlying server is listening on.
6161
io.InternetAddress get address => _server.address;
6262

63-
/// The port number the underlying server is listening on.
63+
@override
6464
int get port => _server.port;
6565

6666
@override

lib/src/isolated_object.dart

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import 'dart:async';
2+
import 'dart:isolate';
3+
4+
import 'util/util.dart';
5+
6+
typedef Factory<T> = FutureOr<T> Function();
7+
8+
typedef _Action<T> = ({int id, dynamic Function(T) function});
9+
typedef _Response = ({int id, dynamic result});
10+
typedef _Inflight = Map<int, Completer>;
11+
typedef _Setup = (SendPort, ReceivePort, _Inflight);
12+
13+
class IsolatedObject<T> {
14+
final Future<_Setup> _connected;
15+
16+
IsolatedObject(final Factory<T> create) : _connected = _connect(create);
17+
18+
static Future<_Setup> _connect<T>(
19+
final Factory<T> create,
20+
) async {
21+
final parentPort = RawReceivePort();
22+
final setupDone = Completer<_Setup>();
23+
24+
parentPort.handler = (final dynamic message) async {
25+
if (message case final RemoteError e) {
26+
setupDone.completeError(e, e.stackTrace);
27+
} else {
28+
final toChild = message as SendPort;
29+
final fromChild = ReceivePort.fromRawReceivePort(parentPort);
30+
final inflight = _Inflight();
31+
setupDone.complete((toChild, fromChild, inflight));
32+
}
33+
};
34+
35+
try {
36+
await _spawn(create, parentPort.sendPort);
37+
} catch (_) {
38+
parentPort.close();
39+
rethrow;
40+
}
41+
42+
final result = await setupDone.future;
43+
final (toChild, fromChild, inflight) = result;
44+
45+
fromChild.asyncListen((final message) async {
46+
if (message case final _Response response) {
47+
final completer = inflight.remove(response.id);
48+
assert(completer != null, 'PROTOCOL BUG. No such ID ${response.id}');
49+
if (completer == null) return; // coverage: ignore-line
50+
switch (response.result) {
51+
case final RemoteError e:
52+
completer.completeError(e, e.stackTrace);
53+
default:
54+
completer.complete(await response.result);
55+
}
56+
}
57+
}, onDone: () {
58+
// ReceivePort closed. Fail any pending requests to avoid hangs.
59+
for (final c in inflight.values) {
60+
if (!c.isCompleted) {
61+
c.completeError(StateError('IsolatedObject<$T> channel closed'));
62+
}
63+
}
64+
inflight.clear();
65+
});
66+
67+
return result;
68+
}
69+
70+
static Future<Isolate> _spawn<T>(
71+
final Factory<T> create,
72+
final SendPort toParent,
73+
) {
74+
return Isolate.spawn(
75+
(final toParent) async {
76+
final childPort = ReceivePort();
77+
final T isolatedObject;
78+
try {
79+
isolatedObject = await create();
80+
toParent.send(childPort.sendPort); // complete handshake
81+
} catch (e, st) {
82+
toParent.send(RemoteError('$e', '$st'));
83+
return;
84+
}
85+
86+
// process inbound actions
87+
await for (final message in childPort) {
88+
if (message == null) {
89+
// shutdown signal received
90+
childPort.close();
91+
break;
92+
} else if (message case final _Action<T> action) {
93+
try {
94+
final result = await action.function(isolatedObject);
95+
toParent.send((id: action.id, result: result)); // return result
96+
} catch (e, st) {
97+
toParent.send((id: action.id, result: RemoteError('$e', '$st')));
98+
}
99+
}
100+
}
101+
},
102+
toParent,
103+
);
104+
}
105+
106+
int _nextId = 0;
107+
Future<U> evaluate<U>(final FutureOr<U> Function(T) function) async {
108+
final (toChild, _, inflight) = await _connected;
109+
110+
final id = _nextId++;
111+
final completer = Completer<U>();
112+
inflight[id] = completer;
113+
114+
toChild.send((id: id, function: function));
115+
return await completer.future;
116+
}
117+
118+
Future<void> close() async {
119+
final (toChild, fromChild, _) = await _connected;
120+
toChild.send(null); // shutdown signal
121+
fromChild.close();
122+
}
123+
}

lib/src/relic_server.dart

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,45 +6,80 @@ import 'body/body.dart';
66
import 'handler/handler.dart';
77
import 'headers/exception/header_exception.dart';
88
import 'headers/standard_headers_extensions.dart';
9+
import 'isolated_object.dart';
910
import 'logger/logger.dart';
1011
import 'message/request.dart';
1112
import 'message/response.dart';
1213
import 'util/util.dart';
1314

14-
/// A server that uses a [Adapter] to handle HTTP requests.
15-
class RelicServer {
16-
/// The underlying adapter.
17-
final Adapter adapter;
15+
sealed class RelicServer {
16+
/// Mounts a [handler] to the server and starts listening for requests.
17+
///
18+
/// Only one [handler] can be mounted at a time, but it will be replaced
19+
/// on each call.
20+
Future<void> mountAndStart(final Handler handler);
1821

19-
/// Whether [mountAndStart] has been called.
20-
Handler? _handler;
22+
/// Close the server
23+
Future<void> close();
2124

25+
/// The port this server is bound to.
26+
///
27+
/// This will throw a [LateInitializationError], if called before [mountAndStart].
28+
int get port;
29+
30+
factory RelicServer(
31+
final Factory<Adapter> adapterFactory, {
32+
final int noOfIsolates = 1,
33+
}) {
34+
return switch (noOfIsolates) {
35+
< 1 => throw RangeError.value(
36+
noOfIsolates,
37+
'noOfIsolates',
38+
'Must be larger than 0',
39+
),
40+
== 1 => _RelicServer(adapterFactory),
41+
_ => _MultiIsolateRelicServer(adapterFactory, noOfIsolates),
42+
};
43+
}
44+
}
45+
46+
/// A server that uses a [Adapter] to handle HTTP requests.
47+
final class _RelicServer implements RelicServer {
48+
final FutureOr<Adapter> _adapter;
49+
Handler? _handler;
2250
StreamSubscription<AdapterRequest>? _subscription;
2351

2452
/// Creates a server with the given parameters.
25-
RelicServer(this.adapter);
53+
_RelicServer(final Factory<Adapter> adapterFactory)
54+
: _adapter = adapterFactory();
2655

2756
/// Mounts a handler to the server and starts listening for requests.
2857
///
2958
/// Only one handler can be mounted at a time.
59+
@override
3060
Future<void> mountAndStart(final Handler handler) async {
61+
port = (await _adapter).port;
3162
_handler = _wrapHandlerWithMiddleware(handler);
3263
if (_subscription == null) await _startListening();
3364
}
3465

35-
/// Close the server
66+
@override
3667
Future<void> close() async {
3768
await _stopListening();
38-
await adapter.close();
69+
await (await _adapter).close();
3970
}
4071

72+
@override
73+
late final int port;
74+
4175
Future<void> _stopListening() async {
4276
await _subscription?.cancel();
4377
_handler = null;
4478
}
4579

4680
/// Starts listening for requests.
4781
Future<void> _startListening() async {
82+
final adapter = await _adapter;
4883
catchTopLevelErrors(() {
4984
_subscription = adapter.requests.listen(_handleRequest);
5085
}, (final error, final stackTrace) {
@@ -60,6 +95,8 @@ class RelicServer {
6095
final handler = _handler;
6196
if (handler == null) return; // if close has been called
6297

98+
final adapter = await _adapter;
99+
63100
// Wrap the handler with our middleware
64101
late Request request;
65102
try {
@@ -143,3 +180,47 @@ void _logError(
143180
type: LoggerType.error,
144181
);
145182
}
183+
184+
final class _IsolatedRelicServer extends IsolatedObject<RelicServer>
185+
implements RelicServer {
186+
_IsolatedRelicServer(final Factory<Adapter> adapterFactory)
187+
: super(() => RelicServer(adapterFactory));
188+
189+
@override
190+
Future<void> close() async {
191+
await evaluate((final r) => r.close());
192+
await super.close();
193+
}
194+
195+
@override
196+
Future<void> mountAndStart(final Handler handler) async {
197+
await evaluate((final r) => r.mountAndStart(handler));
198+
port = await evaluate((final r) => r.port);
199+
}
200+
201+
@override
202+
late final int port;
203+
}
204+
205+
final class _MultiIsolateRelicServer implements RelicServer {
206+
final List<RelicServer> _children;
207+
208+
_MultiIsolateRelicServer(
209+
final Factory<Adapter> adapterFactory,
210+
final int noOfIsolates,
211+
) : _children = List.generate(
212+
noOfIsolates, (final _) => _IsolatedRelicServer(adapterFactory));
213+
214+
@override
215+
Future<void> close() async {
216+
await _children.map((final c) => c.close()).wait;
217+
}
218+
219+
@override
220+
Future<void> mountAndStart(final Handler handler) async {
221+
await _children.map((final c) => c.mountAndStart(handler)).wait;
222+
}
223+
224+
@override
225+
int get port => _children.first.port;
226+
}

lib/src/router/relic_app.dart

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ final class RelicApp implements RelicRouter {
1717
/// The adapter factory pattern allows for deferred initialization of server
1818
/// resources.
1919
///
20+
/// If [noOfIsolates] equals 1, the server is started on the current isolate.
21+
/// Otherwise [noOfIsolates] isolates are spun up. When using multiple isolates make
22+
/// sure that any handler and middleware configured are sendable.
23+
/// (see https://api.dart.dev/stable/dart-isolate/SendPort/send.html).
24+
///
2025
/// Returns a [Future] that completes with the running [RelicServer] instance.
2126
///
2227
/// [RelicApp] instances supports hot-reload. They will re-configure the internal
@@ -28,7 +33,7 @@ final class RelicApp implements RelicRouter {
2833
/// final app = RelicApp()
2934
/// ..get('/', (ctx) => ctx.ok('Hello!'));
3035
///
31-
/// final adaptorFactory = () => IOAdapter.bind(InternetAddress.loopbackIPv4, port: 8080);
36+
/// final adapterFactory = () => IOAdapter.bind(InternetAddress.loopbackIPv4, port: 8080);
3237
/// final server = await app.run(adapterFactory);
3338
///
3439
/// // later .. when done
@@ -39,10 +44,11 @@ final class RelicApp implements RelicRouter {
3944
/// Check out [RelicAppIOServeEx.serve] if you are using `dart:io` to avoid
4045
/// specifying [adapterFactory] explicitly.
4146
Future<RelicServer> run(
42-
final FutureOr<Adapter> Function() adapterFactory,
43-
) async {
47+
final FutureOr<Adapter> Function() adapterFactory, {
48+
final int noOfIsolates = 1,
49+
}) async {
4450
if (_server != null) throw StateError('Cannot call run twice');
45-
_server = RelicServer(await adapterFactory());
51+
_server = RelicServer(adapterFactory, noOfIsolates: noOfIsolates);
4652
await _init();
4753
_reloadSubscription = await _hotReloader.register(this);
4854
return _server!;
@@ -136,9 +142,7 @@ class _HotReloader {
136142
Future<StreamSubscription?> register(final RelicApp app) async {
137143
final reloadStream = await _reloadStream;
138144
if (reloadStream != null) {
139-
return reloadStream
140-
.asyncMap((final _) => app._reload())
141-
.listen((final _) {});
145+
return reloadStream.asyncListen((final _) => app._reload());
142146
}
143147
return null;
144148
}

0 commit comments

Comments
 (0)