Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Storage interface #271

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ addons:
script:
- nosetests --with-coverage -v -s hecuba_py/tests/*.py
- nosetests --with-coverage -v -s hecuba_py/tests/withcassandra
- nosetests --with-coverage -v -s storage/cql_iface/tests/api_tests.py

jobs:
include:
Expand All @@ -64,4 +65,4 @@ jobs:

# Push the results back to codecov
after_success:
- codecov
- codecov
4 changes: 2 additions & 2 deletions hecuba_core/src/py_interface/HCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,8 @@ static int hnumpy_store_init(HNumpyStore *self, PyObject *args, PyObject *kwds)


static PyMethodDef hnumpy_store_type_methods[] = {
{"get_numpy", (PyCFunction) get_numpy, METH_VARARGS, NULL},
{"save_numpy", (PyCFunction) save_numpy, METH_VARARGS, NULL},
{"get_row", (PyCFunction) get_numpy, METH_VARARGS, NULL},
{"put_row", (PyCFunction) save_numpy, METH_VARARGS, NULL},
{NULL, NULL, 0, NULL}
};

Expand Down
286 changes: 55 additions & 231 deletions hecuba_py/hecuba/IStorage.py
Original file line number Diff line number Diff line change
@@ -1,262 +1,86 @@
import re
import uuid
from bisect import bisect_right
from collections import namedtuple, defaultdict
from time import time

from hecuba import config, log
from collections import namedtuple
from .tools import storage_id_from_name


class AlreadyPersistentError(RuntimeError):
pass


_select_istorage_meta = config.session.prepare("SELECT * FROM hecuba.istorage WHERE storage_id = ?")
_size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count "
"FROM system.size_estimates WHERE keyspace_name=? and table_name=?"))
_max_token = int(((2 ** 63) - 1)) # type: int
_min_token = int(-2 ** 63) # type: int

_valid_types = ['counter', 'text', 'boolean', 'decimal', 'double', 'int', 'list', 'set', 'map', 'bigint', 'blob',
'tuple', 'dict', 'float', 'numpy.ndarray']

_basic_types = _valid_types[:-1]
_hecuba_valid_types = '(atomicint|str|bool|decimal|float|int|tuple|list|generator|frozenset|set|dict|long|buffer' \
'|counter|double)'

AT = 'int | atomicint | str | bool | decimal | float | long | double | buffer'

ATD = 'int | atomicint | str | bool | decimal | float | long | double | buffer | set'

_python_types = [int, str, bool, float, tuple, set, dict, bytearray]
_conversions = {'atomicint': 'counter',
'str': 'text',
'bool': 'boolean',
'decimal': 'decimal',
'float': 'float',
'int': 'int',
'tuple': 'tuple',
'list': 'list',
'generator': 'list',
'frozenset': 'set',
'set': 'set',
'dict': 'map',
'long': 'bigint',
'buffer': 'blob',
'bytearray': 'blob',
'counter': 'counter',
'double': 'double',
'StorageDict': 'dict',
'ndarray': 'hecuba.hnumpy.StorageNumpy',
'numpy.ndarray': 'hecuba.hnumpy.StorageNumpy'}

args = namedtuple("IStorage", [])


def process_path(module_path):
"""
Method to obtain module and class_name from a module path
Args:
module_path(String): path in the format module.class_name
Returns:
tuple containing class_name and module
"""

if module_path == 'numpy.ndarray':
return 'StorageNumpy', 'hecuba.hnumpy'
if module_path == 'StorageDict':
return 'StorageDict', 'hecuba.hdict'
last = 0
for key, i in enumerate(module_path):
if i == '.' and key > last:
last = key
module = module_path[:last]
class_name = module_path[last + 1:]
return class_name, module
class DataModelNode(object):
def __init__(self, name=None, class_name=None, args=None):
self.name = name
self.class_name = class_name
self.args = args


def _tokens_partitions(ksp, table, tokens_ranges, splits_per_node, token_range_size, target_token_range_size):
"""
Method that calculates the new token partitions for a given object
Args:
tokens: current number of tokens of the object
min_tokens_per_worker: defined minimum number of tokens
number_of_workers: defined
Returns:
a partition every time it's called
:type tokens_ranges: list[(long,long)]
"""
class IStorage(object):
args_names = ["storage_id"]
args = namedtuple("IStorage", args_names)
_build_args = args(storage_id="")

tm = config.cluster.metadata.token_map
tmap = tm.tokens_to_hosts_by_ks.get(ksp, None)
from cassandra.metadata import Murmur3Token
tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges)
if not tmap:
tm.rebuild_keyspace(ksp, build_if_absent=True)
tmap = tm.tokens_to_hosts_by_ks[ksp]

tokens_per_node = defaultdict(list)
for tmumur, t_to in tokens_murmur3:
point = bisect_right(tm.ring, tmumur)
if point == len(tm.ring):
tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to))
else:
tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to))

n_nodes = len(tokens_per_node)
step_size = _max_token // (splits_per_node * n_nodes)
if token_range_size:
step_size = token_range_size
elif target_token_range_size:
one = config.session.execute(_size_estimates, [ksp, table]).one()
if one:
(mean_p_size, p_count) = one
estimated_size = mean_p_size * p_count
if estimated_size > 0:
step_size = _max_token // (
max(estimated_size / target_token_range_size,
splits_per_node * n_nodes)
)

for tokens_in_node in tokens_per_node.values():
partition = []
for fraction, to in tokens_in_node:
while fraction < to - step_size:
partition.append((fraction, fraction + step_size))
fraction += step_size
partition.append((fraction, to))
group_size = max(len(partition) // splits_per_node, 1)
for i in range(0, len(partition), group_size):
yield partition[i:i + group_size]


def _discrete_token_ranges(tokens):
"""
Makes proper tokens ranges ensuring that in a tuple (a,b) a <= b
Args:
tokens: a list of tokens [1, 0, 10]
Returns:
a rationalized list [(-1, 0),(0,10),(10, max)]
"""
tokens.sort()
if len(tokens) == 0:
return tokens
if tokens[0] > _min_token:
token_ranges = [(_min_token, tokens[0])]
else:
token_ranges = []
n_tns = len(tokens)
for i in range(0, n_tns - 1):
token_ranges.append((tokens[i], tokens[i + 1]))
token_ranges.append((tokens[n_tns - 1], _max_token))
return token_ranges
_data_model_def = None
_data_model_id = None

def getID(self):
return self.__storage_id

def _extract_ks_tab(name):
"""
Method used to obtain keyspace and table from a given name
Args:
name: a string containing keyspace name and table name, or only table name
Returns:
a tuple containing keyspace name and table name
"""
sp = name.split(".")
if len(sp) == 2:
ksp = sp[0]
table = sp[1]
else:
ksp = config.execution_name
table = name
return ksp.lower(), table.lower()
def setID(self, st_id):
if st_id is not None and not isinstance(st_id, uuid.UUID):
raise TypeError("Storage ID must be an instance of UUID")
self.__storage_id = st_id

storage_id = property(getID, setID)

class IStorage:
args_names = []
_build_args = args()
_storage_id = None
def __new__(cls, *args, **kwargs):
toret = super(IStorage, cls).__new__(cls)
toret._ksp = ''
toret._table = ''
toret._is_persistent = False
toret.__storage_id = None
toret._name = ''
return toret

def split(self):
def __eq__(self, other):
"""
Method used to divide an object into sub-objects.
Method to compare a IStorage object with another one.
Args:
other: IStorage to be compared with.
Returns:
a subobject everytime is called
boolean (true - equals, false - not equals).
"""
st = time()
tokens = self._build_args.tokens

for token_split in _tokens_partitions(self._ksp, self._table, tokens,
config.splits_per_node,
config.token_range_size,
config.target_token_range_size):
storage_id = uuid.uuid4()
log.debug('assigning to %s %d tokens', str(storage_id), len(token_split))
new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id)
args_dict = new_args._asdict()
args_dict["built_remotely"] = False
yield self.__class__.build_remotely(args_dict)
log.debug('completed split of %s in %f', self.__class__.__name__, time() - st)

def _get_istorage_attrs(self, storage_id):
return list(config.session.execute(_select_istorage_meta, [storage_id]))

def _count_name_collision(self, attribute):
m = re.compile("^%s_%s(_[0-9]+)?$" % (self._table, attribute))
q = config.session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s",
[self._ksp])
return sum(1 for elem in q if m.match(elem[0]))

@staticmethod
def build_remotely(args):
"""
Takes the information which consists of at least the type,
:raises TypeError if the object class doesn't subclass IStorage
:param obj_info: Contains the information to be used to create the IStorage obj
:return: An IStorage object
"""
if "built_remotely" not in args.keys():
built_remotely = True
else:
built_remotely = args["built_remotely"]

obj_type = args.get('class_name', args.get('type', None))
if obj_type is None:
raise TypeError("Trying to build an IStorage obj without giving the type")

# Import the class defined by obj_type
cname, module = process_path(obj_type)

try:
mod = __import__(module, globals(), locals(), [cname], 0)
except ValueError:
raise ValueError("Can't import class {} from module {}".format(cname, module))

imported_class = getattr(mod, cname)
if not issubclass(imported_class, IStorage):
raise TypeError("Trying to build remotely an object '%s' != IStorage subclass" % cname)

args = {k: v for k, v in args.items() if k in imported_class.args_names}
args.pop('class_name', None)
args["built_remotely"] = built_remotely

return imported_class(**args)
return self.__class__ == other.__class__ and self.getID() == other.getID()

@staticmethod
def _store_meta(storage_args):
raise Exception("to be implemented")
pass

def make_persistent(self, name):
raise Exception("to be implemented")
if not self.storage_id:
self.storage_id = storage_id_from_name(name)
self._is_persistent = True
self._name = name

def stop_persistent(self):
raise Exception("to be implemented")
self.storage_id = None
self._is_persistent = False

def delete_persistent(self):
raise Exception("to be implemented")
self.storage_id = None
self._is_persistent = False

def getID(self):
def split(self):
"""
Obtains the id of the storage element
Method used to divide an object into sub-objects.
Returns:
self._storage_id: id of the object
a subobject everytime is called
"""
return str(self._storage_id)
raise NotImplemented("Split not supported yet")

def set_name(self, name):
if not isinstance(name, str):
raise TypeError("Name -{}- should be an instance of str".format(str(name)))
self._name = name

def get_name(self):
return self._name
Loading