Skip to content

Commit

Permalink
Updated indexing of nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Sep 24, 2024
1 parent 94af712 commit 75948bb
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 169 deletions.
35 changes: 35 additions & 0 deletions stats-backend/api2/migrations/0031_update_nodestatushistory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from django.db import migrations, models

def update_nodestatushistory(apps, schema_editor):
NodeStatusHistory = apps.get_model('api2', 'NodeStatusHistory')
Node = apps.get_model('api2', 'Node')

for history in NodeStatusHistory.objects.all():
if history.provider:
history.node_id = history.provider.node_id
history.save()

class Migration(migrations.Migration):

dependencies = [
('api2', '0030_relaynodes_ip_address_relaynodes_port'),
]

operations = [
migrations.AddField(
model_name='nodestatushistory',
name='node_id',
field=models.CharField(max_length=42, null=True),
),
migrations.RunPython(update_nodestatushistory),
migrations.RemoveField(
model_name='nodestatushistory',
name='provider',
),
migrations.AlterField(
model_name='nodestatushistory',
name='node_id',
field=models.CharField(max_length=42),
),

]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 4.1.7 on 2024-09-24 11:13

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api2', '0031_update_nodestatushistory'),
]

operations = [

migrations.AlterField(
model_name='nodestatushistory',
name='is_online',
field=models.BooleanField(),
),
migrations.AlterField(
model_name='nodestatushistory',
name='timestamp',
field=models.DateTimeField(auto_now_add=True),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.1.7 on 2024-09-24 11:14

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api2', '0032_remove_nodestatushistory_api2_nodest_provide_64b3a2_idx_and_more'),
]

operations = [
migrations.AlterField(
model_name='nodestatushistory',
name='is_online',
field=models.BooleanField(db_index=True),
),
migrations.AlterField(
model_name='nodestatushistory',
name='timestamp',
field=models.DateTimeField(auto_now_add=True, db_index=True),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.1.7 on 2024-09-24 11:14

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api2', '0033_alter_nodestatushistory_is_online_and_more'),
]

operations = [
migrations.AddIndex(
model_name='nodestatushistory',
index=models.Index(fields=['node_id', 'timestamp'], name='api2_nodest_node_id_220bdf_idx'),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.1.7 on 2024-09-24 11:14

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('api2', '0034_nodestatushistory_api2_nodest_node_id_220bdf_idx'),
]

operations = [
migrations.RemoveIndex(
model_name='nodestatushistory',
name='api2_nodest_provide_64b3a2_idx',
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by Django 4.1.7 on 2024-09-24 12:00

from django.db import migrations
from django.db.models import F

def cleanup_nodestatushistory(apps, schema_editor):
NodeStatusHistory = apps.get_model('api2', 'NodeStatusHistory')

# Get all node_ids
node_ids = NodeStatusHistory.objects.values_list('node_id', flat=True).distinct()

for node_id in node_ids:
entries = NodeStatusHistory.objects.filter(node_id=node_id).order_by('timestamp')
previous_status = None
to_delete = []

for entry in entries:
if previous_status is not None and entry.is_online == previous_status:
to_delete.append(entry.id)
else:
previous_status = entry.is_online

# Delete duplicate entries for this node
NodeStatusHistory.objects.filter(id__in=to_delete).delete()

class Migration(migrations.Migration):

dependencies = [
('api2', '0035_remove_nodestatushistory_api2_nodest_provide_64b3a2_idx'),
]

operations = [
migrations.RunPython(cleanup_nodestatushistory),
]
7 changes: 4 additions & 3 deletions stats-backend/api2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,20 @@ class GLM(models.Model):


class NodeStatusHistory(models.Model):
provider = models.ForeignKey(Node, on_delete=models.CASCADE, db_index=True)
node_id = models.CharField(max_length=42)
is_online = models.BooleanField(db_index=True)
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)

def __str__(self):
return f"{self.provider.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}"
return f"{self.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}"

class Meta:
indexes = [
models.Index(fields=["provider", "timestamp"]),
models.Index(fields=["node_id", "timestamp"]),
]



class ProviderWithTask(models.Model):
instance = models.ForeignKey(
Node, on_delete=models.CASCADE, related_name="tasks_received"
Expand Down
165 changes: 8 additions & 157 deletions stats-backend/api2/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def update_providers_info(node_props):

# Get existing Offers
existing_offers = Offer.objects.filter(
provider__node_id__in=provider_ids, runtime__in=[data["golem.runtime.name"] for _, data in provider_data_list]
provider__node_id__in=provider_ids,
runtime__in=[data["golem.runtime.name"] for _, data in provider_data_list]
).select_related('provider')

existing_offers_dict = {(offer.provider.node_id, offer.runtime): offer for offer in existing_offers}
Expand All @@ -98,7 +99,7 @@ def update_providers_info(node_props):
Offer.objects.bulk_create(new_offers)

# Update existing_offers_dict with newly created offers
updated_offers = Offer.objects.filter(provider__node_id__in=provider_ids)
updated_offers = Offer.objects.filter(provider__node_id__in=provider_ids).select_related('provider')
existing_offers_dict.update({(offer.provider.node_id, offer.runtime): offer for offer in updated_offers})

# Now process and update offers
Expand Down Expand Up @@ -150,8 +151,11 @@ def update_providers_info(node_props):
else:
print("EC2 monthly price is zero, cannot compare offer prices.")

offer.properties = data
offers_to_update.append(offer)
# Always update the offer if any properties have changed
# Compare existing properties with new data
if offer.properties != data:
offer.properties = data
offers_to_update.append(offer)

# Bulk update offers if any
if offers_to_update:
Expand Down Expand Up @@ -182,159 +186,7 @@ def update_providers_info(node_props):
sys.path.append(str(examples_dir))
from .yapapi_utils import build_parser, print_env_info, format_usage # noqa: E402

@app.task
def update_nodes_data(node_props):
# Collect issuer_ids
is_online_checked_providers = set()
issuer_ids = []
for props in node_props:
props = json.loads(props)
issuer_id = props["node_id"]
if issuer_id not in is_online_checked_providers:
issuer_ids.append(issuer_id)
is_online_checked_providers.add(issuer_id)

# Check node status in parallel using ThreadPoolExecutor
def check_status(issuer_id):
return issuer_id, check_node_status(issuer_id)

with ThreadPoolExecutor() as executor:
results = list(executor.map(check_status, issuer_ids))
nodes_status_to_update = dict(results)

# Get previously online providers not in scan
deserialized_node_props = [json.loads(props) for props in node_props]
provider_ids_in_props = {props["node_id"] for props in deserialized_node_props}

latest_online_status = (
NodeStatusHistory.objects.filter(provider=OuterRef("pk"))
.order_by("-timestamp")
.values("is_online")[:1]
)
previously_online_providers_ids = set(
Node.objects.annotate(latest_online=Subquery(latest_online_status))
.filter(latest_online=True)
.values_list("node_id", flat=True)
)

provider_ids_not_in_scan = previously_online_providers_ids - provider_ids_in_props
with ThreadPoolExecutor() as executor:
results = list(executor.map(check_status, provider_ids_not_in_scan))
nodes_status_to_update_not_in_scan = dict(results)
nodes_status_to_update.update(nodes_status_to_update_not_in_scan)

print(
f"Finished checking statuses of {len(nodes_status_to_update)} providers."
)

# Now, update node statuses in bulk
# Fetch Nodes
provider_ids = list(nodes_status_to_update.keys())

# Get existing Nodes
existing_nodes = Node.objects.filter(node_id__in=provider_ids)
existing_nodes_dict = {node.node_id: node for node in existing_nodes}

# Find which nodes are new
existing_provider_ids = set(existing_nodes_dict.keys())
new_provider_ids = set(provider_ids) - existing_provider_ids

# Create new Node instances if any
new_nodes = [Node(node_id=provider_id) for provider_id in new_provider_ids]
if new_nodes:
Node.objects.bulk_create(new_nodes)

# Update existing_nodes_dict with newly created nodes
updated_nodes = Node.objects.filter(node_id__in=new_provider_ids)
for node in updated_nodes:
existing_nodes_dict[node.node_id] = node

# Get latest statuses from Redis in batch
redis_keys = [f"node_status:{provider_id}" for provider_id in provider_ids]
latest_statuses = r.mget(redis_keys)
latest_status_dict = dict(zip(provider_ids, latest_statuses))

providers_to_update_online_status = {}
node_status_history_to_create = []
for provider_id in provider_ids:
is_online_now = nodes_status_to_update[provider_id]
provider = existing_nodes_dict[provider_id]
latest_status = latest_status_dict.get(provider_id)

if latest_status is None:
# Fetch latest status from database
latest_status_from_db = NodeStatusHistory.objects.filter(
provider=provider
).order_by("-timestamp").first()
if latest_status_from_db:
if latest_status_from_db.is_online != is_online_now:
node_status_history_to_create.append(
NodeStatusHistory(provider=provider, is_online=is_online_now)
)
providers_to_update_online_status[provider_id] = is_online_now
else:
node_status_history_to_create.append(
NodeStatusHistory(provider=provider, is_online=is_online_now)
)
providers_to_update_online_status[provider_id] = is_online_now
r.set(f"node_status:{provider_id}", str(is_online_now))
else:
if latest_status.decode() != str(is_online_now):
node_status_history_to_create.append(
NodeStatusHistory(provider=provider, is_online=is_online_now)
)
providers_to_update_online_status[provider_id] = is_online_now
r.set(f"node_status:{provider_id}", str(is_online_now))

# Bulk create NodeStatusHistory entries if any
if node_status_history_to_create:
NodeStatusHistory.objects.bulk_create(node_status_history_to_create)

# Bulk update the online status of providers
if providers_to_update_online_status:
# Build a list of Nodes to update
nodes_to_update = []
for provider_id, is_online_now in providers_to_update_online_status.items():
provider = existing_nodes_dict[provider_id]
provider.online = is_online_now
nodes_to_update.append(provider)
Node.objects.bulk_update(nodes_to_update, ['online'])


def check_node_status(issuer_id):
node_id_no_prefix = issuer_id[2:] if issuer_id.startswith('0x') else issuer_id
url = f"http://yacn2.dev.golem.network:9000/nodes/{node_id_no_prefix}"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
node_key = issuer_id.lower()
node_info = data.get(node_key)

if node_info:
if isinstance(node_info, list):
if node_info == []:
return False # Offline
elif node_info == [None]:
return False # Offline
else:
# Check if 'seen' is present
for item in node_info:
if item and 'seen' in item:
return True
return False
else:
# Unexpected format
return False
else:
# Empty dict, node is offline
return False
except requests.exceptions.RequestException as e:
print(f"HTTP request exception when checking node status for {issuer_id}: {e}")
return False
except Exception as e:
print(f"Unexpected error checking node status for {issuer_id}: {e}")
return False


async def list_offers(
Expand Down Expand Up @@ -381,4 +233,3 @@ async def monitor_nodes_status(subnet_tag: str = "public"):
# Delay update_nodes_data call using Celery

update_providers_info.delay(node_props)
update_nodes_data.delay(node_props)
4 changes: 1 addition & 3 deletions stats-backend/api2/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@


def calculate_uptime_percentage(node_id, node=None):
if node is None:
node = Node.objects.get(node_id=node_id)
statuses = NodeStatusHistory.objects.filter(provider=node).order_by("timestamp")
statuses = NodeStatusHistory.objects.filter(node_id=node_id).order_by("timestamp")

online_duration = timedelta(0)
last_online_time = None
Expand Down
Loading

0 comments on commit 75948bb

Please sign in to comment.