diff --git a/client/task_sender/swan_task_sender.py b/client/task_sender/swan_task_sender.py index a607308..310c7eb 100644 --- a/client/task_sender/swan_task_sender.py +++ b/client/task_sender/swan_task_sender.py @@ -1,7 +1,6 @@ import csv import logging import os -import time import uuid from os import listdir from os.path import isfile, join @@ -22,7 +21,6 @@ def read_file_path_in_dir(dir_path: str) -> List[str]: def generate_csv_and_send(_task: SwanTask, deal_list: List[OfflineDeal], _output_dir: str, _client: SwanClient, _uuid: str): - _csv_name = _task.task_name + ".csv" _csv_path = os.path.join(_output_dir, _csv_name) @@ -48,7 +46,6 @@ def generate_csv_and_send(_task: SwanTask, deal_list: List[OfflineDeal], _output def generate_car(_deal_list: List[OfflineDeal], target_dir) -> List[OfflineDeal]: - csv_path = os.path.join(target_dir, "car.csv") with open(csv_path, "w") as csv_file: @@ -104,22 +101,13 @@ def generate_metadata_csv(_deal_list: List[OfflineDeal], _task: SwanTask, _out_d for _deal in _deal_list: csv_writer.writerow(_deal.__dict__) -def update_task_by_uuid(config_path, task_uuid, miner_fid, csv): - config = read_config(config_path) - api_url = config['main']['api_url'] - api_key = config['main']['api_key'] - access_token = config['main']['access_token'] - client = SwanClient(api_url, api_key, access_token) - client.update_task_by_uuid(task_uuid, miner_fid, csv) - def update_task_by_uuid(config_path, task_uuid, miner_fid, csv): config = read_config(config_path) api_url = config['main']['api_url'] api_key = config['main']['api_key'] access_token = config['main']['access_token'] client = SwanClient(api_url, api_key, access_token) - print(client.api_token) client.update_task_by_uuid(task_uuid, miner_fid, csv) @@ -146,6 +134,7 @@ def generate_car_files(input_dir, config_path): generate_car(deal_list, output_dir) + def create_new_task(input_dir, config_path, task_name, miner_id=None): # todo move config reading to cli level config = read_config(config_path) @@ -214,7 +203,6 @@ def create_new_task(input_dir, config_path, task_name, miner_id=None): deal.car_file_url = os.path.join(download_url_prefix, deal.car_file_name) if not public_deal: - final_csv_path = send_deals(config_path, miner_id, task_name, deal_list=deal_list, task_uuid=task_uuid) if offline_mode: @@ -234,4 +222,4 @@ def create_new_task(input_dir, config_path, task_name, miner_id=None): task.miner_id = miner_id generate_metadata_csv(deal_list, task, output_dir, task_uuid) - generate_csv_and_send(task, deal_list, output_dir, client, task_uuid) \ No newline at end of file + generate_csv_and_send(task, deal_list, output_dir, client, task_uuid) diff --git a/common/swan_client.py b/common/swan_client.py index 28603bd..547cd99 100644 --- a/common/swan_client.py +++ b/common/swan_client.py @@ -1,6 +1,8 @@ import json import logging +import time +import jwt import requests from common.Miner import Miner @@ -28,15 +30,31 @@ def to_request_dict(self): class SwanClient: - api_token = None + jwt_token = None + jwt_token_expiration = None def __init__(self, api_url, api_key, access_token): self.api_url = api_url self.api_key = api_key self.access_token = access_token - self.refresh_token() - - def refresh_token(self): + self.get_jwt_token() + + class SwanTool: + @staticmethod + def refresh_token(decorated): + # the function that is used to check + # the JWT and refresh if necessary + def wrapper(cli, *args, **kwargs): + # refresh token 1 minute before expiration + if not cli.jwt_token \ + or not cli.jwt_token_expiration \ + or time.time() > cli.jwt_token_expiration - 60: + cli.get_jwt_token() + return decorated(cli, *args, **kwargs) + + return wrapper + + def get_jwt_token(self): logging.info('Refreshing token') refresh_api_token_suffix = "/user/api_keys/jwt" refresh_api_token_method = 'POST' @@ -48,10 +66,13 @@ def refresh_token(self): } try: resp_data = send_http_request(refresh_token_url, refresh_api_token_method, None, json.dumps(data)) - self.api_token = resp_data['jwt'] + self.jwt_token = resp_data['jwt'] + payload = jwt.decode(jwt=self.jwt_token, verify=False, algorithm='HS256') + self.jwt_token_expiration = payload['exp'] except Exception as e: logging.info(str(e)) + @SwanTool.refresh_token def update_task_by_uuid(self, task_uuid: str, miner_fid: str, csv): logging.info('Updating Swan task.') update_task_url_suffix = '/uuid_tasks/' @@ -59,10 +80,10 @@ def update_task_by_uuid(self, task_uuid: str, miner_fid: str, csv): update_task_url = self.api_url + update_task_url_suffix + task_uuid payload_data = {"miner_fid": miner_fid} - send_http_request(update_task_url, update_task_method, self.api_token, payload_data, file=csv) + send_http_request(update_task_url, update_task_method, self.jwt_token, payload_data, file=csv) logging.info('Swan task updated.') - + @SwanTool.refresh_token def post_task(self, task: SwanTask, csv): logging.info('Creating new Swan task: %s' % task.task_name) create_task_url_suffix = '/tasks' @@ -71,12 +92,10 @@ def post_task(self, task: SwanTask, csv): create_task_url = self.api_url + create_task_url_suffix payload_data = task.to_request_dict() - send_http_request(create_task_url, create_task_method, self.api_token, payload_data, file=csv) + send_http_request(create_task_url, create_task_method, self.jwt_token, payload_data, file=csv) logging.info('New Swan task Generated.') - - - + @SwanTool.refresh_token def update_miner(self, miner: Miner): update_miner_url_suffix = '/miners/%s/status' % miner.miner_id update_miner_method = 'PUT' @@ -84,31 +103,34 @@ def update_miner(self, miner: Miner): update_miner_url = self.api_url + update_miner_url_suffix payload_data = json.dumps(miner.to_request_dict()) - return send_http_request(update_miner_url, update_miner_method, self.api_token, payload_data) + return send_http_request(update_miner_url, update_miner_method, self.jwt_token, payload_data) + @SwanTool.refresh_token def get_offline_deals(self, miner_fid: str, status: str, limit: int): url = self.api_url + "/offline_deals/" + miner_fid + "?deal_status=" + status + "&limit=" + limit + "&offset=0" get_offline_deals_method = "GET" try: - response = send_http_request(url, get_offline_deals_method, self.api_token, None) + response = send_http_request(url, get_offline_deals_method, self.jwt_token, None) return response["deal"] except Exception as e: return e + @SwanTool.refresh_token def update_offline_deal_status(self, status: str, note: str, task_id: str, deal_cid: str): url = self.api_url + "/my_miner/tasks/" + task_id + "/deals/" + deal_cid update_offline_deal_status_method = "PUT" body = {"status": status, "note": note} - send_http_request(url, update_offline_deal_status_method, self.api_token, body) + send_http_request(url, update_offline_deal_status_method, self.jwt_token, body) + @SwanTool.refresh_token def update_offline_deal_details(self, status: str, note: str, deal_id, file_path=None, file_size=None): url = self.api_url + "/my_miner/deals/" + str(deal_id) update_offline_deal_details_method = "PUT" body = {"status": status, "note": note, "file_path": file_path, "file_size": file_size} - send_http_request(url, update_offline_deal_details_method, self.api_token, body) + send_http_request(url, update_offline_deal_details_method, self.jwt_token, body) def send_http_request(url, method, token, payload, file=None):