-
Notifications
You must be signed in to change notification settings - Fork 462
feat: replace Synchronous HTTP Calls with Asynchronous in OPA plugin #2011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: replace Synchronous HTTP Calls with Asynchronous in OPA plugin #2011
Conversation
| response: Returns a response that's recieved from the OPA server. Raises error in cases the communication is not successful. | ||
| """ | ||
| match = re.match(r"(\d+)s", self.opa_config.opa_client_timeout.strip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that could be done once in the init function of the Plugin? This would save processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.,
# Parse once in __init__:
def __init__(self, config: PluginConfig):
...
# Parse timeout once
match = re.match(r"(\d+)s", self.opa_config.opa_client_timeout.strip())
self._timeout_seconds = float(match.group(1)) if match else 30.0There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's a good point. I will make that change
araujof
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Shriti, please look into the recommendations we made.
| try: | ||
| rsp = requests.post(url, json=payload) | ||
| logger.info(f"OPA connection response '{rsp}'") | ||
| async with httpx.AsyncClient() as client: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new httpx.AsyncClient() is created and destroyed for every OPA policy evaluation. This means:
- Connection establishment overhead on every request
- No connection reuse/pooling
- TCP handshake + TLS negotiation repeated for each policy check
- Under high load with many plugin invocations, this becomes a severe bottleneck
Recommendation:
# In __init__:
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(self._timeout_seconds),
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
http2=True # Optional: enable HTTP/2 for better performance
)
# Add cleanup method:
async def cleanup(self):
await self._http_client.aclose()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe set a default (those are max numbers) and allows for configuration, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| response: Returns a response that's recieved from the OPA server. Raises error in cases the communication is not successful. | ||
| """ | ||
| match = re.match(r"(\d+)s", self.opa_config.opa_client_timeout.strip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.,
# Parse once in __init__:
def __init__(self, config: PluginConfig):
...
# Parse timeout once
match = re.match(r"(\d+)s", self.opa_config.opa_client_timeout.strip())
self._timeout_seconds = float(match.group(1)) if match else 30.0| logger.warning(f"Retry attempt to connect to OPA server {attempt + 1}") | ||
| if attempt == max_retries - 1: | ||
| raise | ||
| await asyncio.sleep(2**attempt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await asyncio.sleep(2**attempt) grows exponentially without limit:
- Attempt 0: 1 second
- Attempt 1: 2 seconds
- Attempt 2: 4 seconds
- Attempt 3: 8 seconds (if max_retries increased)
This could cause excessive delays if max_retries is increased.
Recommendation:
# Add jitter and cap max delay
import random
delay = min(2**attempt, 10) + random.uniform(0, 1)
await asyncio.sleep(delay)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense. Will make that change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Nice job on this PR. As noted in the inlined comments, a few suggestions:
# In __init__:
self.http_client = httpx.AsyncClient(timeout=httpx.Timeout(30.0))
# In _evaluate_opa_policy:
rsp = await self._post_with_retry(client=self.http_client, url=url, payload=payload, ...)
# Add cleanup method:
async def cleanup(self):
await self.http_client.aclose()This would improve performance by avoiding connection establishment overhead on each OPA request.
match = re.match(r"(\d+)s", self.opa_config.opa_client_timeout.strip())Could be compiled once in init: # In __init__:
self._timeout_pattern = re.compile(r"(\d+)s")
# In _post_with_retry:
match = self._timeout_pattern.match(self.opa_config.opa_client_timeout.strip()) |
araujof
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…to opa Signed-off-by: Shriti Priya <[email protected]>
Signed-off-by: Shriti Priya <[email protected]>
Signed-off-by: Shriti Priya <[email protected]>
Signed-off-by: Shriti Priya <[email protected]>
Signed-off-by: Shriti Priya <[email protected]>
Signed-off-by: Shriti Priya <[email protected]>
- Rename cleanup() to shutdown() to match plugin framework interface - Fix config key from opa_client_max_retries to opa_client_retries The plugin framework calls shutdown() for cleanup, not cleanup(). The config.yaml key must match the schema field name. Signed-off-by: Mihai Criveti <[email protected]>
d9ad3d4 to
ace14e3
Compare
- Pass keepalive_expiry to httpx.Limits (was parsed but unused) - Add Pydantic Field validation for retries >= 1 (prevents 0/negative) - Add validation for max_keepalive and max_connections >= 1 Addresses review feedback about config knobs having no effect and retries=0 causing silent failures. Signed-off-by: Mihai Criveti <[email protected]>
- Add httpx>=0.28.0 to pyproject.toml dependencies (was missing) - Clarify retries field: 1=single attempt, 3=up to 3 attempts The httpx library is now explicitly declared since the plugin switched from requests to httpx for async HTTP calls. Signed-off-by: Mihai Criveti <[email protected]>
📝 This PR addresses the following:
Fix #1931
Previous implementation of OPA plugin had the following issue:
When the OPA plugin is enabled and evaluates policies, each policy check blocks the entire async worker until the HTTP request completes, which caused latency across all concurrent requests, etc.
Fix:
Key improvements:
config.yaml