Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Sep 26, 2024
1 parent 38a078e commit 25e21e1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 69 deletions.
3 changes: 1 addition & 2 deletions stats-backend/api2/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from .utils import identify_network_by_offer, identify_wallet_and_network
from django.db import transaction
from django.db.models import OuterRef, Subquery

from concurrent.futures import ThreadPoolExecutor

pool = redis.ConnectionPool(host="redis", port=6379, db=0)
Expand Down Expand Up @@ -230,6 +229,6 @@ async def monitor_nodes_status(subnet_tag: str = "public"):
except asyncio.TimeoutError:
print("Scan timeout reached")
print(f"In the current scan, we found {len(current_scan_providers)} providers")
# Delay update_nodes_data call using Celery
# Delay update_nodes_data call using Celery

update_providers_info.delay(node_props)
57 changes: 20 additions & 37 deletions stats-backend/api2/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,11 @@ def get_current_glm_price():

import asyncio
from .scanner import monitor_nodes_status


@app.task
def v2_offer_scraper(subnet_tag="public"):
# Run the asyncio function using asyncio.run()
asyncio.run(monitor_nodes_status(subnet_tag))
v2_offer_scraper.apply_async(args=[subnet_tag], countdown=5, queue='yagna', routing_key='yagna')


@app.task(queue="yagna")
Expand Down Expand Up @@ -1829,55 +1828,39 @@ def extract_wallets_and_ids():

@app.task
def bulk_update_node_statuses(nodes_data):
node_ids = [node_id for node_id, _ in nodes_data]
is_online_dict = dict(nodes_data)

# Fetch latest statuses from Redis in bulk
redis_keys = [f"provider:{node_id}:status" for node_id in node_ids]
redis_values = r.mget(redis_keys)
redis_statuses = dict(zip(node_ids, redis_values))

# Fetch latest database statuses in bulk
latest_db_statuses = dict(
NodeStatusHistory.objects.filter(node_id__in=node_ids)
.order_by('node_id', '-timestamp')
.distinct('node_id')
.values_list('node_id', 'is_online')
)

status_history_to_create = []
redis_updates = {}
node_updates = []

for node_id, is_online in nodes_data:
redis_status = redis_statuses.get(node_id)
db_status = latest_db_statuses.get(node_id)

if redis_status is None or redis_status.decode() != str(is_online) or db_status != is_online:
latest_status = r.get(f"provider:{node_id}:status")

if latest_status is None or latest_status.decode() != str(is_online):
status_history_to_create.append(
NodeStatusHistory(node_id=node_id, is_online=is_online)
)
redis_updates[f"provider:{node_id}:status"] = str(is_online)
node_updates.append(node_id)


if status_history_to_create:
# Bulk create NodeStatusHistory objects
NodeStatusHistory.objects.bulk_create(status_history_to_create)

if node_updates:
# Bulk update Node objects
Node.objects.filter(node_id__in=node_updates).update(
online=Case(
*[When(node_id=node_id, then=Value(is_online_dict[node_id])) for node_id in node_updates],
default=F('online'),
output_field=BooleanField()
)
)
with transaction.atomic():
NodeStatusHistory.objects.bulk_create(status_history_to_create)

if redis_updates:
# Update Redis in bulk
r.mset(redis_updates)

# Clean up duplicate consecutive statuses
with transaction.atomic():
subquery = NodeStatusHistory.objects.filter(
node_id=OuterRef('node_id'),
timestamp__lt=OuterRef('timestamp')
).order_by('-timestamp')

duplicate_records = NodeStatusHistory.objects.annotate(
prev_status=Subquery(subquery.values('is_online')[:1])
).filter(is_online=F('prev_status'))

duplicate_records.delete()

from .utils import check_node_status
@app.task
def fetch_and_update_relay_nodes_online_status():
Expand Down
29 changes: 11 additions & 18 deletions stats-backend/api2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ def node_uptime(request, yagna_id):
response_data = []
last_offline_timestamp = None

# Get the last known status before the 30-day period
last_known_status_before_period = NodeStatusHistory.objects.filter(
node_id=yagna_id,
timestamp__lt=thirty_days_ago
).order_by('-timestamp').first()

# Initialize the default status based on the last known status or the node's current status
default_status = "online" if (last_known_status_before_period and last_known_status_before_period.is_online) or node.online else "offline"

for day_offset in range(30):
day = (current_time - timedelta(days=day_offset)).date()
day_start = timezone.make_aware(datetime.combine(day, datetime.min.time()))
Expand Down Expand Up @@ -232,18 +241,11 @@ def node_uptime(request, yagna_id):
}
)
else:
# If the node was created after this day, mark as "unregistered"
if day < node.uptime_created_at.date():
status = "unregistered"
else:
# Infer status from last known status
last_known_status = statuses.filter(timestamp__lt=day_start).last()
status = "online" if last_known_status and last_known_status.is_online else "offline"

# If no data points for the day, use the default status
response_data.append(
{
"date": day.strftime("%d %B, %Y"),
"status": status,
"status": default_status,
"downtimes": [],
}
)
Expand Down Expand Up @@ -317,15 +319,6 @@ def process_downtime(start_time, end_time):
}


def calculate_time_diff(check_time, granularity, node):
if granularity >= 86400:
return f"{(check_time - node.uptime_created_at).days} days ago"
elif granularity >= 3600:
hours_ago = int((timezone.now() - check_time).total_seconds() / 3600)
return f"{hours_ago} hours ago" if hours_ago > 1 else "1 hour ago"
else:
minutes_ago = int((timezone.now() - check_time).total_seconds() / 60)
return f"{minutes_ago} minutes ago" if minutes_ago > 1 else "1 minute ago"


def globe_data(request):
Expand Down
15 changes: 3 additions & 12 deletions stats-backend/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def setup_periodic_tasks(sender, **kwargs):
fetch_and_update_relay_nodes_online_status,
)
fetch_and_update_relay_nodes_online_status.delay()
v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna")
v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna")
sender.add_periodic_task(
60,
computing_total_over_time.s(),
Expand Down Expand Up @@ -230,18 +232,7 @@ def setup_periodic_tasks(sender, **kwargs):
queue="default",
options={"queue": "default", "routing_key": "default"},
)
sender.add_periodic_task(
60.0,
v2_offer_scraper.s(),
queue="yagna",
options={"queue": "yagna", "routing_key": "yagna"},
)
sender.add_periodic_task(
60.0,
v2_offer_scraper.s(subnet_tag="ray-on-golem-heads"),
queue="yagna",
options={"queue": "yagna", "routing_key": "yagna"},
)

sender.add_periodic_task(
20.0,
v2_network_online_to_redis.s(),
Expand Down

0 comments on commit 25e21e1

Please sign in to comment.