diff --git a/ai_ta_backend/flows.py b/ai_ta_backend/flows.py index aa436b38..64dd3b51 100644 --- a/ai_ta_backend/flows.py +++ b/ai_ta_backend/flows.py @@ -36,21 +36,14 @@ def get_users(self, limit: int = 50, pagination: bool = True, api_key: str = "") return all_users - def execute_flow(self, hook: str, api_key: str = "", post: str = "", data=None) -> None: - if not api_key: - raise ValueError('api_key is required') - headers = {"X-N8N-API-KEY": api_key, "Accept": "application/json"} + def execute_flow(self, hook: str, data={'field-0': ''}) -> None: + if not data: + data = {'field-0': ''} url = hook - if post: - if data: - response = requests.post(url, headers=headers, json=post, timeout=8, data=data) - else: - response = requests.post(url, headers=headers, json=post, timeout=8) - else: - response = requests.get(url, headers=headers, timeout=8) + response = requests.post(url, files=data, timeout=60) body = response.json() if not response.ok: - raise Exception(f"Error: {response.status_code} \n Message: {body['message']} \n Hint: {body['hint']}") + raise Exception(f"Error: {response.status_code} \n Message: {body.get('message')}") pass def get_executions(self, limit, id=None, pagination: bool = True, api_key: str = ""): @@ -85,13 +78,6 @@ def get_executions(self, limit, id=None, pagination: bool = True, api_key: str = else: return all_executions - def get_hook(self, name: str, api_key: str = ""): - work_flow = self.get_workflows(limit=100, api_key=api_key, workflow_name=name) - for node in work_flow.get('nodes'): # type: ignore - if node['name'] == 'Webhook': - return node['webhookId'] - pass - def get_workflows(self, limit, pagination: bool = True, @@ -130,6 +116,40 @@ def get_workflows(self, raise Exception('Workflow not found') return all_workflows + def get_hook(self, name: str, api_key: str = ""): + work_flow = self.get_workflows(limit=100, api_key=api_key, workflow_name=name) + if isinstance(work_flow, dict) and 'nodes' in work_flow: + for node in work_flow['nodes']: + if node['name'] == 'n8n Form Trigger': + return node['parameters']['path'] + else: + raise Exception('No nodes found in the workflow') + + def format_data(self, inputted, api_key: str, workflow_name): + work_flow = self.get_workflows(100, api_key=api_key, workflow_name=workflow_name) + print("Got workflow") + values = [] + if isinstance(work_flow, dict) and 'nodes' in work_flow: + print("passed if") + for node in work_flow['nodes']: + if node['name'] == 'n8n Form Trigger': + values = node['parameters']['formFields']['values'] + print("found value") + data = {} + print(inputted) + inputted = json.loads(inputted) + print("Done with json") + inputted = dict(inputted) + print("Got data") + for i, value in enumerate(values): + field_name = 'field-' + str(i) + data[value['fieldLabel']] = field_name + new_data = {} + for k, v in inputted.items(): + new_data[data[k]] = v + + return new_data + # TODO: activate and disactivate workflows def switch_workflow(self, id, api_key: str = "", activate: 'str' = 'True'): @@ -156,6 +176,8 @@ def get_data(self, id): self.get_executions(20, id) # TODO: NEED to have keyword args for workflows like Pest Detection. + # TODO: make the supabase rpc call to make the transaction + # What if some data takes longer to parse, so the transactional supabase call is wrong. def main_flow(self, name: str, api_key: str = "", data: str = ""): if not api_key: raise ValueError('api_key is required') @@ -163,24 +185,13 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): execution = self.get_executions(limit=1, api_key=api_key) print("Got executions") hookId = self.get_hook(name, api_key) - hook = self.url + f"/webhook/{hookId}" + hook = self.url + f"/form/{hookId}" print("Hook!!!: ", hook) - print(data) - if data: - json_data = json.loads(data) - print("Data to json") - new_data = dict(json_data) - print("Got data to dictionary") - else: - new_data = None + + new_data = self.format_data(data, api_key, name) response = self.supabase_client.table('n8n_api_keys').select("*").execute() print("Got response") - workflow = self.get_workflows(limit=100, api_key=api_key, workflow_name=name) - print("Got workflow") - print(workflow) - workflow_post = workflow['nodes'][0]['parameters'].get('httpMethod') # type: ignore - print("Got workflow post") ids = [] for row in dict(response)['data']: @@ -199,7 +210,7 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): try: self.supabase_client.table('n8n_api_keys').insert({"id": id}).execute() - self.execute_flow(hook, api_key, workflow_post, new_data) + self.execute_flow(hook, new_data) print("Executed") executions = self.get_executions(20, id, True, api_key) print("Got executions", executions)