From e8f0971aa9945cfc02bfa852eb7ea43f1dc8e53a Mon Sep 17 00:00:00 2001 From: Christian Mladenov Date: Thu, 2 Feb 2023 09:45:45 -0800 Subject: [PATCH] Add a health check endpoint Add the GRPC health check service from https://github.com/grpc/grpc/blob/master/doc/health-checking.md --- src/isolate/server/health/__init__.py | 8 ++ src/isolate/server/health/health.proto | 23 ++++ src/isolate/server/health/health_pb2.py | 32 +++++ src/isolate/server/health/health_pb2.pyi | 75 +++++++++++ src/isolate/server/health/health_pb2_grpc.py | 124 +++++++++++++++++++ src/isolate/server/health_server.py | 40 ++++++ src/isolate/server/server.py | 4 +- tests/test_server.py | 47 +++++-- 8 files changed, 344 insertions(+), 9 deletions(-) create mode 100644 src/isolate/server/health/__init__.py create mode 100644 src/isolate/server/health/health.proto create mode 100644 src/isolate/server/health/health_pb2.py create mode 100644 src/isolate/server/health/health_pb2.pyi create mode 100644 src/isolate/server/health/health_pb2_grpc.py create mode 100644 src/isolate/server/health_server.py diff --git a/src/isolate/server/health/__init__.py b/src/isolate/server/health/__init__.py new file mode 100644 index 0000000..1f66f2e --- /dev/null +++ b/src/isolate/server/health/__init__.py @@ -0,0 +1,8 @@ +from isolate.server.health.health_pb2 import ( + HealthCheckRequest, + HealthCheckResponse, +) +from isolate.server.health.health_pb2_grpc import HealthServicer, HealthStub +from isolate.server.health.health_pb2_grpc import ( + add_HealthServicer_to_server as register_health, +) diff --git a/src/isolate/server/health/health.proto b/src/isolate/server/health/health.proto new file mode 100644 index 0000000..7be24c7 --- /dev/null +++ b/src/isolate/server/health/health.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} diff --git a/src/isolate/server/health/health_pb2.py b/src/isolate/server/health/health_pb2.py new file mode 100644 index 0000000..e5ed4e6 --- /dev/null +++ b/src/isolate/server/health/health_pb2.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: health.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x0chealth.proto\x12\x0egrpc.health.v1"%\n\x12HealthCheckRequest\x12\x0f\n\x07service\x18\x01 \x01(\t"\xa9\x01\n\x13HealthCheckResponse\x12\x41\n\x06status\x18\x01 \x01(\x0e\x32\x31.grpc.health.v1.HealthCheckResponse.ServingStatus"O\n\rServingStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SERVING\x10\x01\x12\x0f\n\x0bNOT_SERVING\x10\x02\x12\x13\n\x0fSERVICE_UNKNOWN\x10\x03\x32\xae\x01\n\x06Health\x12P\n\x05\x43heck\x12".grpc.health.v1.HealthCheckRequest\x1a#.grpc.health.v1.HealthCheckResponse\x12R\n\x05Watch\x12".grpc.health.v1.HealthCheckRequest\x1a#.grpc.health.v1.HealthCheckResponse0\x01\x62\x06proto3' +) + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "health_pb2", globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _HEALTHCHECKREQUEST._serialized_start = 32 + _HEALTHCHECKREQUEST._serialized_end = 69 + _HEALTHCHECKRESPONSE._serialized_start = 72 + _HEALTHCHECKRESPONSE._serialized_end = 241 + _HEALTHCHECKRESPONSE_SERVINGSTATUS._serialized_start = 162 + _HEALTHCHECKRESPONSE_SERVINGSTATUS._serialized_end = 241 + _HEALTH._serialized_start = 244 + _HEALTH._serialized_end = 418 +# @@protoc_insertion_point(module_scope) diff --git a/src/isolate/server/health/health_pb2.pyi b/src/isolate/server/health/health_pb2.pyi new file mode 100644 index 0000000..d60d416 --- /dev/null +++ b/src/isolate/server/health/health_pb2.pyi @@ -0,0 +1,75 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" +import builtins +import google.protobuf.descriptor +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import sys +import typing + +if sys.version_info >= (3, 10): + import typing as typing_extensions +else: + import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing_extensions.final +class HealthCheckRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SERVICE_FIELD_NUMBER: builtins.int + service: builtins.str + def __init__( + self, + *, + service: builtins.str = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["service", b"service"] + ) -> None: ... + +global___HealthCheckRequest = HealthCheckRequest + +@typing_extensions.final +class HealthCheckResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _ServingStatus: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _ServingStatusEnumTypeWrapper( + google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[ + HealthCheckResponse._ServingStatus.ValueType + ], + builtins.type, + ): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + UNKNOWN: HealthCheckResponse._ServingStatus.ValueType # 0 + SERVING: HealthCheckResponse._ServingStatus.ValueType # 1 + NOT_SERVING: HealthCheckResponse._ServingStatus.ValueType # 2 + SERVICE_UNKNOWN: HealthCheckResponse._ServingStatus.ValueType # 3 + """Used only by the Watch method.""" + + class ServingStatus(_ServingStatus, metaclass=_ServingStatusEnumTypeWrapper): ... + UNKNOWN: HealthCheckResponse.ServingStatus.ValueType # 0 + SERVING: HealthCheckResponse.ServingStatus.ValueType # 1 + NOT_SERVING: HealthCheckResponse.ServingStatus.ValueType # 2 + SERVICE_UNKNOWN: HealthCheckResponse.ServingStatus.ValueType # 3 + """Used only by the Watch method.""" + + STATUS_FIELD_NUMBER: builtins.int + status: global___HealthCheckResponse.ServingStatus.ValueType + def __init__( + self, + *, + status: global___HealthCheckResponse.ServingStatus.ValueType = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["status", b"status"] + ) -> None: ... + +global___HealthCheckResponse = HealthCheckResponse diff --git a/src/isolate/server/health/health_pb2_grpc.py b/src/isolate/server/health/health_pb2_grpc.py new file mode 100644 index 0000000..d9198b6 --- /dev/null +++ b/src/isolate/server/health/health_pb2_grpc.py @@ -0,0 +1,124 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from isolate.server.health import health_pb2 as health__pb2 + + +class HealthStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Check = channel.unary_unary( + "/grpc.health.v1.Health/Check", + request_serializer=health__pb2.HealthCheckRequest.SerializeToString, + response_deserializer=health__pb2.HealthCheckResponse.FromString, + ) + self.Watch = channel.unary_stream( + "/grpc.health.v1.Health/Watch", + request_serializer=health__pb2.HealthCheckRequest.SerializeToString, + response_deserializer=health__pb2.HealthCheckResponse.FromString, + ) + + +class HealthServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Check(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def Watch(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_HealthServicer_to_server(servicer, server): + rpc_method_handlers = { + "Check": grpc.unary_unary_rpc_method_handler( + servicer.Check, + request_deserializer=health__pb2.HealthCheckRequest.FromString, + response_serializer=health__pb2.HealthCheckResponse.SerializeToString, + ), + "Watch": grpc.unary_stream_rpc_method_handler( + servicer.Watch, + request_deserializer=health__pb2.HealthCheckRequest.FromString, + response_serializer=health__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "grpc.health.v1.Health", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class Health(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Check( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/grpc.health.v1.Health/Check", + health__pb2.HealthCheckRequest.SerializeToString, + health__pb2.HealthCheckResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def Watch( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/grpc.health.v1.Health/Watch", + health__pb2.HealthCheckRequest.SerializeToString, + health__pb2.HealthCheckResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/src/isolate/server/health_server.py b/src/isolate/server/health_server.py new file mode 100644 index 0000000..a8752e2 --- /dev/null +++ b/src/isolate/server/health_server.py @@ -0,0 +1,40 @@ +import asyncio +from dataclasses import dataclass +from typing import AsyncIterator + +from grpc.aio import ServicerContext + +from isolate.server import health + + +@dataclass +class HealthServicer(health.HealthServicer): + def __post_init__(self): + self._state = { + # Empty refers to the whole server + "": health.HealthCheckResponse.ServingStatus.SERVING, + "isolate": health.HealthCheckResponse.ServingStatus.SERVING, + } + + def _get_status( + self, service: str + ) -> health.HealthCheckResponse.ServingStatus.ValueType: + status = self._state.get( + service, + health.HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN, + ) + return status + + def Check( + self, request: health.HealthCheckRequest, context: ServicerContext + ) -> health.HealthCheckResponse: + return health.HealthCheckResponse(status=self._get_status(request.service)) + + async def Watch( + self, + request: health.HealthCheckRequest, + context: ServicerContext, + ) -> AsyncIterator[health.HealthCheckResponse]: + while True: + yield health.HealthCheckResponse(status=self._get_status(request.service)) + await asyncio.sleep(2) diff --git a/src/isolate/server/server.py b/src/isolate/server/server.py index 4782d9f..e9fce8d 100644 --- a/src/isolate/server/server.py +++ b/src/isolate/server/server.py @@ -27,7 +27,8 @@ from isolate.connections.grpc import AgentError, LocalPythonGRPC from isolate.connections.grpc.configuration import get_default_options from isolate.logs import Log, LogLevel, LogSource -from isolate.server import definitions +from isolate.server import definitions, health +from isolate.server.health_server import HealthServicer from isolate.server.interface import from_grpc, to_grpc # Whether to inherit all the packages from the current environment or not. @@ -292,6 +293,7 @@ def main() -> None: ) with BridgeManager() as bridge_manager: definitions.register_isolate(IsolateServicer(bridge_manager), server) + health.register_health(HealthServicer(), server) server.add_insecure_port(f"[::]:50001") print("Started listening at localhost:50001") diff --git a/tests/test_server.py b/tests/test_server.py index 021df63..bca1b5a 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -2,6 +2,7 @@ import textwrap from concurrent import futures from contextlib import contextmanager +from dataclasses import dataclass from functools import partial from pathlib import Path from typing import Any, List, Optional, cast @@ -12,7 +13,8 @@ from isolate.backends.settings import IsolateSettings from isolate.connections.grpc.configuration import get_default_options from isolate.logs import Log, LogLevel, LogSource -from isolate.server import definitions +from isolate.server import definitions, health +from isolate.server.health_server import HealthServicer from isolate.server.interface import from_grpc, to_grpc, to_serialized_object from isolate.server.server import BridgeManager, IsolateServicer @@ -27,6 +29,12 @@ def inherit_from_local(monkeypatch: Any, value: bool = True) -> None: monkeypatch.setattr("isolate.server.server.INHERIT_FROM_LOCAL", value) +@dataclass +class Stubs: + isolate_stub: definitions.IsolateStub + health_stub: health.HealthStub + + @contextmanager def make_server(tmp_path): server = grpc.server( @@ -35,24 +43,40 @@ def make_server(tmp_path): test_settings = IsolateSettings(cache_dir=tmp_path / "cache") with BridgeManager() as bridge: definitions.register_isolate(IsolateServicer(bridge, test_settings), server) + health.register_health(HealthServicer(), server) host, port = "localhost", server.add_insecure_port(f"[::]:0") server.start() try: - yield definitions.IsolateStub( + isolate_stub = definitions.IsolateStub( + grpc.insecure_channel( + f"{host}:{port}", + options=get_default_options(), + ) + ) + + health_stub = health.HealthStub( grpc.insecure_channel( f"{host}:{port}", options=get_default_options(), ) ) + + yield Stubs(isolate_stub=isolate_stub, health_stub=health_stub) finally: server.stop(None) @pytest.fixture def stub(tmp_path): - with make_server(tmp_path) as stub: - yield stub + with make_server(tmp_path) as stubs: + yield stubs.isolate_stub + + +@pytest.fixture +def health_stub(tmp_path): + with make_server(tmp_path) as stubs: + yield stubs.health_stub def define_environment(kind: str, **kwargs: Any) -> definitions.EnvironmentDefinition: @@ -476,13 +500,20 @@ def test_grpc_option_configuration(tmp_path, monkeypatch): ctx.setenv("ISOLATE_GRPC_CALL_MAX_RECEIVE_MESSAGE_LENGTH", "100") with pytest.raises(grpc.RpcError, match="Sent message larger than max"): - with make_server(tmp_path) as stub: - run_function(stub, take_buffer, b"0" * 200) + with make_server(tmp_path) as stubs: + run_function(stubs.isolate_stub, take_buffer, b"0" * 200) with monkeypatch.context() as ctx: ctx.setenv("ISOLATE_GRPC_CALL_MAX_SEND_MESSAGE_LENGTH", "5000") ctx.setenv("ISOLATE_GRPC_CALL_MAX_RECEIVE_MESSAGE_LENGTH", "5000") - with make_server(tmp_path) as stub: - result, _ = run_function(stub, take_buffer, b"0" * 200) + with make_server(tmp_path) as stubs: + result, _ = run_function(stubs.isolate_stub, take_buffer, b"0" * 200) assert result == b"0" * 200 + + +def test_health_check(health_stub: health.HealthStub) -> None: + resp: health.HealthCheckResponse = health_stub.Check( + health.HealthCheckRequest(service="") + ) + assert resp.status == health.HealthCheckResponse.SERVING