diff --git a/hecuba_py/hecuba/IStorage.py b/hecuba_py/hecuba/IStorage.py index ff8e9eb7..44ee756c 100644 --- a/hecuba_py/hecuba/IStorage.py +++ b/hecuba_py/hecuba/IStorage.py @@ -1,6 +1,7 @@ import uuid from . import log from .tools import extract_ks_tab, build_remotely, storage_id_from_name, get_istorage_attrs, generate_token_ring_ranges +from .partitioner import Partitioner class AlreadyPersistentError(RuntimeError): @@ -102,14 +103,16 @@ def split(self): Returns: a subobject everytime is called """ - from .tools import tokens_partitions + from . import config + p = Partitioner(self, config.partition_strategy) + try: tokens = self._build_args.tokens except AttributeError as ex: raise RuntimeError("Object {} does not have tokens".format(self._get_name())) - for token_split in tokens_partitions(self._ksp, self._table, tokens): - storage_id = uuid.uuid4() + for storage_id, token_split in p.tokens_partitions(self._ksp, self._table, config.token_range_size, + config.target_token_range_size): log.debug('assigning to {} num tokens {}'.format(str(storage_id), len(token_split))) new_args = self._build_args._replace(tokens=token_split, storage_id=storage_id) args_dict = new_args._asdict() diff --git a/hecuba_py/hecuba/__init__.py b/hecuba_py/hecuba/__init__.py index adad849d..435b7edf 100755 --- a/hecuba_py/hecuba/__init__.py +++ b/hecuba_py/hecuba/__init__.py @@ -119,12 +119,19 @@ def __init__(self): except KeyError: singleton.token_range_size = None - try: - singleton.target_token_range_size = int(os.environ['TARGET_TOKEN_RANGE_SIZE']) - log.info('TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) - except KeyError: - singleton.target_token_range_size = 64 * 1024 - log.warn('using default TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) + try: + singleton.target_token_range_size = int(os.environ['TARGET_TOKEN_RANGE_SIZE']) + log.info('TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) + except KeyError: + singleton.target_token_range_size = 64 * 1024 + log.warn('using default TARGET_TOKEN_RANGE_SIZE: %d', singleton.target_token_range_size) + + try: + singleton.partition_strategy = int(os.environ['PARTITION_STRATEGY']) + log.info('PARTITION STRATEGY: %d', singleton.partition_strategy) + except KeyError: + singleton.partition_strategy = "SIMPLE" + log.warn('using default PARTITION STRATEGY: %d', singleton.partition_strategy) try: singleton.max_cache_size = int(os.environ['MAX_CACHE_SIZE']) diff --git a/hecuba_py/hecuba/partitioner.py b/hecuba_py/hecuba/partitioner.py new file mode 100644 index 00000000..c4ad5404 --- /dev/null +++ b/hecuba_py/hecuba/partitioner.py @@ -0,0 +1,323 @@ +import os +import time +import uuid +from collections import defaultdict, deque + +from hecuba import config, log + +_select_istorage_meta = config.session.prepare("SELECT * FROM hecuba.istorage WHERE storage_id = ?") +_size_estimates = config.session.prepare(("SELECT mean_partition_size, partitions_count " + "FROM system.size_estimates WHERE keyspace_name=? and table_name=?")) + +_dynamic_part_table_cql = """CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""" + +_max_token = int(((2 ** 63) - 1)) # type: int +_min_token = int(-2 ** 63) # type: int + + +class Partitioner: + + def __init__(self, father, strategy): + self._father = father + self._strategy = strategy + self._rebuild_token_ring(self._father._ksp, self._father._build_args.tokens) + if strategy == "DYNAMIC": + self._setup_dynamic_structures() + + def _rebuild_token_ring(self, ksp, tokens_ranges): + from bisect import bisect_right + from cassandra.metadata import Murmur3Token + + tm = config.cluster.metadata.token_map + tmap = tm.tokens_to_hosts_by_ks.get(ksp, None) + tokens_murmur3 = map(lambda a: (Murmur3Token(a[0]), a[1]), tokens_ranges) + if not tmap: + tm.rebuild_keyspace(ksp, build_if_absent=True) + tmap = tm.tokens_to_hosts_by_ks[ksp] + + self._tokens_per_node = defaultdict(list) + for tmumur, t_to in tokens_murmur3: + point = bisect_right(tm.ring, tmumur) + if point == len(tm.ring): + self._tokens_per_node[tmap[tm.ring[0]][0]].append((tmumur.value, t_to)) + else: + self._tokens_per_node[tmap[tm.ring[point]][0]].append((tmumur.value, t_to)) + + self._nodes_number = len(self._tokens_per_node) + + def _setup_dynamic_structures(self): + try: + config.session.execute(_dynamic_part_table_cql) + except Exception as ex: + print("Could not create table hecuba.partitioning.") + raise ex + + self._prepared_store_id = \ + config.session.prepare("""INSERT INTO hecuba.partitioning + (partitioning_uuid, storage_id, number_of_partitions) + VALUES (?, ?, ?)""") + self._partitioning_uuid = uuid.uuid4() + self._partitions_time = defaultdict(list) + self._partitions_nodes = dict() + self._idle_cassandra_nodes = deque() + self._partitions_size = dict() + self._best_granularity = None + + self._select_partitions_times = \ + config.session.prepare("""SELECT storage_id, number_of_partitions, start_time, end_time + FROM hecuba.partitioning + WHERE partitioning_uuid = ? ALLOW FILTERING""") + + try: + self._nodes_number = len(os.environ["PYCOMPSS_NODES"].split(",")) - 1 + except KeyError: + pass + try: + self._nodes_number = int(os.environ["NODES_NUMBER"]) - 1 # master and worker + except KeyError: + raise RuntimeError("You must set the environment variable PYCOMPSS_NODES|NODES_NUMBER before using " + "the dynamic task granularity scheduler." + "\nPYCOMPSS_NODES should be a list of nodes separated by commas." + "\nNODES_NUMBER should be an integer representing the number of hosts.") + + self._n_idle_nodes = self._nodes_number + self._initial_send = self._nodes_number + # generate list of basic number of partitions + partitions = [(int(2 ** (x / 2)) // len(self._tokens_per_node)) for x in range(10, 21)] + # as many basic number of partitions as the number of nodes + # 11 basic number of partitions, repeating them when more than 11 nodes + self._basic_partitions = (partitions * (self._nodes_number // len(partitions) + 1))[0:self._nodes_number] + + def tokens_partitions(self, ksp, table, token_range_size, target_token_range_size): + """ + Method that calculates the new token partitions for a given object + Returns: + a tuple (node, partition) every time it's called + """ + partitions_per_node = self._compute_partitions_per_node(ksp, table, token_range_size, target_token_range_size) + + if self._strategy == "DYNAMIC": + for node, partition_tokens in self._dynamic_tokens_partitions(partitions_per_node): + storage_id = uuid.uuid4() + config.session.execute(self._prepared_store_id, + [self._partitioning_uuid, storage_id, config.splits_per_node]) + self._n_idle_nodes -= 1 + self._partitions_nodes[storage_id] = node + yield storage_id, partition_tokens + else: + for _, final_tokens in self._send_final_tasks(partitions_per_node): + yield uuid.uuid4(), final_tokens + + def _compute_partitions_per_node(self, ksp, table, token_range_size, target_token_range_size): + """ + Compute all the partitions per node. If the strategy is simple partitioning, each node will have + (config.splits_per_node (default 32) * self._nodes_number) partitions. If the strategy is dynamic partitioning, + each node will have 1024 partitions, because if there is only one Cassandra node this will be the minimum + granularity. If there are more nodes, some partitions will be grouped. + Returns: + a dictionary with hosts as keys and partitions of tokens as values + """ + splits_per_node = config.splits_per_node + step_size = _max_token // (config.splits_per_node * self._nodes_number) + + if token_range_size: + step_size = token_range_size + elif target_token_range_size: + res = config.session.execute(_size_estimates, [ksp, table]) + if res: + one = res.one() + else: + one = 0 + if one: + (mean_p_size, p_count) = one + estimated_size = mean_p_size * p_count + if estimated_size > 0: + step_size = _max_token // ( + max(estimated_size / target_token_range_size, + splits_per_node * self._nodes_number) + ) + + if self._strategy == "DYNAMIC": + # 1024 because it is the maximum number of splits per node, in the case of only one Cassandra node + step_size = _max_token // 1024 + + partitions_per_node = defaultdict(list) + for node, tokens_in_node in self._tokens_per_node.items(): + for fraction, to in tokens_in_node: + while fraction < to - step_size: + partitions_per_node[node].append((fraction, fraction + step_size)) + fraction += step_size + partitions_per_node[node].append((fraction, to)) + + return partitions_per_node + + def _dynamic_tokens_partitions(self, partitions_per_node): + """ + Main loop of the dynamic partitioning strategy. There are 3 stages: + Sending of initial tasks: it returns as much initial tasks as pycompss nodes + Sending of intermediate tasks: until a best granularity is chosen, it return partitions using the granularity + with the best performance + Sending of final tasks: when a best granularity is chosen, it returns all the remaining partitions using the + best granularity + Returns: + a tuple (node, partition) every time it's called + """ + + while self._initial_send > 0: + for initial_tokens in self._send_initial_tasks(partitions_per_node): + yield initial_tokens + + while self._best_granularity is None: + for intermediate_tokens in self._send_intermediate_tasks(partitions_per_node): + yield intermediate_tokens + if sum([len(partitions) for partitions in partitions_per_node.values()]) == 0: + self._best_granularity = config.splits_per_node + break + else: + for final_tokens in self._send_final_tasks(partitions_per_node): + yield final_tokens + + def _send_initial_tasks(self, partitions_per_node): + for node in self._tokens_per_node.keys(): + config.splits_per_node = self._basic_partitions[self._initial_send * -1] + group_size = max(len(partitions_per_node[node]) // config.splits_per_node, 1) + if config.splits_per_node not in self._partitions_size: + self._partitions_size[config.splits_per_node] = group_size + + yield node, partitions_per_node[node][0:group_size] + del partitions_per_node[node][0:group_size] + + self._partitions_time[config.splits_per_node] = [] + self._initial_send -= 1 + if self._initial_send == 0: + break + + def _send_intermediate_tasks(self, partitions_per_node): + self._update_partitions_time() + while not self._at_least_each_granularity_finished(): + if self._n_idle_nodes > 0: + # if there is an idle node, send a new task without choosing the best granularity + config.splits_per_node, set_best = self._best_time_per_token() + if [] not in self._partitions_time.values() and set_best: + self._best_granularity = config.splits_per_node + break + time.sleep(1) + self._update_partitions_time() + else: + self._best_granularity, _ = self._best_time_per_token() + config.splits_per_node = self._best_granularity + + node = self._idle_cassandra_nodes.popleft() + group_size = max(len(partitions_per_node[node]) // config.splits_per_node, 1) + if config.splits_per_node not in self._partitions_size: + self._partitions_size[config.splits_per_node] = group_size + + yield node, partitions_per_node[node][0:group_size] + del partitions_per_node[node][0:group_size] + + def _send_final_tasks(self, partitions_per_node): + for partition in partitions_per_node.values(): + group_size = max(len(partition) // config.splits_per_node, 1) + for i in range(0, len(partition), group_size): + yield -1, partition[i:i + group_size] + + def _at_least_each_granularity_finished(self): + """ + Checks that there is at least one end_time set for all the granularities + """ + if [] in self._partitions_time.values(): + return False + + for _, partition_times in self._partitions_time.items(): + if not any(times["end_time"] for times in partition_times): + return False + return True + + @staticmethod + def _set_best_granularity(best, unfinished): + for _, time_per_token in unfinished.items(): + if time_per_token < best: + return False + return True + + def _best_time_per_token(self): + """ + The time is not a good measure, because the smaller tasks will be the shortest. + We use a time / tokens proportion + """ + times_per_token = dict() + unfinished_tasks = dict() + actual_time = time.time() + + not_empty_partitions = ((splits, partition_times) for splits, partition_times + in self._partitions_time.items() if partition_times) + + for splits_per_node, partition_times in not_empty_partitions: + group_size = self._partitions_size[splits_per_node] + partition_time = 0.0 + if not any(times["end_time"] for times in partition_times): + """ + If there isn't at least one end_time set for this granularity, takes the actual time as the + finishing time. If there is already a granularity with better performance, it is selected as the + best granularity. + A granularity with this condition cannot be set as the best granularity. + """ + for t in partition_times: + partition_time += actual_time - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + try: + unfinished_tasks[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + else: + # at least one task finished + for t in partition_times: + if t["end_time"] is not None: + partition_time += t["end_time"] - t["start_time"] + + partition_time = partition_time / float(len(partition_times)) + if partition_time >= 2.0: + # to avoid having too much overhead, granularities lasting less than two seconds are discarded + try: + times_per_token[splits_per_node] = partition_time / group_size + except ZeroDivisionError: + pass + + sorted_times = sorted(times_per_token.items(), key=lambda item: item[1]) + + if len(sorted_times) > 0: + best_granularity, best_time = sorted_times[0] + set_best = self._set_best_granularity(best_time, unfinished_tasks) + else: + # if no task lasted at least two seconds, pick the biggest granularity + best_granularity = min(set(self._partitions_time.keys()) - set(unfinished_tasks.keys())) + set_best = False + + return best_granularity, set_best + + def _update_partitions_time(self): + partitions_times = config.session.execute(self._select_partitions_times, [self._partitioning_uuid]) + + for storage_id, partitions, start, end in partitions_times: + if start is not None: + for i, times in enumerate(self._partitions_time[partitions]): + if start == times["start_time"]: + if end is not None and times["end_time"] != end: + self._partitions_time[partitions][i]["end_time"] = end + self._n_idle_nodes += 1 + self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) + break + else: + total_time = {"start_time": start, "end_time": end} + self._partitions_time[partitions].append(total_time) + if end is not None: + self._n_idle_nodes += 1 + self._idle_cassandra_nodes.append(self._partitions_nodes[storage_id]) diff --git a/hecuba_py/tests/withcassandra/istorage_split_locality.py b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py similarity index 77% rename from hecuba_py/tests/withcassandra/istorage_split_locality.py rename to hecuba_py/tests/withcassandra/istorage_split_locality_tests.py index c85d4a35..40ce9922 100644 --- a/hecuba_py/tests/withcassandra/istorage_split_locality.py +++ b/hecuba_py/tests/withcassandra/istorage_split_locality_tests.py @@ -4,11 +4,18 @@ from hecuba import config from hecuba.tools import tokens_partitions, discrete_token_ranges +from hecuba.partitioner import Partitioner +from hecuba import config, StorageDict from .. import test_config -@unittest.skip("Disabled until token partition gets fixed") +class SimpleObj(StorageDict): + ''' + @TypeSpec dict<, v:int> + ''' + + class IStorageSplitLocalityTest(unittest.TestCase): @classmethod def setUpClass(cls): @@ -20,16 +27,25 @@ def setUpClass(cls): def test_enough_token(self): original_cfg = config.__dict__ config.__dict__.update(splits_per_node=10, token_range_size=None, target_token_range_size=64 * 1024 * 1024) - all_tokens = discrete_token_ranges(list(map(lambda a: a.value, config.cluster.metadata.token_map.ring))) - tkns_p = list(tokens_partitions("test_ksp", "tab", all_tokens)) + + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 10 + tkns_p = list(partitioner.tokens_partitions("test_ksp", "tab", token_range_size=None, + target_token_range_size=64 * 1024 * 1024)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 10, 20) config.__dict__ = original_cfg def test_too_little_tokens(self): original_cfg = config.__dict__ config.__dict__.update(splits_per_node=1000, token_range_size=None, target_token_range_size=64 * 1024) - all_tokens = discrete_token_ranges(list(map(lambda a: a.value, config.cluster.metadata.token_map.ring))) - tkns_p = list(tokens_partitions("test_ksp", "tab", all_tokens)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1000 + tkns_p = list(partitioner.tokens_partitions("test_ksp", "tab", token_range_size=None, + target_token_range_size=64 * 1024)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 1000, 1000) config.__dict__ = original_cfg @@ -38,8 +54,12 @@ def test_splitting_tokens(self): config.__dict__.update(splits_per_node=1, token_range_size=int((2 ** 64) / 1000), target_token_range_size=None) - all_tokens = discrete_token_ranges(list(map(lambda a: a.value, config.cluster.metadata.token_map.ring))) - tkns_p = list(tokens_partitions("test_ksp", "tab", all_tokens)) + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1 + tkns_p = list(partitioner.tokens_partitions("test_ksp", "tab", token_range_size=int((2 ** 64) / 1000), + target_token_range_size=None)) + tkns_p = [i[1] for i in tkns_p] self.check_all(tkns_p, 1, 1000) config.__dict__ = original_cfg @@ -52,8 +72,12 @@ def test_using_size_estimates(self): target_token_range_size=64) test_config.ccm_cluster.flush() test_config.ccm_cluster.compact() - all_tokens = discrete_token_ranges(list(map(lambda a: a.value, config.cluster.metadata.token_map.ring))) - tkns_p = list(tokens_partitions("test_ksp", "tab", all_tokens)) + + obj = SimpleObj("test_ksp.tab") + partitioner = Partitioner(obj, "SIMPLE") + config.splits_per_node = 1 + tkns_p = list( + partitioner.tokens_partitions("test_ksp", "tab", token_range_size=None, target_token_range_size=64)) config.__dict__ = original_cfg # self.check_all(tkns_p, 1, 1000) @@ -103,6 +127,10 @@ def checkToken(self, tokens): # type : (List[Long]) -> Host from cassandra.metadata import Token tm = config.cluster.metadata.token_map + + # only the first token of each partition is not assigned correctly + tokens = [(tok[0] + 1, tok[1]) for tok in tokens] + hosts = set(map(lambda token: tm.get_replicas("test_ksp", token)[0], map(lambda a: Token(a[0]), tokens))) self.assertEqual(len(hosts), 1, "A token range is local in 2 nodes") diff --git a/hecuba_py/tests/withcassandra/partitioner_tests.py b/hecuba_py/tests/withcassandra/partitioner_tests.py new file mode 100644 index 00000000..e57cc581 --- /dev/null +++ b/hecuba_py/tests/withcassandra/partitioner_tests.py @@ -0,0 +1,314 @@ +import os +import time +import unittest +from random import randint + +from hecuba import config, StorageDict + + +class MyDict(StorageDict): + ''' + @TypeSpec dict<, val0:str> + ''' + + +set_start_time = """UPDATE hecuba.partitioning + SET start_time = %s + WHERE storage_id = %s""" + +set_time = """UPDATE hecuba.partitioning + SET start_time = %s, end_time = %s + WHERE storage_id = %s""" + +set_end_time = """UPDATE hecuba.partitioning + SET end_time = %s + WHERE storage_id = %s""" + +number_nodes = 2 + +_dynamic_part_table_cql = """CREATE TABLE IF NOT EXISTS hecuba.partitioning( + partitioning_uuid uuid, + storage_id uuid, + number_of_partitions int, + start_time double, + end_time double, + PRIMARY KEY (storage_id)) + WITH default_time_to_live = 86400""" + +config.token_range_size = None +config.target_token_range_size = None + + +class PartitionerTest(unittest.TestCase): + + def computeItems(self, SDict): + counter = 0 + for _ in SDict.keys(): + counter = counter + 1 + return counter + + def test_simple(self): + config.splits_per_node = 32 + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + nsplits = 0 + config.partition_strategy = "SIMPLE" + for partition in d.split(): + nsplits += 1 + acc += self.computeItems(partition) + + nodes_number = 2 + print("number of splits: %s, best is %s" % (nsplits, config.splits_per_node * nodes_number)) + self.assertEqual(nitems, acc) + + def test_dynamic_simple(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 80), (0, 5)] + nsplits = 0 + for partition in d.split(): + if nsplits <= 1: + # this will be done by the compss api + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) + nsplits += 1 + + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 45)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 45 // number_nodes) + + def test_dynamic_simple_other(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 10), (0, 80)] + nsplits = 0 + for partition in d.split(): + if nsplits <= 1: + # this will be done by the compss api + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) + nsplits += 1 + + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 32)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 32 // number_nodes) + + def test_dynamic_different_nodes(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "5" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45, 64, 90] + times = [(0, 1000), (0, 1000), (0, 20), (0, 1000)] + nsplits = 0 + for partition in d.split(): + if nsplits <= 3: + # this will be done by the compss api + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) + nsplits += 1 + + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s\n" % (nsplits, 64)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 64 // number_nodes) + + def test_dynamic_best_without_finishing(self): + """ + Test if the best granularity is set without finishing all the initial granularities tasks. + This happens when all the unfinished tasks are worse than the best granularity with at least one finished task + """ + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 80), (0, 40)] + nsplits = 0 + for partition in d.split(): + # pretending that task with gran=32 is taking a lot of time + if nsplits == 0: + # this will be done by the compss api + config.session.execute(set_start_time, [0, partition.storage_id]) + elif nsplits == 1: + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) + else: + self.assertEqual(config.splits_per_node, 45 // number_nodes) + + nsplits += 1 + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 45)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 45 // number_nodes) + + def test_dynamic_best_idle_nodes(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 120), (0, 60)] + nsplits = 0 + for partition in d.split(): + # pretending that task with gran=32 is taking a lot of time + if nsplits == 0: + id_partition0 = partition.storage_id + # this will be done by the compss api + config.session.execute(set_start_time, [time.time(), partition.storage_id]) + elif nsplits == 1: + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) + elif nsplits == 5: + config.session.execute(set_end_time, [time.time() + 150, id_partition0]) + elif 1 < nsplits < 5: + start = randint(0, 200) + config.session.execute(set_time, [start, start + 60, partition.storage_id]) + + if nsplits > 1: + self.assertEqual(config.splits_per_node, 45 // number_nodes) + + nsplits += 1 + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 45)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 45 // number_nodes) + + def test_dynamic_idle_nodes_new_best(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + d = MyDict("my_app.mydict") + nitems = 10000 + for i in range(0, nitems): + d[i] = "RandomText" + str(i) + + time.sleep(2) + # assert all the data has been written + self.assertEqual(len(list(d.keys())), nitems) + + acc = 0 + os.environ["NODES_NUMBER"] = "3" + config.partition_strategy = "DYNAMIC" + granularity = [32, 45] + times = [(0, 80), (0, 60)] + nsplits = 0 + for partition in d.split(): + if nsplits == 0: + id_partition0 = partition.storage_id + # this will be done by the compss api + # time.time() to avoid choosing gran=64 when task with gran=32 taking a lot of time + # dynamic partitioning mode will use time.time() to check how much is taking + config.session.execute(set_start_time, [time.time(), partition.storage_id]) + elif nsplits == 1: + config.session.execute(set_time, [times[nsplits][0], times[nsplits][1], partition.storage_id]) + elif nsplits == 5: + last_time = config.session.execute("""SELECT start_time FROM hecuba.partitioning + WHERE storage_id = %s""" % id_partition0)[0][0] + config.session.execute(set_end_time, [last_time + 80, id_partition0]) + else: + start = randint(0, 200) + config.session.execute(set_time, [start, start + 60, partition.storage_id]) + + if 5 >= nsplits >= 2: + self.assertEqual(config.splits_per_node, 45 // number_nodes) + elif nsplits > 5: + self.assertEqual(config.splits_per_node, 32 // number_nodes) + nsplits += 1 + acc += self.computeItems(partition) + + print("number of splits: %s, best is %s" % (nsplits, 32)) + self.assertEqual(nitems, acc) + self.assertEqual(config.splits_per_node, 32 // number_nodes) + + def test_check_nodes_not_set(self): + config.session.execute("DROP TABLE IF EXISTS my_app.mydict") + d = MyDict("my_app.mydict") + + def raise_exception(): + return [partition for partition in d.split()] + + config.partition_strategy = "DYNAMIC" + + if "NODES_NUMBER" in os.environ: + del os.environ["NODES_NUMBER"] + if "PYCOMPSS_NODES" in os.environ: + del os.environ["PYCOMPSS_NODES"] + + self.assertRaises(RuntimeError, raise_exception) + + +if __name__ == "__main__": + unittest.main() diff --git a/hecuba_py/tests/withcassandra/storage_api_tests.py b/hecuba_py/tests/withcassandra/storage_api_tests.py index 9e8bd375..2a147c6c 100644 --- a/hecuba_py/tests/withcassandra/storage_api_tests.py +++ b/hecuba_py/tests/withcassandra/storage_api_tests.py @@ -1,9 +1,14 @@ import unittest +import uuid +import logging -from storage.api import getByID -from ..app.words import Words +from storage.api import getByID, TaskContext, start_task, end_task from hecuba import config, StorageDict +from hecuba.partitioner import _dynamic_part_table_cql + +from ..app.words import Words + class ApiTestSDict(StorageDict): ''' @@ -11,6 +16,14 @@ class ApiTestSDict(StorageDict): ''' +select_time = "SELECT * FROM hecuba.partitioning" + + +class PyCOMPSsArg: + def __init__(self, storage_id): + self.key = str(storage_id) + + class StorageApi_Tests(unittest.TestCase): def class_type_test(self): base_dict = ApiTestSDict('test.api_sdict') @@ -47,3 +60,114 @@ def test_getByID_storage_obj(self): b = Words('testspace.tt') new_block = getByID(b.storage_id) self.assertEqual(b, new_block) + + def test_get_by_id_uuid(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = base_dict.storage_id + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_get_by_id_str(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = str(base_dict.storage_id) + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_get_by_id_getID(self): + base_dict = ApiTestSDict('test.api_sdict') + storage_id = base_dict.getID() + del base_dict + + rebuild_dict = getByID(storage_id) + self.assertTrue(isinstance(rebuild_dict, ApiTestSDict)) + + def test_start_task_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + start_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + + def test_end_task_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + end_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].end_time, None) + + def test_task_context_uuid(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) + task_context.__enter__() + task_context.__exit__(type=None, value=None, traceback=None) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + self.assertNotEqual(inserted[0].end_time, None) + + def test_start_task_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + storage_id = uuid.uuid4() + + start_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + + def test_end_task_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + end_task([PyCOMPSsArg(storage_id)]) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].end_time, None) + + def test_task_context_key(self): + config.session.execute("DROP TABLE IF EXISTS hecuba.partitioning") + config.session.execute(_dynamic_part_table_cql) + + storage_id = uuid.uuid4() + + task_context = TaskContext(logger=logging, values=[PyCOMPSsArg(storage_id)]) + task_context.__enter__() + task_context.__exit__(type=None, value=None, traceback=None) + + inserted = list(config.session.execute(select_time)) + self.assertEqual(len(inserted), 1) + self.assertEqual(inserted[0].storage_id, storage_id) + self.assertNotEqual(inserted[0].start_time, None) + self.assertNotEqual(inserted[0].end_time, None) + + +if __name__ == "__main__": + unittest.main() diff --git a/storageAPI/storage/api.py b/storageAPI/storage/api.py index 4ecd4fbf..954d171e 100755 --- a/storageAPI/storage/api.py +++ b/storageAPI/storage/api.py @@ -39,7 +39,28 @@ def start_task(params): Args: params: a list of objects (Blocks, StorageObjs, strings, ints, ...) """ - pass + from hecuba import config + import time + for param in params: + uid = None + try: + uid = uuid.UUID(param.__dict__["key"]) + except KeyError: + pass + try: + uid = param._storage_id + except AttributeError: + pass + + if uid is not None: + try: + prepare = config.session.prepare("""UPDATE hecuba.partitioning + SET start_time = ? + WHERE storage_id = ?""") + config.session.execute(prepare, [time.time(), uid]) + except: + pass + break def end_task(params): @@ -49,7 +70,28 @@ def end_task(params): Args: params: a list of objects (Blocks, StorageObjs, strings, ints, ...) """ - pass + from hecuba import config + import time + for param in params: + uid = None + try: + uid = uuid.UUID(param.__dict__["key"]) + except KeyError: + pass + try: + uid = param._storage_id + except AttributeError: + pass + + if uid is not None: + try: + prepare = config.session.prepare("""UPDATE hecuba.partitioning + SET end_time = ? + WHERE storage_id = ?""") + config.session.execute(prepare, [time.time(), uid]) + except: + pass + break class TaskContext(object):