Skip to content

Commit

Permalink
Merge pull request #74 from fal-ai/batuhan/fea-836-expose-grpc-option…
Browse files Browse the repository at this point in the history
…s-for-internal-isolate

feat: support dynamic grpc reconfiguration on all gRPC channels
  • Loading branch information
isidentical authored Jan 31, 2023
2 parents c0c9e29 + 57198f7 commit 293a42c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 6 deletions.
7 changes: 6 additions & 1 deletion src/isolate/connections/grpc/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from isolate.connections._local import PythonExecutionBase, agent_startup
from isolate.connections.common import serialize_object
from isolate.connections.grpc import agent, definitions
from isolate.connections.grpc.configuration import get_default_options
from isolate.connections.grpc.interface import from_grpc, to_grpc
from isolate.logs import Log, LogLevel, LogSource

Expand All @@ -38,7 +39,11 @@ def _establish_bridge(
max_wait_timeout: float = 10.0,
) -> Iterator[definitions.AgentStub]:
with self.start_agent() as (address, credentials):
with grpc.secure_channel(address, credentials) as channel:
with grpc.secure_channel(
address,
credentials,
options=get_default_options(),
) as channel:
channel_status = grpc.channel_ready_future(channel)
try:
channel_status.result(timeout=max_wait_timeout)
Expand Down
2 changes: 2 additions & 0 deletions src/isolate/connections/grpc/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from isolate.backends.common import sha256_digest_of
from isolate.connections.common import SerializationError, serialize_object
from isolate.connections.grpc import definitions
from isolate.connections.grpc.configuration import get_default_options
from isolate.connections.grpc.interface import from_grpc, to_grpc
from isolate.logs import Log, LogLevel, LogSource

Expand Down Expand Up @@ -197,6 +198,7 @@ def create_server(address: str) -> grpc.Server:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
maximum_concurrent_rpcs=1,
options=get_default_options(),
)

# Local server credentials allow us to ensure that the
Expand Down
23 changes: 23 additions & 0 deletions src/isolate/connections/grpc/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import ast
import os

_GRPC_OPTION_PREFIX = "ISOLATE_GRPC_CALL_"


def get_default_options():
"""Return the default list of GRPC call options (both for
server and client) which are set via environment variables.
Each environment variable starting with `ISOLATE_GRPC_CALL_`
will be converted to a GRPC option. The name of the option
will be the name of the environment variable, with the
`ISOLATE_GRPC_CALL_` prefix removed and converted to lowercase.
"""

options = []
for raw_key, raw_value in os.environ.items():
if raw_key.startswith(_GRPC_OPTION_PREFIX):
field = raw_key[len(_GRPC_OPTION_PREFIX) :].lower()
value = ast.literal_eval(raw_value)
options.append((f"grpc.{field}", value))
return options
6 changes: 5 additions & 1 deletion src/isolate/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from isolate.backends.local import LocalPythonEnvironment
from isolate.backends.virtualenv import VirtualPythonEnvironment
from isolate.connections.grpc import AgentError, LocalPythonGRPC
from isolate.connections.grpc.configuration import get_default_options
from isolate.logs import Log, LogLevel, LogSource
from isolate.server import definitions
from isolate.server.interface import from_grpc, to_grpc
Expand Down Expand Up @@ -288,7 +289,10 @@ def _add_log_to_queue(messages: Queue, log: Log) -> None:


def main() -> None:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_THREADS))
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=MAX_THREADS),
options=get_default_options(),
)
with BridgeManager() as bridge_manager:
definitions.register_isolate(IsolateServicer(bridge_manager), server)

Expand Down
46 changes: 42 additions & 4 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import textwrap
from concurrent import futures
from contextlib import contextmanager
from functools import partial
from pathlib import Path
from typing import Any, List, Optional, cast
Expand All @@ -9,6 +10,7 @@
import pytest

from isolate.backends.settings import IsolateSettings
from isolate.connections.grpc.configuration import get_default_options
from isolate.logs import Log, LogLevel, LogSource
from isolate.server import definitions
from isolate.server.interface import from_grpc, to_grpc, to_serialized_object
Expand All @@ -25,21 +27,34 @@ def inherit_from_local(monkeypatch: Any, value: bool = True) -> None:
monkeypatch.setattr("isolate.server.server.INHERIT_FROM_LOCAL", value)


@pytest.fixture
def stub(tmp_path):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
@contextmanager
def make_server(tmp_path):
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1), options=get_default_options()
)
test_settings = IsolateSettings(cache_dir=tmp_path / "cache")
with BridgeManager() as bridge:
definitions.register_isolate(IsolateServicer(bridge, test_settings), server)
host, port = "localhost", server.add_insecure_port(f"[::]:0")
server.start()

try:
yield definitions.IsolateStub(grpc.insecure_channel(f"{host}:{port}"))
yield definitions.IsolateStub(
grpc.insecure_channel(
f"{host}:{port}",
options=get_default_options(),
)
)
finally:
server.stop(None)


@pytest.fixture
def stub(tmp_path):
with make_server(tmp_path) as stub:
yield stub


def define_environment(kind: str, **kwargs: Any) -> definitions.EnvironmentDefinition:
struct = definitions.Struct()
struct.update(kwargs)
Expand Down Expand Up @@ -448,3 +463,26 @@ def test_receive_complete_logs(
result, logs = run_function(stub, print_logs_no_delay, num_lines, should_flush)
assert result == num_lines
assert logs == [str(i) for i in range(num_lines)]


def take_buffer(buffer):
return buffer


def test_grpc_option_configuration(tmp_path, monkeypatch):
inherit_from_local(monkeypatch)
with monkeypatch.context() as ctx:
ctx.setenv("ISOLATE_GRPC_CALL_MAX_SEND_MESSAGE_LENGTH", "100")
ctx.setenv("ISOLATE_GRPC_CALL_MAX_RECEIVE_MESSAGE_LENGTH", "100")

with pytest.raises(grpc.RpcError, match="Sent message larger than max"):
with make_server(tmp_path) as stub:
run_function(stub, take_buffer, b"0" * 200)

with monkeypatch.context() as ctx:
ctx.setenv("ISOLATE_GRPC_CALL_MAX_SEND_MESSAGE_LENGTH", "5000")
ctx.setenv("ISOLATE_GRPC_CALL_MAX_RECEIVE_MESSAGE_LENGTH", "5000")

with make_server(tmp_path) as stub:
result, _ = run_function(stub, take_buffer, b"0" * 200)
assert result == b"0" * 200

0 comments on commit 293a42c

Please sign in to comment.