Skip to content

Commit

Permalink
change inserted/updated message location
Browse files Browse the repository at this point in the history
  • Loading branch information
h3xitsec committed Feb 2, 2025
1 parent 8dee251 commit 9d5d012
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 58 deletions.
6 changes: 3 additions & 3 deletions docker_swarm/ansible/deploy_h3xrecon_stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
- name: Pull h3xrecon project Docker images
ansible.builtin.shell: docker pull {{ item }}
loop:
- ghcr.io/h3xitsec/h3xrecon/worker:nightly
- ghcr.io/h3xitsec/h3xrecon/worker:latest
when: inventory_hostname in groups['workers']

- {"name": "Clean up drive", "ansible.builtin.include_tasks": "tasks/clean_node_drives.yaml"}
Expand Down Expand Up @@ -37,8 +37,8 @@
- name: Pull h3xrecon project Docker images
ansible.builtin.shell: docker pull {{ item }}
loop:
- ghcr.io/h3xitsec/h3xrecon/server:nightly
- ghcr.io/h3xitsec/h3xrecon/database:nightly
- ghcr.io/h3xitsec/h3xrecon/server:latest
- ghcr.io/h3xitsec/h3xrecon/database:latest

- name: Create screenshot directory
ansible.builtin.file:
Expand Down
47 changes: 40 additions & 7 deletions src/h3xrecon/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,13 @@ async def insert_dns_record(self, domain_id: int, program_id: int, hostname: str
dns_class = EXCLUDED.dns_class
RETURNING (xmax = 0) AS inserted, id
''', domain_id, program_id, hostname, ttl, dns_class, dns_type, value)
logger.debug(f"DNS record inserted: {result}")
if result.success:
return DbResult(success=True, data=result.data[0])
if result.data.get('inserted') is True:
logger.success(f"INSERTED DNS RECORD: {hostname} {dns_type} {value}")
else:
logger.info(f"UPDATED DNS RECORD: {hostname} {dns_type} {value}")
else:
logger.error(f"Failed to insert or update DNS record: {result.error}")
return DbResult(success=False, error=f"Error inserting or updating DNS record in database: {result.error}")
except Exception as e:
logger.error(f"Error inserting DNS record: {str(e)}")
Expand Down Expand Up @@ -471,6 +474,10 @@ async def insert_screenshot(self, program_id: int, url: str, filepath: str, md5_
)
logger.debug(f"Insert result: {result}")
if result.success and isinstance(result.data, list) and len(result.data) > 0:
if result.data[0]['inserted']:
logger.success(f"INSERTED SCREENSHOT: {filepath}")
else:
logger.info(f"UPDATED SCREENSHOT: {filepath}")
return {
'inserted': result.data[0]['inserted'],
'id': result.data[0]['id']
Expand Down Expand Up @@ -512,8 +519,11 @@ async def insert_ip(self, ip: str, ptr: str, cloud_provider: str, program_id: in
"""
try:
result = await self._write_records(query, ip, ptr, cloud_provider, program_id)
logger.debug(f"Insert result: {result}")
if result.success and isinstance(result.data, list) and len(result.data) > 0:
if result.data[0]['inserted']:
logger.success(f"INSERTED IP: {ip}")
else:
logger.info(f"UPDATED IP: {ip}")
return {
'inserted': result.data[0]['inserted'],
'id': result.data[0]['id']
Expand Down Expand Up @@ -572,6 +582,10 @@ async def insert_service(self, ip: str, program_id: int, port: int = None, proto

# Handle nested DbResult for service record
if result.success:
if result.data[0]['inserted']:
logger.success(f"INSERTED SERVICE: {ip} {port} {protocol} {service}")
else:
logger.info(f"UPDATED SERVICE: {ip} {port} {protocol} {service}")
return DbResult(success=True, data=result.data[0])
else:
return DbResult(success=False, error=f"Error inserting or updating service in database: {result.error}")
Expand Down Expand Up @@ -626,13 +640,17 @@ async def insert_domain(self, domain: str, program_id: int, ips: List[str] = Non
logger.debug(f"Insert result: {result}")

if result.success and isinstance(result.data, list) and len(result.data) > 0:
if result.data[0]['inserted']:
logger.success(f"INSERTED DOMAIN: {domain}{f' IPs:{ips}' if ips else ''}{f' Cnames:{cnames}' if cnames else ''}{f' Wildcard:{is_catchall}' if is_catchall else ''}")
else:
logger.info(f"UPDATED DOMAIN: {domain}{f' IPs:{ips}' if ips else ''}{f' Cnames:{cnames}' if cnames else ''}{f' Wildcard:{is_catchall}' if is_catchall else ''}")
return DbResult(success=True, data={
'inserted': result.data[0]['inserted'],
'id': result.data[0]['id']
})
return DbResult(success=False, error=f"Error inserting or updating domain in database: {result.error}")
else:
return DbResult(success=False, error=f"Error inserting or updating domain in database: {result.error}")
else:
logger.error(f"Failed to insert or update domain in database: {result.error}")
return DbResult(success=False, error=f"Error inserting or updating domain in database: {result.error}")
except Exception as e:
logger.error(f"Error inserting or updating domain in database: {str(e)}")
logger.exception(e)
Expand Down Expand Up @@ -709,6 +727,10 @@ async def insert_website(self, url: str, host: str = None, port: int = None, sch
)

if result.success:
if result.data[0]['inserted']:
logger.success(f"INSERTED WEBSITE: {url}")
else:
logger.info(f"UPDATED WEBSITE: {url}")
return DbResult(success=True, data=result.data[0])
return DbResult(success=False, error=f"Error inserting or updating website in database: {result.error}")
except Exception as e:
Expand Down Expand Up @@ -793,12 +815,15 @@ async def insert_website_path(

# Handle nested DbResult objects
if result.success:
if result.data[0]['inserted']:
logger.success(f"INSERTED WEBSITE PATH: {path}")
else:
logger.info(f"UPDATED WEBSITE PATH: {path}")
return DbResult(success=True, data=result.data[0])
else:
return DbResult(success=False, error=f"Error inserting or updating website path in database: {result.error}")
except Exception as e:
logger.error(f"Error inserting or updating website path in database: {e}")
logger.exception(e)
return DbResult(success=False, error=f"Error inserting or updating website path in database: {e}")

async def insert_certificate(self, program_id: int, data: Dict[str, Any]):
Expand Down Expand Up @@ -903,6 +928,10 @@ async def insert_certificate(self, program_id: int, data: Dict[str, Any]):
)

if result.success:
if result.data[0]['inserted']:
logger.success(f"INSERTED CERTIFICATE: {serial}")
else:
logger.info(f"UPDATED CERTIFICATE: {serial}")
return DbResult(success=True, data=result.data[0])
return DbResult(success=False, error=f"Error inserting or updating certificate in database: {result.error}")
except Exception as e:
Expand Down Expand Up @@ -990,6 +1019,10 @@ async def insert_nuclei(self, program_id: int, data: Dict[str, Any]):
)

if result.success:
if result.data[0]['inserted']:
logger.success(f"INSERTED NUCLEI: {url} {template_id}")
else:
logger.info(f"UPDATED NUCLEI: {url} {template_id}")
return DbResult(success=True, data=result.data[0])
return DbResult(success=False, error=f"Error inserting or updating Nuclei hit in database: {result.error}")
except Exception as e:
Expand Down
51 changes: 3 additions & 48 deletions src/h3xrecon/workers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ async def process_dns_record(self, msg_data: Dict[str, Any]):
if result.success:
domain_id = result.data.get('id')
else:
logger.error(f"Failed to insert domain {record.get('target_domain')}: {result.error}")
continue

result = await self.db.insert_dns_record(
Expand All @@ -442,14 +441,6 @@ async def process_dns_record(self, msg_data: Dict[str, Any]):
dns_type=record.get('dns_type'),
value=record.get('value')
)
logger.debug(f"result: {result}")
if result.success:
if result.data.get('inserted') is True:
logger.success(f"INSERTED DNS RECORD: {record.get('hostname')} {record.get('dns_type')} {record.get('value')}")
else:
logger.info(f"UPDATED DNS RECORD: {record.get('hostname')} {record.get('dns_type')} {record.get('value')}")
else:
logger.error(f"Failed to insert or update DNS record: {result.error}")

async def process_ip(self, msg_data: Dict[str, Any]):
"""Process IP data."""
Expand Down Expand Up @@ -479,7 +470,6 @@ async def process_ip(self, msg_data: Dict[str, Any]):

# Log operation result
if result.get('inserted'):
logger.success(f"INSERTED IP: {ip_data}")
# await self.db.log_dataworker_operation(
# component_id=self.component_id,
# data_type='ip',
Expand All @@ -494,8 +484,6 @@ async def process_ip(self, msg_data: Dict[str, Any]):
await self.trigger_new_jobs(program_id=msg_data.get('program_id'), data_type="ip", result=ip)
else:
logger.warning(f"JOB TRIGGERING DISABLED: {msg_data.get('data_type')} : {ip} : {msg_data.get('execution_id', 'no execution id')}")
else:
logger.info(f"UPDATED IP: {ip}")
except Exception as e:
# await self.db.log_dataworker_operation(
# component_id=self.component_id,
Expand All @@ -522,10 +510,7 @@ async def process_screenshot(self, msg_data: Dict[str, Any]):
url=screenshot.get('url')
)
if result['inserted']:
logger.success(f"INSERTED SCREENSHOT: {screenshot.get('url')}")
return screenshot.get('url')
else:
logger.info(f"UPDATED SCREENSHOT: {screenshot.get('url')}")

async def process_domain(self, msg_data: Dict[str, Any]):
"""Process domain data."""
Expand Down Expand Up @@ -556,15 +541,10 @@ async def process_domain(self, msg_data: Dict[str, Any]):
)
if result.success:
if result.data.get('inserted'):
logger.success(f"INSERTED DOMAIN: {domain_data}")
if msg_data.get('trigger_new_jobs'):
await self.trigger_new_jobs(program_id=msg_data.get('program_id'), data_type="domain", result=domain)
else:
logger.warning(f"JOB TRIGGERING DISABLED: {msg_data.get('data_type')} : {domain} : {msg_data.get('execution_id', 'no execution id')}")
else:
logger.info(f"UPDATED DOMAIN: {domain_data}")
else:
logger.error(f"Failed to insert or update domain: {result.error}")

async def process_website(self, msg: Dict[str, Any]):
"""Process website data."""
Expand Down Expand Up @@ -604,15 +584,10 @@ async def process_website(self, msg: Dict[str, Any]):
)
if result.success:
if result.data.get('inserted'):
logger.success(f"INSERTED WEBSITE: {url}")
if msg.get('trigger_new_jobs'):
await self.trigger_new_jobs(program_id=msg.get('program_id'), data_type="website", result=url)
else:
logger.warning(f"JOB TRIGGERING DISABLED: {msg.get('data_type')} : {msg.get('data')} : {msg.get('execution_id', 'no execution id')}")
else:
logger.info(f"UPDATED WEBSITE: {url}")
else:
logger.error(f"Failed to insert or update website: {result.error}")

except Exception as e:
logger.error(f"Failed to process website in program {msg.get('program_id')}: {e}")
Expand All @@ -636,7 +611,6 @@ async def process_website_path(self, msg: Dict[str, Any]):
if not website_id.success or not website_id.data:
logger.error(f"Failed to find website {base_url} in database")
result = await self.db.insert_website(url=base_url, program_id=msg.get('program_id'))
logger.debug(f"Inserted website {base_url} in database: {result}")
website_id = await self.db._fetch_value(f"SELECT id FROM websites WHERE url = '{base_url}'")
if website_id.success and website_id.data:
logger.info(f"PROCESSING WEBSITE PATH: {d.get('url')}")
Expand All @@ -663,15 +637,10 @@ async def process_website_path(self, msg: Dict[str, Any]):
)
if result.success:
if result.data.get('inserted'):
logger.success(f"INSERTED WEBSITE PATH: {d.get('url')}")
if msg.get('trigger_new_jobs'):
await self.trigger_new_jobs(program_id=msg.get('program_id'), data_type="website_path", result=d.get('url'))
else:
logger.warning(f"JOB TRIGGERING DISABLED: {msg.get('data_type')} : {msg.get('data')} : {msg.get('execution_id', 'no execution id')}")
else:
logger.info(f"UPDATED WEBSITE PATH: {d.get('url')}")
else:
logger.error(f"Failed to insert or update website path: {result.error}")

except Exception as e:
logger.error(f"Failed to process website path in program {msg.get('program_id')}: {e}")
Expand Down Expand Up @@ -701,16 +670,11 @@ async def process_nuclei(self, msg: Dict[str, Any]):
)
if result.success:
if result.data.get('inserted') is True:
logger.success(f"INSERTED NUCLEI: {d.get('matched_at', {})} | {d.get('template_id', {})} | {d.get('severity', {})}")
else:
logger.info(f"UPDATED NUCLEI: {d.get('matched_at', {})} | {d.get('template_id', {})} | {d.get('severity', {})}")
else:
logger.error(f"Failed to insert or update Nuclei hit: {result.error}")
pass
else:
logger.debug(f"Hostname {hostname} is not in scope for program {msg.get('program_id')}. Skipping.")
except Exception as e:
logger.error(f"Failed to process Nuclei result in program {msg.get('program_id')}: {e}")
logger.exception(e)

async def process_certificate(self, msg: Dict[str, Any]):
"""Process certificate data."""
Expand All @@ -725,14 +689,9 @@ async def process_certificate(self, msg: Dict[str, Any]):
)
if result.success:
if result.data.get('inserted'):
logger.success(f"INSERTED CERTIFICATE: {d.get('cert', {}).get('serial', {})}")
else:
logger.info(f"UPDATED CERTIFICATE: {d.get('cert', {}).get('serial', {})}")
else:
logger.error(f"Failed to insert or update certificate: {result.error}")
pass
except Exception as e:
logger.error(f"Failed to process certificate in program {msg.get('program_id')}: {e}")
logger.exception(e)

async def process_service(self, msg_data: Dict[str, Any]):
"""Process service data."""
Expand All @@ -749,11 +708,7 @@ async def process_service(self, msg_data: Dict[str, Any]):
)
if result.success:
if result.data.get('inserted'):
logger.success(f"INSERTED SERVICE: {i.get('ip')}:{i.get('port')}/{i.get('protocol')}/{i.get('service')}")
else:
logger.info(f"UPDATED SERVICE: {i.get('ip')}:{i.get('port')}/{i.get('protocol')}/{i.get('service')}")
else:
logger.error(f"Failed to insert or update service: {result.error}")
pass
async def _handle_killjob_command(self, msg: Dict[str, Any]):
"""Handle killjob command to cancel the running task."""
pass
Expand Down

0 comments on commit 9d5d012

Please sign in to comment.