Skip to content

Commit

Permalink
Server API ergonomics
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Jan 9, 2024
1 parent a5c8aaf commit 83d7df7
Showing 1 changed file with 114 additions and 96 deletions.
210 changes: 114 additions & 96 deletions src/server/restate_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/

import { on } from "events";
import stream from "stream";
import { pipeline, finished } from "stream/promises";
import http2 from "http2";
import http2, { Http2ServerRequest, Http2ServerResponse } from "http2";
import { parse as urlparse, Url } from "url";
import {
ProtocolMode,
Expand All @@ -27,63 +26,9 @@ import { InvocationBuilder } from "../invocation";
import { StateMachine } from "../state_machine";
import { KeyedRouter, UnKeyedRouter } from "../types/router";

/**
* Creates a Restate entrypoint based on a HTTP2 server. The entrypoint will listen
* for requests to the services at a specified port.
*
* This is the entrypoint to be used in most scenarios (standalone, Docker, Kubernetes, ...);
* any deployments that forwards requests to a network endpoint. The prominent exception is
* AWS Lambda, which uses the {@link restate_lambda_handler#lambdaApiGatewayHandler}
* function to create an entry point.
*
* After creating this endpoint, register services on this entrypoint via {@link RestateServer.bindService }
* and start it via {@link RestateServer.listen }.
*
* @example
* A typical entry point would look like this:
* ```
* import * as restate from "@restatedev/restate-sdk";
*
* export const handler = restate
* .createServer()
* .bindService({
* service: "MyService",
* instance: new myService.MyServiceImpl(),
* descriptor: myService.protoMetadata,
* })
* .listen(8000);
* ```
*/
export function createServer(): RestateServer {
return new RestateServer();
}

/**
* Restate entrypoint implementation for services. This server receives and
* decodes the requests, streams events between the service and the Restate runtime,
* and drives the durable execution of the service invocations.
*/
export class RestateServer extends BaseRestateServer {
constructor() {
super(ProtocolMode.BIDI_STREAM);
}

public bindKeyedRouter<M>(
path: string,
router: KeyedRouter<M>
): RestateServer {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, true);
return this;
}

public bindRouter<M>(path: string, router: UnKeyedRouter<M>): RestateServer {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, false);
return this;
}
export interface RestateServer {
// RestateServer is a http2 server handler that you can pass to http2.createServer.
(request: Http2ServerRequest, response: Http2ServerResponse): void;

/**
* Adds a gRPC service to be served from this endpoint.
Expand Down Expand Up @@ -121,12 +66,11 @@ export class RestateServer extends BaseRestateServer {
* @param serviceOpts The options describing the service to be bound. See above for a detailed description.
* @returns An instance of this RestateServer
*/
public bindService(serviceOpts: ServiceOpts): RestateServer {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindService(serviceOpts);
return this;
}
bindService(serviceOpts: ServiceOpts): RestateServer;

bindKeyedRouter<M>(path: string, router: KeyedRouter<M>): RestateServer;

bindRouter<M>(path: string, router: UnKeyedRouter<M>): RestateServer;

/**
* Starts the Restate server and listens at the given port.
Expand All @@ -137,23 +81,116 @@ export class RestateServer extends BaseRestateServer {
*
* This method's result promise never completes.
*
* This method is a shorthand for:
*
* @example
* ```
* const httpServer = http2.createServer(restateServer);
* httpServer.listen(port);
* ```
*
* If you need to manually control the server lifecycle, we suggest to manually instantiate the http2 server and use this object as request handler.
*
* @param port The port to listen at. May be undefined (see above).
*/
public async listen(port?: number) {
// Infer the port if not specified, or default it
listen(port?: number): Promise<void>;
}

/**
* Creates a Restate entrypoint based on a HTTP2 server. The entrypoint will listen
* for requests to the services at a specified port.
*
* This is the entrypoint to be used in most scenarios (standalone, Docker, Kubernetes, ...);
* any deployments that forwards requests to a network endpoint. The prominent exception is
* AWS Lambda, which uses the {@link restate_lambda_handler#lambdaApiGatewayHandler}
* function to create an entry point.
*
* After creating this endpoint, register services on this entrypoint via {@link RestateServer.bindService }
* and start it via {@link RestateServer.listen }.
*
* @example
* A typical entry point would look like this:
* ```
* import * as restate from "@restatedev/restate-sdk";
*
* export const handler = restate
* .createServer()
* .bindService({
* service: "MyService",
* instance: new myService.MyServiceImpl(),
* descriptor: myService.protoMetadata,
* })
* .listen(8000);
* ```
*/
export function createServer(): RestateServer {
// See https://stackoverflow.com/questions/16508435/implementing-typescript-interface-with-bare-function-signature-plus-other-fields/16508581#16508581
// for more details on how we implement the RestateServer interface.

const restateServerImpl = new RestateServerImpl();
const instance: any = (request: Http2ServerRequest, response: Http2ServerResponse) => {
restateServerImpl.acceptConnection(request, response);
}
instance.bindKeyedRouter = <M>(path: string, router: UnKeyedRouter<M>) => {
restateServerImpl.bindKeyedRouter(path, router);
return instance;
};
instance.bindRouter = <M>(path: string, router: UnKeyedRouter<M>) => {
restateServerImpl.bindRouter(path, router);
return instance;
};
instance.bindService = (serviceOpts: ServiceOpts) => {
restateServerImpl.bindService(serviceOpts);
return instance;
};
instance.listen = (port?: number) => {
const actualPort = port ?? parseInt(process.env.PORT ?? "9080");
rlog.info(`Listening on ${actualPort}...`);

for await (const connection of incomingConnectionAtPort(actualPort)) {
this.handleConnection(connection.url, connection.stream).catch((e) => {
const error = ensureError(e);
rlog.error(
"Error while handling connection: " + (error.stack ?? error.message)
);
connection.stream.end();
connection.stream.destroy();
});
}
const server = http2.createServer(instance);
server.listen(actualPort);
return new Promise(() => {});
};

return <RestateServer>instance;
}

class RestateServerImpl extends BaseRestateServer {
constructor() {
super(ProtocolMode.BIDI_STREAM);
}

bindKeyedRouter<M>(path: string, router: KeyedRouter<M>) {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, true);
}

bindRouter<M>(path: string, router: UnKeyedRouter<M>) {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, false);
}

bindService(serviceOpts: ServiceOpts) {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindService(serviceOpts);
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
acceptConnection(request: Http2ServerRequest, _response: Http2ServerResponse) {
const stream = request.stream;
const url: Url = urlparse(request.url ?? "/");

this.handleConnection(url, stream).catch((e) => {
const error = ensureError(e);
rlog.error(
"Error while handling connection: " + (error.stack ?? error.message)
);
stream.end();
stream.destroy();
});
}

private async handleConnection(
Expand Down Expand Up @@ -190,25 +227,6 @@ export class RestateServer extends BaseRestateServer {
}
}

async function* incomingConnectionAtPort(port: number) {
const server = http2.createServer();

server.on("error", (err) =>
rlog.error("Error in Restate service endpoint http2 server: " + err)
);
server.listen(port);

let connectionId = 1n;

for await (const [s, h] of on(server, "stream")) {
const stream = s as http2.ServerHttp2Stream;
const headers = h as http2.IncomingHttpHeaders;
const url: Url = urlparse(headers[":path"] ?? "/");
connectionId++;
yield { connectionId, url, headers, stream };
}
}

async function respondDiscovery(
response: ServiceDiscoveryResponse,
http2Stream: http2.ServerHttp2Stream
Expand Down

0 comments on commit 83d7df7

Please sign in to comment.