Skip to content
This repository has been archived by the owner on Aug 4, 2021. It is now read-only.

Commit

Permalink
Merge pull request #15 from nebulaai/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
bq1024 authored Mar 5, 2021
2 parents 50cd458 + d8ffaea commit 38130c1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 29 deletions.
16 changes: 2 additions & 14 deletions client/task_sender/swan_task_sender.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import csv
import logging
import os
import time
import uuid
from os import listdir
from os.path import isfile, join
Expand All @@ -22,7 +21,6 @@ def read_file_path_in_dir(dir_path: str) -> List[str]:

def generate_csv_and_send(_task: SwanTask, deal_list: List[OfflineDeal], _output_dir: str, _client: SwanClient,
_uuid: str):

_csv_name = _task.task_name + ".csv"
_csv_path = os.path.join(_output_dir, _csv_name)

Expand All @@ -48,7 +46,6 @@ def generate_csv_and_send(_task: SwanTask, deal_list: List[OfflineDeal], _output


def generate_car(_deal_list: List[OfflineDeal], target_dir) -> List[OfflineDeal]:

csv_path = os.path.join(target_dir, "car.csv")

with open(csv_path, "w") as csv_file:
Expand Down Expand Up @@ -104,22 +101,13 @@ def generate_metadata_csv(_deal_list: List[OfflineDeal], _task: SwanTask, _out_d
for _deal in _deal_list:
csv_writer.writerow(_deal.__dict__)

def update_task_by_uuid(config_path, task_uuid, miner_fid, csv):
config = read_config(config_path)
api_url = config['main']['api_url']
api_key = config['main']['api_key']
access_token = config['main']['access_token']
client = SwanClient(api_url, api_key, access_token)
client.update_task_by_uuid(task_uuid, miner_fid, csv)


def update_task_by_uuid(config_path, task_uuid, miner_fid, csv):
config = read_config(config_path)
api_url = config['main']['api_url']
api_key = config['main']['api_key']
access_token = config['main']['access_token']
client = SwanClient(api_url, api_key, access_token)
print(client.api_token)
client.update_task_by_uuid(task_uuid, miner_fid, csv)


Expand All @@ -146,6 +134,7 @@ def generate_car_files(input_dir, config_path):

generate_car(deal_list, output_dir)


def create_new_task(input_dir, config_path, task_name, miner_id=None):
# todo move config reading to cli level
config = read_config(config_path)
Expand Down Expand Up @@ -214,7 +203,6 @@ def create_new_task(input_dir, config_path, task_name, miner_id=None):
deal.car_file_url = os.path.join(download_url_prefix, deal.car_file_name)

if not public_deal:

final_csv_path = send_deals(config_path, miner_id, task_name, deal_list=deal_list, task_uuid=task_uuid)

if offline_mode:
Expand All @@ -234,4 +222,4 @@ def create_new_task(input_dir, config_path, task_name, miner_id=None):
task.miner_id = miner_id

generate_metadata_csv(deal_list, task, output_dir, task_uuid)
generate_csv_and_send(task, deal_list, output_dir, client, task_uuid)
generate_csv_and_send(task, deal_list, output_dir, client, task_uuid)
52 changes: 37 additions & 15 deletions common/swan_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
import time

import jwt
import requests

from common.Miner import Miner
Expand Down Expand Up @@ -28,15 +30,31 @@ def to_request_dict(self):


class SwanClient:
api_token = None
jwt_token = None
jwt_token_expiration = None

def __init__(self, api_url, api_key, access_token):
self.api_url = api_url
self.api_key = api_key
self.access_token = access_token
self.refresh_token()

def refresh_token(self):
self.get_jwt_token()

class SwanTool:
@staticmethod
def refresh_token(decorated):
# the function that is used to check
# the JWT and refresh if necessary
def wrapper(cli, *args, **kwargs):
# refresh token 1 minute before expiration
if not cli.jwt_token \
or not cli.jwt_token_expiration \
or time.time() > cli.jwt_token_expiration - 60:
cli.get_jwt_token()
return decorated(cli, *args, **kwargs)

return wrapper

def get_jwt_token(self):
logging.info('Refreshing token')
refresh_api_token_suffix = "/user/api_keys/jwt"
refresh_api_token_method = 'POST'
Expand All @@ -48,21 +66,24 @@ def refresh_token(self):
}
try:
resp_data = send_http_request(refresh_token_url, refresh_api_token_method, None, json.dumps(data))
self.api_token = resp_data['jwt']
self.jwt_token = resp_data['jwt']
payload = jwt.decode(jwt=self.jwt_token, verify=False, algorithm='HS256')
self.jwt_token_expiration = payload['exp']
except Exception as e:
logging.info(str(e))

@SwanTool.refresh_token
def update_task_by_uuid(self, task_uuid: str, miner_fid: str, csv):
logging.info('Updating Swan task.')
update_task_url_suffix = '/uuid_tasks/'
update_task_method = 'PUT'
update_task_url = self.api_url + update_task_url_suffix + task_uuid
payload_data = {"miner_fid": miner_fid}

send_http_request(update_task_url, update_task_method, self.api_token, payload_data, file=csv)
send_http_request(update_task_url, update_task_method, self.jwt_token, payload_data, file=csv)
logging.info('Swan task updated.')


@SwanTool.refresh_token
def post_task(self, task: SwanTask, csv):
logging.info('Creating new Swan task: %s' % task.task_name)
create_task_url_suffix = '/tasks'
Expand All @@ -71,44 +92,45 @@ def post_task(self, task: SwanTask, csv):
create_task_url = self.api_url + create_task_url_suffix
payload_data = task.to_request_dict()

send_http_request(create_task_url, create_task_method, self.api_token, payload_data, file=csv)
send_http_request(create_task_url, create_task_method, self.jwt_token, payload_data, file=csv)
logging.info('New Swan task Generated.')




@SwanTool.refresh_token
def update_miner(self, miner: Miner):
update_miner_url_suffix = '/miners/%s/status' % miner.miner_id
update_miner_method = 'PUT'

update_miner_url = self.api_url + update_miner_url_suffix
payload_data = json.dumps(miner.to_request_dict())

return send_http_request(update_miner_url, update_miner_method, self.api_token, payload_data)
return send_http_request(update_miner_url, update_miner_method, self.jwt_token, payload_data)

@SwanTool.refresh_token
def get_offline_deals(self, miner_fid: str, status: str, limit: int):
url = self.api_url + "/offline_deals/" + miner_fid + "?deal_status=" + status + "&limit=" + limit + "&offset=0"

get_offline_deals_method = "GET"
try:
response = send_http_request(url, get_offline_deals_method, self.api_token, None)
response = send_http_request(url, get_offline_deals_method, self.jwt_token, None)
return response["deal"]
except Exception as e:
return e

@SwanTool.refresh_token
def update_offline_deal_status(self, status: str, note: str, task_id: str, deal_cid: str):
url = self.api_url + "/my_miner/tasks/" + task_id + "/deals/" + deal_cid
update_offline_deal_status_method = "PUT"
body = {"status": status, "note": note}

send_http_request(url, update_offline_deal_status_method, self.api_token, body)
send_http_request(url, update_offline_deal_status_method, self.jwt_token, body)

@SwanTool.refresh_token
def update_offline_deal_details(self, status: str, note: str, deal_id, file_path=None, file_size=None):
url = self.api_url + "/my_miner/deals/" + str(deal_id)
update_offline_deal_details_method = "PUT"
body = {"status": status, "note": note, "file_path": file_path, "file_size": file_size}

send_http_request(url, update_offline_deal_details_method, self.api_token, body)
send_http_request(url, update_offline_deal_details_method, self.jwt_token, body)


def send_http_request(url, method, token, payload, file=None):
Expand Down

0 comments on commit 38130c1

Please sign in to comment.