Skip to content

Commit

Permalink
Expanded context functionality into its own package, applied linter s…
Browse files Browse the repository at this point in the history
…uggestions, made some of the logging interfaces look more like python's vanilla logging, got shared redis communicators working, expanded readmes, fixed some GUI issues for the evaluation service, enhanced error tracking for the evaluation service's runner, abstracted out object creation functions to make it easier to slide in new functionality,
  • Loading branch information
christophertubbs committed Jun 7, 2024
1 parent 900f868 commit c1d52b2
Show file tree
Hide file tree
Showing 38 changed files with 3,288 additions and 885 deletions.
12 changes: 2 additions & 10 deletions python/gui/maas_experiment/settings.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
"""
Django settings for maas_experiment project.
Generated by 'django-admin startproject' using Django 2.2.5.
For more information on this file, see
https://docs.djangoproject.com/en/2.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/2.2/ref/settings/
Django settings for maas_experiment project
"""

from .application_values import *
Expand All @@ -20,7 +12,7 @@
# See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.environ.get("SECRET_KEY",'cm_v*vc*8s048%f46*@t7)hb9rtaa@%)#b!s(+$4+iw^tjt=s6')
SECRET_KEY = os.environ.get("SECRET_KEY", 'cm_v*vc*8s048%f46*@t7)hb9rtaa@%)#b!s(+$4+iw^tjt=s6')

# Must be set in production!
ALLOWED_HOSTS = ['*']
Expand Down
166 changes: 166 additions & 0 deletions python/lib/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,169 @@
Python package for core DMOD types, both concrete and abstract, that are depended upon by other DMOD Python packages and themselves have no dependencies outside of Python and its standard library.

Classes belong here if placing them in a more specialized package would cause undesired consequences, such as circular dependencies or transitive dependency on otherwise unnecessary packages.

## `common`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common` package

### `collection`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.collection` module

### `failure`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.failure` module

### `helper_functions`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.helper_functions` module

### `protocols`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.protocols` module

### `reader`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.reader` module

### `tasks`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.tasks` module

### `types`

<b style="color: red">TODO:</b> Write information about the `dmod.core.common.types` module

## `decorators`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators` package

### `decorator_constants`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators.decorator_constants` module

### `decorator_functions`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators.decorator_functions` module

### `message_handlers`

<b style="color: red">TODO:</b> Write information about the `dmod.core.decorators.message_handlers` module

## `context`

The `dmod.core.context` module provides the functionality needed to create automatic proxies for remote objects,
provide a DMOD specific multiprocessed object manager, and a custom implementation of the object manager's server to
overcome issues with the base functionality as of python 3.8. If
`dmod.core.context.DMODObjectManager.register_class(NewClass)` is called after its definition, a proxy for it will be
defined dynamically and a proxy for that type (`NewClass` in this example) may be constructed through code such as:

```python
from dmod.core import context

with context.get_object_manager() as manager:
class_instance = manager.create_object('NewClass', 'one', 2, other_parameter=9)
```

where <code style="color: green">'NewClass'</code> is the name of the desired class and
<code style="color: green">'one'</code>, <code style="color: blue">2</code>, and <code>other_parameter</code>
are the parameters for `NewClass`'s constructor.

Scopes for the manager may be created to track objects that are passed from one process to another. If a
proxy is instantiated within a called function, passed to a new process, and the function returns, the
`decref` function on the server will be called before the `incref` function is called and lead to the
destruction of the object before it may be used. Creating the object through a scope may keep the object
alive and assigning the process to it will allow the object manager to destroy its objects when the process
completes.

For example:

