Skip to content

Commit

Permalink
Fixes for Worker-controller 2.1.3 (#943)
Browse files Browse the repository at this point in the history
* Add missing worker-controller testing

* Read worker count directly from broker -- dont just guess

* Fix CI PR triggers

* backport refactor

* fix tests/test_autoscaler.py

* Fix tests/test_autoscaler_rules.py

* pep

* f

* Check both broker queue and API stauts for stuck analyses

* PEP
  • Loading branch information
sambles authored Jan 11, 2024
1 parent 2202566 commit 48ce98c
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 110 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/build-schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ on:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:
inputs:
ods_branch:
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/scan-external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ on:
- main
- stable**
pull_request:
branches:
- main
- stable**

workflow_dispatch:
inputs:
cve_severity:
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ on:
- main
- stable**
pull_request:
branches:
- main
- stable**
schedule:
- cron: '0 */6 * * *' # Run scan every 6 hours

Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/test-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ on:
- main
- stable**
pull_request:
branches:
- main
- stable**
workflow_dispatch:
inputs:
last_release:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
name: Platform Python Tests
name: Python Tests - Platform API

on:
push:
branches:
- main
- stable**
pull_request:
branches:
- main
- stable**
workflow_dispatch:
inputs:
ods_branch:
Expand Down
47 changes: 47 additions & 0 deletions .github/workflows/test-python_worker-controller.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: Python Tests - Worker Controller

on:
push:
branches:
- main
pull_request:
workflow_dispatch:
workflow_call:

jobs:
unittest:
env:
JUNIT_REPORT: pytest_worker-controller_report.xml
PLAT_BRANCH: ${{ github.ref }}
runs-on: ubuntu-22.04

steps:
- name: Branch selection (remote trigger)
if: inputs.platform_branch != ''
run: echo "PLAT_BRANCH=${{ inputs.platform_branch }}" >> $GITHUB_ENV

- name: Checkout
uses: actions/checkout@v3
with:
repository: OasisLMF/OasisPlatform
ref: ${{ env.PLAT_BRANCH }}

- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- run: |
pip install pytest
pip install -r kubernetes/worker-controller/requirements.txt
- name: Run Pytest
run: |
cd kubernetes/worker-controller/src
python -m pytest -v
# - name: Generate Report
# uses: dorny/test-reporter@v1
# if: success() || failure() # run this step even if previous step failed
# with:
# name: Pytest Results # Name of the check run which will be created
# path: ${{ env.JUNIT_REPORT }} # Path to test results
# reporter: java-junit # Format of test results
# fail-on-error: 'false'
36 changes: 12 additions & 24 deletions kubernetes/worker-controller/src/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _aggregate_model_states(self, analyses: []) -> dict:

return model_states

async def _scale_deployment(self, wd: WorkerDeployment, analysis_in_progress: bool, model_state: ModelState, limit: int) -> int:
async def _scale_deployment(self, wd: WorkerDeployment, model_state: ModelState, limit: int) -> int:
"""
Update a worker deployments number of replicas, based on what autoscaler_rules returns as desired number
of replicas (if changed since laste time).
Expand All @@ -105,31 +105,19 @@ async def _scale_deployment(self, wd: WorkerDeployment, analysis_in_progress: bo
:return: Number of replicas set on deployment
"""

desired_replicas = wd.auto_scaling.get('worker_count_min', 0) # Set to min value of workers (always running)
is_fixed_strategy = wd.auto_scaling.get('scaling_strategy') == 'FIXED_WORKERS' and self.never_shutdown_fixed_workers

if analysis_in_progress or is_fixed_strategy:

if analysis_in_progress:
logging.debug('Analysis for model %s is running', wd.name)
if is_fixed_strategy:
logging.debug('Model %s is set to "FIXED_WORKERS"', wd.name)

try:
desired_replicas = autoscaler_rules.get_desired_worker_count(wd.auto_scaling, model_state)

if limit is not None and desired_replicas > limit:
desired_replicas = limit
except ValueError as e:
logging.error('Could not calculate desired replicas count for model %s: %s', wd.id_string(), str(e))
try:
desired_replicas = autoscaler_rules.get_desired_worker_count(wd.auto_scaling, model_state, self.never_shutdown_fixed_workers)
if limit is not None and desired_replicas > limit:
desired_replicas = limit
except ValueError as e:
desired_replicas = 0
logging.error('Could not calculate desired replicas count for model %s: %s', wd.id_string(), str(e))

if desired_replicas > 0 and wd.name in self.cleanup_deployments:

if wd.name in self.cleanup_deployments:
self.cleanup_deployments.remove(wd.name)

if wd.replicas != desired_replicas:

if desired_replicas > 0:
await self.cluster.set_replicas(wd.name, desired_replicas)
else:
Expand All @@ -138,10 +126,8 @@ async def _scale_deployment(self, wd: WorkerDeployment, analysis_in_progress: bo
self.cleanup_timer = None

self.cleanup_deployments.add(wd.name)

loop = asyncio.get_event_loop()
self.cleanup_timer = loop.call_later(20, self._cleanup)

return wd.replicas

return desired_replicas
Expand Down Expand Up @@ -179,7 +165,9 @@ async def parse_queued_pending(self, msg) -> [RunningAnalysis]:

# Check for pending analyses
if (queue_name not in ['celery', 'task-controller']):
queued_count = entry['queue']['queued_count']
queued_task_count = entry.get('queue', {}).get('queued_count', 0) # sub-task queued (API DB status)
queue_message_count = entry.get('queue', {}).get('queue_message_count', 0) # queue has messages
queued_count = max(queued_task_count, queue_message_count)

if (queued_count > 0) and not analyses_list:
# a task is queued, but no analyses are running.
Expand Down Expand Up @@ -249,7 +237,7 @@ async def _scale_models(self, prioritized_models):
analysis_in_progress = state.get('tasks', 0) > 0
replicas_limit = self.limit - workers_total if self.limit else None

workers_created = await self._scale_deployment(wd, analysis_in_progress, state, replicas_limit)
workers_created = await self._scale_deployment(wd, state, replicas_limit)
workers_total += workers_created
else:
logging.warning('No auto scaling setting found for model %s', wd.id_string())
Expand Down
84 changes: 52 additions & 32 deletions kubernetes/worker-controller/src/autoscaler_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from models import ModelState


def get_desired_worker_count(autoscaling_setting: dict, model_state: ModelState):
def get_desired_worker_count(autoscaling_setting: dict, model_state: ModelState, never_shutdown_fixed_workers: bool = False):
"""
This function is called for each worker deployment (model) having one or more
analyses running.
Expand All @@ -14,43 +14,63 @@ def get_desired_worker_count(autoscaling_setting: dict, model_state: ModelState)
:param autoscaling_setting: Auto scaling configuration (see oasis API for more details)
:param model_state: State of this model such as number of running analyses and tasks.
:param never_shutdown_fixed_workers: Debug model which dosn't spin down workers when in fixed mode
:return: Desired number of workers to scale to.
"""

strategy = autoscaling_setting.get('scaling_strategy')
worker_count_min = int(autoscaling_setting.get('worker_count_min'))
worker_count_max = int(get_req_setting(autoscaling_setting, 'worker_count_max'))

if strategy:
if strategy == 'FIXED_WORKERS':
count = int(get_req_setting(autoscaling_setting, 'worker_count_fixed'))
return max(
min(count, worker_count_max),
worker_count_min,
)

elif strategy == 'QUEUE_LOAD':
analyses = model_state['analyses']
return max(
min(analyses, worker_count_max),
worker_count_min,
)

elif strategy == 'DYNAMIC_TASKS':

chunks_per_worker = autoscaling_setting.get('chunks_per_worker')

workers = math.ceil(int(model_state.get('tasks', 0)) / int(chunks_per_worker))
return max(
min(workers, worker_count_max),
worker_count_min,
)

else:
raise ValueError(f'Unsupported scaling strategy: {strategy}')
else:
worker_count_min = int(autoscaling_setting.get('worker_count_min', 0))
analysis_in_progress = any([
model_state.get('tasks', 0) > 0,
model_state.get('analyses', 0) > 0
])

# Guard for missing options
if not strategy:
raise ValueError(f'No valid auto scaling configuration for model: {autoscaling_setting}')

if strategy in ['QUEUE_LOAD', 'DYNAMIC_TASKS']:
worker_count_max = get_req_setting(autoscaling_setting, 'worker_count_max')

# Debugging model (keep all fixed workers alive)
if strategy == 'FIXED_WORKERS' and never_shutdown_fixed_workers:
return max(
int(get_req_setting(autoscaling_setting, 'worker_count_fixed')),
worker_count_min,
)

# Scale down to Minimum worker count
if not analysis_in_progress:
return worker_count_min

# Run a fixed set of workers when analysis is on queue
elif strategy == 'FIXED_WORKERS':
count = int(get_req_setting(autoscaling_setting, 'worker_count_fixed'))
return max(
count,
worker_count_min,
)

# Run one worker per analysis in progress
elif strategy == 'QUEUE_LOAD':
analyses = model_state['analyses']
return max(
min(analyses, worker_count_max),
worker_count_min,
)

# Run `n` workers based on number of tasks on queue
elif strategy == 'DYNAMIC_TASKS':
chunks_per_worker = autoscaling_setting.get('chunks_per_worker')
workers = math.ceil(int(model_state.get('tasks', 0)) / int(chunks_per_worker))
return max(
min(workers, worker_count_max),
worker_count_min,
)

else:
raise ValueError(f'Unsupported scaling strategy: {strategy}')


def get_req_setting(autoscaling_setting: dict, name: str):
"""
Expand Down
8 changes: 4 additions & 4 deletions kubernetes/worker-controller/src/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ async def get_auto_scaling(oasis_model_id):

def test_never_shutdown_fixed_workers(self):

autoscaler = AutoScaler(None, None, None, None, None, False, True)

wd1 = WorkerDeployment('worker-oasislmf-piwind-1', 'oasislmf', 'piwind', '1')
wd1.auto_scaling = {
'scaling_strategy': 'FIXED_WORKERS',
Expand All @@ -203,10 +201,12 @@ def test_never_shutdown_fixed_workers(self):
wd1.replicas = 2
model_state = ModelState(tasks=10, analyses=2, priority=5)

desired_replicas = asyncio.run(autoscaler._scale_deployment(wd1, True, model_state, 10))
autoscaler = AutoScaler(None, None, None, None, None, False, never_shutdown_fixed_workers=True)
desired_replicas = asyncio.run(autoscaler._scale_deployment(wd1, model_state, 10))
self.assertEqual(2, desired_replicas)

desired_replicas = asyncio.run(autoscaler._scale_deployment(wd1, False, model_state, 10))
autoscaler = AutoScaler(None, None, None, None, None, False, never_shutdown_fixed_workers=False)
desired_replicas = asyncio.run(autoscaler._scale_deployment(wd1, model_state, 10))
self.assertEqual(2, desired_replicas)


Expand Down
24 changes: 23 additions & 1 deletion kubernetes/worker-controller/src/tests/test_autoscaler_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,31 @@ def test_fixed_correct(self):
'scaling_strategy': 'FIXED_WORKERS',
'worker_count_fixed': 5
}
state = {
'analyses': 3
}
desired_replicas = autoscaler_rules.get_desired_worker_count(as_conf, state)

self.assertEqual(5, desired_replicas)

def test_min_workers_correct(self):
as_conf = {
'scaling_strategy': 'FIXED_WORKERS',
'worker_count_fixed': 5,
'worker_count_min': 3
}
state = {}
desired_replicas = autoscaler_rules.get_desired_worker_count(as_conf, state)
self.assertEqual(3, desired_replicas)

def test_min_workers__scale_up_correct(self):
as_conf = {
'scaling_strategy': 'FIXED_WORKERS',
'worker_count_fixed': 5,
'worker_count_min': 1
}
state = {'analyses': 1}
desired_replicas = autoscaler_rules.get_desired_worker_count(as_conf, state)
self.assertEqual(5, desired_replicas)

def test_fixed_incorrect_missing_size(self):
Expand All @@ -31,7 +53,7 @@ def test_fixed_incorrect_missing_size(self):
'scaling_strategy': 'FIXED_WORKERS'
}
state = {}
self.assertRaises(ValueError, lambda: autoscaler_rules.get_desired_worker_count(as_conf, state))
self.assertRaises(ValueError, lambda: autoscaler_rules.get_desired_worker_count(as_conf, state, never_shutdown_fixed_workers=True))

def test_queue_load_correct(self):

Expand Down
8 changes: 4 additions & 4 deletions scripts/minikube-deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ if [[ -z $OASIS_MODEL_DATA_DIR ]]; then
fi

## init minikube
minikube delete
minikube config set cpus 12
minikube config set memory 16000
minikube start
#minikube delete
#minikube config set cpus 12
#minikube config set memory 16000
#minikube start

# build images
eval $(minikube docker-env)
Expand Down
1 change: 1 addition & 0 deletions src/server/oasisapi/queues/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class QueueSerializer(serializers.Serializer):
worker_count = serializers.IntegerField()
queued_count = serializers.IntegerField()
running_count = serializers.IntegerField()
queue_message_count = serializers.IntegerField()
models = serializers.SerializerMethodField()

@swagger_serializer_method(serializer_or_field=AnalysisModelSerializer)
Expand Down
Loading

0 comments on commit 48ce98c

Please sign in to comment.