-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathWSClient.py
261 lines (233 loc) · 10.2 KB
/
WSClient.py
1
import requestsimport jsonimport timeimport csvimport osclass WSClient(object): def __init__(self, task_config, ws_addr, logfile): """ self.State displays state of runner, possible states: Free, TaskSent, ResultsGot :param task_name: name of task that will be sent to Worker Service :param task_params: dict of parameters for specific task :param ws_addr: remote slave address(ip or domain) including port "127.0.0.1:8089" """ # Worker service connection initialization self.ws_addr = ws_addr self.path = "http://%s" % self.ws_addr while not self.ping(): print("WS does not available at %s, retrying after 5 seconds.." % self.path) time.sleep(5) # Task initialization self.task_name = task_config["ExperimentsConfiguration"]["TaskName"] self.features_names = task_config["DomainDescription"]["FeatureNames"] self.results_structure = task_config["ExperimentsConfiguration"]["ResultStructure"] self.results_data_types = task_config["ExperimentsConfiguration"]["ResultDataTypes"] self.FileToRead = task_config["ExperimentsConfiguration"]["FileToRead"] self.State = "Free" self.task_body = {} self.results = None self.logfile = logfile def send_task(self, task): """ :param task: list of points for sending to the WS :return: None """ if self.State != "Free": print("Runner is already busy.") return 1 else: # { # "task_name": "random_1", # "request_type": "send_task", # "params_names": ["param1", "param2", "paramN"], # "param_values": [ # [123.0, 123.0, 123.0], # [123.0, 123.0, 123.0] # ], # "worker_config": { # "ws_file": "name of file that will be read" # "b": "3" # } # } if len(task) == 0 or len(task[0]) != len(self.features_names): print("%s: Invalid task length in send_task" % self.__module__) return 1 # self.task_body is a dictionary self.task_body["task_name"] = self.task_name self.task_body["request_type"] = "send_task" self.task_body["params_names"] = self.features_names self.task_body["param_values"] = task self.task_body["worker_config"] = { "ws_file": self.FileToRead } # print("From module %s:\n%s" % (self.__module__, self.task_body)) headers = { 'Type': "Send_task", 'Content-Type': 'application/json' } try: response = requests.post(self.path + "/task/add", data=json.dumps(self.task_body), headers=headers) if response.status_code != 201: print("Incorrect response code from server: %s\nBody:%s" % (response.status_code, response.content)) exit() #print(response.content) response = response.content.decode() response = response.replace("\n", "") response = json.loads(response) # In responce Worker Service will send IDs for each task. # Response body will be like: # { # "task_name": "random_1", # "response_type": "send_task", # "id": [123123123, 1231231234, 1231231235] # } # Saving IDs for further polling results and etc. self.task_body['id'] = response['id'] except requests.RequestException or json.JSONDecodeError as e: print("Failed to send task to %s: %s" % (self.ws_addr, e)) return None except json.JSONDecodeError as e: print("Failed to decode response for send task request:" % e) return None self.State = "TaskSent" return self.task_body['id'] def get_results(self): """ Send one time poll request to get results from the slave :return: list of data points with requested structure """ if self.State != "TaskSent": print("Task was not send.") return 1 headers = { 'Content-Type': 'application/json' } # Results polling will be in view of: # { # "task_name": "random_1", # "request_type": "get_results", # "response_struct": ["param1", "param2", "paramN"], # "id": [123123123, 123123124, 123123125] # } data = { "task_name": self.task_body["task_name"], "request_type": "get_results", "response_struct": self.results_structure, "id": self.task_body["id"] } try: #print(json.dumps(data)) response = requests.put(self.path + '/result/format', data=json.dumps(data), headers=headers) #print(response.content) if response.status_code != 200: print("Incorrect response code from server on getting results: %s\nBody:%s" % (response.status_code, response.content)) return 1 try: response = response.content.decode() response = response.replace("\n", "") # Response will be in structure: # { # "task_name": "random_1", # "request_type": "get_results", # "params_names": ["param1", "param2", "paramN"], # "param_values": [ # [123, "value_for_param_2", 123.1], # [112313253, "value2_for_param_2", 123123.1], # [123, None, None] # ] # } results = json.loads(response) self.results = results["param_values"] if "in progress" not in results["statuses"] and "wrong id" not in results["statuses"]: self.State = "ResultsGot" for index, point in enumerate(self.results): try: # Casting each value to needed data type, e.g.: # ResultDataTypes = ["float", "int", "str"] # + # point = [123,123,123] # results -> [123.0, 123, "123"] self.results[index] = [eval(self.results_data_types[dtIndex])(x) for dtIndex, x in enumerate(point)] except: self.results[index] = point return self.results except Exception as e: print("Unable to decode responce: %s\nError: %s" % (response, e)) return None except requests.RequestException as e: print("Failed to get results from WS %s, error: %s" % (self.ws_addr, e)) return None def poll_while_not_get(self, interval=0.1, timeout=90, terminate=False): """ Start polling results from Worker Service (ws_addr) with specified time interval and before timeout elapsed. :param interval: interval between each poll request :param timeout: timeout before terminating task :return: list of data points with requested structure """ if self.State != "TaskSent": print("Task was not send.") return 1 headers = { 'Content-Type': 'application/json' } time_start = time.time() self.results = self.get_results() while self.State != "ResultsGot": self.get_results() if time.time() - time_start > timeout: if terminate: response = requests.delete(self.path + '/terminate/'+'id', headers=headers) else: break time.sleep(interval) return self.results def write_results_to_csv(self): """ Appends results to CSV file. Filename based on task config - Task name and experiment number. In case if file exists - data will be appended, if not - file will be created. Structure based on self.results_structure :return: """ if self.State != "ResultsGot": print("Results are not ready to be written.") return 1 # Creating file if not exists file_exists = os.path.isfile(self.logfile) if not file_exists: with open(self.logfile, 'w') as f: legend = '' for column_name in self.results_structure: legend += column_name + ", " f.write(legend[:legend.rfind(', ')] + '\n') # Writing the results try: with open(self.logfile, 'a', newline="") as csvfile: writer = csv.writer(csvfile, delimiter=',') for result in self.results: writer.writerow(result) self.State = "Free" return 0 except Exception as e: print("Failed to write the results: %s" % e) return 1 def ping(self): """ Method checks availability of Worker Service. :return: True if service available, False if not or error occurred (IP does not available or etc). """ try: resp = requests.get(self.path + "/ping") if "pong" not in resp.content.decode(): print("Ping does not have OK response.") return False else: return True except Exception as e: print("Error in ping method: %s" % e) return False def work(self, task): print("Sending task: %s" % str(task)) self.send_task(task) print("Polling results..") self.poll_while_not_get() self.write_results_to_csv() return self.results