diff --git a/.travis.yml b/.travis.yml index fae0f9a6..bcf4b8fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -42,6 +42,7 @@ addons: script: - nosetests --with-coverage -v -s hecuba_py/tests/*.py - nosetests --with-coverage -v -s hecuba_py/tests/withcassandra + - nosetests --with-coverage -v -s storage/cql_iface/tests/api_tests.py jobs: include: @@ -64,4 +65,4 @@ jobs: # Push the results back to codecov after_success: - - codecov \ No newline at end of file + - codecov diff --git a/hecuba_core/src/py_interface/HCache.cpp b/hecuba_core/src/py_interface/HCache.cpp index 6f6066e1..2b08ea01 100644 --- a/hecuba_core/src/py_interface/HCache.cpp +++ b/hecuba_core/src/py_interface/HCache.cpp @@ -636,8 +636,8 @@ static int hnumpy_store_init(HNumpyStore *self, PyObject *args, PyObject *kwds) static PyMethodDef hnumpy_store_type_methods[] = { - {"get_numpy", (PyCFunction) get_numpy, METH_VARARGS, NULL}, - {"save_numpy", (PyCFunction) save_numpy, METH_VARARGS, NULL}, + {"get_row", (PyCFunction) get_numpy, METH_VARARGS, NULL}, + {"put_row", (PyCFunction) save_numpy, METH_VARARGS, NULL}, {NULL, NULL, 0, NULL} }; diff --git a/hecuba_py/hecuba/IStorage.py b/hecuba_py/hecuba/IStorage.py index e7e4359f..2928bcf9 100644 --- a/hecuba_py/hecuba/IStorage.py +++ b/hecuba_py/hecuba/IStorage.py @@ -1,262 +1,86 @@ -import re import uuid -from bisect import bisect_right -from collections import namedtuple, defaultdict -from time import time - -from hecuba import config, log +from collections import namedtuple +from .tools import storage_id_from_name class AlreadyPersistentError(RuntimeError): pass -_select_istorage_meta = config.session.prepare("SELECT * FROM hecuba.istorage WHERE storage_id = ?") -_size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count " - "FROM system.size_estimates WHERE keyspace_name=? and table_name=?")) -_max_token = int(((2 ** 63) - 1)) # type: int -_min_token = int(-2 ** 63) # type: int - -_valid_types = ['counter', 'text', 'boolean', 'decimal', 'double', 'int', 'list', 'set', 'map', 'bigint', 'blob', - 'tuple', 'dict', 'float', 'numpy.ndarray'] - -_basic_types = _valid_types[:-1] -_hecuba_valid_types = '(atomicint|str|bool|decimal|float|int|tuple|list|generator|frozenset|set|dict|long|buffer' \ - '|counter|double)' - -AT = 'int | atomicint | str | bool | decimal | float | long | double | buffer' - -ATD = 'int | atomicint | str | bool | decimal | float | long | double | buffer | set' - -_python_types = [int, str, bool, float, tuple, set, dict, bytearray] -_conversions = {'atomicint': 'counter', - 'str': 'text', - 'bool': 'boolean', - 'decimal': 'decimal', - 'float': 'float', - 'int': 'int', - 'tuple': 'tuple', - 'list': 'list', - 'generator': 'list', - 'frozenset': 'set', - 'set': 'set', - 'dict': 'map', - 'long': 'bigint', - 'buffer': 'blob', - 'bytearray': 'blob', - 'counter': 'counter', - 'double': 'double', - 'StorageDict': 'dict', - 'ndarray': 'hecuba.hnumpy.StorageNumpy', - 'numpy.ndarray': 'hecuba.hnumpy.StorageNumpy'} - -args = namedtuple("IStorage", []) - - -def process_path(module_path): - """ - Method to obtain module and class_name from a module path - Args: - module_path(String): path in the format module.class_name - Returns: - tuple containing class_name and module - """ - - if module_path == 'numpy.ndarray': - return 'StorageNumpy', 'hecuba.hnumpy' - if module_path == 'StorageDict': - return 'StorageDict', 'hecuba.hdict' - last = 0 - for key, i in enumerate(module_path): - if i == '.' and key > last: - last = key - module = module_path[:last] - class_name = module_path[last + 1:] - return class_name, module +class DataModelNode(object): + def __init__(self, name=None, class_name=None, args=None): + self.name = name + self.class_name = class_name + self.args = args -def _tokens_partitions(ksp, table, tokens_ranges, splits_per_node, token_range_size, target_token_range_size): - """ - Method that calculates the new token partitions for a given object - Args: - tokens: current number of tokens of the object - min_tokens_per_worker: defined minimum number of tokens - number_of_workers: defined - Returns: - a partition every time it's called - :type tokens_ranges: list[(long,long)] - """ +class IStorage(object): + args_names = ["storage_id"] + args = namedtuple("IStorage", args_names) + _build_args = args(storage_id="") - tm = config.cluster.metadata.token_map - tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) - from cassandra.metadata import Murmur3Token - tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) - if not tmap: - tm.rebuild_keyspace(ksp, build_if_absent=True) - tmap = tm.tokens_to_hosts_by_ks[ksp] - - tokens_per_node = defaultdict(list) - for tmumur, t_to in tokens_murmur3: - point = bisect_right(tm.ring, tmumur) - if point == len(tm.ring): - tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) - else: - tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) - - n_nodes = len(tokens_per_node) - step_size = _max_token // (splits_per_node * n_nodes) - if token_range_size: - step_size = token_range_size - elif target_token_range_size: - one = config.session.execute(_size_estimates, [ksp, table]).one() - if one: - (mean_p_size, p_count) = one - estimated_size = mean_p_size * p_count - if estimated_size > 0: - step_size = _max_token // ( - max(estimated_size / target_token_range_size, - splits_per_node * n_nodes) - ) - - for tokens_in_node in tokens_per_node.values(): - partition = [] - for fraction, to in tokens_in_node: - while fraction < to - step_size: - partition.append((fraction, fraction + step_size)) - fraction += step_size - partition.append((fraction, to)) - group_size = max(len(partition) // splits_per_node, 1) - for i in range(0, len(partition), group_size): - yield partition[i:i + group_size] - - -def _discrete_token_ranges(tokens): - """ - Makes proper tokens ranges ensuring that in a tuple (a,b) a <= b - Args: - tokens: a list of tokens [1, 0, 10] - Returns: - a rationalized list [(-1, 0),(0,10),(10, max)] - """ - tokens.sort() - if len(tokens) == 0: - return tokens - if tokens[0] > _min_token: - token_ranges = [(_min_token, tokens[0])] - else: - token_ranges = [] - n_tns = len(tokens) - for i in range(0, n_tns - 1): - token_ranges.append((tokens[i], tokens[i + 1])) - token_ranges.append((tokens[n_tns - 1], _max_token)) - return token_ranges + _data_model_def = None + _data_model_id = None + def getID(self): + return self.__storage_id -def _extract_ks_tab(name): - """ - Method used to obtain keyspace and table from a given name - Args: - name: a string containing keyspace name and table name, or only table name - Returns: - a tuple containing keyspace name and table name - """ - sp = name.split(".") - if len(sp) == 2: - ksp = sp[0] - table = sp[1] - else: - ksp = config.execution_name - table = name - return ksp.lower(), table.lower() + def setID(self, st_id): + if st_id is not None and not isinstance(st_id, uuid.UUID): + raise TypeError("Storage ID must be an instance of UUID") + self.__storage_id = st_id + storage_id = property(getID, setID) -class IStorage: - args_names = [] - _build_args = args() - _storage_id = None + def __new__(cls, *args, **kwargs): + toret = super(IStorage, cls).__new__(cls) + toret._ksp = '' + toret._table = '' + toret._is_persistent = False + toret.__storage_id = None + toret._name = '' + return toret - def split(self): + def __eq__(self, other): """ - Method used to divide an object into sub-objects. + Method to compare a IStorage object with another one. + Args: + other: IStorage to be compared with. Returns: - a subobject everytime is called + boolean (true - equals, false - not equals). """ - st = time() - tokens = self._build_args.tokens - - for token_split in _tokens_partitions(self._ksp, self._table, tokens, - config.splits_per_node, - config.token_range_size, - config.target_token_range_size): - storage_id = uuid.uuid4() - log.debug('assigning to %s %d tokens', str(storage_id), len(token_split)) - new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id) - args_dict = new_args._asdict() - args_dict["built_remotely"] = False - yield self.__class__.build_remotely(args_dict) - log.debug('completed split of %s in %f', self.__class__.__name__, time() - st) - - def _get_istorage_attrs(self, storage_id): - return list(config.session.execute(_select_istorage_meta, [storage_id])) - - def _count_name_collision(self, attribute): - m = re.compile("^%s_%s(_[0-9]+)?$" % (self._table, attribute)) - q = config.session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s", - [self._ksp]) - return sum(1 for elem in q if m.match(elem[0])) - - @staticmethod - def build_remotely(args): - """ - Takes the information which consists of at least the type, - :raises TypeError if the object class doesn't subclass IStorage - :param obj_info: Contains the information to be used to create the IStorage obj - :return: An IStorage object - """ - if "built_remotely" not in args.keys(): - built_remotely = True - else: - built_remotely = args["built_remotely"] - - obj_type = args.get('class_name', args.get('type', None)) - if obj_type is None: - raise TypeError("Trying to build an IStorage obj without giving the type") - - # Import the class defined by obj_type - cname, module = process_path(obj_type) - - try: - mod = __import__(module, globals(), locals(), [cname], 0) - except ValueError: - raise ValueError("Can't import class {} from module {}".format(cname, module)) - - imported_class = getattr(mod, cname) - if not issubclass(imported_class, IStorage): - raise TypeError("Trying to build remotely an object '%s' != IStorage subclass" % cname) - - args = {k: v for k, v in args.items() if k in imported_class.args_names} - args.pop('class_name', None) - args["built_remotely"] = built_remotely - - return imported_class(**args) + return self.__class__ == other.__class__ and self.getID() == other.getID() @staticmethod def _store_meta(storage_args): - raise Exception("to be implemented") + pass def make_persistent(self, name): - raise Exception("to be implemented") + if not self.storage_id: + self.storage_id = storage_id_from_name(name) + self._is_persistent = True + self._name = name def stop_persistent(self): - raise Exception("to be implemented") + self.storage_id = None + self._is_persistent = False def delete_persistent(self): - raise Exception("to be implemented") + self.storage_id = None + self._is_persistent = False - def getID(self): + def split(self): """ - Obtains the id of the storage element + Method used to divide an object into sub-objects. Returns: - self._storage_id: id of the object + a subobject everytime is called """ - return str(self._storage_id) + raise NotImplemented("Split not supported yet") + + def set_name(self, name): + if not isinstance(name, str): + raise TypeError("Name -{}- should be an instance of str".format(str(name))) + self._name = name + + def get_name(self): + return self._name diff --git a/hecuba_py/hecuba/__init__.py b/hecuba_py/hecuba/__init__.py index 680bc6bb..9cc199a3 100755 --- a/hecuba_py/hecuba/__init__.py +++ b/hecuba_py/hecuba/__init__.py @@ -1,8 +1,6 @@ import logging import os -from cassandra.cluster import Cluster -from cassandra.policies import RetryPolicy, RoundRobinPolicy, TokenAwarePolicy # Set default log.handler to avoid "No handler found" warnings. @@ -10,6 +8,7 @@ f = '%(filename)s: %(levelname)s: %(funcName)s(): %(lineno)d:\t%(message)s' stderrLogger.setFormatter(logging.Formatter(f)) + log = logging.getLogger('hecuba') log.addHandler(stderrLogger) @@ -21,235 +20,10 @@ log.setLevel(logging.ERROR) -class _NRetry(RetryPolicy): - def __init__(self, time_to_retry=5): - self.time_to_retry = time_to_retry - - def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): - if retry_num > self.time_to_retry: - return self.RETHROW, None - else: - return self.RETHROW, None - - def on_write_timeout(self, query, consistency, write_type, required_responses, received_responses, retry_num): - if retry_num > self.time_to_retry: - return self.RETHROW, None - else: - return self.RETHROW, None - - def on_read_timeout(self, query, consistency, required_responses, received_responses, data_retrieved, retry_num): - if retry_num > self.time_to_retry: - return self.RETHROW, None - else: - return self.RETHROW, None - - -class Config: - class __Config: - def __init__(self): - self.configured = False - - instance = __Config() - - def __getattr__(self, item): - return getattr(Config.instance, item) - - def __init__(self): - singleton = Config.instance - if singleton.configured: - log.info('setting down') - return - - singleton.configured = True - - if 'CREATE_SCHEMA' in os.environ: - singleton.id_create_schema = int(os.environ['CREATE_SCHEMA']) - else: - singleton.id_create_schema = -1 - - try: - singleton.nodePort = int(os.environ['NODE_PORT']) - log.info('NODE_PORT: %d', singleton.nodePort) - except KeyError: - log.warn('using default NODE_PORT 9042') - singleton.nodePort = 9042 - - try: - singleton.contact_names = os.environ['CONTACT_NAMES'].split(",") - log.info('CONTACT_NAMES: %s', str.join(" ", singleton.contact_names)) - except KeyError: - log.warn('using default contact point localhost') - singleton.contact_names = ['127.0.0.1'] - - if hasattr(singleton, 'session'): - log.warn('Shutting down pre-existent sessions and cluster') - try: - singleton.session.shutdown() - singleton.cluster.shutdown() - except Exception: - log.warn('error shutting down') - try: - singleton.replication_factor = int(os.environ['REPLICA_FACTOR']) - log.info('REPLICA_FACTOR: %d', singleton.replication_factor) - except KeyError: - singleton.replication_factor = 1 - log.warn('using default REPLICA_FACTOR: %d', singleton.replication_factor) - - try: - user_defined_execution_name = os.environ['EXECUTION_NAME'] - if user_defined_execution_name == 'hecuba': - raise RuntimeError('Error: the application keyspace cannot be \'hecuba\'. ' - 'This keyspace is reserved for storing metadata.') - singleton.execution_name = user_defined_execution_name - log.info('EXECUTION_NAME: %s', singleton.execution_name) - except KeyError: - singleton.execution_name = 'my_app' - log.warn('using default EXECUTION_NAME: %s', singleton.execution_name) - try: - singleton.splits_per_node = int(os.environ['SPLITS_PER_NODE']) - log.info('SPLITS_PER_NODE: %d', singleton.splits_per_node) - except KeyError: - singleton.splits_per_node = 32 - log.warn('using default SPLITS_PER_NODE: %d', singleton.splits_per_node) - - try: - singleton.token_range_size = int(os.environ['TOKEN_RANGE_SIZE']) - log.info('TOKEN_RANGE_SIZE: %d', singleton.token_range_size) - singleton.target_token_range_size = None - except KeyError: - singleton.token_range_size = None - - try: - singleton.target_token_range_size = int(os.environ['TARGET_TOKEN_RANGE_SIZE']) - log.info('TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) - except KeyError: - singleton.target_token_range_size = 64 * 1024 - log.warn('using default TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) - - try: - singleton.max_cache_size = int(os.environ['MAX_CACHE_SIZE']) - log.info('MAX_CACHE_SIZE: %d', singleton.max_cache_size) - except KeyError: - singleton.max_cache_size = 1000 - log.warn('using default MAX_CACHE_SIZE: %d', singleton.max_cache_size) - - try: - singleton.replication_strategy = os.environ['REPLICATION_STRATEGY'] - log.info('REPLICATION_STRATEGY: %s', singleton.replication_strategy) - except KeyError: - singleton.replication_strategy = "SimpleStrategy" - log.warn('using default REPLICATION_STRATEGY: %s', singleton.replication_strategy) - - try: - singleton.replication_strategy_options = os.environ['REPLICATION_STRATEGY_OPTIONS'] - log.info('REPLICATION_STRATEGY_OPTIONS: %s', singleton.replication_strategy_options) - except KeyError: - singleton.replication_strategy_options = "" - log.warn('using default REPLICATION_STRATEGY_OPTIONS: %s', singleton.replication_strategy_options) - - if singleton.replication_strategy is "SimpleStrategy": - singleton.replication = "{'class' : 'SimpleStrategy', 'replication_factor': %d}" % \ - singleton.replication_factor - else: - singleton.replication = "{'class' : '%s', %s}" % ( - singleton.replication_strategy, singleton.replication_strategy_options) - try: - singleton.hecuba_print_limit = int(os.environ['HECUBA_PRINT_LIMIT']) - log.info('HECUBA_PRINT_LIMIT: %s', singleton.hecuba_print_limit) - except KeyError: - singleton.hecuba_print_limit = 1000 - log.warn('using default HECUBA_PRINT_LIMIT: %s', singleton.hecuba_print_limit) - - try: - singleton.prefetch_size = int(os.environ['PREFETCH_SIZE']) - log.info('PREFETCH_SIZE: %s', singleton.prefetch_size) - except KeyError: - singleton.prefetch_size = 10000 - log.warn('using default PREFETCH_SIZE: %s', singleton.prefetch_size) - - try: - singleton.write_buffer_size = int(os.environ['WRITE_BUFFER_SIZE']) - log.info('WRITE_BUFFER_SIZE: %s', singleton.write_buffer_size) - except KeyError: - singleton.write_buffer_size = 1000 - log.warn('using default WRITE_BUFFER_SIZE: %s', singleton.write_buffer_size) - - try: - singleton.write_callbacks_number = int(os.environ['WRITE_CALLBACKS_NUMBER']) - log.info('WRITE_CALLBACKS_NUMBER: %s', singleton.write_callbacks_number) - except KeyError: - singleton.write_callbacks_number = 16 - log.warn('using default WRITE_CALLBACKS_NUMBER: %s', singleton.write_callbacks_number) - - try: - env_var = os.environ['TIMESTAMPED_WRITES'].lower() - singleton.timestamped_writes = False if env_var == 'no' or env_var == 'false' else True - log.info('TIMESTAMPED WRITES ENABLED? {}'.format(singleton.timestamped_writes)) - except KeyError: - singleton.timestamped_writes = True - log.warn('using default TIMESTAMPED_WRITES: %s', singleton.timestamped_writes) - - if singleton.max_cache_size < singleton.write_buffer_size: - import warnings - message = "Defining a MAX_CACHE_SIZE smaller than WRITE_BUFFER_SIZE can result " \ - "in reading outdated results from the persistent storage" - warnings.warn(message) - - log.info('Initializing global session') - - singleton.cluster = Cluster(contact_points=singleton.contact_names, - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - port=singleton.nodePort, - default_retry_policy=_NRetry(5)) - singleton.session = singleton.cluster.connect() - singleton.session.encoder.mapping[tuple] = singleton.session.encoder.cql_encode_tuple - if singleton.id_create_schema == -1: - queries = [ - "CREATE KEYSPACE IF NOT EXISTS hecuba WITH replication = %s" % singleton.replication, - """CREATE TYPE IF NOT EXISTS hecuba.q_meta( - mem_filter text, - from_point frozen>, - to_point frozen>, - precision float); - """, - 'CREATE TYPE IF NOT EXISTS hecuba.np_meta(dims frozen>,type int,block_id int);', - """CREATE TABLE IF NOT EXISTS hecuba - .istorage (storage_id uuid, - class_name text,name text, - istorage_props map, - tokens list>>, - indexed_on list, - qbeast_random text, - qbeast_meta frozen, - numpy_meta frozen, - primary_keys list>>, - columns list>>, - PRIMARY KEY(storage_id)); - """] - for query in queries: - try: - singleton.session.execute(query) - except Exception as e: - log.error("Error executing query %s" % query) - raise e - - from hfetch import connectCassandra - # connecting c++ bindings - connectCassandra(singleton.contact_names, singleton.nodePort) - - -global config -config = Config() - -from hecuba.parser import Parser -from hecuba.storageobj import StorageObj -from hecuba.hdict import StorageDict -from hecuba.hnumpy import StorageNumpy -from hecuba.hfilter import hfilter - -if not filter == hfilter: - import builtins - builtins.python_filter = filter - builtins.filter = hfilter +from .parser import Parser +from .storageobj import StorageObj +from .hdict import StorageDict +from .hnumpy import StorageNumpy +from .storageiter import StorageIter -__all__ = ['StorageObj', 'StorageDict', 'StorageNumpy', 'Parser'] +__all__ = ['StorageObj', 'StorageDict', 'StorageNumpy', 'Parser', 'StorageIter', 'IStorage'] \ No newline at end of file diff --git a/hecuba_py/hecuba/hdict.py b/hecuba_py/hecuba/hdict.py index b14c2567..c0255486 100644 --- a/hecuba_py/hecuba/hdict.py +++ b/hecuba_py/hecuba/hdict.py @@ -1,349 +1,65 @@ -import uuid -from collections import Iterable, defaultdict -from collections import Mapping -from collections import namedtuple - -import numpy as np -from hecuba import config, log, Parser -from hecuba.tools import NamedItemsIterator, NamedIterator -from hecuba.hnumpy import StorageNumpy -from hfetch import Hcache - -from hecuba.IStorage import IStorage, AlreadyPersistentError, _basic_types, _discrete_token_ranges, _extract_ks_tab - - -class EmbeddedSet(set): - ''' - father is the dictionary containing the set - keys are the keys names of the set in the dictionary - values is the initializing set - ''' - - def __init__(self, father, keys, values=None): - super(EmbeddedSet, self).__init__() - self._father = father - self._keys = keys - if values is not None: - if len(self) != 0: - self.clear() - if isinstance(values, set): - for value in values: - self.add(value) - else: - raise Exception("Set expected.") - - def add(self, value): - keys = self._keys[:] - if not isinstance(value, Iterable) or isinstance(value, str): - keys.append(value) - else: - keys += list(value) - return self._father.__setitem__(keys, []) - - def remove(self, value): - if value in self: - keys = self._keys[:] - if not isinstance(value, Iterable) or isinstance(value, str): - keys.append(value) - else: - keys += list(value) - return self._father.__delitem__(keys) - else: - raise KeyError - - def discard(self, value): - try: - if value in self: - keys = self._keys[:] - if not isinstance(value, Iterable) or isinstance(value, str): - keys.append(value) - else: - keys += list(value) - return self._father.__delitem__(keys) - except KeyError as ex: - pass - - def __len__(self): - query = "SELECT COUNT(*) FROM %s.%s WHERE " % (self._father._ksp, self._father._table) - query = ''.join([query, self._join_keys_query()]) +from . import log, Parser +from .storageiter import StorageIter - try: - result = config.session.execute(query) - return result[0][0] - except Exception as ir: - log.error("Unable to execute %s", query) - raise ir - - def __contains__(self, value): - keys = self._keys[:] - if not isinstance(value, Iterable) or isinstance(value, str): - keys.append(value) - else: - keys += list(value) - return self._father.__contains__(keys) - - def __iter__(self): - keys_set = "" - for key in self._father._get_set_types(): - keys_set += key[0] + ", " - query = "SELECT %s FROM %s.%s WHERE " % (keys_set[:-2], self._father._ksp, self._father._table) - query = ''.join([query, self._join_keys_query()]) +from .IStorage import IStorage, AlreadyPersistentError +from .tools import build_remotely, storage_id_from_name, transform_to_dm +import storage +import uuid - try: - result = config.session.execute(query) - if len(self._father._get_set_types()) == 1: - result = map(lambda x: x[0], result) - else: - result = map(lambda x: tuple(x), result) - return iter(result) - except Exception as ir: - log.error("Unable to execute %s", query) - raise ir - def _join_keys_query(self): - keys = [] - for pkey, key in zip(self._father._primary_keys, self._keys): - if pkey["type"] == "text": - actual_key = "'%s'" % key - else: - actual_key = "%s" % key - keys.append(" = ".join([pkey["name"], actual_key])) - all_keys = " and ".join(keys) - - return all_keys - - def union(self, *others): - result = set() - for value in self: - result.add(value) - for other in others: - for value in other: - result.add(value) - return result - - def intersection(self, *others): - result = set() - for value in self: - in_all_others = True - for other in others: - try: - if value not in other: - in_all_others = False - break - except KeyError: - in_all_others = False - break - if in_all_others: - result.add(value) - return result - - def difference(self, *others): - result = set() - for value in self: - in_any_other = False - for other in others: - try: - if value in other: - in_any_other = True - break - except KeyError: - pass - if not in_any_other: - result.add(value) - return result - - def update(self, *others): - for other in others: - for value in other: - self.add(value) - return self - - def issubset(self, other): - if len(self) > len(other): - return False - for value in self: - if value not in other: - return False - return True - - def issuperset(self, other): - if len(self) < len(other): - return False - for value in other: - if value not in self: - return False - return True - - def __eq__(self, other): - return self._father.__eq__(other._father) and self._keys == other._keys - - def __ne__(self, other): - return not (self.__eq__(other)) - - def __lt__(self, other): - return self.__ne__(other) and self.issubset(other) - - def __le__(self, other): - return self.issubset(other) - - def __gt__(self, other): - return self.__ne__(other) and self.issuperset(other) - - def __ge__(self, other): - return self.issuperset(other) - - def clear(self): - for value in self._father[tuple(self._keys)]: - self.remove(value) - - -class StorageDict(dict, IStorage): +class StorageDict(IStorage, dict): # """ # Object used to access data from workers. # """ - args_names = ["name", "primary_keys", "columns", "tokens", "storage_id", "indexed_on", "class_name", "built_remotely"] - args = namedtuple('StorageDictArgs', args_names) - _prepared_store_meta = config.session.prepare('INSERT INTO hecuba.istorage' - '(storage_id, class_name, name, tokens, ' - 'primary_keys, columns, indexed_on)' - 'VALUES (?,?,?,?,?,?,?)') - - @staticmethod - def _store_meta(storage_args): - """ - Method to update the info about the StorageDict in the DB metadata table - Args: - storage_args: structure with all data needed to update the metadata - """ - log.debug("StorageDict: storing metas %s", storage_args) - - try: - config.session.execute(StorageDict._prepared_store_meta, - [storage_args.storage_id, storage_args.class_name, - storage_args.name, - storage_args.tokens, storage_args.primary_keys, - storage_args.columns, storage_args.indexed_on]) - except Exception as ex: - log.error("Error creating the StorageDict metadata: %s %s", storage_args, ex) - raise ex - - def __init__(self, name=None, primary_keys=None, columns=None, tokens=None, - storage_id=None, indexed_on=None, built_remotely=False, **kwargs): + def __new__(cls, name='', *args, **kwargs): """ Creates a new StorageDict. Args: name (string): the name of the collection/table (keyspace is optional) - primary_keys (list(tuple)): a list of (key,type) primary keys (primary + clustering). - columns (list(tuple)): a list of (key,type) columns - tokens (list): list of tokens storage_id (string): the storage id identifier - indexed_on (list): values that will be used as index - kwargs: other parameters + args: arguments for base constructor + kwargs: arguments for base constructor """ + if not cls._data_model_id: + # User data model + keys = {} + try: + cls._data_model_def = kwargs['data_model'] + except KeyError: + import typing + dms = [] + for ob in cls.__orig_bases__: + if isinstance(ob, typing.GenericMeta): + dms.append(transform_to_dm(ob)) + if len(dms) != 1: + raise ValueError("Different orig bases than expected ({})".format(len(dms))) - super(StorageDict, self).__init__(**kwargs) - self._is_persistent = False - self._built_remotely = built_remotely - log.debug("CREATED StorageDict(%s,%s,%s,%s,%s,%s)", primary_keys, columns, name, tokens, storage_id, kwargs) - - if tokens is None: - log.info('using all tokens') - tokens = list(map(lambda a: a.value, config.cluster.metadata.token_map.ring)) - self._tokens = _discrete_token_ranges(tokens) - else: - self._tokens = tokens - - self._storage_id = storage_id - - if self.__doc__ is not None: - self._persistent_props = self._parse_comments(self.__doc__) - self._primary_keys = self._persistent_props['primary_keys'] - self._columns = self._persistent_props['columns'] - self._indexed_on = self._persistent_props.get('indexed_on', indexed_on) - else: - self._primary_keys = primary_keys - set_pks = [] - normal_columns = [] - for column_name, column_type in columns: - if column_name.find("_set_") != -1: - set_pks.append((column_name.replace("_set_", ""), column_type)) - else: - normal_columns.append((column_name, column_type)) - if set_pks: - self._columns = [{"type": "set", "columns": set_pks}] - else: - self._columns = columns - self._indexed_on = indexed_on - - self._has_embedded_set = False - build_column = [] - columns = [] - for col in self._columns: - if isinstance(col, dict): - types = col["columns"] - if col["type"] == "set": - self._has_embedded_set = True - for t in types: - build_column.append(("_set_" + t[0], t[1])) - else: - build_column.append((col["name"], col["type"])) - columns.append(col) - else: - columns.append({"type": col[1], "name": col[0]}) - build_column.append(col) - - self._columns = columns[:] - self._primary_keys = [{"type": key[1], "name": key[0]} if isinstance(key, tuple) else key - for key in self._primary_keys] - build_keys = [(key["name"], key["type"]) for key in self._primary_keys] - - key_names = [col["name"] for col in self._primary_keys] - column_names = [col["name"] for col in self._columns] - - self._item_builder = namedtuple('row', key_names + column_names) + cls._data_model_def = dms[0] + cls._data_model_def['type'] = cls - if len(key_names) > 1: - self._key_builder = namedtuple('row', key_names) - else: - self._key_builder = None - if self._has_embedded_set: - set_names = [colname for (colname, dt) in self._get_set_types()] - self._column_builder = namedtuple('row', set_names) - elif len(column_names) > 1: - self._column_builder = namedtuple('row', column_names) - else: - self._column_builder = None + # Storage data model + #keys = {k: uuid.UUID if issubclass(v, IStorage) else v for k, v in cls._data_model_def["value_id"]} + #cols = {k: uuid.UUID if issubclass(v, IStorage) else v for k, v in cls._data_model_def["cols"]} - self._k_size = len(key_names) + cls._data_model_id = storage.StorageAPI.add_data_model(cls._data_model_def) - class_name = '%s.%s' % (self.__class__.__module__, self.__class__.__name__) + toret = super(StorageDict, cls).__new__(cls, kwargs) + storage_id = kwargs.get('storage_id', None) - if build_column is None: - build_column = self._columns[:] + if storage_id is None and name: + storage_id = storage_id_from_name(name) - self._build_args = self.args(None, build_keys, build_column, self._tokens, - self._storage_id, self._indexed_on, class_name, built_remotely) + if name or storage_id: + toret.setID(storage_id) + toret.set_name(name) + toret._is_persistent = True + storage.StorageAPI.register_persistent_object(cls._data_model_id, toret) + return toret - if name: - self.make_persistent(name) - - def __eq__(self, other): - """ - Method to compare a StorageDict with another one. - Args: - other: StorageDict to be compared with. - Returns: - boolean (true - equals, false - not equals). - """ - return self._storage_id == other._storage_id and self._tokens == other.token_ranges and \ - self._table == other.table_name and self._ksp == other.keyspace - - @classmethod - def _parse_comments(self, comments): - parser = Parser("TypeSpec") - return parser._parse_comments(comments) + def __init__(self, *args, **kwargs): + super().__init__(**kwargs) def __contains__(self, key): """ @@ -355,65 +71,53 @@ def __contains__(self, key): """ if not self._is_persistent: return dict.__contains__(self, key) - else: - try: - # TODO we should save this value in a cache - self._hcache.get_row(self._make_key(key)) - return True - except Exception as ex: - log.warn("persistentDict.__contains__ ex %s", ex) - return False - - def _make_key(self, key): + + return storage.StorageAPI.get_records(self.storage_id, [key]) != [] + + def keys(self): """ - Method used to pass the key data to the StorageDict cache in a proper way. - Args: - key: the data that needs to get the correct format + This method return a list of all the keys of the StorageDict. + Returns: + list: a list of keys """ - if isinstance(key, str) or not isinstance(key, Iterable): - if len(self._primary_keys) == 1: - return [key] - else: - raise Exception('missing a primary key') + iter_cols = self._data_model_def.get('value_id', None) + iter_model = {"type": "StorageIter", "name": self.get_name(), "cols": iter_cols} + if self.storage_id: + return StorageIter(storage_id=self.storage_id, data_model=iter_model, name=self.get_name()) - if isinstance(key, Iterable) and len(key) == len(self._primary_keys): - return list(key) - elif self._has_embedded_set and isinstance(key, Iterable) and len(key) == ( - len(self._primary_keys) + len(self._get_set_types())): - return list(key) - else: - raise Exception('wrong primary key') + return dict.keys(self) - @staticmethod - def _make_value(value): + def values(self): """ - Method used to pass the value data to the StorageDict cache in a proper way. - Args: - value: the data that needs to get the correct format + This method return a list of all the values of the StorageDict. + Returns: + list: a list of values """ - if issubclass(value.__class__, IStorage): - return [uuid.UUID(value.getID())] - elif isinstance(value, str) or not isinstance(value, Iterable) or isinstance(value, np.ndarray): - return [value] - elif isinstance(value, tuple): - return [value] - elif isinstance(value, Iterable): - val = [] - for v in value: - if isinstance(v, IStorage): - val.append(uuid.UUID(v.getID())) - else: - val.append(v) - return val - else: - return list(value) + + iter_cols = self._data_model_def.get('cols', None) + iter_model = {"type": "StorageIter", "name": self.get_name(), "cols": iter_cols} + if self.storage_id: + return StorageIter(storage_id=self.storage_id, data_model=iter_model, name=self.get_name()) + + return dict.values(self) + + def items(self): + """ + This method return a list of all the key-value pairs of the StorageDict. + Returns: + list: a list of key-value pairs + """ + if self.storage_id: + return StorageIter(storage_id=self.storage_id, data_model=self._data_model_def) + + return dict.items(self) def __iter__(self): """ Method that overloads the python dict basic iteration, which returns an iterator over the dictionary keys. """ - return self.keys() + return self.keys def make_persistent(self, name): """ @@ -424,111 +128,54 @@ def make_persistent(self, name): name: """ if self._is_persistent: - raise AlreadyPersistentError("This StorageDict is already persistent [Before:{}.{}][After:{}]", - self._ksp, self._table, name) + raise AlreadyPersistentError("This StorageDict is already persistent {}", name) # Update local StorageDict metadata - self._is_persistent = True - (self._ksp, self._table) = _extract_ks_tab(name) - - if self._storage_id is None: - self._storage_id = uuid.uuid3(uuid.NAMESPACE_DNS, self._ksp + '.' + self._table) - self._build_args = self._build_args._replace(storage_id=self._storage_id, name=self._ksp + "." + self._table) - - # Prepare data - persistent_keys = [(key["name"], "tuple<" + ",".join(key["columns"]) + ">") if key["type"] == "tuple" - else (key["name"], key["type"]) for key in self._primary_keys] + self._get_set_types() - persistent_values = [] - if not self._has_embedded_set: - for col in self._columns: - if col["type"] == "tuple": - persistent_values.append({"name": col["name"], "type": "tuple<" + ",".join(col["columns"]) + ">"}) - elif col["type"] not in _basic_types: - persistent_values.append({"name": col["name"], "type": "uuid"}) - else: - persistent_values.append({"name": col["name"], "type": col["type"]}) - - key_names = [col[0] if isinstance(col, tuple) else col["name"] for col in persistent_keys] - - if config.id_create_schema == -1 and not self._built_remotely: - query_keyspace = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s" % (self._ksp, config.replication) - try: - log.debug('MAKE PERSISTENCE: %s', query_keyspace) - config.session.execute(query_keyspace) - except Exception as ex: - log.warn("Error creating the StorageDict keyspace %s, %s", (query_keyspace), ex) - raise ex - - persistent_columns = [(col["name"], col["type"]) for col in persistent_values] - - query_table = "CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));" \ - % (self._ksp, - self._table, - ",".join("%s %s" % tup for tup in persistent_keys + persistent_columns), - str.join(',', key_names)) - try: - log.debug('MAKE PERSISTENCE: %s', query_table) - config.session.execute(query_table) - except Exception as ex: - log.warn("Error creating the StorageDict table: %s %s", query_table, ex) - raise ex - - if hasattr(self, '_indexed_on') and self._indexed_on is not None: - index_query = 'CREATE CUSTOM INDEX IF NOT EXISTS ' + self._table + '_idx ON ' - index_query += self._ksp + '.' + self._table + ' (' + str.join(',', self._indexed_on) + ') ' - index_query += "using 'es.bsc.qbeast.index.QbeastIndex';" - try: - config.session.execute(index_query) - except Exception as ex: - log.error("Error creating the Qbeast custom index: %s %s", index_query, ex) - raise ex - trigger_query = "CREATE TRIGGER IF NOT EXISTS %s%s_qtr ON %s.%s USING 'es.bsc.qbeast.index.QbeastTrigger';" % \ - (self._ksp, self._table, self._ksp, self._table) - try: - config.session.execute(trigger_query) - except Exception as ex: - log.error("Error creating the Qbeast trigger: %s %s", trigger_query, ex) - raise ex - - self._hcache_params = (self._ksp, self._table, - self._storage_id, - self._tokens, key_names, persistent_values, - {'cache_size': config.max_cache_size, - 'writer_par': config.write_callbacks_number, - 'writer_buffer': config.write_buffer_size, - 'timestamped_writes' : config.timestamped_writes}) - log.debug("HCACHE params %s", self._hcache_params) - self._hcache = Hcache(*self._hcache_params) + super().make_persistent(name) + storage.StorageAPI.register_persistent_object(self.__class__._data_model_id, self) + + keys = [] + values = [] # Storing all in-memory values to cassandra - for key, value in dict.items(self): - if issubclass(value.__class__, IStorage): - # new name as ksp.table_valuename, where valuename is either defined by the user or set by hecuba + for i, (key, value) in enumerate(dict.items(self)): + keys.append(key) + if isinstance(value, IStorage): if not value._is_persistent: - val_name = self._ksp + '.' + self._table + '_' + self._columns[0]["name"] - value.make_persistent(val_name) - value = value._storage_id - self._hcache.put_row(self._make_key(key), self._make_value(value)) + sd_name = name + '_' + i + value.make_persistent(sd_name) + values.append(value.getID()) + else: + values.append(value) - super(StorageDict, self).clear() + storage.StorageAPI.put_records(self.storage_id, keys, values) - self._store_meta(self._build_args) + super(StorageDict, self).clear() def stop_persistent(self): """ Method to turn a StorageDict into non-persistent. """ - log.debug('STOP PERSISTENCE: %s', self._table) - self._is_persistent = False - self._hcache = None + log.debug('STOP PERSISTENCE') + for obj in self.values(): + if isinstance(obj, IStorage): + obj.stop_persistent() + + super().stop_persistent() def delete_persistent(self): """ Method to empty all data assigned to a StorageDict. """ - query = "TRUNCATE TABLE %s.%s;" % (self._ksp, self._table) - log.debug('DELETE PERSISTENT: %s', query) - config.session.execute(query) + + log.debug('DELETE PERSISTENT') + for obj in self.values(): + if isinstance(obj, IStorage): + obj.delete_persistent() + + storage.StorageAPI.delete_persistent_object(self.storage_id) + + super().delete_persistent() def __delitem__(self, key): """ @@ -536,18 +183,10 @@ def __delitem__(self, key): Args: key: position of the entry that we want to delete """ - if not self._is_persistent: + if not self.storage_id: dict.__delitem__(self, key) - elif self._has_embedded_set: - self._hcache.delete_row(key) - else: - self._hcache.delete_row([key]) - - def __create_embeddedset(self, key, val=None): - if not isinstance(key, Iterable) or isinstance(key, str): - return EmbeddedSet(self, [key], val) else: - return EmbeddedSet(self, list(key), val) + storage.StorageAPI.put_records(self.storage_id, [key], []) def __getitem__(self, key): """ @@ -559,53 +198,29 @@ def __getitem__(self, key): """ log.debug('GET ITEM %s', key) - if not self._is_persistent: + if not self.storage_id: return dict.__getitem__(self, key) - elif self._has_embedded_set: - return self.__create_embeddedset(key=key) - else: - # Returns always a list with a single entry for the key - persistent_result = self._hcache.get_row(self._make_key(key)) - log.debug("GET ITEM %s[%s]", persistent_result, persistent_result.__class__) - - # we need to transform UUIDs belonging to IStorage objects and rebuild them - final_results = [] - for index, col in enumerate(self._columns): - name = col["name"] - col_type = col["type"] - element = persistent_result[index] - if col_type not in _basic_types: - # element is not a built-in type - table_name = self._ksp + '.' + self._table + '_' + name - info = {"name": table_name, "tokens": self._build_args.tokens, "storage_id": uuid.UUID(element), - "class_name": col_type} - element = IStorage.build_remotely(info) - - final_results.append(element) - - if self._column_builder is not None: - return self._column_builder(*final_results) - else: - return final_results[0] - - def __make_val_persistent(self, val, col=0): - if isinstance(val, StorageDict): - for k, element in val.items(): - val[k] = self.__make_val_persistent(element) - elif isinstance(val, list): - for index, element in enumerate(val): - val[index] = self.__make_val_persistent(element, index) - if isinstance(val, IStorage) and not val._is_persistent: - val._storage_id = uuid.uuid4() - attribute = self._columns[col]["name"] - count = self._count_name_collision(attribute) - if count == 0: - name = self._ksp + "." + self._table + "_" + attribute - else: - name = self._ksp + "." + self._table + "_" + attribute + "_" + str(count - 1) - # new name as ksp+table+obj_class_name - val.make_persistent(name) - return val + + if not isinstance(key, list): + key = [key] + + # Returns always a list with a single entry for the key + persistent_result = storage.StorageAPI.get_records(self.storage_id, [key]) + + # we need to transform UUIDs belonging to IStorage objects and rebuild them + final_results = [] + + for i, element in enumerate(persistent_result): + col_type = self.__class__._data_model_def['cols'][i] + if issubclass(col_type, IStorage): + # element is not a built-in type + table_name = self.storage_id + '_' + str(key) + info = {"name": table_name, "storage_id": element, "class_name": col_type} + element = build_remotely(info) + + final_results.append(element) + + return final_results def __setitem__(self, key, val): """ @@ -614,28 +229,18 @@ def __setitem__(self, key, val): key: the position of the value that we want to save val: the value that we want to save in that position """ - if isinstance(val, list): - vals_istorage = [] - for element in val: - if isinstance(element, np.ndarray): - val_istorage = StorageNumpy(element) - else: - val_istorage = element - vals_istorage.append(val_istorage) - - val = vals_istorage - elif isinstance(val, np.ndarray): - val = StorageNumpy(val) - elif isinstance(val, set): - val = self.__create_embeddedset(key=key, val=val) - log.debug('SET ITEM %s->%s', key, val) + + if not isinstance(val, list): + val = [val] + + keys = [def_type(val[i]) for i, def_type in enumerate(self._data_model_def['value_id'].values())] + vals = [def_type(val[i]) for i, def_type in enumerate(self._data_model_def['cols'].values())] + if not self._is_persistent: - dict.__setitem__(self, key, val) - elif not isinstance(val, EmbeddedSet): - # Not needed because it is made persistent and inserted to hcache when calling to self.__create_embeddedset - val = self.__make_val_persistent(val) - self._hcache.put_row(self._make_key(key), self._make_value(val)) + dict.__setitem__(self, keys, vals) + else: + storage.StorageAPI.put_records(self.storage_id, [keys], [vals]) def __repr__(self): """ @@ -646,13 +251,13 @@ def __repr__(self): to_return = {} for item in self.items(): to_return[item[0]] = item[1] - if len(to_return) == config.hecuba_print_limit: + if len(to_return) == 20: return str(to_return) if len(to_return) > 0: return str(to_return) return "" - def update(self, other=None, **kwargs): + def update(self, other=(), **kwargs): """ Updates the current dict with a new dictionary or set of attr,value pairs (those must follow the current dict data model). @@ -662,96 +267,15 @@ def update(self, other=None, **kwargs): **kwargs: set of attr:val pairs, to be treated as key,val and inserted in the current dict. """ - if other is not None: - if isinstance(other, StorageDict): - for k, v in other.items(): - self[k] = v - else: - for k, v in other.items() if isinstance(other, Mapping) else other: - self[k] = v - for k, v in kwargs.items(): - self[k] = v - def keys(self): - """ - Obtains the iterator for the keys of the StorageDict - Returns: - if persistent: - iterkeys(self): list of keys - if not persistent: - dict.keys(self) - """ - if self._is_persistent: - ik = self._hcache.iterkeys(config.prefetch_size) - iterator = NamedIterator(ik, self._key_builder, self) - if self._has_embedded_set: - iterator = iter(set(iterator)) - - return iterator - else: - return dict.keys(self) - - def items(self): - """ - Obtains the iterator for the key,val pairs of the StorageDict - Returns: - if persistent: - NamedItemsIterator(self): list of key,val pairs - if not persistent: - dict.items(self) - """ - if self._is_persistent: - ik = self._hcache.iteritems(config.prefetch_size) - iterator = NamedItemsIterator(self._key_builder, - self._column_builder, - self._k_size, - ik, - self) - if self._has_embedded_set: - d = defaultdict(set) - # iteritems has the set values in different rows, this puts all the set values in the same row - if len(self._get_set_types()) == 1: - for row in iterator: - d[row.key].add(row.value[0]) - else: - for row in iterator: - d[row.key].add(tuple(row.value)) - - iterator = d.items() - - return iterator - else: - return dict.items(self) + for k, v in other: + self[k] = v - def values(self): - """ - Obtains the iterator for the values of the StorageDict - Returns: - if persistent: - NamedIterator(self): list of valuesStorageDict - if not persistent: - dict.values(self) - """ - if self._is_persistent: - if self._has_embedded_set: - items = self.items() - return dict(items).values() - else: - ik = self._hcache.itervalues(config.prefetch_size) - return NamedIterator(ik, self._column_builder, self) - else: - return dict.values(self) + for k, v in kwargs.items(): + self[k] = v - def get(self, key, default): + def get(self, key, default=None): try: - value = self.__getitem__(key) + return self.__getitem__(key) except KeyError: - value = default - return value - - def _get_set_types(self): - if self._has_embedded_set: - set_types = [col.get("columns", []) for col in self._columns if isinstance(col, dict)] - return sum(set_types, []) - else: - return [] + return default diff --git a/hecuba_py/hecuba/hnumpy.py b/hecuba_py/hecuba/hnumpy.py index 58adeb56..b7b0b74a 100644 --- a/hecuba_py/hecuba/hnumpy.py +++ b/hecuba_py/hecuba/hnumpy.py @@ -1,103 +1,76 @@ -import uuid from collections import namedtuple - import numpy as np -from hecuba import config, log -from hfetch import HNumpyStore +import uuid + +from .IStorage import IStorage, AlreadyPersistentError -from hecuba.IStorage import IStorage, AlreadyPersistentError, _extract_ks_tab +import storage -class StorageNumpy(np.ndarray, IStorage): +class StorageNumpy(IStorage, np.ndarray): class np_meta(object): def __init__(self, shape, dtype, block_id): self.dims = shape self.type = dtype self.block_id = block_id - _storage_id = None _build_args = None _class_name = None _hcache_params = None _hcache = None - _is_persistent = False _ksp = "" _table = "" _block_id = None - _prepared_store_meta = config.session.prepare('INSERT INTO hecuba.istorage' - '(storage_id, class_name, name, numpy_meta)' - 'VALUES (?,?,?,?)') args_names = ["storage_id", "class_name", "name", "shape", "dtype", "block_id", "built_remotely"] args = namedtuple('StorageNumpyArgs', args_names) def __new__(cls, input_array=None, storage_id=None, name=None, built_remotely=False, **kwargs): + if storage_id: + # already exists + pass if input_array is None and name and storage_id is not None: - result = cls.load_array(storage_id, name) + result = cls.load_array(storage_id) input_array = result[0] obj = np.asarray(input_array).view(cls) + obj._name = name + obj.storage_id = storage_id obj._is_persistent = True - (obj._ksp, obj._table) = _extract_ks_tab(name) - obj._hcache = result[1] - obj._hcache_params = result[2] - obj._storage_id = storage_id elif not name and storage_id is not None: raise RuntimeError("hnumpy received storage id but not a name") elif (input_array is not None and name and storage_id is not None) \ or (storage_id is None and name): obj = np.asarray(input_array).view(cls) - obj._storage_id = storage_id - obj._built_remotely = built_remotely + obj.storage_id = storage_id + obj._is_persistent = False obj.make_persistent(name) else: obj = np.asarray(input_array).view(cls) - obj._storage_id = storage_id + obj.storage_id = storage_id + obj._is_persistent = storage_id is not None # Finally, we must return the newly created object: obj._built_remotely = built_remotely obj._class_name = '%s.%s' % (cls.__module__, cls.__name__) return obj + def __init__(self, *args, **kwargs): + super(StorageNumpy, self).__init__() + # used as copy constructor def __array_finalize__(self, obj): if obj is None: return - self._storage_id = getattr(obj, '_storage_id', None) - - @staticmethod - def build_remotely(new_args): - """ - Launches the StorageNumpy.__init__ from the uuid api.getByID - Args: - new_args: a list of all information needed to create again the StorageNumpy - Returns: - so: the created StorageNumpy - """ - log.debug("Building StorageNumpy object with %s", new_args) - return StorageNumpy(name=new_args.name, storage_id=new_args.storage_id) + # self.storage_id = getattr(obj, 'storage_id', None) + # if self.storage_id and not hasattr(self, '_hcache'): + # self.make_persistent(obj.name) @staticmethod - def _store_meta(storage_args): - """ - Saves the information of the object in the istorage table. - Args:. - storage_args (object): contains all data needed to restore the object from the workers - """ - log.debug("StorageObj: storing media %s", storage_args) - try: - config.session.execute(StorageNumpy._prepared_store_meta, - [storage_args.storage_id, storage_args.class_name, - storage_args.name, StorageNumpy.np_meta(storage_args.shape, storage_args.dtype, - storage_args.block_id)]) - - except Exception as ex: - log.warn("Error creating the StorageNumpy metadata with args: %s" % str(storage_args)) - raise ex - - @staticmethod - def load_array(storage_id, name): - (ksp, table) = _extract_ks_tab(name) - hcache_params = (ksp, table + '_numpies', + def load_array(storage_id): + storage.StorageAPI.get_records(storage_id) + ''' + (ksp, table) = extract_ks_table(name) + hcache_params = (ksp, table, storage_id, [], ['storage_id', 'cluster_id', 'block_id'], [{'name': "payload", 'type': 'numpy'}], {'cache_size': config.max_cache_size, @@ -110,56 +83,29 @@ def load_array(storage_id, name): return [result[0], hcache, hcache_params] else: raise KeyError + ''' def make_persistent(self, name): if self._is_persistent: - raise AlreadyPersistentError("This StorageNumpy is already persistent [Before:{}.{}][After:{}]", - self._ksp, self._table, name) - self._is_persistent = True - - (self._ksp, self._table) = _extract_ks_tab(name) - if self._storage_id is None: - self._storage_id = uuid.uuid3(uuid.NAMESPACE_DNS, self._ksp + '.' + self._table + '_numpies') - - self._build_args = self.args(self._storage_id, self._class_name, self._ksp + '.' + self._table, - self.shape, self.dtype.num, self._block_id, self._built_remotely) - - if not self._built_remotely: - log.info("PERSISTING DATA INTO %s %s", self._ksp, self._table) - - query_keyspace = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s" % (self._ksp, config.replication) - config.session.execute(query_keyspace) - - config.session.execute('CREATE TABLE IF NOT EXISTS ' + self._ksp + '.' + self._table + '_numpies' - '(storage_id uuid , ' - 'cluster_id int, ' - 'block_id int, ' - 'payload blob, ' - 'PRIMARY KEY((storage_id,cluster_id),block_id))') - - self._hcache_params = (self._ksp, self._table + '_numpies', - self._storage_id, [], ['storage_id', 'cluster_id', 'block_id'], - [{'name': "payload", 'type': 'numpy'}], - {'cache_size': config.max_cache_size, - 'writer_par': config.write_callbacks_number, - 'write_buffer': config.write_buffer_size, - 'timestamped_writes': config.timestamped_writes}) - - self._hcache = HNumpyStore(*self._hcache_params) - if len(self.shape) != 0: - self._hcache.save_numpy([self._storage_id], [self]) - self._store_meta(self._build_args) - - def delete_persistent(self): - """ - Deletes the Cassandra table where the persistent StorageObj stores data - """ - query = "DELETE FROM %s.%s WHERE storage_id = %s;" % (self._ksp, self._table + '_numpies', self._storage_id) - query2 = "DELETE FROM hecuba.istorage WHERE storage_id = %s;" % self._storage_id - log.debug("DELETE PERSISTENT: %s", query) - config.session.execute(query) - config.session.execute(query2) - self._is_persistent = False + raise AlreadyPersistentError("This StorageNumpy is already persistent [Before:{}][After:{}]", + self._name, name) + + super().make_persistent(name) + self._build_args = self.args(self.storage_id, self._class_name, self._name, + self.shape, self.dtype.num, self._block_id, False) + + if self._data_model_id is None: + data_model = {"type": type(self.view(np.ndarray)), + "keys": {"storage_id": uuid.UUID, "cluster_id": int, "block_id": int}, + "cols": {"payload": bytearray}} + self._data_model_id = storage.StorageAPI.add_data_model(data_model) + + storage.StorageAPI.register_persistent_object(datamodel_id=self._data_model_id, pyobject=self) + # storage.StorageAPI.put_records(self.storage_id, [slice(None, None)], [self.view(np.ndarray)]) + storage.StorageAPI.put_records(self.storage_id, [[self.storage_id]], [[self.view(np.ndarray)]]) + + def split(self): + storage.StorageAPI.split(self.storage_id) def __iter__(self): return iter(self.view(np.ndarray)) @@ -195,8 +141,8 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): if method == 'at': return - if self._is_persistent and len(self.shape): - self._hcache.save_numpy([self._storage_id], [self]) + if self.storage_id and len(self.shape): + self._hcache.save_numpy([self.storage_id], [self]) if ufunc.nout == 1: results = (results,) diff --git a/hecuba_py/hecuba/parser.py b/hecuba_py/hecuba/parser.py index 0b221220..00d2685e 100644 --- a/hecuba_py/hecuba/parser.py +++ b/hecuba_py/hecuba/parser.py @@ -1,7 +1,31 @@ import re from itertools import count -from hecuba.IStorage import _conversions, process_path +from .tools import process_path + +# User definitions to python types conversions +_conversions = {'atomicint': 'counter', + 'str': 'str', + 'text': 'str', + 'boolean': 'bool', + 'decimal': 'decimal', + 'float': 'float', + 'int': 'int', + 'tuple': 'tuple', + 'list': 'list', + 'generator': 'list', + 'frozenset': 'frozenset', + 'set': 'set', + 'dict': 'dict', + 'long': 'int', + 'buffer': 'buffer', + 'bytearray': 'bytearray', + 'counter': 'counter', + 'double': 'float', + 'StorageDict': 'dict', + 'ndarray': 'hecuba.StorageNumpy', + 'numpy.ndarray': 'hecuba.StorageNumpy', + 'UUID': 'uuid'} class Parser(object): @@ -231,7 +255,7 @@ def _input_type(self, line, this): if line.count('<') == 1: # is tuple, set, list aux = self._parse_set_tuple_list(line, this) elif line.count('<') == 0 and line.count('Index_on') == 0 and line.count('.') == 0 or ( - line.count('numpy.ndarray') and line.count('dict') == 0): # is simple type + line.count('numpy.ndarray') and line.count('dict') == 0): # is simple type aux = self._parse_simple(line, this) elif line.count('Index_on') == 1: aux = self._parse_index(line, this) @@ -270,7 +294,7 @@ def _remove_spaces_from_line(self, line): line = line[0:pos].replace('*', ' ') + line[pos:].replace("*", '') return line - def _parse_comments(self, comments): + def parse_comments(self, comments): '''Def: Parses the comments param to a ClassField or TypeSpec type and checks if the comments are in the correct format. Returns: an structure with all the parsed comments.''' diff --git a/hecuba_py/hecuba/qbeast.py b/hecuba_py/hecuba/qbeast.py deleted file mode 100644 index 63acde05..00000000 --- a/hecuba_py/hecuba/qbeast.py +++ /dev/null @@ -1,171 +0,0 @@ -import random -import string -import uuid -from collections import namedtuple - -from hecuba import config, log -from hecuba.tools import NamedItemsIterator -from hfetch import Hcache - -from hecuba.IStorage import IStorage, _discrete_token_ranges, _extract_ks_tab - - -class QbeastMeta(object): - def __init__(self, mem_filter, from_point, to_point, precision): - self.precision = precision - self.from_point = from_point - self.to_point = to_point - self.mem_filter = mem_filter - - -config.cluster.register_user_type('hecuba', 'q_meta', QbeastMeta) - - -class QbeastIterator(IStorage): - """ - Object used to access data from workers. - """ - args_names = ['primary_keys', 'columns', 'indexed_on', 'name', 'qbeast_meta', 'qbeast_random', - 'storage_id', 'tokens', 'class_name', 'built_remotely'] - _building_args = namedtuple('QbeastArgs', args_names) - _prepared_store_meta = config.session.prepare( - 'INSERT INTO hecuba.istorage' - '(primary_keys, columns, indexed_on, name, qbeast_meta,' - ' qbeast_random, storage_id, tokens, class_name)' - 'VALUES (?,?,?,?,?,?,?,?,?)') - _prepared_set_qbeast_meta = config.session.prepare( - 'INSERT INTO hecuba.istorage (storage_id, qbeast_meta)VALUES (?,?)') - _row_namedtuple = namedtuple("row", "key,value") - - @staticmethod - def _store_meta(storage_args): - log.debug("QbeastIterator: storing metas %s", '') - - try: - config.session.execute(QbeastIterator._prepared_store_meta, - [storage_args.primary_keys, - storage_args.columns, - storage_args.indexed_on, - storage_args.name, - storage_args.qbeast_meta, - storage_args.qbeast_random, - storage_args.storage_id, - storage_args.tokens, - storage_args.class_name]) - except Exception as ex: - log.error("Error creating the StorageDictIx metadata: %s %s", storage_args, ex) - raise ex - - def __init__(self, primary_keys, columns, indexed_on, name, qbeast_meta=None, qbeast_random=None, - storage_id=None, tokens=None, built_remotely=False): - """ - Creates a new block. - Args: - primary_keys (list(tuple)): a list of (key,type) primary keys (primary + clustering). - columns (list(tuple)): a list of (key,type) columns - indexed_on (list(str)): a list of the names of the indexed columns - name (string): keyspace.table of the Cassandra collection - qbeast_random (str): qbeast random string, when selecting in different nodes this must have the same value - storage_id (uuid): the storage id identifier - tokens (list): list of tokens - """ - log.debug("CREATED QbeastIterator(%s,%s,%s,%s)", storage_id, tokens, ) - (self._ksp, self._table) = _extract_ks_tab(name) - self._indexed_on = indexed_on - self._qbeast_meta = qbeast_meta - if qbeast_random is None: - self._qbeast_random = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) - else: - self._qbeast_random = qbeast_random - if tokens is None: - log.info('using all tokens') - tokens = map(lambda a: a.value, config.cluster.metadata.token_map.ring) - self._tokens = _discrete_token_ranges(tokens) - else: - self._tokens = tokens - - class_name = '%s.%s' % (self.__class__.__module__, self.__class__.__name__) - - key_names = [col[0] if isinstance(col, tuple) else col["name"] for col in primary_keys] - column_names = [col[0] if isinstance(col, tuple) else col["name"] for col in columns] - if len(key_names) > 1: - self._key_builder = namedtuple('row', key_names) - else: - self._key_builder = None - if len(column_names) > 1: - self._column_builder = namedtuple('row', column_names) - else: - self._column_builder = None - - self._k_size = len(primary_keys) - - if storage_id is None: - self._storage_id = uuid.uuid4() - else: - self._storage_id = storage_id - - build_keys = [(key["name"], key["type"]) if isinstance(key, dict) else key for key in primary_keys] - build_columns = [(col["name"], col["type"]) if isinstance(col, dict) else col for col in columns] - - self._build_args = self._building_args( - build_keys, - build_columns, - self._indexed_on, - self._ksp + "." + self._table, - self._qbeast_meta, - self._qbeast_random, - self._storage_id, - self._tokens, - class_name, - built_remotely) - - persistent_columns = [{"name": col[0], "type": col[1]} if isinstance(col, tuple) else col for col in columns] - - self._hcache_params = (self._ksp, self._table, - self._storage_id, - self._tokens, key_names, persistent_columns, - {'cache_size': config.max_cache_size, - 'writer_par': config.write_callbacks_number, - 'writer_buffer': config.write_buffer_size, - 'timestamped_writes': config.timestamped_writes}) - log.debug("HCACHE params %s", self._hcache_params) - self._hcache = Hcache(*self._hcache_params) - - if not built_remotely: - self._store_meta(self._build_args) - - def _set_qbeast_meta(self, qbeast_meta): - self._qbeast_meta = qbeast_meta - self._build_args = self._build_args._replace(qbeast_meta=qbeast_meta) - config.session.execute(QbeastIterator._prepared_set_qbeast_meta, [self._storage_id, qbeast_meta]) - - def __eq__(self, other): - return self._storage_id == other._storage_id and \ - self._tokens == other.token_ranges \ - and self._table == other.table_name and self._ksp == other.keyspace - - def __len__(self): - return len([row for row in self.__iter__()]) - - def __iter__(self): - if hasattr(self, "_qbeast_meta") and self._qbeast_meta is not None: - conditions = "" - for index, (from_p, to_p) in enumerate(zip(self._qbeast_meta.from_point, self._qbeast_meta.to_point)): - conditions += "{0} > {1} AND {0} < {2} AND ".format(self._indexed_on[index], from_p, to_p) - - conditions = conditions[:-5] + self._qbeast_meta.mem_filter - - conditions += " AND expr(%s_idx, 'precision=%s:%s') ALLOW FILTERING" \ - % (self._table, self._qbeast_meta.precision, self._qbeast_random) - - hiter = self._hcache.iteritems({'custom_select': conditions, 'prefetch_size': config.prefetch_size}) - else: - hiter = self._hcache.iteritems(config.prefetch_size) - - iterator = NamedItemsIterator(self._key_builder, - self._column_builder, - self._k_size, - hiter, - self) - - return iterator diff --git a/hecuba_py/hecuba/storageiter.py b/hecuba_py/hecuba/storageiter.py new file mode 100644 index 00000000..8b20a59f --- /dev/null +++ b/hecuba_py/hecuba/storageiter.py @@ -0,0 +1,49 @@ +from .IStorage import IStorage +import storage + + +class ImplStorageIter: + def __new__(cls, *args, **kwargs): + toret = super().__new__(cls) + storage_id = kwargs.get('storage_id', None) + # toret.data = storage.StorageAPI.get_records(storage_id,[]) + return toret + + def __next__(self): + if self.data is None: + raise StopIteration() + + for elem in self.data: + yield elem + self.data = [] + raise StopIteration() + + +class StorageIter(IStorage): + _persistent_props = None + _persistent_attrs = None + + def __new__(cls, *args, **kwargs): + toret = super().__new__(cls, *args, **kwargs) + storage_id = kwargs.get('storage_id', None) + persistent_props = kwargs.pop('data_model', None) + name = kwargs.pop('name', '') + toret.set_name(name) + DataModelId = storage.StorageAPI.add_data_model(persistent_props) + + if storage_id: + toret.storage_id = storage_id + toret._is_persistent = True + toret.myiter = ImplStorageIter(storage_id=storage_id) + storage.StorageAPI.register_persistent_object(DataModelId, toret) + else: + toret.myiter = iter([]) + + return toret + + def __iter__(self): + return self + + def __next__(self): + # and transform to namedtuple + return next(self.myiter) diff --git a/hecuba_py/hecuba/storageobj.py b/hecuba_py/hecuba/storageobj.py index 9f39e81c..8612c197 100644 --- a/hecuba_py/hecuba/storageobj.py +++ b/hecuba_py/hecuba/storageobj.py @@ -1,52 +1,49 @@ -import uuid -from collections import namedtuple - import numpy as np -from hecuba import config, log, Parser +from . import log, Parser -from hecuba.hnumpy import StorageNumpy -from hecuba.IStorage import IStorage, AlreadyPersistentError, _discrete_token_ranges, _basic_types, _valid_types, \ - _extract_ks_tab +from .hnumpy import StorageNumpy +from .IStorage import IStorage, AlreadyPersistentError +from .tools import storage_id_from_name, transform_to_dm +import uuid +import storage -class StorageObj(IStorage): - args_names = ["name", "tokens", "storage_id", "istorage_props", "class_name", "built_remotely"] - args = namedtuple('StorageObjArgs', args_names) - _prepared_store_meta = config.session.prepare('INSERT INTO hecuba' + - '.istorage (storage_id, class_name, name, tokens,istorage_props) ' - ' VALUES (?,?,?,?,?)') +class StorageObj(IStorage): """ This class is where information will be stored in Hecuba. The information can be in memory, stored in a python dictionary or local variables, or saved in a DB(Cassandra), depending on if it's persistent or not. """ - @staticmethod - def _store_meta(storage_args): - """ - Saves the information of the object in the istorage table. - Args: - storage_args (object): contains all data needed to restore the object from the workers - """ - log.debug("StorageObj: storing media %s", storage_args) - try: - - config.session.execute(StorageObj._prepared_store_meta, - [storage_args.storage_id, - storage_args.class_name, - storage_args.name, - storage_args.tokens, - storage_args.istorage_props]) - except Exception as ex: - log.warn("Error creating the StorageDict metadata: %s, %s", str(storage_args), ex) - raise ex - - @classmethod - def _parse_comments(cls, comments): - parser = Parser("ClassField") - return parser._parse_comments(comments) - - def __init__(self, name="", tokens=None, storage_id=None, istorage_props=None, built_remotely=False, **kwargs): + def __new__(cls, name='', *args, **kwargs): + if not cls._data_model_id: + # User data model + keys = {} + try: + cls._data_model_def = kwargs['data_model'] + except KeyError: + cls._data_model_def = dict() + cls._data_model_def["cols"] = transform_to_dm(cls) + cls._data_model_def['value_id'] = {'storage_id': uuid.UUID} + cls._data_model_def['type'] = cls + + # Storage data model + #cols = {k: uuid.UUID if issubclass(v, IStorage) else v for k, v in cls._data_model_def["cols"].items()} + cls._data_model_id = storage.StorageAPI.add_data_model(cls._data_model_def) + + toret = super(StorageObj, cls).__new__(cls) + storage_id = kwargs.get('storage_id', None) + if storage_id is None and name: + storage_id = storage_id_from_name(name) + + if name or storage_id: + toret.setID(storage_id) + toret.set_name(name) + toret._is_persistent = True + storage.StorageAPI.register_persistent_object(cls._data_model_id, toret) + return toret + + def __init__(self, *args, **kwargs): """ Creates a new storageobj. Args: @@ -56,100 +53,10 @@ def __init__(self, name="", tokens=None, storage_id=None, istorage_props=None, b istorage_props dict(string,string): a map with the storage id of each contained istorage object. kwargs: more optional parameters """ - log.debug("CREATED StorageObj(%s)", name) # Assign private attributes - self._is_persistent = True if name or storage_id else False - self._built_remotely = built_remotely - self._persistent_props = StorageObj._parse_comments(self.__doc__) - self._persistent_attrs = self._persistent_props.keys() - self._class_name = '%s.%s' % (self.__class__.__module__, self.__class__.__name__) - - if self._is_persistent: - if name: - self._ksp, self._table = _extract_ks_tab(name) - name = self._ksp + '.' + self._table - - if not storage_id: - # Rebuild storage id - storage_id = uuid.uuid3(uuid.NAMESPACE_DNS, name) - - # Retrieve from hecuba istorage the data - metas = self._get_istorage_attrs(storage_id) - - # If found data, replace the constructor data - if len(metas) != 0: - tokens = metas[0].tokens - istorage_props = metas[0].istorage_props - name = metas[0].name - self._ksp, self._table = _extract_ks_tab(name) - - if tokens is None: - # log.info('using all tokens') - tokens = [token.value for token in config.cluster.metadata.token_map.ring] - tokens = _discrete_token_ranges(tokens) - - self._tokens = tokens - self._storage_id = storage_id - self._istorage_props = istorage_props - - # Arguments used to build objects remotely - self._build_args = self.args(name, - self._tokens, - self._storage_id, - self._istorage_props, - self._class_name, - built_remotely) - - if self._is_persistent: - # If never existed, must create the tables and register - if not self._built_remotely: - self._create_tables() - self._store_meta(self._build_args) - - self._load_attributes() - - def _load_attributes(self): - """ - Loads the IStorage objects into memory by creating them or retrieving from the backend. - """ - attrs = [] - for attribute, value_info in self._persistent_props.items(): - if value_info['type'] not in _basic_types: - # The attribute is an IStorage object - attrs.append((attribute, getattr(self, attribute))) - for (attr_name, attr) in attrs: - setattr(self, attr_name, attr) - - def __eq__(self, other): - return self.__class__ == other.__class__ and self.getID() == other.getID() - - def _create_tables(self): - """ - Setups the python structures used to communicate with the backend. - Creates the necessary tables on the backend to store the object data. - """ - - log.info("CREATING KEYSPACE AND TABLE %s %s", self._ksp, self._table) - - query_keyspace = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s" % (self._ksp, config.replication) - config.session.execute(query_keyspace) - - query_simple = 'CREATE TABLE IF NOT EXISTS ' + self._ksp + '.' + self._table + \ - '( storage_id uuid PRIMARY KEY, ' - for key, entry in self._persistent_props.items(): - query_simple += str(key) + ' ' - if entry['type'] != 'dict' and entry['type'] in _valid_types: - if entry['type'] == 'list' or entry['type'] == 'tuple': - query_simple += entry['type'] + '<' + entry['columns'] + '>, ' - else: - query_simple += entry['type'] + ', ' - else: - query_simple += 'uuid, ' - try: - config.session.execute(query_simple[:-2] + ' )') - except Exception as ir: - log.error("Unable to execute %s", query_simple) - raise ir + # if self._is_persistent: + # self._load_attributes() + super(StorageObj, self).__init__() def make_persistent(self, name): """ @@ -162,78 +69,57 @@ def make_persistent(self, name): name (string): name with which the table in the DB will be created """ if self._is_persistent: - raise AlreadyPersistentError("This StorageObj is already persistent [Before:{}.{}][After:{}]", - self._ksp, self._table, name) - - (self._ksp, self._table) = _extract_ks_tab(name) + raise AlreadyPersistentError("This StorageObj is already persistent {}", name) - if not self._storage_id: - # Rebuild storage id - self._storage_id = uuid.uuid3(uuid.NAMESPACE_DNS, self._ksp + '.' + self._table) - self._build_args = self._build_args._replace(name=self._ksp + '.' + self._table, - storage_id=self._storage_id) + super().make_persistent(name) - # Retrieve from hecuba istorage the data - metas = self._get_istorage_attrs(self._storage_id) + storage.StorageAPI.register_persistent_object(self.__class__._data_model_id, self) + # defined_attrs = [attr for attr in self._data_model_def.keys() if attr in list(set(dir(self)))] - # If metadata was found, replace the private attrs - if len(metas) != 0: - # Persisted another - name = metas[0].name - self._tokens = metas[0].tokens - self._istorage_props = metas[0].istorage_props - # Create the interface with the backend to store the object - self._create_tables() - - self._is_persistent = True - if self._build_args.storage_id is None: - self._build_args = self._build_args._replace(name=self._ksp + '.' + self._table, - storage_id=self._storage_id) - self._store_meta(self._build_args) - - # Iterate over the objects the user has requested to be persistent - # retrieve them from memory and make them persistent - for obj_name, obj_info in self._persistent_props.items(): + attrs = [] + values = [] + for obj_name, obj_type in self._data_model_def["cols"].items(): try: pd = object.__getattribute__(self, obj_name) - if isinstance(pd, IStorage) and not pd._is_persistent: - count = self._count_name_collision(obj_name) - sd_name = self._ksp + "." + self._table + "_" + obj_name - if count > 1: - sd_name += '_' + str(count - 2) - pd.make_persistent(sd_name) - # self is persistent so setting the attribute will store the data and create the appropiate binding - setattr(self, obj_name, pd) except AttributeError: # Attribute unset, no action needed - pass + continue + attrs.append(obj_name) + if isinstance(pd, IStorage): + if not pd._is_persistent: + sd_name = name + "_" + obj_name + pd.make_persistent(sd_name) + values.append(pd.getID()) + else: + values.append(pd) + + storage.StorageAPI.put_records(self.storage_id, attrs, values) def stop_persistent(self): """ The StorageObj stops being persistent, but keeps the information already stored in Cassandra """ - for obj_name in self._persistent_attrs: + log.debug("STOP PERSISTENT") + for obj_name in self._data_model_def.keys(): attr = getattr(self, obj_name, None) if isinstance(attr, IStorage): attr.stop_persistent() - log.debug("STOP PERSISTENT") - self._is_persistent = False + super().stop_persistent() def delete_persistent(self): """ Deletes the Cassandra table where the persistent StorageObj stores data """ - for obj_name in self._persistent_attrs: + log.debug('DELETE PERSISTENT') + for obj_name in self._data_model_def.keys(): attr = getattr(self, obj_name, None) if isinstance(attr, IStorage): attr.delete_persistent() - query = "TRUNCATE TABLE %s.%s;" % (self._ksp, self._table) - log.debug("DELETE PERSISTENT: %s", query) - config.session.execute(query) + storage.StorageAPI.delete_persistent_object(self.storage_id) - self._is_persistent = False + super().delete_persistent() def __getattr__(self, attribute): """ @@ -245,67 +131,36 @@ def __getattr__(self, attribute): Returns: value: obtained value """ - if attribute.startswith('_') or attribute not in self._persistent_attrs: + try: return object.__getattribute__(self, attribute) + except AttributeError as ex: + if attribute.startswith('_') or attribute not in self._data_model_def['cols'].keys(): + raise ex + + value_info = self._data_model_def['cols'][attribute] - value_info = self._persistent_props[attribute] - is_istorage_attr = value_info['type'] not in _basic_types - if not self._is_persistent: - if not is_istorage_attr: - return object.__getattribute__(self, attribute) + if not self.storage_id: + if not issubclass(value_info, IStorage): + raise AttributeError else: - # We are not persistent or the attribute hasn't been assigned an IStorage obj, we build one - info = {"name": '', "tokens": self._build_args.tokens, "storage_id": None} - info.update(value_info) - info["built_remotely"] = self._built_remotely - value = IStorage.build_remotely(info) + # We build the object, because Hecuba allows accessing attributes without previous initialization + value = value_info(data_model=self._data_model_def["cols"][attribute], build_remotely=True) object.__setattr__(self, attribute, value) return value - ''' - StorageObj is persistent. - If the attribute is not a built-in object, we might have it in memory. - Since python works using references any modification from another reference will affect this attribute, - which is the expected behaviour. Therefore, is safe to store in-memory the Hecuba objects. - ''' - try: - return object.__getattribute__(self, attribute) - except AttributeError as ex: - # Not present in memory, we will need to rebuild it - pass + assert self._is_persistent - query = "SELECT %s FROM %s.%s WHERE storage_id = %s;" % (attribute, self._ksp, self._table, self._storage_id) - log.debug("GETATTR: %s", query) - try: - result = config.session.execute(query) - except Exception as ex: - log.warn("GETATTR ex %s", ex) - raise ex + attr = storage.StorageAPI.get_records(self.storage_id, [attribute])[0] - try: - value = result[0][0] - # if exists but is set to None, the current behaviour is raising AttributeError - if value is None: - raise AttributeError('value not found') - except IndexError as ex: - if not is_istorage_attr: - raise AttributeError('value not found') - value = None - - if is_istorage_attr: - # If IStorage type, then we rebuild - count = self._count_name_collision(attribute) - attr_name = self._ksp + '.' + self._table + '_' + attribute - if count > 1: - attr_name += '_' + str(count - 2) + if issubclass(value_info, IStorage): # Build the IStorage obj - info = {"name": attr_name, "tokens": self._build_args.tokens, "storage_id": value} - info.update(value_info) - info["built_remotely"] = self._built_remotely - value = IStorage.build_remotely(info) + attr = value_info(name=self.get_name() + '_' + attribute, storage_id=attr, + data_model=self._data_model_def["cols"][attribute], build_remotely=True) + elif not attr: + raise AttributeError('Value not found for {}'.format(attribute)) - object.__setattr__(self, attribute, value) - return value + object.__setattr__(self, attribute, attr) + return attr def __setattr__(self, attribute, value): """ @@ -316,7 +171,7 @@ def __setattr__(self, attribute, value): attribute: name of the value that we want to set value: value that we want to save """ - if attribute[0] is '_' or attribute not in self._persistent_attrs: + if attribute[0] is '_' or attribute not in self._data_model_def["cols"].keys(): object.__setattr__(self, attribute, value) return @@ -325,35 +180,21 @@ def __setattr__(self, attribute, value): if isinstance(value, np.ndarray): value = StorageNumpy(value) elif isinstance(value, dict): - per_dict = self._persistent_props[attribute] - info = {"name": '', "tokens": self._build_args.tokens, "storage_id": None, - "built_remotely": self._built_remotely} - info.update(per_dict) - new_value = IStorage.build_remotely(info) - new_value.update(value) - value = new_value + obj_class = self._data_model_defDataModelId["cols"][attribute]["type"] + value = obj_class(data_model=self._data_model_def["cols"][attribute], build_remotely=False) - if self._is_persistent: + if self.storage_id: # Write attribute to the storage if isinstance(value, IStorage): - if not value._is_persistent: - name_collisions = attribute.lower() - count = self._count_name_collision(name_collisions) - attr_name = self._ksp + '.' + self._table + '_' + name_collisions - if count != 0: - attr_name += '_' + str(count - 1) + if not value.storage_id: + attr_name = self._name + '_' + attribute + attr_id = storage_id_from_name(attr_name) value.make_persistent(attr_name) - # We store the storage_id when the object belongs to an Hecuba class - values = [self._storage_id, value._storage_id] - # We store the IStorage object in memory, to avoid rebuilding when it is not necessary + storage.StorageAPI.put_records(self.storage_id, [attribute], [attr_id]) + else: + storage.StorageAPI.put_records(self.storage_id, [attribute], [value.storage_id]) else: - values = [self._storage_id, value] - - query = "INSERT INTO %s.%s (storage_id,%s)" % (self._ksp, self._table, attribute) - query += " VALUES (%s,%s)" - - log.debug("SETATTR: ", query) - config.session.execute(query, values) + storage.StorageAPI.put_records(self.storage_id, [attribute], [value]) # We store all the attributes in memory object.__setattr__(self, attribute, value) @@ -364,8 +205,6 @@ def __delattr__(self, item): Args: item: the name of the attribute to be deleted """ - if self._is_persistent and item in self._persistent_attrs: - query = "UPDATE %s.%s SET %s = null WHERE storage_id = %s" % ( - self._ksp, self._table, item, self._storage_id) - config.session.execute(query) + if self.storage_id and item in self._data_model_def["cols"].keys(): + storage.StorageAPI.put_records(self.storage_id, [item], []) object.__delattr__(self, item) diff --git a/hecuba_py/hecuba/tools.py b/hecuba_py/hecuba/tools.py index ffdefcc1..aa68ba0e 100644 --- a/hecuba_py/hecuba/tools.py +++ b/hecuba_py/hecuba/tools.py @@ -1,49 +1,138 @@ -from collections import namedtuple - - -class NamedIterator: - # Class that allows to iterate over the keys or the values of a dict - def __init__(self, hiterator, builder, father): - self.hiterator = hiterator - self.builder = builder - self._storage_father = father - - def __iter__(self): - return self - - def __next__(self): - n = self.hiterator.get_next() - if self.builder is not None: - if self._storage_father._get_set_types() is not None: - nkeys = len(n) - len(self._storage_father._get_set_types()) - n = n[:nkeys] - return self.builder(*n) - else: - return n[0] +import uuid +import typing +from collections import OrderedDict +import hecuba -class NamedItemsIterator: - # Class that allows to iterate over the keys and the values of a dict - builder = namedtuple('row', 'key, value') +def storage_id_from_name(name): + return uuid.uuid3(uuid.NAMESPACE_DNS, name) - def __init__(self, key_builder, column_builder, k_size, hiterator, father): - self.key_builder = key_builder - self.k_size = k_size - self.column_builder = column_builder - self.hiterator = hiterator - self._storage_father = father - def __iter__(self): - return self +def process_path(module_path): + """ + Method to obtain module and class_name from a module path + Args: + module_path(String): path in the format module.class_name + Returns: + tuple containing class_name and module + """ - def __next__(self): - n = self.hiterator.get_next() - if self.key_builder is None: - k = n[0] - else: - k = self.key_builder(*n[0:self.k_size]) - if self.column_builder is None: - v = n[self.k_size] + if module_path == 'numpy.ndarray': + return 'StorageNumpy', 'hecuba.hnumpy' + if module_path == 'StorageDict': + return 'StorageDict', 'hecuba.hdict' + + res = module_path.split('.') + if len(res) == 1: + mod = "builtins" + class_name = module_path + else: + mod = res[0] + class_name = module_path[len(mod) + 1:] + + return class_name, mod + + +def build_remotely(args): + """ + Takes the information which consists of at least the type, + :raises TypeError if the object class doesn't subclass IStorage + :param obj_info: Contains the information to be used to create the IStorage obj + :return: An IStorage object + """ + if "built_remotely" not in args.keys(): + built_remotely = True + else: + built_remotely = args["built_remotely"] + + obj_type = args.get('class_name', args.get('type', None)) + if obj_type is None: + raise TypeError("Trying to build an IStorage obj without giving the type") + + imported_class = obj_type + + args = {k: v for k, v in args.items() if k in imported_class.args_names} + args.pop('class_name', None) + args["built_remotely"] = built_remotely + + return imported_class(**args) + + +def import_class(module_path): + """ + Method to obtain module and class_name from a module path + Args: + module_path(String): path in the format module.class_name + Returns: + tuple containing class_name and module + """ + class_name, mod = process_path(module_path) + + try: + mod = __import__(mod, globals(), locals(), [class_name], 0) + except ValueError: + raise ValueError("Can't import class {} from module {}".format(class_name, mod)) + + imported_class = getattr(mod, class_name) + return imported_class + + +def build_data_model(description): + res = {} + for k, v in description.items(): + dt = update_type(v["type"]) + try: + keys = build_data_model(v["primary_keys"]) + values = build_data_model(v["columns"]) + res[k] = {"keys": keys, "cols": values, "type": dt} + except KeyError: + res[k] = dt + return res + # {k: update_type(v['type']) for k, v in persistent_props.items()} + + +def update_type(d): + if d == 'text': + return str + res = import_class(d) + return res + + +def transform_to_dm(ob, depth=0): + """ + + :param ob: + :return: List or dict + """ + if issubclass(ob, hecuba.IStorage.IStorage) and depth>0: + return ob + elif issubclass(ob, typing.Dict): + fields = {} + + keys = transform_to_dm(ob.__args__[0], depth+1) # Keys + cols = transform_to_dm(ob.__args__[1], depth+1) # Cols + + if isinstance(keys, list): + keys = {"key{}".format(i): transform_to_dm(v, depth+1) for i, v in enumerate(keys)} + if isinstance(cols, list): + cols = {"col{}".format(i): transform_to_dm(v,depth+1) for i, v in enumerate(cols)} + + fields["value_id"] = keys + fields["cols"] = cols + return fields + + elif hasattr(ob, '__annotations__'): + annot = ob.__annotations__ + if isinstance(annot, OrderedDict): + return {k: transform_to_dm(v, depth+1) for k,v in annot.items()} + elif isinstance(annot, dict): + return {k: transform_to_dm(v, depth+1) for k,v in annot.items()} else: - v = self.column_builder(*n[self.k_size:]) - return self.builder(k, v) + raise NotImplemented + + elif hasattr(ob, '__args__'): + if issubclass(ob, typing.Tuple): + t = [transform_to_dm(cl, depth+1) for cl in ob.__args__ if cl != ()] + return tuple(t) + return [transform_to_dm(cl, depth+1) for cl in ob.__args__ if cl != ()] + return ob diff --git a/hecuba_py/tests/storage_api_tests.py b/hecuba_py/tests/storage_api_tests.py index d0d0579f..b3e694da 100644 --- a/hecuba_py/tests/storage_api_tests.py +++ b/hecuba_py/tests/storage_api_tests.py @@ -1,7 +1,7 @@ import unittest from storage.api import getByID -from hecuba import config, StorageDict +from hecuba import StorageDict class ApiTestSDict(StorageDict): diff --git a/hecuba_py/tests/storagedict_tests.py b/hecuba_py/tests/storagedict_tests.py index ab24df93..597687af 100644 --- a/hecuba_py/tests/storagedict_tests.py +++ b/hecuba_py/tests/storagedict_tests.py @@ -1,6 +1,5 @@ import unittest -from hecuba import Config from hecuba.hdict import StorageDict diff --git a/hecuba_py/tests/withcassandra/hfetch_tests.py b/hecuba_py/tests/withcassandra/hfetch_tests.py index c9ca5509..d0b9289f 100644 --- a/hecuba_py/tests/withcassandra/hfetch_tests.py +++ b/hecuba_py/tests/withcassandra/hfetch_tests.py @@ -1,6 +1,6 @@ import unittest -from hecuba import config, StorageDict +from hecuba import StorageDict class ConcurrentDict(StorageDict): @@ -11,8 +11,6 @@ class ConcurrentDict(StorageDict): class HfetchTests(unittest.TestCase): def test_timestamped_writes(self): - previous_cfg = config.timestamped_writes - config.timestamped_writes = "True" my_dict = ConcurrentDict("concurrent_dict") last_value = 1000 @@ -24,7 +22,7 @@ def test_timestamped_writes(self): retrieved = my_dict[0] - config.timestamped_writes = previous_cfg + self.assertEqual(retrieved, last_value - 1) diff --git a/hecuba_py/tests/withcassandra/storagedict_tests.py b/hecuba_py/tests/withcassandra/storagedict_tests.py index 6abb1c61..486d22e6 100644 --- a/hecuba_py/tests/withcassandra/storagedict_tests.py +++ b/hecuba_py/tests/withcassandra/storagedict_tests.py @@ -1,7 +1,7 @@ import cassandra import unittest -from hecuba import config, StorageObj, StorageDict +from hecuba import StorageObj, StorageDict from ..app.words import Words import uuid import time @@ -73,7 +73,7 @@ class MultiTuples(StorageDict): class StorageDictTest(unittest.TestCase): def test_init_empty(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab1") + tablename = "ksp.tab1" tokens = [(1, 2), (2, 3), (3, 4)] nopars = StorageDict(tablename, @@ -83,9 +83,6 @@ def test_init_empty(self): self.assertEqual("tab1", nopars._table) self.assertEqual("ksp", nopars._ksp) - res = config.session.execute( - 'SELECT storage_id, primary_keys, columns, class_name, name, tokens, istorage_props,indexed_on ' + - 'FROM hecuba.istorage WHERE storage_id = %s', [nopars._storage_id])[0] self.assertEqual(uuid.uuid3(uuid.NAMESPACE_DNS, tablename), nopars._storage_id) self.assertEqual(nopars.__class__.__module__, 'hecuba.hdict') @@ -100,7 +97,6 @@ def test_init_empty(self): self.assertEqual(nopars._is_persistent, rebuild._is_persistent) def test_init_empty_def_keyspace(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab1") tablename = "tab1" tokens = [(1, 2), (2, 3), (3, 4)] nopars = StorageDict(tablename, @@ -108,26 +104,18 @@ def test_init_empty_def_keyspace(self): [('value', 'int')], tokens) self.assertEqual("tab1", nopars._table) - self.assertEqual(config.execution_name, nopars._ksp) - - res = config.session.execute( - 'SELECT storage_id, primary_keys, columns, class_name, name, tokens, istorage_props,indexed_on ' + - 'FROM hecuba.istorage WHERE storage_id = %s', [nopars._storage_id])[0] - self.assertEqual(uuid.uuid3(uuid.NAMESPACE_DNS, config.execution_name + '.' + tablename), nopars._storage_id) self.assertEqual(nopars.__class__.__module__, 'hecuba.hdict') self.assertEqual(nopars.__class__.__name__, 'StorageDict') rebuild = StorageDict.build_remotely(res._asdict()) self.assertEqual(rebuild._built_remotely, True) self.assertEqual('tab1', rebuild._table) - self.assertEqual(config.execution_name, rebuild._ksp) - self.assertEqual(uuid.uuid3(uuid.NAMESPACE_DNS, config.execution_name + '.' + tablename), rebuild._storage_id) self.assertEqual(nopars._is_persistent, rebuild._is_persistent) def test_simple_insertions(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab10") + tablename = "tab10" tokens = [(1, 2), (2, 3), (3, 4)] pd = StorageDict(tablename, @@ -138,12 +126,9 @@ def test_simple_insertions(self): for i in range(100): pd[i] = 'ciao' + str(i) del pd - count, = config.session.execute('SELECT count(*) FROM my_app.tab10')[0] - self.assertEqual(count, 100) def test_dict_print(self): tablename = "tab10" - config.session.execute("DROP TABLE IF EXISTS my_app." + tablename) pd = StorageDict(tablename, [('position', 'int')], [('value', 'text')]) @@ -162,7 +147,7 @@ def test_dict_print(self): def test_get_strs(self): tablename = "tab10" - config.session.execute("DROP TABLE IF EXISTS my_app." + tablename) + pd = StorageDict(tablename, [('position', 'int')], [('value', 'text')]) @@ -185,7 +170,6 @@ def test_get_strs(self): ''' def test_make_persistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.t_make_words") nopars = Words() self.assertFalse(nopars._is_persistent) nopars.ciao = 1 @@ -195,36 +179,32 @@ def test_make_persistent(self): for i in range(10): nopars.words[i] = 'ciao' + str(i) - count, = config.session.execute( - "SELECT count(*) FROM system_schema.tables WHERE keyspace_name = 'my_app' and table_name = 't_make_words'")[ - 0] - self.assertEqual(0, count) nopars.make_persistent("t_make") del nopars - count, = config.session.execute('SELECT count(*) FROM my_app.t_make_words')[0] + self.assertEqual(10, count) def test_none_value(self): - config.session.execute("DROP TABLE IF EXISTS my_app.somename") + mydict = MyStorageDict('somename') mydict[0] = None self.assertEqual(mydict[0], None) - config.session.execute("DROP TABLE IF EXISTS my_app.somename") + def test_none_keys(self): - config.session.execute("DROP TABLE IF EXISTS my_app.somename") + mydict = MyStorageDict('somename') def set_none_key(): mydict[None] = 1 self.assertRaises(TypeError, set_none_key) - config.session.execute("DROP TABLE IF EXISTS my_app.somename") + def test_paranoid_setitem_nonpersistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + pd = StorageDict("mydict", [('position', 'int')], [('value', 'text')]) @@ -242,7 +222,7 @@ def set_wrong_val_2(): self.assertRaises(TypeError, set_wrong_val_2) def test_paranoid_setitem_multiple_nonpersistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + pd = StorageDict("mydict", [('position1', 'int'), ('position2', 'text')], [('value1', 'text'), ('value2', 'int')]) @@ -260,14 +240,12 @@ def set_wrong_val_2(): self.assertRaises(TypeError, set_wrong_val_2) def test_paranoid_setitem_persistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab_a1") + pd = StorageDict("tab_a1", [('position', 'int')], [('value', 'text')]) pd[0] = 'bla' - result = config.session.execute('SELECT value FROM my_app.tab_a1 WHERE position = 0') - for row in result: - self.assertEquals(row.value, 'bla') + def set_wrong_val_test(): pd[0] = 1 @@ -275,7 +253,7 @@ def set_wrong_val_test(): self.assertRaises(TypeError, set_wrong_val_test) def test_paranoid_setitem_multiple_persistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab_a2") + pd = StorageDict("tab_a2", [('position1', 'int'), ('position2', 'text')], [('value1', 'text'), ('value2', 'int')]) @@ -295,14 +273,12 @@ def set_wrong_key(): self.assertRaises(TypeError, set_wrong_key) def test_paranoid_setitemdouble_persistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab_a3") + pd = StorageDict("tab_a3", [('position', 'int')], [('value', 'double')]) pd[0] = 2.0 - result = config.session.execute('SELECT value FROM my_app.tab_a3 WHERE position = 0') - for row in result: - self.assertEquals(row.value, 2.0) + def set_wrong_val_test(): pd[0] = 1 @@ -310,7 +286,7 @@ def set_wrong_val_test(): set_wrong_val_test() def test_paranoid_setitemdouble_multiple_persistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.tab_a4") + pd = StorageDict("tab_a4", [('position1', 'int'), ('position2', 'text')], [('value1', 'text'), ('value2', 'double')]) @@ -319,26 +295,23 @@ def test_paranoid_setitemdouble_multiple_persistent(self): self.assertEquals(pd[0, 'pos1'], ('bla', 1.0)) def test_empty_persistent(self): - config.session.execute("DROP TABLE IF EXISTS my_app.wordsso_words") - config.session.execute("DROP TABLE IF EXISTS my_app.wordsso") + so = Words() so.make_persistent("wordsso") so.ciao = "an attribute" so.another = 123 - config.batch_size = 1 - config.cache_activated = False + for i in range(10): so.words[i] = str.join(',', map(lambda a: "ciao", range(i))) del so - count, = config.session.execute('SELECT count(*) FROM my_app.wordsso_words')[0] - self.assertEqual(10, count) + so = Words("wordsso") so.delete_persistent() so.words.delete_persistent() - count, = config.session.execute('SELECT count(*) FROM my_app.wordsso_words')[0] + self.assertEqual(0, count) def test_simple_items_test(self): @@ -832,11 +805,6 @@ def test_iterator_sync(self): config.session.execute("DROP TABLE IF EXISTS my_app.test_iterator_sync") def test_assign_and_replace(self): - config.session.execute("DROP TABLE IF EXISTS my_app.first_name") - config.session.execute("DROP TABLE IF EXISTS my_app.first_name_mona") - config.session.execute("DROP TABLE IF EXISTS my_app.first_name_mona_0") - config.session.execute("DROP TABLE IF EXISTS my_app.first_name_mona_1") - config.session.execute("DROP TABLE IF EXISTS my_app.second_name") first_storagedict = MyStorageDictA() my_storageobj = MyStorageObjC("first_name") @@ -875,12 +843,6 @@ def test_assign_and_replace(self): self.assertEqual(len(last_items), 1) self.assertEqual(my_storagedict[last_key], last_value) - config.session.execute("DROP TABLE IF EXISTS my_app.first_name") - config.session.execute("DROP TABLE IF EXISTS my_app.first_name_mona") - config.session.execute("DROP TABLE IF EXISTS my_app.first_name_mona_0") - config.session.execute("DROP TABLE IF EXISTS my_app.first_name_mona_1") - config.session.execute("DROP TABLE IF EXISTS my_app.second_name") - def test_make_persistent_with_persistent_obj(self): o2 = myobj2("obj") o2.attr1 = 1 diff --git a/storageAPI/storage/__init__.py b/pycompss_api/__init__.py similarity index 100% rename from storageAPI/storage/__init__.py rename to pycompss_api/__init__.py diff --git a/storageAPI/storage/api.py b/pycompss_api/api.py similarity index 94% rename from storageAPI/storage/api.py rename to pycompss_api/api.py index 4b3cf603..74d7b332 100755 --- a/storageAPI/storage/api.py +++ b/pycompss_api/api.py @@ -1,5 +1,6 @@ import uuid + def init(config_file_path=None): """ Function that can be useful when running the application with COMPSs >= 2.0 @@ -92,10 +93,10 @@ def getByID(objid): (Block| Storageobj) """ from hecuba import log - from hecuba.IStorage import IStorage - + from hecuba.IStorage import build_remotely + from hecuba import config + objid = str(objid) try: - from hecuba import config query = "SELECT * FROM hecuba.istorage WHERE storage_id = %s" results = config.session.execute(query, [uuid.UUID(objid)])[0] except Exception as e: @@ -103,4 +104,4 @@ def getByID(objid): raise e log.debug("IStorage API:getByID(%s) of class %s", objid, results.class_name) - return IStorage.build_remotely(results._asdict()) + return build_remotely(results._asdict()) diff --git a/storageAPI/storageItf/.gitignore b/pycompss_api/storageItf/.gitignore similarity index 100% rename from storageAPI/storageItf/.gitignore rename to pycompss_api/storageItf/.gitignore diff --git a/storageAPI/storageItf/pom.xml b/pycompss_api/storageItf/pom.xml similarity index 100% rename from storageAPI/storageItf/pom.xml rename to pycompss_api/storageItf/pom.xml diff --git a/storageAPI/storageItf/src/main/java/storage/StorageException.java b/pycompss_api/storageItf/src/main/java/storage/StorageException.java similarity index 100% rename from storageAPI/storageItf/src/main/java/storage/StorageException.java rename to pycompss_api/storageItf/src/main/java/storage/StorageException.java diff --git a/storageAPI/storageItf/src/main/java/storage/StorageItf.java b/pycompss_api/storageItf/src/main/java/storage/StorageItf.java similarity index 100% rename from storageAPI/storageItf/src/main/java/storage/StorageItf.java rename to pycompss_api/storageItf/src/main/java/storage/StorageItf.java diff --git a/storageAPI/storageItf/src/main/java/storage/StubItf.java b/pycompss_api/storageItf/src/main/java/storage/StubItf.java similarity index 100% rename from storageAPI/storageItf/src/main/java/storage/StubItf.java rename to pycompss_api/storageItf/src/main/java/storage/StubItf.java diff --git a/setup.py b/setup.py index 3864f95e..eb4d81a6 100644 --- a/setup.py +++ b/setup.py @@ -63,8 +63,8 @@ def setup_packages(): # compute which libraries were built metadata = dict(name="Hecuba", version="0.1.3", - package_dir={'hecuba': 'hecuba_py/hecuba', 'storage': 'storageAPI/storage'}, - packages=['hecuba', 'storage'], # find_packages(), + package_dir={'hecuba': 'hecuba_py/hecuba', 'storage': 'storage', 'pycompss_api': 'storage'}, + packages=['hecuba', 'storage'] + find_packages(), # find_packages(), install_requires=['cassandra-driver>=3.7.1', 'numpy>=1.16'], zip_safe=False, data_files=[('', extra_files)], diff --git a/storage/__init__.py b/storage/__init__.py new file mode 100644 index 00000000..3707311b --- /dev/null +++ b/storage/__init__.py @@ -0,0 +1,9 @@ +from .storage_iface import StorageIface + + +def select_storage_api(): + from .cql_iface.cql_iface import CQLIface + return CQLIface() + + +StorageAPI = select_storage_api() diff --git a/storage/cql_iface/__init__.py b/storage/cql_iface/__init__.py new file mode 100644 index 00000000..ae2bd0e0 --- /dev/null +++ b/storage/cql_iface/__init__.py @@ -0,0 +1,220 @@ +from cassandra.cluster import Cluster +from cassandra.policies import RetryPolicy, RoundRobinPolicy, TokenAwarePolicy +from hecuba import log +import os + + +class _NRetry(RetryPolicy): + def __init__(self, time_to_retry=5): + self.time_to_retry = time_to_retry + + def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): + if retry_num > self.time_to_retry: + return self.RETHROW, None + else: + return self.RETHROW, None + + def on_write_timeout(self, query, consistency, write_type, required_responses, received_responses, retry_num): + if retry_num > self.time_to_retry: + return self.RETHROW, None + else: + return self.RETHROW, None + + def on_read_timeout(self, query, consistency, required_responses, received_responses, data_retrieved, retry_num): + if retry_num > self.time_to_retry: + return self.RETHROW, None + else: + return self.RETHROW, None + + +class Config(object): + class __Config: + def __init__(self): + self.configured = False + + instance = __Config() + + @staticmethod + def execute(statement, args): + if not Config.instance.configured: + raise RuntimeError("Not configured to contact cassandra on CQL_Comm storage") + + return Config.instance.session.execute(statement, args) + + def __getattr__(self, item): + return getattr(Config.instance, item) + + def __init__(self): + singleton = Config.instance + if singleton.configured: + log.info('setting down') + return + + singleton.configured = True + + if 'CREATE_SCHEMA' in os.environ: + singleton.id_create_schema = int(os.environ['CREATE_SCHEMA']) + else: + singleton.id_create_schema = -1 + + try: + singleton.nodePort = int(os.environ['NODE_PORT']) + log.info('NODE_PORT: %d', singleton.nodePort) + except KeyError: + log.warn('using default NODE_PORT 9042') + singleton.nodePort = 9042 + + try: + singleton.contact_names = os.environ['CONTACT_NAMES'].split(",") + log.info('CONTACT_NAMES: %s', str.join(" ", singleton.contact_names)) + except KeyError: + log.warn('using default contact point localhost') + singleton.contact_names = ['127.0.0.1'] + + if hasattr(singleton, 'session'): + log.warn('Shutting down pre-existent sessions and cluster') + try: + singleton.session.shutdown() + singleton.cluster.shutdown() + except Exception: + log.warn('error shutting down') + try: + singleton.replication_factor = int(os.environ['REPLICA_FACTOR']) + log.info('REPLICA_FACTOR: %d', singleton.replication_factor) + except KeyError: + singleton.replication_factor = 1 + log.warn('using default REPLICA_FACTOR: %d', singleton.replication_factor) + + try: + user_defined_execution_name = os.environ['EXECUTION_NAME'] + if user_defined_execution_name == 'hecuba': + raise RuntimeError('Error: the application keyspace cannot be \'hecuba\'. ' + 'This keyspace is reserved for storing metadata.') + singleton.execution_name = user_defined_execution_name + log.info('EXECUTION_NAME: %s', singleton.execution_name) + except KeyError: + singleton.execution_name = 'my_app' + log.warn('using default EXECUTION_NAME: %s', singleton.execution_name) + try: + singleton.splits_per_node = int(os.environ['SPLITS_PER_NODE']) + log.info('SPLITS_PER_NODE: %d', singleton.splits_per_node) + except KeyError: + singleton.splits_per_node = 32 + log.warn('using default SPLITS_PER_NODE: %d', singleton.splits_per_node) + + try: + singleton.token_range_size = int(os.environ['TOKEN_RANGE_SIZE']) + log.info('TOKEN_RANGE_SIZE: %d', singleton.token_range_size) + singleton.target_token_range_size = None + except KeyError: + singleton.token_range_size = None + + try: + singleton.target_token_range_size = int(os.environ['TARGET_TOKEN_RANGE_SIZE']) + log.info('TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) + except KeyError: + singleton.target_token_range_size = 64 * 1024 + log.warn('using default TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) + + try: + singleton.max_cache_size = int(os.environ['MAX_CACHE_SIZE']) + log.info('MAX_CACHE_SIZE: %d', singleton.max_cache_size) + except KeyError: + singleton.max_cache_size = 1000 + log.warn('using default MAX_CACHE_SIZE: %d', singleton.max_cache_size) + + try: + singleton.replication_strategy = os.environ['REPLICATION_STRATEGY'] + log.info('REPLICATION_STRATEGY: %s', singleton.replication_strategy) + except KeyError: + singleton.replication_strategy = "SimpleStrategy" + log.warn('using default REPLICATION_STRATEGY: %s', singleton.replication_strategy) + + try: + singleton.replication_strategy_options = os.environ['REPLICATION_STRATEGY_OPTIONS'] + log.info('REPLICATION_STRATEGY_OPTIONS: %s', singleton.replication_strategy_options) + except KeyError: + singleton.replication_strategy_options = "" + log.warn('using default REPLICATION_STRATEGY_OPTIONS: %s', singleton.replication_strategy_options) + + if singleton.replication_strategy is "SimpleStrategy": + singleton.replication = "{'class' : 'SimpleStrategy', 'replication_factor': %d}" % \ + singleton.replication_factor + else: + singleton.replication = "{'class' : '%s', %s}" % ( + singleton.replication_strategy, singleton.replication_strategy_options) + try: + singleton.hecuba_print_limit = int(os.environ['HECUBA_PRINT_LIMIT']) + log.info('HECUBA_PRINT_LIMIT: %s', singleton.hecuba_print_limit) + except KeyError: + singleton.hecuba_print_limit = 1000 + log.warn('using default HECUBA_PRINT_LIMIT: %s', singleton.hecuba_print_limit) + + try: + singleton.prefetch_size = int(os.environ['PREFETCH_SIZE']) + log.info('PREFETCH_SIZE: %s', singleton.prefetch_size) + except KeyError: + singleton.prefetch_size = 10000 + log.warn('using default PREFETCH_SIZE: %s', singleton.prefetch_size) + + try: + singleton.write_buffer_size = int(os.environ['WRITE_BUFFER_SIZE']) + log.info('WRITE_BUFFER_SIZE: %s', singleton.write_buffer_size) + except KeyError: + singleton.write_buffer_size = 1000 + log.warn('using default WRITE_BUFFER_SIZE: %s', singleton.write_buffer_size) + + try: + singleton.write_callbacks_number = int(os.environ['WRITE_CALLBACKS_NUMBER']) + log.info('WRITE_CALLBACKS_NUMBER: %s', singleton.write_callbacks_number) + except KeyError: + singleton.write_callbacks_number = 16 + log.warn('using default WRITE_CALLBACKS_NUMBER: %s', singleton.write_callbacks_number) + + try: + env_var = os.environ['TIMESTAMPED_WRITES'].lower() + singleton.timestamped_writes = False if env_var == 'no' or env_var == 'false' else True + log.info('TIMESTAMPED WRITES ENABLED? {}'.format(singleton.timestamped_writes)) + except KeyError: + singleton.timestamped_writes = True + log.warn('using default TIMESTAMPED_WRITES: %s', singleton.timestamped_writes) + + if singleton.max_cache_size < singleton.write_buffer_size: + import warnings + message = "Defining a MAX_CACHE_SIZE smaller than WRITE_BUFFER_SIZE can result " \ + "in reading outdated results from the persistent storage" + warnings.warn(message) + + log.info('Initializing global session') + + singleton.cluster = Cluster(contact_points=singleton.contact_names, + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + port=singleton.nodePort, + default_retry_policy=_NRetry(5)) + singleton.session = singleton.cluster.connect() + singleton.session.encoder.mapping[tuple] = singleton.session.encoder.cql_encode_tuple + if singleton.id_create_schema == -1: + queries = [ + "CREATE KEYSPACE IF NOT EXISTS hecuba WITH replication = %s" % singleton.replication, + """CREATE TYPE IF NOT EXISTS hecuba.q_meta( + mem_filter text, + from_point frozen>, + to_point frozen>, + precision float); + """, + 'CREATE TYPE IF NOT EXISTS hecuba.np_meta(dims frozen>,type int,block_id int);', + """CREATE TABLE IF NOT EXISTS hecuba.istorage (storage_id uuid, + data_model text, name text, PRIMARY KEY(storage_id)); + """] + for query in queries: + try: + singleton.session.execute(query) + except Exception as e: + log.error("Error executing query %s" % query) + raise e + + from hfetch import connectCassandra + # connecting c++ bindings + connectCassandra(singleton.contact_names, singleton.nodePort) + +config = Config() \ No newline at end of file diff --git a/storage/cql_iface/cql_comm.py b/storage/cql_iface/cql_comm.py new file mode 100644 index 00000000..91e83704 --- /dev/null +++ b/storage/cql_iface/cql_comm.py @@ -0,0 +1,259 @@ +from hecuba import log +from hfetch import Hcache, HNumpyStore +import hecuba +from . import config +import uuid +import numpy + +""" + Cassandra related methods +""" + +_size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count " + "FROM system.size_estimates WHERE keyspace_name=? and table_name=?")) +_max_token = int(((2 ** 63) - 1)) # type: int +_min_token = int(-2 ** 63) # type: int + +_select_istorage_meta = config.session.prepare("SELECT * FROM hecuba.istorage WHERE storage_id = ?") + +# User class to Cassandra data type +_hecuba2cassandra_typemap = {str: 'text', + bool: 'boolean', + float: 'float', + int: 'int', + tuple: 'tuple', + list: 'list', + set: 'set', + dict: 'map', + bytearray: 'blob', + bytes: 'blob', + 'double': 'double', + numpy.ndarray: 'hecuba.hnumpy.StorageNumpy', + uuid.UUID: 'uuid'} + + +def extract_ksp_table(name): + """ + Method used to obtain keyspace and table from a given name + Args: + name: a string containing keyspace name and table name, or only table name + Returns: + a tuple containing keyspace name and table name + """ + + try: + ksp = name[:name.index('.')] + table = name[len(ksp) + 1:] + except ValueError as ex: + ksp = config.execution_name + table = name + return ksp.lower(), table.lower() + + +def tokens_partitions(ksp, table, tokens_ranges): + """ + Method that calculates the new token partitions for a given object + Args: + tokens: current number of tokens of the object + min_tokens_per_worker: defined minimum number of tokens + number_of_workers: defined + Returns: + a partition every time it's called + :type tokens_ranges: list[(long,long)] + """ + from collections import defaultdict + from bisect import bisect_right + from cassandra.metadata import Murmur3Token + + splits_per_node = config.splits_per_node + token_range_size = config.token_range_size + target_token_range_size = config.target_token_range_size + + tm = config.cluster.metadata.token_map + tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) + + tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) + if not tmap: + tm.rebuild_keyspace(ksp, build_if_absent=True) + tmap = tm.tokens_to_hosts_by_ks[ksp] + + tokens_per_node = defaultdict(list) + for tmumur, t_to in tokens_murmur3: + point = bisect_right(tm.ring, tmumur) + if point == len(tm.ring): + tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) + else: + tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) + + n_nodes = len(tokens_per_node) + step_size = _max_token // (splits_per_node * n_nodes) + if token_range_size: + step_size = token_range_size + elif target_token_range_size: + one = config.session.execute(_size_estimates, [ksp, table]).one() + if one: + (mean_p_size, p_count) = one + estimated_size = mean_p_size * p_count + if estimated_size > 0: + step_size = _max_token // ( + max(estimated_size / target_token_range_size, + splits_per_node * n_nodes) + ) + + for tokens_in_node in tokens_per_node.values(): + partition = [] + for fraction, to in tokens_in_node: + while fraction < to - step_size: + partition.append((fraction, fraction + step_size)) + fraction += step_size + partition.append((fraction, to)) + group_size = max(len(partition) // splits_per_node, 1) + for i in range(0, len(partition), group_size): + yield partition[i:i + group_size] + + +def discrete_token_ranges(tokens): + """ + Makes proper tokens ranges ensuring that in a tuple (a,b) a <= b + Args: + tokens: a list of tokens [1, 0, 10] + Returns: + a rationalized list [(-1, 0),(0,10),(10, max)] + """ + tokens.sort() + if len(tokens) == 0: + return tokens + if tokens[0] > _min_token: + token_ranges = [(_min_token, tokens[0])] + else: + token_ranges = [] + n_tns = len(tokens) + for i in range(0, n_tns - 1): + token_ranges.append((tokens[i], tokens[i + 1])) + token_ranges.append((tokens[n_tns - 1], _max_token)) + return token_ranges + + +def count_name_collision(ksp, table, attribute): + import re + m = re.compile("^%s_%s(_[0-9]+)?$" % (table, attribute)) + q = config.session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s", + [ksp]) + return sum(1 for elem in q if m.match(elem[0])) + + +def get_istorage_attrs(storage_id): + return list(config.session.execute(_select_istorage_meta, [storage_id])) + + +class CqlCOMM(object): + istorage_prepared_st = config.session.prepare('INSERT INTO hecuba.istorage' + '(storage_id, name, data_model)' + 'VALUES (?,?,?)') + + istorage_remove_entry = config.session.prepare('DELETE FROM hecuba.istorage WHERE storage_id = ?') + istorage_read_entry = config.session.prepare('SELECT * FROM hecuba.istorage WHERE storage_id = ?') + + @staticmethod + def register_istorage(obj_id, obj_name, data_model): + obj_info = [obj_id, obj_name, str(data_model)] + config.execute(CqlCOMM.istorage_prepared_st, obj_info) + + @staticmethod + def register_data_model(data_model_id, definition): + # extract keys, values and so on + pass + + @staticmethod + def delete_data(object_id): + res = config.execute(CqlCOMM.istorage_read_entry, [object_id]) + if res: + res = res.one() + config.execute(CqlCOMM.istorage_remove_entry, [object_id]) + # TODO Use res to delete the appropriate data, maybe async + + @staticmethod + def create_table(object_id, name, definition): + # StorageObj for now + ksp, table = extract_ksp_table(name) + query_keyspace = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s" % (ksp, config.replication) + config.session.execute(query_keyspace) + + primary_keys = definition['value_id'] + columns = definition['cols'] + + if not primary_keys: + primary_keys = {"storage_id": uuid.UUID} + + pks = str.join(',', primary_keys.keys()) + if definition["type"] is numpy.ndarray: + pks = "(storage_id, cluster_id),block_id" + + all_keys = ",".join("%s %s" % (k, _hecuba2cassandra_typemap[v]) for k, v in primary_keys.items()) + + all_cols = ",".join("%s %s" % (k, _hecuba2cassandra_typemap[v]) for k, v in columns.items()) + + if all_cols: + total_cols = all_keys + ',' + all_cols + else: + total_cols = all_keys + + query_table = "CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));" \ + % (ksp, + table, + total_cols, + pks) + try: + log.debug('MAKE PERSISTENCE: %s', query_table) + config.session.execute(query_table) + except Exception as ex: + log.warn("Error creating the StorageDict table: %s %s", query_table, ex) + raise ex + + @staticmethod + def create_hcache(object_id, name, definition): + ksp, table = extract_ksp_table(name) + + if definition["type"] is numpy.ndarray: + + keys = [k for k in definition["value_id"].keys()] + columns = [k for k in definition["cols"].keys()] + hcache_params = (ksp, table, object_id, [(-2 ** 63, 2 ** 63 - 1)], keys, columns, + {'cache_size': config.max_cache_size, + 'writer_par': config.write_callbacks_number, + 'writer_buffer': config.write_buffer_size, + 'timestamped_writes': config.timestamped_writes}) + return HNumpyStore(*hcache_params) + elif issubclass(definition.get("type", None), hecuba.hdict.StorageDict): + keys = [k for k in definition["value_id"].keys()] + columns = [k for k in definition["cols"].keys()] + + hcache_params = (ksp, table, object_id, [(-2 ** 63, 2 ** 63 - 1)], keys, columns, + {'cache_size': config.max_cache_size, + 'writer_par': config.write_callbacks_number, + 'writer_buffer': config.write_buffer_size, + 'timestamped_writes': config.timestamped_writes}) + return Hcache(*hcache_params) + + elif issubclass(definition.get("type", None), hecuba.storageobj.StorageObj): + + class HcacheWrapper(object): + def __init__(self, attributes, object_id, ksp, table): + self.internal_caches = {} + self.object_id = object_id + for attr in attributes: + hc = Hcache(ksp, table, object_id, [(-2 ** 63, 2 ** 63 - 1)], ["storage_id"], [attr], + {'cache_size': config.max_cache_size, + 'writer_par': config.write_callbacks_number, + 'writer_buffer': config.write_buffer_size, + 'timestamped_writes': config.timestamped_writes}) + + self.internal_caches[attr] = hc + + def get_row(self, attr): + return self.internal_caches[attr].get_row([self.object_id])[0] + + def put_row(self, attr, val): + self.internal_caches[attr].put_row([self.object_id], [val]) + + return HcacheWrapper(definition["cols"].keys(), object_id, ksp, table) diff --git a/storage/cql_iface/cql_iface.py b/storage/cql_iface/cql_iface.py new file mode 100644 index 00000000..5ad7ca29 --- /dev/null +++ b/storage/cql_iface/cql_iface.py @@ -0,0 +1,125 @@ +from ..storage_iface import StorageIface +from .cql_comm import CqlCOMM +import uuid +import numpy +from hecuba.IStorage import IStorage +from hecuba.storageiter import StorageIter + +""" +Mockup on how the Cassandra implementation of the interface could work. +""" + + +class CQLIface(StorageIface): + # DataModelID - DataModelDef + data_models_cache = {} + + # StorageID - DataModelID + object_to_data_model = {} + # Object Name - Cache + hcache_by_name = {} + # Object's class - Cache + hcache_by_class = {} + # StorageID - Cache + hcache_by_id = {} + + def __init__(self): + pass + + def add_data_model(self, definition): + # datamodel_id + dm = list(definition.items()) + dm.sort() + data_model_id = hash(str(dm)) + self.data_models_cache[data_model_id] = definition + CqlCOMM.register_data_model(data_model_id, definition) + return data_model_id + + def register_persistent_object(self, datamodel_id, pyobject): + if not isinstance(pyobject, IStorage): + raise RuntimeError("Class does not inherit IStorage") + + object_id = pyobject.getID() + data_model = self.data_models_cache[datamodel_id] + # CQLIface.cache[object_id] = (datamodel_id, pyobject) + object_name = pyobject.get_name() + + CqlCOMM.register_istorage(object_id, object_name, data_model) + + self.object_to_data_model[object_id] = datamodel_id + + if isinstance(pyobject, StorageIter): + return self._replace_iterator(pyobject) + + CqlCOMM.create_table(object_id, object_name, data_model) + + # create hcache + obj_class = pyobject.__class__.__name__ + if obj_class not in self.hcache_by_class: + hc = CqlCOMM.create_hcache(object_id, object_name, data_model) + + self.hcache_by_class[obj_class] = hc + self.hcache_by_name[pyobject.get_name()] = hc + self.hcache_by_id[object_id] = hc + + def delete_persistent_object(self, object_id): + try: + CqlCOMM.delete_data(object_id) + except KeyError: + return False + + return True + + def add_index(self, datamodel_id): + # IndexID + raise NotImplemented("Add index not implemented yet") + + def get_records(self, object_id, key_list): + results = [] + hcache = self.hcache_by_id[object_id] + for key in key_list: + try: + results.append(hcache.get_row(key)) + except Exception: + results.append([]) + + return results + + # List < Value > + + def put_records(self, object_id, key_list, value_list): + + if not key_list: + return + + for key, value in zip(key_list, value_list): + self.hcache_by_id[object_id].put_row(key, value) + + def split(self, object_id): + # List < object_id > + splits = [] + for i in range(0, 32): + tmp_uid = uuid.uuid4() + splits.append(tmp_uid) + CQLIface.cache[tmp_uid] = (CQLIface.cache[object_id].__class__.datamodel_id, CQLIface.cache[object_id]) + return splits + + def get_data_locality(self, object_id): + # List < Node > + return ['127.0.0.1'] + + def _replace_iterator(self, iter_obj): + hc = self.hcache_by_name[iter_obj.get_name()] + self.hcache_by_id[iter_obj.getID()] = hc + + class HcacheIterWrap: + def __init__(self, myiter): + self.myiter = myiter + + def __iter__(self): + return self + + def __next__(self): + return self.myiter.get_next() + + iter_obj.myiter = HcacheIterWrap(hc.iterkeys(100)) diff --git a/hecuba_py/hecuba/hfilter.py b/storage/cql_iface/hfilter.py similarity index 100% rename from hecuba_py/hecuba/hfilter.py rename to storage/cql_iface/hfilter.py diff --git a/storage/cql_iface/tests/api_tests.py b/storage/cql_iface/tests/api_tests.py new file mode 100644 index 00000000..40412cee --- /dev/null +++ b/storage/cql_iface/tests/api_tests.py @@ -0,0 +1,141 @@ +import unittest + +from storage.cql_iface.cql_iface import CQLIface +from storage.cql_iface.cql_comm import config + +from hecuba import StorageDict +from hecuba.IStorage import IStorage +from hecuba.tools import storage_id_from_name + + +class ConcurrentDict(StorageDict): + ''' + @TypeSpec dict <, value0:int, value1:float> + ''' + + +class TestClass(IStorage): + + def __new__(cls, name='', *args, **kwargs): + toret = super(TestClass, cls).__new__(cls) + storage_id = kwargs.get('storage_id', None) + if storage_id is None and name: + storage_id = storage_id_from_name(name) + + if name or storage_id: + toret.setID(storage_id) + toret.set_name(name) + toret._is_persistent = True + return toret + + def __init__(self, *args, **kwargs): + super(TestClass, self).__init__() + + +class HfetchTests(unittest.TestCase): + def test_instantiate(self): + result = CQLIface() + self.assertIsNotNone(result) + + def test_register_dict_of_ints(self): + given_name = 'storage_test.custom_obj' + config.session.execute("DROP TABLE IF EXISTS {}".format(given_name)) + # Setup object + obj = TestClass(given_name) + myid = obj.getID() + name = obj.get_name() + data_model = {"type": TestClass, "keys": {"k": str}, "cols": {"a": int}} + + # Setup persistent storage + storage = CQLIface() + data_model_id = storage.add_data_model(data_model) + storage.register_persistent_object(data_model_id, obj) + + res = config.session.execute("SELECT * FROM hecuba.istorage WHERE storage_id={}".format(myid)).one() + self.assertEqual(res.name, name) + config.session.execute("DROP TABLE IF EXISTS {}".format(given_name)) + + def test_add_data_model(self): + data_model_a = {"type": "typeA", "keys": {"k": str}, "cols": {"a": int}} + data_model_copy = {"type": "typeA", "cols": {"a": int}, "keys": {"k": str}} + data_model_b = {"type": "typeB", "keys": {"k": str}, "cols": {"a": int}} + + # Setup persistent storage + storage = CQLIface() + + # Register data models + id_a = storage.add_data_model(data_model_a) + id_b = storage.add_data_model(data_model_b) + id_a_cpy = storage.add_data_model(data_model_copy) + + # Compare the given ids + self.assertEqual(data_model_a, data_model_copy) + self.assertEqual(id_a, id_a_cpy) + self.assertNotEqual(id_a, id_b) + + def test_del_persistent_object(self): + given_name = 'storage_test.dict' + config.session.execute("DROP TABLE IF EXISTS {}".format(given_name)) + + # Setup object + obj = ConcurrentDict(given_name) + myid = obj.getID() + name = obj.get_name() + data_model_id = obj._data_model_id + + self.assertEqual(given_name, name) + + # Setup persistent storage + storage = CQLIface() + + storage.register_persistent_object(data_model_id, obj) + + res = config.session.execute("SELECT * FROM hecuba.istorage WHERE storage_id={}".format(myid)).one() + self.assertEqual(res.name, name) + + for i in range(10): + obj[i] = [i, float(i) / 10.0] + + del obj + storage.delete_persistent_object(myid) + + res = config.session.execute("SELECT * FROM hecuba.istorage WHERE storage_id={}".format(myid)).one() + self.assertIsNone(res) + + res = config.session.execute("SELECT count(*) FROM {}".format(name)).one() + # TODO self.assertIsNone + + config.session.execute("DROP TABLE IF EXISTS {}".format(given_name)) + + def test_put_records(self): + given_name = 'storage_test.complex_obj' + config.session.execute("DROP TABLE IF EXISTS {}".format(given_name)) + + obj = TestClass(given_name) + myid = obj.getID() + name = obj.get_name() + data_model = {"type": "typeA", "keys": {"k": int}, "cols": {"a": int, "b": str, "c": float}} + given_name = 'storage_test.dict' + storage = CQLIface() + data_model_id = storage.add_data_model(data_model) + storage.register_persistent_object(data_model_id, obj) + + fields_ids = [] + values = [] + ninserts = 100 + for i in range(ninserts): + fields_ids.append([i * 100]) + values.append([i, "someText{}".format(i), i / 10.0]) + + storage.put_records(myid, fields_ids, values) + + returned_values = storage.get_records(myid, fields_ids) + + self.assertEqual(len(values), len(returned_values)) + + for val, ret_val in zip(values, returned_values): + self.assertEqual(val[0], ret_val[0]) + self.assertEqual(val[1], ret_val[1]) + self.assertAlmostEqual(val[2], ret_val[2], places=6) + + config.session.execute("DROP TABLE IF EXISTS {}".format(given_name)) diff --git a/storage/storage_iface.py b/storage/storage_iface.py new file mode 100644 index 00000000..302227ce --- /dev/null +++ b/storage/storage_iface.py @@ -0,0 +1,88 @@ +from abc import ABCMeta, abstractmethod +from hecuba.IStorage import IStorage +from uuid import UUID +from typing import List, Generator + + +class StorageIface(metaclass=ABCMeta): + @abstractmethod + def add_data_model(self, definition: dict) -> int: + """ + Registers a data model describing the data format that will be passed and retrieved from storage. + :param definition: Describes a data model that will be used to fetch and store data + :return: data_model_id: Unique identifier to refer to the data model + """ + pass + + @abstractmethod + def register_persistent_object(self, datamodel_id: int, pyobject: IStorage) -> UUID: + """ + Informs the storage that the Hecuba object `pyobject` will be storing and accessing data + using the data model identified by `datamodel_id`. Returns a unique identifier to identify the object. + If the object has been previously registered nothing happens and its ID is returned. + :param datamodel_id: Identifier of a previously registered data model + :param pyobject: Hecuba persistent object to register with the persistent storage. + :return: object_id: UUID to reference and identify the pyobject in the future. + + """ + pass + + @abstractmethod + def delete_persistent_object(self, object_id: UUID) -> bool: + """ + Deletes the data of the Hecuba object identified by `object_id` from the storage layer. + :param object_id: Hecuba object identifier + :return: True on success, False otherwise + """ + pass + + @abstractmethod + def add_index(self, datamodel_id: int): + """ + + :param datamodel_id: Identifier of a previously registered data model + :return: IndexID + :raises: NotImplemented + """ + pass + + @abstractmethod + def get_records(self, object_id: UUID, key_list: List[object]) -> List[object]: + """ + Returns a list with the records corresponding to the key_list for the Hecuba object referenced by `object_id`. + :param object_id: Hecuba object identifier + :param key_list: List with the keys of the records to be retrieved. + :return: List of the records corresponding to the keys contained in key_list + """ + pass + + @abstractmethod + def put_records(self, object_id, key_list: List[object], value_list: List[object]): + """ + Stores the records contained in value_list, which correspond to the keys in key_list + for the Hecuba object referenced by `object_id`. + :param object_id: Hecuba object identifier + :param key_list: List with the keys of the records to be stored. + :param value_list: List with the records to be stored. + :return: - + """ + pass + + @abstractmethod + def split(self, object_id: UUID) -> Generator[UUID]: + """ + Partitions the data of the Hecuba object referenced by `object_id` following the same data model. + Each partition is assigned an UUID. + :param object_id: Hecuba object identifier + :return: Yield an `object_id` referencing a subset of the data. + """ + pass + + @abstractmethod + def get_data_locality(self, object_id: UUID) -> List[str]: + """ + Returns a list with the IPs of the nodes holding a copy of the given `object_id`. + :param object_id: Hecuba object identifier + :return: List of IPs + """ + pass