Skip to content

Commit

Permalink
Server API ergonomics (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Jan 11, 2024
1 parent 799761a commit 31e2c9e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 99 deletions.
2 changes: 1 addition & 1 deletion src/server/base_restate_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export abstract class BaseRestateServer {
});
}

protected bindService({ descriptor, service, instance }: ServiceOpts) {
bindService({ descriptor, service, instance }: ServiceOpts) {
const spec = parseService(descriptor, service, instance);
this.addDescriptor(descriptor);

Expand Down
218 changes: 122 additions & 96 deletions src/server/restate_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/

import { on } from "events";
import stream from "stream";
import { pipeline, finished } from "stream/promises";
import http2 from "http2";
import http2, { Http2ServerRequest, Http2ServerResponse } from "http2";
import { parse as urlparse, Url } from "url";
import {
ProtocolMode,
Expand All @@ -27,63 +26,9 @@ import { InvocationBuilder } from "../invocation";
import { StateMachine } from "../state_machine";
import { KeyedRouter, UnKeyedRouter } from "../types/router";

/**
* Creates a Restate entrypoint based on a HTTP2 server. The entrypoint will listen
* for requests to the services at a specified port.
*
* This is the entrypoint to be used in most scenarios (standalone, Docker, Kubernetes, ...);
* any deployments that forwards requests to a network endpoint. The prominent exception is
* AWS Lambda, which uses the {@link restate_lambda_handler#lambdaApiGatewayHandler}
* function to create an entry point.
*
* After creating this endpoint, register services on this entrypoint via {@link RestateServer.bindService }
* and start it via {@link RestateServer.listen }.
*
* @example
* A typical entry point would look like this:
* ```
* import * as restate from "@restatedev/restate-sdk";
*
* export const handler = restate
* .createServer()
* .bindService({
* service: "MyService",
* instance: new myService.MyServiceImpl(),
* descriptor: myService.protoMetadata,
* })
* .listen(8000);
* ```
*/
export function createServer(): RestateServer {
return new RestateServer();
}

/**
* Restate entrypoint implementation for services. This server receives and
* decodes the requests, streams events between the service and the Restate runtime,
* and drives the durable execution of the service invocations.
*/
export class RestateServer extends BaseRestateServer {
constructor() {
super(ProtocolMode.BIDI_STREAM);
}

public bindKeyedRouter<M>(
path: string,
router: KeyedRouter<M>
): RestateServer {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, true);
return this;
}

public bindRouter<M>(path: string, router: UnKeyedRouter<M>): RestateServer {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, false);
return this;
}
export interface RestateServer {
// RestateServer is a http2 server handler that you can pass to http2.createServer.
(request: Http2ServerRequest, response: Http2ServerResponse): void;

/**
* Adds a gRPC service to be served from this endpoint.
Expand Down Expand Up @@ -121,12 +66,11 @@ export class RestateServer extends BaseRestateServer {
* @param serviceOpts The options describing the service to be bound. See above for a detailed description.
* @returns An instance of this RestateServer
*/
public bindService(serviceOpts: ServiceOpts): RestateServer {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindService(serviceOpts);
return this;
}
bindService(serviceOpts: ServiceOpts): RestateServer;

bindKeyedRouter<M>(path: string, router: KeyedRouter<M>): RestateServer;

bindRouter<M>(path: string, router: UnKeyedRouter<M>): RestateServer;

/**
* Starts the Restate server and listens at the given port.
Expand All @@ -137,23 +81,124 @@ export class RestateServer extends BaseRestateServer {
*
* This method's result promise never completes.
*
* This method is a shorthand for:
*
* @example
* ```
* const httpServer = http2.createServer(restateServer);
* httpServer.listen(port);
* ```
*
* If you need to manually control the server lifecycle, we suggest to manually instantiate the http2 server and use this object as request handler.
*
* @param port The port to listen at. May be undefined (see above).
*/
public async listen(port?: number) {
// Infer the port if not specified, or default it
listen(port?: number): Promise<void>;
}

/**
* Creates a Restate entrypoint based on a HTTP2 server. The entrypoint will listen
* for requests to the services at a specified port.
*
* This is the entrypoint to be used in most scenarios (standalone, Docker, Kubernetes, ...);
* any deployments that forwards requests to a network endpoint. The prominent exception is
* AWS Lambda, which uses the {@link restate_lambda_handler#lambdaApiGatewayHandler}
* function to create an entry point.
*
* After creating this endpoint, register services on this entrypoint via {@link RestateServer.bindService }
* and start it via {@link RestateServer.listen }.
*
* @example
* A typical entry point would look like this:
* ```
* import * as restate from "@restatedev/restate-sdk";
*
* export const handler = restate
* .createServer()
* .bindService({
* service: "MyService",
* instance: new myService.MyServiceImpl(),
* descriptor: myService.protoMetadata,
* })
* .listen(8000);
* ```
*/
export function createServer(): RestateServer {
// See https://stackoverflow.com/questions/16508435/implementing-typescript-interface-with-bare-function-signature-plus-other-fields/16508581#16508581
// for more details on how we implement the RestateServer interface.

const restateServerImpl = new RestateServerImpl();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const instance: any = (
request: Http2ServerRequest,
response: Http2ServerResponse
) => {
restateServerImpl.acceptConnection(request, response);
};
instance.bindKeyedRouter = <M>(path: string, router: UnKeyedRouter<M>) => {
restateServerImpl.bindKeyedRouter(path, router);
return instance;
};
instance.bindRouter = <M>(path: string, router: UnKeyedRouter<M>) => {
restateServerImpl.bindRouter(path, router);
return instance;
};
instance.bindService = (serviceOpts: ServiceOpts) => {
restateServerImpl.bindService(serviceOpts);
return instance;
};
instance.listen = (port?: number) => {
const actualPort = port ?? parseInt(process.env.PORT ?? "9080");
rlog.info(`Listening on ${actualPort}...`);

for await (const connection of incomingConnectionAtPort(actualPort)) {
this.handleConnection(connection.url, connection.stream).catch((e) => {
const error = ensureError(e);
rlog.error(
"Error while handling connection: " + (error.stack ?? error.message)
);
connection.stream.end();
connection.stream.destroy();
});
}
const server = http2.createServer(instance);
server.listen(actualPort);
// eslint-disable-next-line @typescript-eslint/no-empty-function
return new Promise(() => {});
};

return <RestateServer>instance;
}

class RestateServerImpl extends BaseRestateServer {
constructor() {
super(ProtocolMode.BIDI_STREAM);
}

bindKeyedRouter<M>(path: string, router: KeyedRouter<M>) {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, true);
}

bindRouter<M>(path: string, router: UnKeyedRouter<M>) {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindRpcService(path, router, false);
}

bindService(serviceOpts: ServiceOpts) {
// Implementation note: This override if here mainly to change the return type to the more
// concrete type RestateServer (from BaseRestateServer).
super.bindService(serviceOpts);
}

acceptConnection(
request: Http2ServerRequest,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_response: Http2ServerResponse
) {
const stream = request.stream;
const url: Url = urlparse(request.url ?? "/");

this.handleConnection(url, stream).catch((e) => {
const error = ensureError(e);
rlog.error(
"Error while handling connection: " + (error.stack ?? error.message)
);
stream.end();
stream.destroy();
});
}

private async handleConnection(
Expand Down Expand Up @@ -190,25 +235,6 @@ export class RestateServer extends BaseRestateServer {
}
}

async function* incomingConnectionAtPort(port: number) {
const server = http2.createServer();

server.on("error", (err) =>
rlog.error("Error in Restate service endpoint http2 server: " + err)
);
server.listen(port);

let connectionId = 1n;

for await (const [s, h] of on(server, "stream")) {
const stream = s as http2.ServerHttp2Stream;
const headers = h as http2.IncomingHttpHeaders;
const url: Url = urlparse(headers[":path"] ?? "/");
connectionId++;
yield { connectionId, url, headers, stream };
}
}

async function respondDiscovery(
response: ServiceDiscoveryResponse,
http2Stream: http2.ServerHttp2Stream
Expand Down
8 changes: 6 additions & 2 deletions test/testdriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
START_MESSAGE_TYPE,
StartMessage,
} from "../src/types/protocol";
import * as restate from "../src/public_api";
import { Connection } from "../src/connection/connection";
import { printMessageAsJson } from "../src/utils/utils";
import { Message } from "../src/types/types";
Expand All @@ -25,6 +24,7 @@ import { rlog } from "../src/utils/logger";
import { StateMachine } from "../src/state_machine";
import { InvocationBuilder } from "../src/invocation";
import { protoMetadata } from "../src/generated/proto/test";
import { BaseRestateServer } from "../src/server/base_restate_server";

export class TestDriver<I, O> implements Connection {
private readonly result: Message[] = [];
Expand Down Expand Up @@ -182,7 +182,11 @@ export class TestDriver<I, O> implements Connection {
* make it simpler for users to understand what methods are relevant for them,
* and which ones are not.
*/
class TestRestateServer extends restate.RestateServer {
class TestRestateServer extends BaseRestateServer {
constructor() {
super(ProtocolMode.BIDI_STREAM);
}

public methodByUrl<I, O>(
url: string | null | undefined
): HostedGrpcServiceMethod<I, O> | undefined {
Expand Down

0 comments on commit 31e2c9e

Please sign in to comment.