diff --git a/src/server/base_restate_server.ts b/src/server/base_restate_server.ts index 3b68ce4e..e8632d30 100644 --- a/src/server/base_restate_server.ts +++ b/src/server/base_restate_server.ts @@ -132,7 +132,7 @@ export abstract class BaseRestateServer { }); } - protected bindService({ descriptor, service, instance }: ServiceOpts) { + bindService({ descriptor, service, instance }: ServiceOpts) { const spec = parseService(descriptor, service, instance); this.addDescriptor(descriptor); diff --git a/src/server/restate_server.ts b/src/server/restate_server.ts index 24a12597..2eb281f8 100644 --- a/src/server/restate_server.ts +++ b/src/server/restate_server.ts @@ -9,10 +9,9 @@ * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE */ -import { on } from "events"; import stream from "stream"; import { pipeline, finished } from "stream/promises"; -import http2 from "http2"; +import http2, { Http2ServerRequest, Http2ServerResponse } from "http2"; import { parse as urlparse, Url } from "url"; import { ProtocolMode, @@ -27,63 +26,9 @@ import { InvocationBuilder } from "../invocation"; import { StateMachine } from "../state_machine"; import { KeyedRouter, UnKeyedRouter } from "../types/router"; -/** - * Creates a Restate entrypoint based on a HTTP2 server. The entrypoint will listen - * for requests to the services at a specified port. - * - * This is the entrypoint to be used in most scenarios (standalone, Docker, Kubernetes, ...); - * any deployments that forwards requests to a network endpoint. The prominent exception is - * AWS Lambda, which uses the {@link restate_lambda_handler#lambdaApiGatewayHandler} - * function to create an entry point. - * - * After creating this endpoint, register services on this entrypoint via {@link RestateServer.bindService } - * and start it via {@link RestateServer.listen }. - * - * @example - * A typical entry point would look like this: - * ``` - * import * as restate from "@restatedev/restate-sdk"; - * - * export const handler = restate - * .createServer() - * .bindService({ - * service: "MyService", - * instance: new myService.MyServiceImpl(), - * descriptor: myService.protoMetadata, - * }) - * .listen(8000); - * ``` - */ -export function createServer(): RestateServer { - return new RestateServer(); -} - -/** - * Restate entrypoint implementation for services. This server receives and - * decodes the requests, streams events between the service and the Restate runtime, - * and drives the durable execution of the service invocations. - */ -export class RestateServer extends BaseRestateServer { - constructor() { - super(ProtocolMode.BIDI_STREAM); - } - - public bindKeyedRouter( - path: string, - router: KeyedRouter - ): RestateServer { - // Implementation note: This override if here mainly to change the return type to the more - // concrete type RestateServer (from BaseRestateServer). - super.bindRpcService(path, router, true); - return this; - } - - public bindRouter(path: string, router: UnKeyedRouter): RestateServer { - // Implementation note: This override if here mainly to change the return type to the more - // concrete type RestateServer (from BaseRestateServer). - super.bindRpcService(path, router, false); - return this; - } +export interface RestateServer { + // RestateServer is a http2 server handler that you can pass to http2.createServer. + (request: Http2ServerRequest, response: Http2ServerResponse): void; /** * Adds a gRPC service to be served from this endpoint. @@ -121,12 +66,11 @@ export class RestateServer extends BaseRestateServer { * @param serviceOpts The options describing the service to be bound. See above for a detailed description. * @returns An instance of this RestateServer */ - public bindService(serviceOpts: ServiceOpts): RestateServer { - // Implementation note: This override if here mainly to change the return type to the more - // concrete type RestateServer (from BaseRestateServer). - super.bindService(serviceOpts); - return this; - } + bindService(serviceOpts: ServiceOpts): RestateServer; + + bindKeyedRouter(path: string, router: KeyedRouter): RestateServer; + + bindRouter(path: string, router: UnKeyedRouter): RestateServer; /** * Starts the Restate server and listens at the given port. @@ -137,23 +81,124 @@ export class RestateServer extends BaseRestateServer { * * This method's result promise never completes. * + * This method is a shorthand for: + * + * @example + * ``` + * const httpServer = http2.createServer(restateServer); + * httpServer.listen(port); + * ``` + * + * If you need to manually control the server lifecycle, we suggest to manually instantiate the http2 server and use this object as request handler. + * * @param port The port to listen at. May be undefined (see above). */ - public async listen(port?: number) { - // Infer the port if not specified, or default it + listen(port?: number): Promise; +} + +/** + * Creates a Restate entrypoint based on a HTTP2 server. The entrypoint will listen + * for requests to the services at a specified port. + * + * This is the entrypoint to be used in most scenarios (standalone, Docker, Kubernetes, ...); + * any deployments that forwards requests to a network endpoint. The prominent exception is + * AWS Lambda, which uses the {@link restate_lambda_handler#lambdaApiGatewayHandler} + * function to create an entry point. + * + * After creating this endpoint, register services on this entrypoint via {@link RestateServer.bindService } + * and start it via {@link RestateServer.listen }. + * + * @example + * A typical entry point would look like this: + * ``` + * import * as restate from "@restatedev/restate-sdk"; + * + * export const handler = restate + * .createServer() + * .bindService({ + * service: "MyService", + * instance: new myService.MyServiceImpl(), + * descriptor: myService.protoMetadata, + * }) + * .listen(8000); + * ``` + */ +export function createServer(): RestateServer { + // See https://stackoverflow.com/questions/16508435/implementing-typescript-interface-with-bare-function-signature-plus-other-fields/16508581#16508581 + // for more details on how we implement the RestateServer interface. + + const restateServerImpl = new RestateServerImpl(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const instance: any = ( + request: Http2ServerRequest, + response: Http2ServerResponse + ) => { + restateServerImpl.acceptConnection(request, response); + }; + instance.bindKeyedRouter = (path: string, router: UnKeyedRouter) => { + restateServerImpl.bindKeyedRouter(path, router); + return instance; + }; + instance.bindRouter = (path: string, router: UnKeyedRouter) => { + restateServerImpl.bindRouter(path, router); + return instance; + }; + instance.bindService = (serviceOpts: ServiceOpts) => { + restateServerImpl.bindService(serviceOpts); + return instance; + }; + instance.listen = (port?: number) => { const actualPort = port ?? parseInt(process.env.PORT ?? "9080"); rlog.info(`Listening on ${actualPort}...`); - for await (const connection of incomingConnectionAtPort(actualPort)) { - this.handleConnection(connection.url, connection.stream).catch((e) => { - const error = ensureError(e); - rlog.error( - "Error while handling connection: " + (error.stack ?? error.message) - ); - connection.stream.end(); - connection.stream.destroy(); - }); - } + const server = http2.createServer(instance); + server.listen(actualPort); + // eslint-disable-next-line @typescript-eslint/no-empty-function + return new Promise(() => {}); + }; + + return instance; +} + +class RestateServerImpl extends BaseRestateServer { + constructor() { + super(ProtocolMode.BIDI_STREAM); + } + + bindKeyedRouter(path: string, router: KeyedRouter) { + // Implementation note: This override if here mainly to change the return type to the more + // concrete type RestateServer (from BaseRestateServer). + super.bindRpcService(path, router, true); + } + + bindRouter(path: string, router: UnKeyedRouter) { + // Implementation note: This override if here mainly to change the return type to the more + // concrete type RestateServer (from BaseRestateServer). + super.bindRpcService(path, router, false); + } + + bindService(serviceOpts: ServiceOpts) { + // Implementation note: This override if here mainly to change the return type to the more + // concrete type RestateServer (from BaseRestateServer). + super.bindService(serviceOpts); + } + + acceptConnection( + request: Http2ServerRequest, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + _response: Http2ServerResponse + ) { + const stream = request.stream; + const url: Url = urlparse(request.url ?? "/"); + + this.handleConnection(url, stream).catch((e) => { + const error = ensureError(e); + rlog.error( + "Error while handling connection: " + (error.stack ?? error.message) + ); + stream.end(); + stream.destroy(); + }); } private async handleConnection( @@ -190,25 +235,6 @@ export class RestateServer extends BaseRestateServer { } } -async function* incomingConnectionAtPort(port: number) { - const server = http2.createServer(); - - server.on("error", (err) => - rlog.error("Error in Restate service endpoint http2 server: " + err) - ); - server.listen(port); - - let connectionId = 1n; - - for await (const [s, h] of on(server, "stream")) { - const stream = s as http2.ServerHttp2Stream; - const headers = h as http2.IncomingHttpHeaders; - const url: Url = urlparse(headers[":path"] ?? "/"); - connectionId++; - yield { connectionId, url, headers, stream }; - } -} - async function respondDiscovery( response: ServiceDiscoveryResponse, http2Stream: http2.ServerHttp2Stream diff --git a/test/testdriver.ts b/test/testdriver.ts index 5c66013a..31d76ddd 100644 --- a/test/testdriver.ts +++ b/test/testdriver.ts @@ -15,7 +15,6 @@ import { START_MESSAGE_TYPE, StartMessage, } from "../src/types/protocol"; -import * as restate from "../src/public_api"; import { Connection } from "../src/connection/connection"; import { printMessageAsJson } from "../src/utils/utils"; import { Message } from "../src/types/types"; @@ -25,6 +24,7 @@ import { rlog } from "../src/utils/logger"; import { StateMachine } from "../src/state_machine"; import { InvocationBuilder } from "../src/invocation"; import { protoMetadata } from "../src/generated/proto/test"; +import { BaseRestateServer } from "../src/server/base_restate_server"; export class TestDriver implements Connection { private readonly result: Message[] = []; @@ -182,7 +182,11 @@ export class TestDriver implements Connection { * make it simpler for users to understand what methods are relevant for them, * and which ones are not. */ -class TestRestateServer extends restate.RestateServer { +class TestRestateServer extends BaseRestateServer { + constructor() { + super(ProtocolMode.BIDI_STREAM); + } + public methodByUrl( url: string | null | undefined ): HostedGrpcServiceMethod | undefined {