|
24 | 24 | )
|
25 | 25 | from db.python.connect import Connection
|
26 | 26 | from db.python.enum_tables import SequencingTypeTable
|
27 |
| -from db.python.layers.analysis import AnalysisLayer |
| 27 | +from db.python.layers.analysis import AnalysisInternal, AnalysisLayer |
28 | 28 | from db.python.layers.base import BaseLayer
|
29 | 29 | from db.python.layers.family import FamilyLayer
|
30 | 30 | from db.python.layers.participant import ParticipantLayer
|
|
33 | 33 | from db.python.tables.project import Project
|
34 | 34 | from db.python.utils import GenericFilter
|
35 | 35 | from models.enums import AnalysisStatus
|
| 36 | +from models.enums.web import SeqrDatasetType |
36 | 37 |
|
37 | 38 | # literally the most temporary thing ever, but for complete
|
38 | 39 | # automation need to have sample inclusion / exclusion
|
|
43 | 44 |
|
44 | 45 | SEQUENCING_GROUPS_TO_IGNORE = {22735, 22739}
|
45 | 46 |
|
| 47 | +# production-pipelines stage names for each dataset type |
| 48 | +ES_INDEX_STAGES = { |
| 49 | + SeqrDatasetType.SNV_INDEL: 'MtToEs', |
| 50 | + SeqrDatasetType.SV: 'MtToEsSv', |
| 51 | + SeqrDatasetType.GCNV: 'MtToEsCNV', |
| 52 | + SeqrDatasetType.MITO: 'MtToEsMito', |
| 53 | +} |
| 54 | + |
46 | 55 | _url_individuals_sync = '/api/project/sa/{projectGuid}/individuals/sync'
|
47 | 56 | _url_individual_meta_sync = '/api/project/sa/{projectGuid}/individuals_metadata/sync'
|
48 | 57 | _url_family_sync = '/api/project/sa/{projectGuid}/families/sync'
|
@@ -114,6 +123,7 @@ async def sync_dataset(
|
114 | 123 | sync_individual_metadata: bool = True,
|
115 | 124 | sync_individuals: bool = True,
|
116 | 125 | sync_es_index: bool = True,
|
| 126 | + es_index_types: list[SeqrDatasetType] = None, |
117 | 127 | sync_saved_variants: bool = True,
|
118 | 128 | sync_cram_map: bool = True,
|
119 | 129 | post_slack_notification: bool = True,
|
@@ -206,6 +216,7 @@ async def sync_dataset(
|
206 | 216 | self.update_es_index(
|
207 | 217 | sequencing_type=sequencing_type,
|
208 | 218 | sequencing_group_ids=sequencing_group_ids,
|
| 219 | + es_index_types=es_index_types, |
209 | 220 | **params,
|
210 | 221 | )
|
211 | 222 | )
|
@@ -356,9 +367,53 @@ async def sync_individual_metadata(
|
356 | 367 | f'Uploaded individual metadata for {len(processed_records)} individuals'
|
357 | 368 | ]
|
358 | 369 |
|
| 370 | + def check_updated_sequencing_group_ids(self, sequencing_group_ids: set[int], es_index_analyses: list[AnalysisInternal]): |
| 371 | + """Check if the sequencing group IDs have been updated""" |
| 372 | + messages = [] |
| 373 | + if sequencing_group_ids: |
| 374 | + es_index_analyses = sorted( |
| 375 | + es_index_analyses, key=lambda el: el.timestamp_completed |
| 376 | + ) |
| 377 | + sequencing_groups_in_new_index = set( |
| 378 | + es_index_analyses[-1].sequencing_group_ids |
| 379 | + ) |
| 380 | + |
| 381 | + if len(es_index_analyses) > 1: |
| 382 | + sequencing_groups_in_old_index = set( |
| 383 | + es_index_analyses[-2].sequencing_group_ids |
| 384 | + ) |
| 385 | + sequencing_groups_diff = sequencing_group_id_format_list( |
| 386 | + sequencing_groups_in_new_index - sequencing_groups_in_old_index |
| 387 | + ) |
| 388 | + if sequencing_groups_diff: |
| 389 | + messages.append( |
| 390 | + 'Sequencing groups added to index: ' + ', '.join(sequencing_groups_diff), |
| 391 | + ) |
| 392 | + |
| 393 | + sg_ids_missing_from_index = sequencing_group_id_format_list( |
| 394 | + sequencing_group_ids - sequencing_groups_in_new_index |
| 395 | + ) |
| 396 | + if sg_ids_missing_from_index: |
| 397 | + messages.append( |
| 398 | + f'Sequencing groups missing from {es_index_analyses[-1].output}: ' |
| 399 | + + ', '.join(sg_ids_missing_from_index), |
| 400 | + ) |
| 401 | + return messages |
| 402 | + |
| 403 | + async def post_es_index_update(self, session: aiohttp.ClientSession, url: str, post_json: dict, headers: dict[str, str]): |
| 404 | + """Post request to update ES index""" |
| 405 | + resp = await session.post( |
| 406 | + url=url, |
| 407 | + json=post_json, |
| 408 | + headers=headers, |
| 409 | + ) |
| 410 | + resp.raise_for_status() |
| 411 | + return await resp.text() |
| 412 | + |
359 | 413 | async def update_es_index(
|
360 | 414 | self,
|
361 | 415 | session: aiohttp.ClientSession,
|
| 416 | + es_index_types: list[SeqrDatasetType], |
362 | 417 | sequencing_type: str,
|
363 | 418 | project_guid,
|
364 | 419 | headers,
|
@@ -392,72 +447,56 @@ async def update_es_index(
|
392 | 447 | fn_path = os.path.join(SEQR_MAP_LOCATION, filename)
|
393 | 448 | # pylint: disable=no-member
|
394 | 449 |
|
| 450 | + # Only need to write this once, as the POST request will ignore extra samples not in each index synced |
| 451 | + with AnyPath(fn_path).open('w+') as f: # type: ignore |
| 452 | + f.write('\n'.join(rows_to_write)) |
| 453 | + |
395 | 454 | alayer = AnalysisLayer(connection=self.connection)
|
396 | 455 | es_index_analyses = await alayer.query(
|
397 | 456 | AnalysisFilter(
|
398 | 457 | project=GenericFilter(eq=self.connection.project),
|
399 | 458 | type=GenericFilter(eq='es-index'),
|
400 | 459 | status=GenericFilter(eq=AnalysisStatus.COMPLETED),
|
401 |
| - meta={'sequencing_type': GenericFilter(eq=sequencing_type)}, |
| 460 | + meta={ |
| 461 | + 'sequencing_type': GenericFilter(eq=sequencing_type), |
| 462 | + }, |
402 | 463 | )
|
403 | 464 | )
|
404 |
| - |
405 |
| - es_index_analyses = sorted( |
406 |
| - es_index_analyses, |
407 |
| - key=lambda el: el.timestamp_completed, |
408 |
| - ) |
409 |
| - |
410 | 465 | if len(es_index_analyses) == 0:
|
411 | 466 | return ['No ES index to synchronise']
|
412 | 467 |
|
413 |
| - with AnyPath(fn_path).open('w+') as f: # type: ignore |
414 |
| - f.write('\n'.join(rows_to_write)) |
415 |
| - |
416 |
| - es_index = es_index_analyses[-1].output |
417 |
| - |
418 | 468 | messages = []
|
| 469 | + requests = [] # for POST requests to gather |
| 470 | + for es_index_type in es_index_types: |
| 471 | + es_indexes_filtered_by_type: list[AnalysisInternal] = [ |
| 472 | + a |
| 473 | + for a in es_index_analyses |
| 474 | + if a.meta.get('stage') == ES_INDEX_STAGES[es_index_type] |
| 475 | + ] |
| 476 | + if not es_indexes_filtered_by_type: |
| 477 | + messages.append(f'No ES index to synchronise for {es_index_type}') |
| 478 | + continue |
419 | 479 |
|
420 |
| - if sequencing_group_ids: |
421 |
| - sequencing_groups_in_new_index = set( |
422 |
| - es_index_analyses[-1].sequencing_group_ids |
| 480 | + es_indexes_filtered_by_type = sorted( |
| 481 | + es_indexes_filtered_by_type, |
| 482 | + key=lambda el: el.timestamp_completed, |
423 | 483 | )
|
424 | 484 |
|
425 |
| - if len(es_index_analyses) > 1: |
426 |
| - sequencing_groups_in_old_index = set( |
427 |
| - es_index_analyses[-2].sequencing_group_ids |
428 |
| - ) |
429 |
| - sequencing_groups_diff = sequencing_group_id_format_list( |
430 |
| - sequencing_groups_in_new_index - sequencing_groups_in_old_index |
431 |
| - ) |
432 |
| - if sequencing_groups_diff: |
433 |
| - messages.append( |
434 |
| - 'Samples added to index: ' + ', '.join(sequencing_groups_diff), |
435 |
| - ) |
| 485 | + es_index = es_indexes_filtered_by_type[-1].output |
436 | 486 |
|
437 |
| - sg_ids_missing_from_index = sequencing_group_id_format_list( |
438 |
| - sequencing_group_ids - sequencing_groups_in_new_index |
439 |
| - ) |
440 |
| - if sg_ids_missing_from_index: |
441 |
| - messages.append( |
442 |
| - 'Sequencing groups missing from index: ' |
443 |
| - + ', '.join(sg_ids_missing_from_index), |
444 |
| - ) |
| 487 | + messages.extend(self.check_updated_sequencing_group_ids(sequencing_group_ids, es_indexes_filtered_by_type)) |
445 | 488 |
|
446 |
| - req1_url = SEQR_URL + _url_update_es_index.format(projectGuid=project_guid) |
447 |
| - resp_1 = await session.post( |
448 |
| - req1_url, |
449 |
| - json={ |
| 489 | + req1_url = SEQR_URL + _url_update_es_index.format(projectGuid=project_guid) |
| 490 | + post_json = { |
450 | 491 | 'elasticsearchIndex': es_index,
|
451 |
| - 'datasetType': 'VARIANTS', |
| 492 | + 'datasetType': es_index_type.value, |
452 | 493 | 'mappingFilePath': fn_path,
|
453 | 494 | 'ignoreExtraSamplesInCallset': True,
|
454 |
| - }, |
455 |
| - headers=headers, |
456 |
| - ) |
457 |
| - resp_1.raise_for_status() |
458 |
| - |
459 |
| - messages.append(f'Updated ES index {es_index}') |
| 495 | + } |
| 496 | + requests.append(self.post_es_index_update(session, req1_url, post_json, headers)) |
| 497 | + messages.append(f'Updated ES index {es_index}') |
460 | 498 |
|
| 499 | + messages.extend(await asyncio.gather(*requests)) |
461 | 500 | return messages
|
462 | 501 |
|
463 | 502 | async def update_saved_variants(
|
|
0 commit comments