Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scale up logic #173

Open
suhlrich opened this issue Apr 19, 2024 · 1 comment
Open

Scale up logic #173

suhlrich opened this issue Apr 19, 2024 · 1 comment

Comments

@suhlrich
Copy link
Member

suhlrich commented Apr 19, 2024

Celery will check the queue length every 20 seconds and update the desired_asg_gpu_instances variable in cloudwatch accordingly.

For counting stopped trials, we can use some of the logic at the beginning of this (

def dequeue(self, request):
)

Config variables somewhere when celery starts?

  • autoscale_gpus_on - some way to toggle whether we are using autoscaling
  • queue_length_before_scaling_start - how long will we let the queue be before starting asg machines
  • queue_length_before_new_machine - how many jobs in the queue per asg machine.

Logic that gets executed on celery. This is functional python code:

import math

# constants from some config
queue_length_before_scaling_start = 20 # we can set this based on the size of on-prem cluster. Maybe 5*n_machines
queue_length_before_new_machine = 5

# TEST VALUES
asg_size_FOR_TESTING = 5
desired_asg_gpu_instances_FOR_TESTING = 3
n_stopped_trials_FOR_TESTING = 26


# find asg size from AWS CLI? Maybe we don't need this, and the ASG logic sets the upper bound if we ask for too many machines.
asg_size = asg_size_FOR_TESTING

# get desired_asg_gpu_instances from cloudwatch. This tells us how how many machines are running in the asg currently.
current_asg_machines = desired_asg_gpu_instances_FOR_TESTING

# get number of stopped trials created in the last 12 hours with all videos uploaded
n_stopped_trials = n_stopped_trials_FOR_TESTING

# compute desired number of machines
change_in_desired_machines = math.trunc((n_stopped_trials - queue_length_before_scaling_start - 
                                current_asg_machines * queue_length_before_new_machine) / queue_length_before_new_machine)

desired_asg_machines = current_asg_machines + change_in_desired_machines

desired_asg_machines_bounded = max(0, min(desired_asg_machines, asg_size))

# push desired_asg_machines_bounded to desired_asg_gpu_instances on cludwatch
# FOR TESTING
print(desired_asg_machines_bounded)
This was referenced Apr 19, 2024
@sashasimkin
Copy link

Hi @suhlrich. Following this comment I want to provide suggestions on how to implement the "scale up logic", which is in fact - metric submission logic.

  1. 20 seconds resolution is too detailed because a) it might put too much strain on the celery/db as each task should go through the database queue b) we will be using aggregates of a minimum 1 minute anyways
  2. Celery worker needs the cloudwatch:PutMetricsData permission to submit this metric to CloudWatch. Since you're using Fargate to host the API project I'll create a task to enable this permission on the worker process, so that you can expect the boto3 client just working in the sample code that follows
  3. Here's a method submit_custom_metric that should be executed every 1 minute in the scheduled task and submitting the number of trials pending. Make sure that metric_name is opencap_trials_pending and namespace is Custom/opencap-dev for development and Custom/opencap for production:
import boto3

def submit_custom_metric(namespace, metric_name, value):
    """
    Submit a custom metric to AWS CloudWatch.

    Parameters:
    - namespace (str): The namespace for the metric data.
    - metric_name (str): The name of the metric.
    - value (float): The value associated with the metric.
    """
    client = boto3.client('cloudwatch')
    response = client.put_metric_data(
        Namespace=namespace,
        MetricData=[
            {
                'MetricName': metric_name,
                'Value': value,
                'Unit': 'Count'
            }
        ]
    )
    return response

def submit_number_of_pending_trials_to_cloudwatch():
    # Submit the metric
    response = submit_custom_metric(
        'Custom/opencap-dev',  # Or 'Custom/opencap' for production
       'opencap_trials_pending',
        current_queue_length,
    )
    print("Metric submitted successfully:", response)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants