Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flagd-rpc)!: add events for rpc mode, some breaking config fixes #108

Merged
merged 10 commits into from
Nov 28, 2024
83 changes: 76 additions & 7 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ pip install openfeature-provider-flagd

## Configuration and Usage

The flagd provider can operate in two modes: [RPC](#remote-resolver-rpc) (evaluation takes place in flagd, via gRPC calls) or [in-process](#in-process-resolver) (evaluation takes place in-process, with the provider getting a ruleset from a compliant sync-source).

### Remote resolver (RPC)

This is the default mode of operation of the provider.
In this mode, `FlagdProvider` communicates with [flagd](https://github.com/open-feature/flagd) via the gRPC protocol.
Flag evaluations take place remotely at the connected flagd instance.

Instantiate a new FlagdProvider instance and configure the OpenFeature SDK to use it:

```python
Expand All @@ -19,7 +27,9 @@ from openfeature.contrib.provider.flagd import FlagdProvider
api.set_provider(FlagdProvider())
```

To use in-process evaluation in offline mode with a file as source:
### In-process resolver

This mode performs flag evaluations locally (in-process).

```python
from openfeature import api
Expand All @@ -36,12 +46,71 @@ api.set_provider(FlagdProvider(

The default options can be defined in the FlagdProvider constructor.

| Option name | Type & Values | Default |
|----------------|---------------|-----------|
| host | str | localhost |
| port | int | 8013 |
| schema | str | http |
| timeout | int | 2 |
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- |
| resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | |
| host | FLAGD_HOST | str | localhost | rpc & in-process |
| port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| tls | FLAGD_TLS | bool | false | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process |
| stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | str | null | in-process |
| cache_type | FLAGD_CACHE | enum - `lru`, `disabled` | lru | rpc |
| max_cache_size | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
| retry_backoff_ms | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | null | in-process |

<!-- not implemented
| target_uri | FLAGD_TARGET_URI | alternative to host/port, supporting custom name resolution | string | null | rpc & in-process |
| socket_path | FLAGD_SOCKET_PATH | alternative to host port, unix socket | String | null | rpc & in-process |
| cert_path | FLAGD_SERVER_CERT_PATH | tls cert path | String | null | rpc & in-process |
| max_event_stream_retries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| context_enricher | - | sync-metadata to evaluation context mapping function | function | identity function | in-process |
| offline_pollIntervalMs | FLAGD_OFFLINE_POLL_MS | poll interval for reading offlineFlagSourcePath | int | 5000 | in-process |
-->

> [!NOTE]
> Some configurations are only applicable for RPC resolver.

<!--
### Unix socket support
Unix socket communication with flagd is facilitated by usaging of the linux-native `epoll` library on `linux-x86_64`
only (ARM support is pending the release of `netty-transport-native-epoll` v5).
Unix sockets are not supported on other platforms or architectures.
-->

### Reconnection

Reconnection is supported by the underlying gRPC connections.
If the connection to flagd is lost, it will reconnect automatically.
A failure to connect will result in an [error event](https://openfeature.dev/docs/reference/concepts/events#provider_error) from the provider, though it will attempt to reconnect indefinitely.

### Deadlines

Deadlines are used to define how long the provider waits to complete initialization or flag evaluations.
They behave differently based on the resolver type.

#### Deadlines with Remote resolver (RPC)

If the remote evaluation call is not completed within this deadline, the gRPC call is terminated with the error `DEADLINE_EXCEEDED`
and the evaluation will default.

### TLS

TLS is available in situations where flagd is running on another host.

<!--
You may optionally supply an X.509 certificate in PEM format. Otherwise, the default certificate store will be used.
```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.host("myflagdhost")
.tls(true) // use TLS
.certPath("etc/cert/ca.crt") // PEM cert
.build());
```
-->

## License

Expand Down
2 changes: 1 addition & 1 deletion providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ classifiers = [
keywords = []
dependencies = [
"openfeature-sdk>=0.6.0",
"grpcio>=1.60.0",
"grpcio>=1.68.0",
"protobuf>=4.25.2",
"mmh3>=4.1.0",
"panzi-json-logic>=1.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@
import typing
from enum import Enum


class ResolverType(Enum):
RPC = "rpc"
IN_PROCESS = "in-process"


DEFAULT_DEADLINE = 500
DEFAULT_HOST = "localhost"
DEFAULT_KEEP_ALIVE = 0
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
DEFAULT_PORT_IN_PROCESS = 8015
DEFAULT_PORT_RPC = 8013
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
DEFAULT_RETRY_BACKOFF = 1000
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False

ENV_VAR_DEADLINE_MS = "FLAGD_DEADLINE_MS"
ENV_VAR_HOST = "FLAGD_HOST"
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
ENV_VAR_PORT = "FLAGD_PORT"
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER_TYPE"
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"

T = typing.TypeVar("T")


Expand All @@ -18,42 +45,83 @@ def env_or_default(
return val if cast is None else cast(val)


class ResolverType(Enum):
GRPC = "grpc"
IN_PROCESS = "in-process"


class Config:
def __init__( # noqa: PLR0913
self,
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
timeout: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
retry_backoff_ms: typing.Optional[int] = None,
deadline: typing.Optional[int] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
):
self.host = env_or_default("FLAGD_HOST", "localhost") if host is None else host
self.port = (
env_or_default("FLAGD_PORT", 8013, cast=int) if port is None else port
)
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host

self.tls = (
env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls
env_or_default(ENV_VAR_TLS, DEFAULT_TLS, cast=str_to_bool)
if tls is None
else tls
)
self.timeout = 5 if timeout is None else timeout

self.retry_backoff_ms: int = (
int(
env_or_default(
ENV_VAR_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF, cast=int
)
)
if retry_backoff_ms is None
else retry_backoff_ms
)

self.resolver_type = (
ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc"))
ResolverType(env_or_default(ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE))
if resolver_type is None
else resolver_type
)

default_port = (
DEFAULT_PORT_RPC
if self.resolver_type is ResolverType.RPC
else DEFAULT_PORT_IN_PROCESS
)

self.port: int = (
int(env_or_default(ENV_VAR_PORT, default_port, cast=int))
if port is None
else port
)

self.offline_flag_source_path = (
env_or_default("FLAGD_OFFLINE_FLAG_SOURCE_PATH", None)
env_or_default(
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, DEFAULT_OFFLINE_SOURCE_PATH
)
if offline_flag_source_path is None
else offline_flag_source_path
)
self.offline_poll_interval_seconds = (
float(env_or_default("FLAGD_OFFLINE_POLL_INTERVAL_SECONDS", 1.0))
if offline_poll_interval_seconds is None
else offline_poll_interval_seconds

self.deadline: int = (
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
if deadline is None
else deadline
)

self.stream_deadline_ms: int = (
int(
env_or_default(
ENV_VAR_STREAM_DEADLINE_MS, DEFAULT_STREAM_DEADLINE, cast=int
)
)
if stream_deadline_ms is None
else stream_deadline_ms
)

self.keep_alive_time: int = (
int(
env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int)
)
if keep_alive_time is None
else keep_alive_time
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# provider.initialise(schema="https",endpoint="example.com",port=1234,timeout=10)
"""

import logging
import typing

from openfeature.evaluation_context import EvaluationContext
Expand All @@ -42,41 +43,66 @@ def __init__( # noqa: PLR0913
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
deadline: typing.Optional[int] = None,
timeout: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
):
"""
Create an instance of the FlagdProvider

:param host: the host to make requests to
:param port: the port the flagd service is available on
:param tls: enable/disable secure TLS connectivity
:param timeout: the maximum to wait before a request times out
:param deadline: the maximum to wait before a request times out
:param timeout: the maximum time to wait before a request times out
:param retry_backoff_ms: the number of milliseconds to backoff
:param offline_flag_source_path: the path to the flag source file
:param stream_deadline_ms: the maximum time to wait before a request times out
:param keep_alive_time: the number of milliseconds to keep alive
:param resolver_type: the type of resolver to use
"""
if deadline is None and timeout is not None:
deadline = timeout * 1000
logging.warn(
"'timeout' property is deprecated, please use 'deadline' instead, be aware that 'deadline' is in milliseconds"
)

self.config = Config(
host=host,
port=port,
tls=tls,
timeout=timeout,
deadline=deadline,
retry_backoff_ms=retry_backoff_ms,
resolver_type=resolver_type,
offline_flag_source_path=offline_flag_source_path,
offline_poll_interval_seconds=offline_poll_interval_seconds,
stream_deadline_ms=stream_deadline_ms,
keep_alive_time=keep_alive_time,
)

self.resolver = self.setup_resolver()

def setup_resolver(self) -> AbstractResolver:
if self.config.resolver_type == ResolverType.GRPC:
return GrpcResolver(self.config)
if self.config.resolver_type == ResolverType.RPC:
return GrpcResolver(
self.config,
self.emit_provider_ready,
self.emit_provider_error,
self.emit_provider_configuration_changed,
)
elif self.config.resolver_type == ResolverType.IN_PROCESS:
return InProcessResolver(self.config, self)
else:
raise ValueError(
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.resolver.initialize(evaluation_context)

def shutdown(self) -> None:
if self.resolver:
self.resolver.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@


class AbstractResolver(typing.Protocol):
def initialize(self, evaluation_context: EvaluationContext) -> None: ...

def shutdown(self) -> None: ...

def resolve_boolean_details(
Expand Down
Loading