Skip to content

Commit

Permalink
Release (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm authored Aug 15, 2023
2 parents 5ae9172 + f76b971 commit e82fc4a
Show file tree
Hide file tree
Showing 16 changed files with 600 additions and 7 deletions.
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test:
@echo Running Tox tests
@tox -e py

NITRIC_VERSION="v0.27.0"
NITRIC_VERSION="v0.32.0"

download:
@curl -L https://github.com/nitrictech/nitric/releases/download/${NITRIC_VERSION}/contracts.tgz -o contracts.tgz
Expand Down
2 changes: 2 additions & 0 deletions nitric/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from nitric.api.storage import Storage
from nitric.api.documents import Documents
from nitric.api.secrets import Secrets
from nitric.api.websocket import Websocket

__all__ = [
"Events",
Expand All @@ -33,4 +34,5 @@
"FailedTask",
"TopicRef",
"Secrets",
"Websocket",
]
28 changes: 28 additions & 0 deletions nitric/api/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Union
from grpclib.client import Channel
from grpclib import GRPCError
from nitric.exception import exception_from_grpc_error
from nitric.utils import new_default_channel
from nitric.proto.nitric.websocket.v1 import (
WebsocketServiceStub,
WebsocketSendRequest,
)


class Websocket(object):
"""Nitric generic Websocket client."""

def __init__(self):
"""Construct a Nitric Websocket Client."""
self._channel: Union[Channel, None] = new_default_channel()
# Had to make unprotected (publically accessible in order to use as part of bucket reference)
self.websocket_stub = WebsocketServiceStub(channel=self._channel)

async def send(self, socket: str, connection_id: str, data: bytes):
"""Send data to a connection on a socket."""
try:
await self.websocket_stub.send(
websocket_send_request=WebsocketSendRequest(socket=socket, connection_id=connection_id, data=data)
)
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err)
2 changes: 1 addition & 1 deletion nitric/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Nitric:
"secret": {},
"queue": {},
"collection": {},
"websocket": {},
}

