diff --git a/Dockerfile b/Dockerfile index dd11283..a38b1e7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.11.4 as build +FROM python:3.12.3 as build MAINTAINER RAMS Project "code@magfest.org" LABEL version.sideboard ="1.0" WORKDIR /app diff --git a/pavement.py b/pavement.py index b3006b6..404242b 100644 --- a/pavement.py +++ b/pavement.py @@ -2,7 +2,6 @@ import os import sys import glob -import pkg_resources from itertools import chain from os.path import abspath, dirname, exists, join @@ -110,28 +109,6 @@ def pull_plugins(): sh('cd "{}";git pull'.format(plugin_dir)) -@task -def assert_all_files_import_unicode_literals(): - """ - error if a python file is found in sideboard or plugins that does not import unicode_literals; \ -this is skipped for Python 3 - """ - if sys.version_info[0] == 2: - all_files_found = [] - cmd = ("find '%s' -name '*.py' ! -size 0 " - "-exec grep -RL 'from __future__ import.*unicode_literals.*$' {} \;") - for test_dir in chain(['sideboard'], collect_plugin_dirs(module=True)): - output = sh(cmd % test_dir, capture=True) - if output: - all_files_found.append(output) - - if all_files_found: - print('the following files did not include "from __future__ import unicode_literals":') - print(''.join(all_files_found)) - raise BuildFailure("there were files that didn't include " - '"from __future__ import unicode_literals"') - - @task def assert_all_projects_correctly_define_a_version(): """ @@ -180,12 +157,6 @@ def run_all_assertions(): def create_plugin(options): """create a plugin skeleton to start a new project""" - # this is actually needed thanks to the skeleton using jinja2 (and six, although that's changeable) - try: - pkg_resources.get_distribution("sideboard") - except pkg_resources.DistributionNotFound: - raise BuildFailure("This command must be run from within a configured virtual environment.") - plugin_name = options.create_plugin.name if getattr(options.create_plugin, 'drop', False) and (PLUGINS_DIR / path(plugin_name.replace('_', '-'))).exists(): diff --git a/requirements.txt b/requirements.txt index 314552f..b701108 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,15 @@ -cherrypy==18.8.0 -configobj==5.0.6 -Jinja2==3.1.2 +cherrypy==18.9.0 +configobj==5.0.8 +Jinja2==3.1.3 paver==1.3.4 -pip==23.2.1 -psutil==5.9.5 +pip==24.0 +psutil==5.9.8 python-prctl==1.8.1; 'linux' in sys_platform -redis==4.6.0 +redis==5.0.3 requests==2.31.0 rpctools==0.3.1 -sh==2.0.4 +sh==2.0.6 six==1.16.0 -SQLAlchemy==1.4.49 -wheel==0.41.0 +SQLAlchemy==1.4.52 +wheel==0.43.0 ws4py==0.5.1 diff --git a/sideboard/lib/_cp.py b/sideboard/lib/_cp.py index e683363..d409533 100644 --- a/sideboard/lib/_cp.py +++ b/sideboard/lib/_cp.py @@ -9,12 +9,8 @@ import jinja2 import cherrypy -try: - from sideboard.lib._redissession import RedisSession - cherrypy.lib.sessions.RedisSession = RedisSession -except ImportError: - # cherrys not installed, so redis sessions not supported - pass +from sideboard.lib._redissession import RedisSession +cherrypy.lib.sessions.RedisSession = RedisSession import sideboard.lib from sideboard.lib import log, config, serializer diff --git a/sideboard/lib/_services.py b/sideboard/lib/_services.py index 8df1866..38b3080 100644 --- a/sideboard/lib/_services.py +++ b/sideboard/lib/_services.py @@ -201,8 +201,6 @@ def _register_rpc_services(rpc_services): jservice = getattr(jproxy, service_name) if rpc_services.get(host, {}).get('jsonrpc_only'): service = jservice - else: - service = services._register_websocket(_ws_url(host, rpc_opts), ssl_opts=ssl_opts, connect_immediately=False) services.register(service, service_name, _jsonrpc=jservice, _override=True) diff --git a/sideboard/lib/sa/__init__.py b/sideboard/lib/sa/__init__.py index eb201c5..dd8dce7 100644 --- a/sideboard/lib/sa/__init__.py +++ b/sideboard/lib/sa/__init__.py @@ -179,7 +179,7 @@ def check_constraint_naming_convention(constraint, table): for operator, text in replacements: constraint_name = constraint_name.replace(operator, text) - constraint_name = re.sub('[\\W\\s]+', '_', constraint_name) + constraint_name = re.sub(r'[\W\s]+', '_', constraint_name) if len(constraint_name) > 32: constraint_name = uuid.uuid5(uuid.NAMESPACE_OID, str(constraint_name)).hex return constraint_name diff --git a/sideboard/server.py b/sideboard/server.py index db184be..4606020 100755 --- a/sideboard/server.py +++ b/sideboard/server.py @@ -4,16 +4,8 @@ import six import cherrypy -from cherrypy.lib import cpstats -import sideboard -from sideboard.internal import connection_checker -from sideboard.jsonrpc import _make_jsonrpc_handler -from sideboard.websockets import WebSocketDispatcher, WebSocketRoot, WebSocketAuthError -from sideboard.lib import log, listify, config, render_with_templates, services, threadlocal -from sideboard.lib._cp import auth_registry - -default_auth_checker = auth_registry[config['default_authenticator']]['check'] +from sideboard.lib import config, threadlocal def reset_threadlocal(): @@ -21,111 +13,6 @@ def reset_threadlocal(): cherrypy.tools.reset_threadlocal = cherrypy.Tool('before_handler', reset_threadlocal, priority=51) - -def jsonrpc_reset(body): - reset_threadlocal() - threadlocal.set('client', body.get('websocket_client')) - - -def jsonrpc_auth(body): - jsonrpc_reset(body) - if not default_auth_checker(): - raise cherrypy.HTTPError(401, 'not logged in') - - -@render_with_templates(config['template_dir']) -class Root(object): - def default(self, *args, **kwargs): - raise cherrypy.HTTPRedirect(config['default_url']) - - def logout(self, return_to='/'): - cherrypy.session.pop('username', None) - raise cherrypy.HTTPRedirect('login?return_to=%s' % return_to) - - def login(self, username='', password='', message='', return_to=''): - if not config['debug']: - return 'Login page only available in debug mode.' - - if username: - if config['debug'] and password == config['debug_password']: - cherrypy.session['username'] = username - raise cherrypy.HTTPRedirect(return_to or config['default_url']) - else: - message = 'Invalid credentials' - - return { - 'message': message, - 'username': username, - 'return_to': return_to - } - - def list_plugins(self): - from sideboard.internal.imports import plugins - plugin_info = {} - for plugin, module in plugins.items(): - plugin_info[plugin] = { - 'name': ' '.join(plugin.split('_')).title(), - 'version': getattr(module, '__version__', None), - 'paths': [] - } - for path, app in cherrypy.tree.apps.items(): - # exclude what Sideboard itself mounts and grafted mount points - if path and hasattr(app, 'root'): - plugin = app.root.__module__.split('.')[0] - plugin_info[plugin]['paths'].append(path) - return { - 'plugins': plugin_info, - 'version': getattr(sideboard, '__version__', None) - } - - def connections(self): - return {'connections': connection_checker.check_all()} - - ws = WebSocketRoot() - wsrpc = WebSocketRoot() - - json = _make_jsonrpc_handler(services.get_services(), precall=jsonrpc_auth) - jsonrpc = _make_jsonrpc_handler(services.get_services(), precall=jsonrpc_reset) - - -class SideboardWebSocket(WebSocketDispatcher): - """ - This web socket handler will be used by browsers connecting to Sideboard web - sites. Therefore, the authentication mechanism is the default approach - of checking the session for a username and rejecting unauthenticated users. - """ - services = services.get_services() - - @classmethod - def check_authentication(cls): - host, origin = cherrypy.request.headers['host'], cherrypy.request.headers['origin'] - if ('//' + host.split(':')[0]) not in origin: - log.error('Javascript websocket connections must follow same-origin policy; origin %s does not match host %s', origin, host) - raise WebSocketAuthError('Origin and Host headers do not match') - - if config['ws.auth_required'] and not cherrypy.session.get(config['ws.auth_field']): - log.warning('websocket connections to this address must have a valid session') - raise WebSocketAuthError('You are not logged in') - - return WebSocketDispatcher.check_authentication() - - -app_config = { - '/static': { - 'tools.staticdir.on': True, - 'tools.staticdir.dir': os.path.join(config['module_root'], 'static') - }, - '/ws': { - 'tools.websockets.on': True, - 'tools.websockets.handler_cls': SideboardWebSocket - } -} -if config['debug']: - app_config['/docs'] = { - 'tools.staticdir.on': True, - 'tools.staticdir.dir': os.path.join(config['module_root'], 'docs', 'html'), - 'tools.staticdir.index': 'index.html' - } cherrypy_config = {} for setting, value in config['cherrypy'].items(): if isinstance(value, six.string_types): @@ -137,32 +24,3 @@ def check_authentication(cls): value = value.encode('utf-8') cherrypy_config[setting] = value cherrypy.config.update(cherrypy_config) - - -# on Python 2, we need bytestrings for CherryPy config, see https://bitbucket.org/cherrypy/cherrypy/issue/1184 -def recursive_coerce(d): - if isinstance(d, dict): - for k, v in d.items(): - if sys.version_info[:2] == (2, 7) and isinstance(k, unicode): - del d[k] - d[k.encode('utf-8')] = recursive_coerce(v) - return d - - -def mount(root, script_name='', config=None): - assert script_name not in cherrypy.tree.apps, '{} has already been mounted, probably by another plugin'.format(script_name) - return orig_mount(root, script_name, recursive_coerce(config)) - -orig_mount = cherrypy.tree.mount -cherrypy.tree.mount = mount -root = Root() -if config['cherrypy']['tools.cpstats.on']: - root.stats = cpstats.StatsPage() -cherrypy.tree.mount(root, '', app_config) - -if config['cherrypy']['profiling.on']: - # If profiling is turned on then expose the web UI, otherwise ignore it. - from sideboard.lib import Profiler - cherrypy.tree.mount(Profiler(config['cherrypy']['profiling.path']), '/profiler') - -sys.modules.pop('six.moves.winreg', None) # kludgy workaround for CherryPy's autoreloader erroring on winreg for versions which have this diff --git a/sideboard/tests/test_lib.py b/sideboard/tests/test_lib.py index 4feef1d..4523d55 100644 --- a/sideboard/tests/test_lib.py +++ b/sideboard/tests/test_lib.py @@ -406,14 +406,6 @@ class Foo(object): assert not is_listy(x) -def test_double_mount(request): - class Root(object): - pass - request.addfinalizer(lambda: cherrypy.tree.apps.pop('/test', None)) - cherrypy.tree.mount(Root(), '/test') - pytest.raises(Exception, cherrypy.tree.mount, Root(), '/test') - - def test_ajaz_serialization(): class Root(object): @ajax diff --git a/sideboard/tests/test_sa.py b/sideboard/tests/test_sa.py index 73fa7f8..981d34d 100644 --- a/sideboard/tests/test_sa.py +++ b/sideboard/tests/test_sa.py @@ -91,7 +91,7 @@ class CrudableMixin(object): } ) @text_length_validation('string_model_attr', 2, 100) -@regex_validation('string_model_attr', '^[A-Za-z0-9\\.\\_\\-]+$', 'test thing') +@regex_validation('string_model_attr', r'^[A-Za-z0-9\.\_\-]+$', 'test thing') @text_length_validation('overridden_desc', 1, 100) @text_length_validation('nonexistant_field', 1, 100) class CrudableClass(CrudableMixin, Base): diff --git a/sideboard/tests/test_server.py b/sideboard/tests/test_server.py index 8f9dc80..946d598 100644 --- a/sideboard/tests/test_server.py +++ b/sideboard/tests/test_server.py @@ -85,25 +85,12 @@ def stop_cherrypy(cls): def setUpClass(cls): super(SideboardServerTest, cls).setUpClass() cls.start_cherrypy() - cls.ws = cls.patch_websocket(services.get_websocket()) - cls.ws.connect(max_wait=5) - assert cls.ws.connected @classmethod def tearDownClass(cls): cls.stop_cherrypy() super(SideboardServerTest, cls).tearDownClass() - @staticmethod - def patch_websocket(ws): - ws.q = Queue() - ws.fallback = ws.q.put - return ws - - def tearDown(self): - while not self.ws.q.empty(): - self.ws.q.get_nowait() - def wait_for(self, func, *args, **kwargs): for i in range(50): cherrypy.engine.publish('main') # since our unit tests don't call cherrypy.engine.block, we must publish this event manually @@ -153,426 +140,12 @@ def get(self, path, **params): def get_json(self, path, **params): return self._get(self.rsess, path, **params).json() - def open_ws(self): - return self.patch_websocket(WebSocket(connect_immediately=True, max_wait=5)) - - def next(self, ws=None, timeout=2): - return (ws or self.ws).q.get(timeout=timeout) - - def assert_incoming(self, ws=None, client=None, timeout=1, **params): - data = self.next(ws, timeout) - assert (client or self.client) == data.get('client') - for key, val in params.items(): - assert val == data[key] - def assert_no_response(self): pytest.raises(Empty, self.next) - def assert_error_with(self, *args, **kwargs): - if args: - self.ws.ws.send(str(args[0])) - else: - self.ws._send(**kwargs) - assert 'error' in self.next() - - def call(self, **params): - callback = 'callback{}'.format(randrange(1000000)) - self.ws._send(callback=callback, **params) - result = self.next() - assert callback == result['callback'] - return result - def subscribe(self, **params): params.setdefault('client', self.client) return self.call(**params) def unsubscribe(self, client=None): self.call(action='unsubscribe', client=client or self.client) - - -class JsonrpcTest(SideboardServerTest): - @pytest.fixture(autouse=True) - def override(self, service_patcher): - service_patcher('testservice', self) - - def get_message(self, name): - return 'Hello {}!'.format(name) - - def send_json(self, body, content_type='application/json'): - if isinstance(body, dict): - body['id'] = self._testMethodName - resp = requests.post(self.jsonrpc_url, data=json.dumps(body), - headers={'Content-Type': 'application/json'}) - assert resp.json - return resp.json() - - def test_rpctools(self): - assert 'Hello World!' == self.jsonrpc.testservice.get_message('World') - - def test_content_types(self): - for ct in ['text/html', 'text/plain', 'application/javascript', 'text/javascript', 'image/gif']: - response = self.send_json({ - 'method': 'testservice.get_message', - 'params': ['World'] - }, content_type=ct) - assert 'Hello World!' == response['result'], 'Expected success with valid reqeust using Content-Type {}'.format(ct) - - -class TestWebsocketSubscriptions(SideboardServerTest): - @pytest.fixture(autouse=True) - def override(self, service_patcher, config_patcher): - config_patcher(1, 'ws.call_timeout') - service_patcher('self', self) - - def echo(self, s): - self.echoes.append(s) - return s - - def slow_echo(self, s): - sleep(2) - return s - - @subscribes('names') - def get_names(self): - return self.names - - def change_name(self, name=None): - self.names[-1] = name or uuid4().hex - notify('names', delay=True) - - @notifies('names') - def change_name_then_error(self): - self.names[:] = reversed(self.names) - self.fail() - - def indirectly_change_name(self): - self.change_name(uuid4().hex) - - @subscribes('places') - def get_places(self): - return self.places - - @notifies('places') - def change_place(self): - self.places[0] = uuid4().hex - - @subscribes('names', 'places') - def get_names_and_places(self): - return self.names + self.places - - def setUp(self): - SideboardServerTest.setUp(self) - self.echoes = [] - self.places = ['Here'] - self.names = ['Hello', 'World'] - self.client = self._testMethodName - - def test_echo(self): - self.ws._send(method='self.echo', params='hello') - self.ws._send(method='self.echo', params=['hello']) - self.ws._send(method='self.echo', params={'s': 'hello'}) - self.assert_no_response() - self.ws._send(method='self.echo', params='hello', callback='cb123') - self.next() - assert ['hello'] * 4 == self.echoes - - def test_errors(self): - self.assert_error_with(0) - self.assert_error_with([]) - self.assert_error_with('') - self.assert_error_with('x') - self.assert_error_with(None) - - self.assert_error_with(method='missing') - self.assert_error_with(method='close_all') - self.assert_error_with(method='crud.missing') - self.assert_error_with(method='too.many.dots') - self.assert_error_with(method='self.echo.extra') - - self.assert_error_with(method='self.echo') - self.assert_error_with(method='self.echo', params=['too', 'many']) - self.assert_error_with(method='self.echo', params={'invalid': 'name'}) - self.assertEqual([], self.echoes) - - self.assert_error_with(method='self.fail') - - def test_callback(self): - result = self.call(method='self.echo', params='hello') - assert 'hello' == result['data'] - assert 'client' not in result - - result = self.call(method='crud.echo', params='hello', client='ds123') - assert 'ds123' == result['client'] - - def test_client_and_callback(self): - self.call(method='self.get_name', client=self.client) - self.assert_no_response() - - def test_triggered(self): - self.subscribe(method='self.get_names') - with self.open_ws() as other_ws: - other_ws._send(method='self.change_name', params=['Kitty']) - self.assert_incoming() - - def test_indirect_trigger(self): - self.subscribe(method='self.get_names') - with self.open_ws() as other_ws: - other_ws._send(method='self.indirectly_change_name') - self.assert_incoming() - - def test_unsubscribe(self): - self.test_triggered() - self.unsubscribe() - self.call(method='self.change_name', params=[uuid4().hex]) - self.assert_no_response() - - def test_errors_still_triggers(self): - with self.open_ws() as other_ws: - self.subscribe(method='self.get_names') - other_ws._send(method='self.change_name_then_error') - self.assert_incoming() - - def test_triggered_error(self): - with self.open_ws() as other_ws: - self.subscribe(method='self.get_names') - self.names.append(object()) - other_ws._send(method='self.change_name_then_error') - self.names[:] = ['Hello'] * 2 - other_ws._send(method='self.change_name') - self.assert_incoming() - - def test_multiple_subscriptions(self): - self.subscribe(method='self.get_names') - self.subscribe(method='self.get_places') - self.assert_no_response() - with self.open_ws() as other_ws: - other_ws._send(method='self.change_name') - self.assert_incoming() - other_ws._send(method='self.change_place') - self.assert_incoming() - other_ws._send(method='self.echo', params='Hello') - self.assert_no_response() - - def test_multiple_triggers(self): - self.subscribe(method='self.get_names_and_places') - self.assert_no_response() - with self.open_ws() as other_ws: - other_ws._send(method='self.change_name') - self.assert_incoming() - other_ws._send(method='self.change_place') - self.assert_incoming() - other_ws._send(method='self.echo', params='Hello') - self.assert_no_response() - - def test_multiple_clients(self): - self.subscribe(method='self.get_names', client='client1') - self.subscribe(method='self.get_names', client='client2') - self.assert_no_response() - with self.open_ws() as other_ws: - other_ws._send(method='self.change_name') - assert {'client1', 'client2'} == {self.next()['client'], self.next()['client']} - - def test_nonlocking_echo(self): - self.ws._send(method='self.slow_echo', params=['foo'], - client='client1', callback='cb11') - sleep(1) - self.ws._send(method='self.echo', params=['bar'], client='client2', - callback='cb22') - self.assert_incoming(data='bar', client='client2') - self.assert_incoming(data='foo', client='client1', timeout=2) - - def test_client_locking(self): - self.ws._send(method='self.slow_echo', params=['foo'], - client=self.client, callback='cb1') - sleep(1) - self.ws._send(method='self.echo', params=['bar'], - client=self.client, callback='cb2') - self.assert_incoming(data='foo', timeout=2) - self.assert_incoming(data='bar') - - def test_jsonrpc_notification(self): - self.subscribe(method='self.get_names') - self.jsonrpc.self.change_name() - self.assert_incoming() - - def test_jsonrpc_websocket_client(self): - self.addCleanup(setattr, self.jsonrpc, "_prepare_request", - self.jsonrpc._prepare_request) - self.jsonrpc._prepare_request = lambda data, headers: data.update( - {'websocket_client': self.client}) - self.jsonrpc.self.change_name() - self.assert_no_response() - - -class TestWebsocketCall(SideboardServerTest): - @pytest.fixture(autouse=True) - def override(self, service_patcher, config_patcher): - config_patcher(1, 'ws.call_timeout') - service_patcher('test', self) - - def fast(self): - return 'fast' - - def slow(self): - sleep(2) - return 'slow' - - def test_fast(self): - assert self.ws.call('test.fast') == 'fast' - - def test_slow(self): - pytest.raises(Exception, self.ws.call, 'test.slow') - - -class TestWebsocketsCrudSubscriptions(SideboardServerTest): - @pytest.fixture(autouse=True) - def override(self, service_patcher): - class MockCrud: - pass - mr = self.mr = MockCrud() - for name in ['create', 'update', 'delete']: - setattr(mr, name, Session.crud.crud_notifies(self.make_crud_method(name))) - for name in ['read', 'count']: - setattr(mr, name, Session.crud.crud_subscribes(self.make_crud_method(name))) - service_patcher('crud', mr) - - def setUp(self): - SideboardServerTest.setUp(self) - self.ws.close() - self.ws = self.open_ws() - self.client = self._testMethodName - - def make_crud_method(self, name): - def crud_method(*args, **kwargs): - log.debug('mocked crud.%s'.format(name)) - assert not getattr(self.mr, name + '_error', False) - return uuid4().hex - - crud_method.__name__ = name.encode('utf-8') if six.PY2 else name - return crud_method - - def models(self, *models): - return [{'_model': model} for model in models] - - def read(self, *models): - self.ws._send(method='crud.read', client=self.client, params=self.models(*models)) - self.assert_incoming(trigger='subscribe') - - def update(self, *models, **kwargs): - client = kwargs.get('client', 'unique_client_' + uuid4().hex) - self.ws._send(method='crud.update', client=client, params=self.models(*models)) - self.assert_incoming(client=client) - - def test_read(self): - self.read('User') - self.assert_no_response() - - def test_triggered_read(self): - self.read('User') - self.update('User') - self.assert_incoming(trigger='update') - - def test_unsubscribe(self): - self.test_triggered_read() - self.unsubscribe() - self.update('User') - self.assert_no_response() - - def test_triggered_error(self): - self.mr.update_error = True - with self.open_ws() as other_ws: - other_ws._send(method='crud.read', client='other_tte', params=self.models('User')) - self.assert_incoming(other_ws, client='other_tte') - self.update('User') - self.ws._send(method='crud.update', client=self.client, params=self.models('User')) - assert 'error' in self.next() - self.assert_incoming(other_ws, client='other_tte', trigger='update') - - def test_indirect_trigger(self): - def account(*attrs): - if len(attrs) == 1: - return {'_model': 'Account', 'field': attrs[0]} - else: - return {'_model': 'Account', - 'or': [{'field': attr} for attr in attrs]} - - def call(*attrs): - self.call(method='crud.read', client=self.client, params=account(*attrs)) - - def assert_update_triggers(model): - self.update(model) - self.assert_incoming() - - call('xxx') - assert_update_triggers('Account') - self.unsubscribe() - - call('user.xxx') - assert_update_triggers('User') - assert_update_triggers('Account') - self.unsubscribe() - - call('user.xxx', 'boss.xxx') - assert_update_triggers('Account') - assert_update_triggers('User') - assert_update_triggers('Account') - self.unsubscribe() - - call('user.tags.xxx') - assert_update_triggers('Account') - assert_update_triggers('User') - assert_update_triggers('Tag') - - self.update('Boss') - self.assert_no_response() - - def test_trigger_and_callback(self): - result = self.call(method='crud.read', params=self.models('User'), client='ds_ttac') - self.assert_no_response() - - def test_multiple_triggers(self): - self.read('User', 'Boss') - self.update('User') - self.assert_incoming() - self.update('Boss') - self.assert_incoming() - self.update('Account') - self.assert_no_response() - - def test_trigger_changed(self): - self.read('User') - self.read('Boss') - self.update('User') - self.assert_no_response() - self.update('Boss') - self.assert_incoming() - self.assert_no_response() - - def test_multiple_clients(self): - self.read('Boss') - self.ws._send(method='crud.read', client='other_tmc', params=self.models('Boss')) - self.assert_incoming(client='other_tmc') - self.update('User') - self.assert_no_response() - self.read('Boss') - self.ws._send(method='crud.update', client='unused_client', params=self.models('Boss')) - self.next() - assert {self.client, 'other_tmc'} == {self.next()['client'], self.next()['client']} - - def test_broadcast_error(self): - with self.open_ws() as other_ws: - self.read('User') - other_ws._send(method='crud.count', client='other_tbe', params=self.models('User')) - self.assert_incoming(other_ws, client='other_tbe') - self.mr.count_error = True - self.update('User', client='other_client_so_everything_will_trigger') - self.assert_incoming(trigger='update', timeout=5) - - def test_jsonrpc_notifications(self): - self.read('User') - self.jsonrpc.crud.delete({'_model': 'User', 'field': 'name', 'value': 'Does Not Exist'}) - self.assert_incoming(trigger='delete') - - self.jsonrpc._prepare_request = lambda data, headers: data.update({'websocket_client': self.client}) - self.jsonrpc.crud.delete({'_model': 'User', 'field': 'name', 'value': 'Does Not Exist'}) - self.assert_no_response() diff --git a/test_requirements.txt b/test_requirements.txt index 3c0c982..805dd9c 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -1,5 +1,5 @@ -pytest>=3.0.1 +pytest>=8.1.1 mock>=1.0.1,<1.1 -Sphinx>=1.2.1 -coverage>=3.6 -pep8>=1.7.0 +Sphinx>=7.2.6 +coverage>=7.4.4 +pep8>=1.7.1