Skip to content

Commit

Permalink
sample additions, review comments fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
reebhub committed Jul 9, 2024
1 parent c1b1e17 commit 39a036d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The `Run` function receives the client-side code as a delegate that will process

| Member | Type | Description |
|--------|:-----|-------------|
| **Items** | `List<SubscriptionBatch&lt;T&gt;.Item>` | Batch's items list. |
| **Items** | `List<SubscriptionBatch<T>.Item>` | Batch's items list. |
| **NumberOfItemsInBatch** | `int` | Amount of items in the batch. |

| Method Signature | Return value | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Create a subscription worker using `get_subscription_worker` or `get_subscriptio
| Member | Type | Description |
|--------|:-----|-------------|
| **subscription_name** | `str` | Returns the subscription name passed to the constructor. This name will be used by the server side to identify the subscription. |
| **time_to_wait_before_connection_retry** | `DateTime` | Time to wait before reconnecting, in the case of non-aborting failure during the subscription processing.<br>Default: 5 seconds. |
| **time_to_wait_before_connection_retry** | `timedelta` | Time to wait before reconnecting, in the case of non-aborting failure during the subscription processing.<br>Default: 5 seconds. |
| **ignore_subscriber_errors** | `bool` | If `True`, will not abort subscription processing if client code, passed to the `run` function, throws an unhandled exception.<br>Default: `False` |
| **strategy** | `SubscriptionOpeningStrategy`<br>(enum) | Sets the way the server will treat current and/or other clients when they try to connect. See [Workers interplay](../../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#worker-interplay). <br>Default: `OPEN_IF_FREE`. |
| **max_docs_per_batch** | `int` | Max number of documents the server will try to send in a batch. If the server doesn't find as many documents as specified, it will not wait and send those it did find.<br>Default: 4096. |
Expand All @@ -59,7 +59,7 @@ Create a subscription worker using `get_subscription_worker` or `get_subscriptio
After [generating](../../../client-api/data-subscriptions/consumption/api-overview#subscription-worker-generation)
a subscription worker, the worker doesn't process any documents yet.
Processing starts when the `run` function is called.
The `run` function receives the client-side code as a delegate that will process the retrieved batches:
The `run` function receives the client-side code as a function that will process the retrieved batches.
{CODE:python subscriptionWorkerRunning@ClientApi\DataSubscriptions\DataSubscriptions.py /}

| Parameters | | |
Expand All @@ -73,9 +73,12 @@ The `run` function receives the client-side code as a delegate that will process

| Member | Type | Description |
|--------|:-----|-------------|
| **items** | `SubscriptionBatch[_T].item` array | Batch items list |
| **items** | `SubscriptionBatch[_T].Item` array | Batch items list |
| **number_of_items_in_batch** | `int` | Number of items in the batch |

{CODE:python number_of_items_in_batch_definition@ClientApi\DataSubscriptions\DataSubscriptions.py /}


{NOTE:Subscription Worker Connectivity}
As long as there is no exception, the worker will continue addressing the same
server that the first batch was received from.
Expand All @@ -86,18 +89,22 @@ The node that the worker succeeds connecting to, will inform the worker which
node is currently responsible for data subscriptions.
{NOTE/}

{CODE:python Item_definition@ClientApi\DataSubscriptions\DataSubscriptions.py /}
{CODE:python SubscriptionBatch_definition@ClientApi\DataSubscriptions\DataSubscriptions.py /}

| `SubscriptionBatch[_T].item` Member | Type | Description |
|--------|:-----|-------------|
| **result** | `T` | Current batch item |
| **exception_message** | `str` | Message of the exception thrown during current document processing in the server side |
| **id** | `str` | Current batch item's underlying document ID |
| **change_vector** | `str` | Current batch item's underlying document change vector of the current document |
| **raw_result** | | Current batch item before serialization to `T` |
| **raw_metadata** | | Current batch item's underlying document metadata |
| **metadata** | | Current batch item's underlying metadata values |
| **\_result** (Optional) | `_T_Item` | Current batch item |
| **\_exception_message** (Optional) | `str` | Message of the exception thrown during current document processing in the server side |
| **\_key** (Optional) | `str` | Current batch item underlying document ID |
| **\_change_vector** (Optional) | `str` | Current batch item underlying document change vector of the current document |
| **\_projection** (Optional) | `bool` | indicates whether the value id a projection |
| **raw_result** (Optional) | `Dict` | Current batch item before serialization to `T` |
| **raw_metadata** (Optional) | `Dict` | Current batch item underlying document metadata |
| **\_metadata** (Optional) | `MetadataAsDictionary` | Current batch item underlying metadata values |

{WARNING: }
Usage of `raw_result`, `raw_metadata`, and `metadata` values outside of the document processing delegate
Usage of `raw_result`, `raw_metadata`, and `_metadata` values outside of the document processing delegate
is not supported.
{WARNING/}

Expand All @@ -111,25 +118,24 @@ is not supported.

| Method | Return Type | Description |
|--------|:-----|-------------|
| `dispose` | `void` | Aborts subscription worker operation ungracefully by waiting for the task returned by the `run` function to finish running. |
| `dispose(bool waitForSubscriptionTask)` | `void` | Aborts the subscription worker, but allows deciding whether to wait for the `run` function task or not. |
| `run` | `Task` | Starts the subscription worker work of processing batches, receiving the batch processing delegates (see [above](../../../client-api/data-subscriptions/consumption/api-overview#running-subscription-worker)). |
| `close(bool wait_for_subscription_task = True)` | `void` | Aborts subscription worker operation ungracefully by waiting for the task returned by the `run` function to finish running. |
| `run` | `Future[None]` | Starts the subscription worker work of processing batches, receiving the batch processing delegates (see [above](../../../client-api/data-subscriptions/consumption/api-overview#running-subscription-worker)). |

---

### Events:

| Event | Type\Return type | Description |
|--------|:-----|-------------|
| **after\_acknowledgment** | `AfterAcknowledgmentAction` (event) | Event invoked after each time the server acknowledges batch processing progress. |
| **after\_acknowledgment** | `Callable[[SubscriptionBatch[_T]], None]` | Event invoked after each time the server acknowledges batch processing progress. |

| `AfterAcknowledgmentAction` Parameters | | |
| `after_acknowledgment` Parameters | | |
| ------------- | ------------- | ----- |
| **batch** | `SubscriptionBatch[_T]` | The batch process which was acknowledged |

| Return value | |
| ------------- | ----- |
| `Task` | The worker waits for the task to finish the event processing |
| `Future[None]` | The worker waits for the task to finish the event processing |

### Properties:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ A worker can be configured to treat the thrown exception in either of the follow
The task returned by the `run` function will be terminated with an erroneous state, throwing
a `SubscriberErrorException` exception.

* If `SubscriptionWorkerOptions`'s value `IgnoreSubscriberErrors` is set to True, the erroneous
* If `SubscriptionWorkerOptions`'s value `ignore_subscriber_errors` is set to True, the erroneous
batch will get acknowledged without retrying and the next batches will continue processing.
{INFO/}

{INFO: Reconnecting}
The above cases describe situations in which a worker tries to reconnect with the server.
There are two key `SubscriptionWorkerOptions` fields controlling this state:

* `TimeToWaitBeforeConnectionRetry` - Worker 'sleep' period before trying to reconnect.
* `MaxErroneousPeriod` - Maximum time that the worker is allowed to be in an erroneous state.
* `time_to_wait_before_connection_retry` - Worker 'sleep' period before trying to reconnect.
* `max_erroneous_period` - Maximum time that the worker is allowed to be in an erroneous state.
When this time elapses, the worker will stop trying to reconnect.
{INFO/}

{INFO: OnUnexpectedSubscriptionError}
`OnUnexpectedSubscriptionError` is the event raised when a connection failure occurs
{INFO: `on_unexpected_subscription_error`}
`on_unexpected_subscription_error` is the event raised when a connection failure occurs
between the subscription worker and the server and it throws an unexpected exception.
When this occurs, the worker will automatically try to reconnect again. This event is
useful for logging these unexpected exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,35 @@ def _transfer_order_callback(batch: SubscriptionBatch[Order]):
# endregion
"""

# region Item_definition
class Item(Generic[_T_Item]):
"""
Represents a single item in a subscription batch results.
This class should be used only inside the subscription's run delegate,
using it outside this scope might cause unexpected behavior.
"""
# endregion

# region number_of_items_in_batch_definition
def number_of_items_in_batch(self) -> int:
return 0 if self.items is None else len(self.items)
# endregion

# region SubscriptionBatch_definition
class SubscriptionBatch(Generic[_T]):

def __init__(self):
self._result: Optional[_T_Item] = None
self._exception_message: Optional[str] = None
self._key: Optional[str] = None
self._change_vector: Optional[str] = None
self._projection: Optional[bool] = None
self._revision: Optional[bool] = None
self.raw_result: Optional[Dict] = None
self.raw_metadata: Optional[Dict] = None
self._metadata: Optional[MetadataAsDictionary] = None
# endregion

# region get_subscriptions_1
def get_subscriptions(
self, start: int, take: int, database: Optional[str] = None
Expand Down

0 comments on commit 39a036d

Please sign in to comment.