Skip to content

Commit

Permalink
feat: add healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
keivanipchihagh committed Mar 19, 2024
1 parent 764a9b6 commit 01d0699
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 14 deletions.
7 changes: 5 additions & 2 deletions src/rpc/athena/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
##########################################################

import grpc
from grpc_health.v1 import health_pb2_grpc

# Third-party imports
from src.rpc.base_client import BaseClient
Expand All @@ -32,10 +33,12 @@ def __init__(
"""
super().__init__(host, port)

self.service = 'Athena'
self.channel = grpc.insecure_channel(self.target)
self.stub = athena_pb2_grpc.AthenaStub(self.channel)

self.is_server_ready() # Check server rediness
# Stubs
self.stub = athena_pb2_grpc.AthenaStub(self.channel)
self.health_stub = health_pb2_grpc.HealthStub(self.channel) # Healthcheck


def get_backtest(self) -> dict:
Expand Down
9 changes: 6 additions & 3 deletions src/rpc/athena/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import grpc

# Third-party imports
from src.rpc.base_server import BaseServer
from protos.athena import athena_pb2
from protos.athena import athena_pb2_grpc
from src.rpc.base_server import BaseServer


class GetBacktestService(athena_pb2_grpc.AthenaServicer):
Expand Down Expand Up @@ -43,6 +43,7 @@ def __init__(
host: str = 'localhost',
port: int = 50051,
n_workers: int = 1,
healthcheck_n_workers: int = 1,
) -> 'AthenaServer':
"""
AthenaServer Constructor.
Expand All @@ -51,8 +52,10 @@ def __init__(
- host (str): Server hostname. Defaults to `localhost`.
- port (int): Server port number. Defaults to `50051`.
- n_workers (int): Number of threads. Defaults to `1`.
- healthcheck_n_workers (int): Number of threads for healthchec. Defaults to `1`.
"""
super().__init__(host, port, n_workers)
super().__init__(host, port, n_workers, healthcheck_n_workers)

# Register servicers
# Register GetBacktest service
athena_pb2_grpc.add_AthenaServicer_to_server(GetBacktestService(), self.server)
self.set_service_to_serving("Athena.GetBacktest")
38 changes: 31 additions & 7 deletions src/rpc/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
##########################################################

import grpc

# Third-party imports
from src.utils import logger
from grpc import _channel
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc


class BaseClient(object):
Expand All @@ -31,12 +31,15 @@ def __init__(
self.port = port
self.target = f'{host}:{port}'

self.channel = None
self.service: str = None
self.sub = None
self.channel: _channel = None
self.health_stub: health_pb2_grpc.HealthStub = None


def is_server_ready(self, timeout: int = 1) -> bool:
def is_server_serving(self, timeout: int = 1) -> bool:
"""
Returns whether server is connected and ready.
Returns whether server is up and serving.
Parameters:
- timeout (int): Timeout for future request. Defaults to `1`.
Expand All @@ -48,7 +51,28 @@ def is_server_ready(self, timeout: int = 1) -> bool:
grpc.channel_ready_future(self.channel).result(timeout)
return True
except grpc.FutureTimeoutError:
logger.warning(f"Server not ready on {self.target}!")
return False
except Exception as ex:
raise ex


def is_service_serving(self, rpc: str) -> bool:
"""
This method sends a HealthCheckRequest to the health check service
to determine the health status of the specified service. It returns
True if the service is serving requests, and False if it's not serving.
Parameters:
- rpc (str): RPC name of the service.
Returns:
- bool: True if the service is healthy (serving requests), False otherwise.
"""
service_rpc = f'{self.service}.{rpc}'
request = health_pb2.HealthCheckRequest(service=service_rpc)
response = self.health_stub.Check(request)

return {
health_pb2.HealthCheckResponse.SERVING: True, # Healthy
health_pb2.HealthCheckResponse.NOT_SERVING: False # Not healthy
}[response.status]
66 changes: 64 additions & 2 deletions src/rpc/base_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import grpc
from concurrent import futures
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc

# Third-party imports
from src.utils import logger
Expand All @@ -21,6 +24,7 @@ def __init__(
host: str = 'localhost',
port: int = 50051,
n_workers: int = 1,
healthcheck_n_workers: int = 1,
) -> 'BaseServer':
"""
BaseServer Constructor.
Expand All @@ -29,16 +33,20 @@ def __init__(
- host (str): Server hostname. Defaults to `localhost`.
- port (int): Server port number. Defaults to `50051`.
- n_workers (int): Number of threads. Defaults to `1`.
- healthcheck_n_workers (int): Number of threads for healthchec. Defaults to `1`.
"""
self.host = host
self.port = port
self.n_workers = n_workers
self.target = f'{host}:{port}'

# Server
self.thread_pool = futures.ThreadPoolExecutor(n_workers)
self.server = grpc.server(self.thread_pool)
self.server.add_insecure_port(self.target)

# Healthcheck service
self.health_servicer: health.HealthServicer
self._configure_health_server(healthcheck_n_workers)


def start(self) -> None:
"""
Expand All @@ -47,3 +55,57 @@ def start(self) -> None:
logger.debug(f"Listening on '{self.target}'")
self.server.start()
self.server.wait_for_termination()


def stop(self) -> None:
"""
Stops the server permanently.
"""
self.health_servicer.enter_graceful_shutdown() # Mark services as `NOT_SERVING`
self.server.stop()


def _configure_health_server(self, n_threads: int = 1):
"""
Configures creates a HealthServicer instance with experimental non-blocking
behavior and a thread pool executor with a maximum of 10 workers. It then adds
the HealthServicer instance to the gRPC server. Additionally, it starts a daemon
thread to toggle the health status of the service periodically.
Parameters:
- n_threads (int): Number of thread workers. Defaults to `1`.
Returns:
- None
"""
self.health_servicer = health.HealthServicer(
experimental_non_blocking = True,
experimental_thread_pool = futures.ThreadPoolExecutor(max_workers=n_threads),
)
health_pb2_grpc.add_HealthServicer_to_server(self.health_servicer, self.server)


def set_service_to_serving(self, service: str) -> None:
"""
This method updates the health status of the specified service to `SERVING`.
Parameters:
- service (str): The name of the service whose health status will be set to SERVING.
Returns:
- None
"""
self.health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)


def set_service_to_not_serving(self, service: str) -> None:
"""
This method updates the health status of the specified service to `NOT_SERVING`.
Parameters:
- service (str): The name of the service whose health status will be set to SERVING.
Returns:
- None
"""
self.health_servicer.set(service, health_pb2.HealthCheckResponse.NOT_SERVING)

0 comments on commit 01d0699

Please sign in to comment.