```python
from dmod.core import context
from concurrent import futures

def do_something(new_class: NewClass):
...

def start_process(manager: context.DMODObjectManager, pool: futures.ProcessPoolExecutor):
scope = manager.establish_scope("example")
example_object = scope.create_object('NewClass', 'one', 2, other_parameter=9)
task = pool.submit(do_something, example_object)

# The scope and everything with it will be deleted when `task.done()`
manager.monitor_operation(scope, task)

# Tell the object manager to monitor scopes when creating it
with futures.ProcessPoolExecutor() as pool, context.get_object_manager(monitor_scope=True) as manager:
start_process(manager, pool)
```

### <span style="color: red">Common Errors</span>

#### Remote Error in `Server.incref`

Sometimes you might encounter an error that reads like:

```shell
Traceback (most recent call last):
File "/path/to/python/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/path/to/python/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()

File "/path/to/python/3.8/lib/python3.8/multiprocessing/queues.py", line 358, in get
return _ForkingPickler.loads(res)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 959, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 809, in __init__
self._incref()
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 864, in _incref
dispatch(conn, None, 'incref', (self._id,))
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 91, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 210, in handle_request
result = func(c, *args, **kwds)
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 456, in incref
raise ke
File "/path/to/python/3.8/lib/python3.8/multiprocessing/managers.py", line 443, in incref
self.id_to_refcount[ident] += 1
KeyError: '3067171c0'
```

This sort of error occurs when the an instantiated object has fallen out of scope _before_ another process has had
a chance to use it. The Server (in this case the `dmod.core.context.DMODObjectServer`) that the manager (in this case
the `dmod.core.context.DMODObjectManager`) keeps track of objects via reference counters. When a proxy is created, the
real object is created on the instantiated server and its reference count increases. When the created proxy leaves
scope, that reference count decreases. When that number reaches 0, the real object that the proxy refers to is
removed. If a proxy is created in the scope of one function and passed to another process, the reference count will
be decremented when that function exits unless the proxy is created within a scope that does not end when the
function does.

## `dataset`

<b style="color: red">TODO:</b> Write information about the `dmod.core.dataset` module

## `enum`

<b style="color: red">TODO:</b> Write information about the `dmod.core.enum` module

## `exception`

<b style="color: red">TODO:</b> Write information about the `dmod.core.exception` module

## `execution`

<b style="color: red">TODO:</b> Write information about the `dmod.core.execution` module

## `meta_data`

<b style="color: red">TODO:</b> Write information about the `dmod.core.meta_data` module

## `serializable`

<b style="color: red">TODO:</b> Write information about the `dmod.core.serializable` module
2 changes: 1 addition & 1 deletion python/lib/core/dmod/core/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.17.0'
__version__ = '0.18.0'
4 changes: 4 additions & 0 deletions python/lib/core/dmod/core/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

from .failure import Failure

from .helper_functions import get_current_function_name
from .helper_functions import is_sequence_type
from .helper_functions import is_iterable_type
Expand All @@ -21,9 +22,12 @@
from .helper_functions import humanize_text
from .helper_functions import generate_identifier
from .helper_functions import generate_key
from .helper_functions import format_stack_trace

from .tasks import wait_on_task
from .tasks import cancel_task
from .tasks import cancel_tasks

from .collection import Bag
from .protocols import DBAPIConnection
from .protocols import DBAPICursor
Expand Down
162 changes: 162 additions & 0 deletions python/lib/core/dmod/core/common/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import enum
import inspect
import typing
import uuid
from datetime import datetime
from datetime import timedelta

import pydantic
from pydantic import PrivateAttr
Expand Down Expand Up @@ -235,6 +238,165 @@ def __contains__(self, obj: object) -> bool:
return obj in self.__data


class _OccurrenceTracker(typing.Generic[_T]):
"""
Keeps track of occurrences of a type of value that have been encountered within a duration
"""
def __init__(self, key: _T, duration: timedelta, threshold: int, on_filled: typing.Callable[[_T], typing.Any]):
self.__key = key
self.__duration = duration
self.__threshold = threshold
self.__occurences: typing.List[datetime] = []
self.__on_filled = on_filled