@classmethod
Expand Down Expand Up @@ -95,7 +96,6 @@ def run(cls) -> None:
This will execute in an existing event loop if there is one, otherwise it will attempt to create its own.
"""
provider = cls._create_tracer()
print(cls._workers)
try:
try:
loop = asyncio.get_running_loop()
Expand Down
117 changes: 114 additions & 3 deletions nitric/faas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import functools
import json
import traceback
from typing import Dict, Generic, Protocol, Union, List, TypeVar, Any, Optional, Sequence
from typing import Dict, Generic, Literal, Protocol, Union, List, TypeVar, Any, Optional, Sequence
from opentelemetry import context, propagate

import betterproto
Expand All @@ -48,6 +48,9 @@
BucketNotificationConfig,
BucketNotificationType,
NotificationResponseContext,
WebsocketResponseContext,
WebsocketEvent,
WebsocketWorker,
)
import grpclib
import asyncio
Expand Down Expand Up @@ -101,6 +104,10 @@ def bucket_notification(self) -> Union[BucketNotificationContext, None]:
"""Return this context as a BucketNotificationContext if it is one, otherwise returns None."""
return None

def websocket(self) -> Union[WebsocketContext, None]:
"""Return this context as a WebsocketContext if it is one, otherwise returns None."""
return None


def _ctx_from_grpc_trigger_request(trigger_request: TriggerRequest, options: Optional[FaasClientOptions] = None):
"""Return a TriggerContext from a TriggerRequest."""
Expand All @@ -114,6 +121,8 @@ def _ctx_from_grpc_trigger_request(trigger_request: TriggerRequest, options: Opt
return FileNotificationContext.from_grpc_trigger_request_and_options(trigger_request, options)
else:
return BucketNotificationContext.from_grpc_trigger_request(trigger_request)
elif context_type == "websocket":
return WebsocketContext.from_grpc_trigger_request(trigger_request)
else:
print(f"Trigger with unknown context received, context type: {context_type}")
raise Exception(f"Unknown trigger context, type: {context_type}")
Expand Down Expand Up @@ -154,6 +163,10 @@ def _grpc_response_from_ctx(ctx: TriggerContext) -> TriggerResponse:
if bucket_context is not None:
return TriggerResponse(notification=NotificationResponseContext(success=bucket_context.res.success))

websocket_context = ctx.websocket()
if websocket_context is not None:
return TriggerResponse(websocket=WebsocketResponseContext(success=websocket_context.res.success))

raise Exception("Unknown Trigger Context type, unable to return valid response")


Expand Down Expand Up @@ -302,6 +315,54 @@ def from_grpc_trigger_request(trigger_request: TriggerRequest):
)


class WebsocketRequest(Request):
"""Represents an incoming websocket event."""

def __init__(
self, connection_id: str, data: bytes, query: Dict[str, str | List[str]], trace_context: Dict[str, str]
):
"""Construct a new WebsocketRequest."""
super().__init__(data, trace_context)

self.connection_id = connection_id
self.query = query


class WebsocketResponse(Response):
"""Represents a response to a websocket event."""

def __init__(self, success: bool = True):
"""Construct a new WebsocketResponse."""
self.success = success


class WebsocketContext(TriggerContext):
"""Represents the full request/response context for a Websocket based trigger."""

def __init__(self, request: WebsocketRequest, response: Optional[WebsocketResponse] = None):
"""Construct a new WebsocketContext."""
super().__init__()
self.req = request
self.res = response if response else WebsocketResponse()

def websocket(self) -> WebsocketContext:
"""Return this WebsocketContext, used when determining the context type of a trigger."""
return self

@staticmethod
def from_grpc_trigger_request(trigger_request: TriggerRequest) -> WebsocketContext:
"""Construct a new WebsocketContext from a Websocket trigger from the Nitric Membrane."""
query: Record = {k: v.value for (k, v) in trigger_request.websocket.query_params.items()}
return WebsocketContext(
request=WebsocketRequest(
data=trigger_request.data,
connection_id=trigger_request.websocket.connection_id,
query=query,
trace_context=trigger_request.trace_context.values,
)
)


class BucketNotificationRequest(Request):
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""

Expand Down Expand Up @@ -424,6 +485,26 @@ def _to_grpc_event_type(event_type: str) -> BucketNotificationType:
raise ValueError(f"Event type {event_type} is unsupported")


class WebsocketWorkerOptions:
"""Options for websocket workers."""

def __init__(self, socket_name: str, event_type: Literal["connect", "disconnect", "message"]):
"""Construct new websocket worker options."""
self.socket_name = socket_name
self.event_type = WebsocketWorkerOptions._to_grpc_event_type(event_type)

@staticmethod
def _to_grpc_event_type(event_type: Literal["connect", "disconnect", "message"]) -> WebsocketEvent:
if event_type == "connect":
return WebsocketEvent.Connect
elif event_type == "disconnect":
return WebsocketEvent.Disconnect
elif event_type == "message":
return WebsocketEvent.Message
else:
raise ValueError(f"Event type {event_type} is unsupported")


class FileNotificationWorkerOptions(BucketNotificationWorkerOptions):
"""Options for bucket notification workers with file references."""

Expand Down Expand Up @@ -499,13 +580,16 @@ class FaasWorkerOptions:
SubscriptionWorkerOptions,
BucketNotificationWorkerOptions,
FileNotificationWorkerOptions,
WebsocketWorkerOptions,
FaasWorkerOptions,
]

# class Context(Protocol):
# ...

C = TypeVar("C", TriggerContext, HttpContext, EventContext, FileNotificationContext, BucketNotificationContext)
C = TypeVar(
"C", TriggerContext, HttpContext, EventContext, FileNotificationContext, BucketNotificationContext, WebsocketContext
)


class Middleware(Protocol, Generic[C]):
Expand All @@ -528,11 +612,13 @@ async def __call__(self, ctx: C) -> C | None:
EventMiddleware = Middleware[EventContext]
BucketNotificationMiddleware = Middleware[BucketNotificationContext]
FileNotificationMiddleware = Middleware[FileNotificationContext]
WebsocketMiddleware = Middleware[WebsocketContext]

HttpHandler = Handler[HttpContext]
EventHandler = Handler[EventContext]
BucketNotificationHandler = Handler[BucketNotificationContext]
FileNotificationHandler = Handler[FileNotificationContext]
WebsocketHandler = Handler[WebsocketContext]


def _convert_to_middleware(handler: Handler[C] | Middleware[C]) -> Middleware[C]:
Expand Down Expand Up @@ -615,6 +701,7 @@ def __init__(self, opts: FaasClientOptions):
self.__bucket_notification_handler: Optional[
Union[BucketNotificationMiddleware, FileNotificationMiddleware]
] = None
self.__websocket_handler: Optional[WebsocketMiddleware] = None
self._opts = opts

def http(self, *handlers: HttpMiddleware | HttpHandler) -> FunctionServer:
Expand Down Expand Up @@ -646,9 +733,23 @@ def bucket_notification(
self.__bucket_notification_handler = compose_middleware(*handlers)
return self

def websocket(self, *handlers: WebsocketHandler | WebsocketMiddleware) -> FunctionServer:
"""
Register one or more Websocket Trigger Handlers or Middleware.
When multiple handlers are provided, they will be called in order.
"""
self.__websocket_handler = compose_middleware(*handlers)
return self

async def start(self):
"""Start the function server using the previously provided middleware."""
if not self._http_handler and not self._event_handler and not self.__bucket_notification_handler:
if (
not self._http_handler
and not self._event_handler
and not self.__bucket_notification_handler
and not self.__websocket_handler
):
raise Exception("At least one handler function must be provided.")

await self._run()
Expand All @@ -665,6 +766,10 @@ def _event_handler(self):
def _bucket_notification_handler(self):
return self.__bucket_notification_handler

@property
def _websocket_handler(self):
return self.__websocket_handler

async def _run(self):
"""Register a new FaaS worker with the Membrane, using the provided function as the handler."""
channel = new_default_channel()
Expand Down Expand Up @@ -697,6 +802,10 @@ async def _run(self):
init_request = InitRequest(
bucket_notification=BucketNotificationWorker(bucket=self._opts.bucket_name, config=config)
)
elif isinstance(self._opts, WebsocketWorkerOptions):
init_request = InitRequest(
websocket=WebsocketWorker(socket=self._opts.socket_name, event=self._opts.event_type)
)

# let the membrane server know we're ready to start
await request_channel.send(ClientMessage(init_request=init_request))
Expand All @@ -722,6 +831,8 @@ async def _run(self):
func = self._event_handler
elif ctx.bucket_notification():
func = self._bucket_notification_handler
elif ctx.websocket():
func = self._websocket_handler

assert func is not None

Expand Down
35 changes: 35 additions & 0 deletions nitric/proto/nitric/deploy/v1/__init__.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e82fc4a

Please sign in to comment.