def value_encountered(self):
"""
Inform the tracker that the value has been encountered again
"""
self.update_occurrences()
self.__occurences.append(datetime.now())
if len(self.__occurences) >= self.__threshold:
self.__on_filled(self.__key)

def update_occurrences(self) -> int:
"""
Update the list of occurrences to include only those within the current duration
Returns:
The number of occurrences still being tracked
"""
cutoff: datetime = datetime.now() - self.__duration
self.__occurences = sorted([
occurrence
for occurrence in self.__occurences
if occurrence > cutoff
])
return len(self.__occurences)

@property
def key(self):
"""
The identifier that is being tracked
"""
return self.__key

def __len__(self):
return len(self.__occurences)

def __str__(self):
if len(self.__occurences) == 0:
occurrences_details = f"No Occurences within the last {self.__duration.total_seconds()} seconds."
else:
occurrences_details = (f"{len(self.__occurences)} occurrences since "
f"{self.__occurences[0].strftime('%Y-%m-%d %H:%M:%S')}")
return f"{self.key}: {occurrences_details}"


class TimedOccurrenceWatcher:
"""
Keeps track of the amount of occurrences of items within a range of time
"""
@staticmethod
def default_key_function(obj: object) -> type:
"""
The function used to find a common identifier for an object if one is not provided
"""
return type(obj)

def __init__(
self,
duration: timedelta,
threshold: int,
on_filled: typing.Callable[[_T], typing.Any],
key_function: typing.Callable[[_VT], _KT] = None
):
if not isinstance(duration, timedelta):
raise ValueError(f"Cannot create a {self.__class__.__name__} - {duration} is not a timedelta object")

if duration.total_seconds() < 0.1:
raise ValueError(
f"Cannot create a {self.__class__.__name__} - the duration is too short ({duration.total_seconds()}s)"
)

self.__duration = duration

if not isinstance(key_function, typing.Callable):
key_function = self.default_key_function

self.__key_function = key_function
self.__entries: typing.Dict[uuid.UUID, _OccurrenceTracker] = {}
self.__threshold = threshold
self.__on_filled = on_filled

def value_encountered(self, value: _T):
"""
Add an occurrence of an object to track
Args:
value: The item to track
"""
self.__update_trackers()
self._get_tracker(value).value_encountered()

def _get_tracker(self, value: _T) -> _OccurrenceTracker[_T]:
"""
Get an occurrence tracker for the given value
Args:
value: The value to track
Returns:
A tracker for the value
"""
key = self.__key_function(value)

for tracker in self.__entries.values():
if tracker.key == key:
return tracker

new_tracker = _OccurrenceTracker(
key=key,
duration=self.__duration,
threshold=self.__threshold,
on_filled=self.__on_filled
)
self.__entries[uuid.uuid1()] = new_tracker
return new_tracker

def __update_trackers(self):
"""
Update the amount of items in each tracker
If a tracker becomes empty it will be removed
"""
for tracker_id, tracker in self.__entries.items():
amount_left = tracker.update_occurrences()
if amount_left == 0:
del self.__entries[tracker_id]

@property
def size(self) -> int:
"""
The number of items encountered within the duration
"""
self.__update_trackers()
return sum(len(tracker) for tracker in self.__entries.values())

@property
def duration(self) -> timedelta:
"""
The amount of time to track items for
"""
return self.__duration

def __str__(self):
return f"{self.__class__.__name__}: {self.size} items within the last {self.duration.total_seconds()} Seconds"

def __repr__(self):
return self.__str__()



class EventfulMap(abc.ABC, typing.MutableMapping[_KT, _VT], typing.Generic[_KT, _VT]):
@abc.abstractmethod
def get_handlers(self) -> typing.Dict[CollectionEvent, typing.MutableSequence[typing.Callable]]:
Expand Down
Loading

0 comments on commit c1d52b2

Please sign in to comment.