diff --git a/kobo/apps/subsequences/constants.py b/kobo/apps/subsequences/constants.py index d04774ad04..0d2dec5f05 100644 --- a/kobo/apps/subsequences/constants.py +++ b/kobo/apps/subsequences/constants.py @@ -1,5 +1,6 @@ GOOGLETX = 'googletx' GOOGLETS = 'googlets' +GOOGLE_CODE = 'goog' ASYNC_TRANSLATION_DELAY_INTERVAL = 5 diff --git a/kobo/apps/subsequences/exceptions.py b/kobo/apps/subsequences/exceptions.py index aeab813eeb..b87b73aa67 100644 --- a/kobo/apps/subsequences/exceptions.py +++ b/kobo/apps/subsequences/exceptions.py @@ -10,3 +10,11 @@ class TranscriptionResultsNotFound(Exception): """ No results returned by specified transcription service """ + + +class TranslationResultsNotFound(Exception): + pass + + +class TranslationAsyncResultAvailable(Exception): + pass diff --git a/kobo/apps/subsequences/integrations/google/base.py b/kobo/apps/subsequences/integrations/google/base.py index cc37f49bdc..cee45190c5 100644 --- a/kobo/apps/subsequences/integrations/google/base.py +++ b/kobo/apps/subsequences/integrations/google/base.py @@ -1,33 +1,58 @@ import constance from abc import ABC, abstractmethod +from concurrent.futures import TimeoutError from google.cloud import storage +from google.api_core.operation import Operation from googleapiclient import discovery from django.conf import settings +from django.contrib.auth.models import User from django.core.cache import cache from kobo.apps.trackers.utils import update_nlp_counter +from kpi.utils.log import logging from .utils import google_credentials_from_constance_config +from ...models import SubmissionExtras from ...constants import GOOGLE_CACHE_TIMEOUT, make_async_cache_key from ...exceptions import SubsequenceTimeoutError +REQUEST_TIMEOUT = 10 # seconds -class GoogleTask(ABC): + +class GoogleService(ABC): """ Base class for Google transcription/translation task Contains common functions for returning async responses using the Operations API """ - def __init__(self): + # These constants must be set by the inherited service class + API_NAME = None + API_VERSION = None + API_RESOURCE = None + + def __init__(self, submission: SubmissionExtras): super().__init__() - self.asset = None - self.destination_path = None + self.submission = submission + self.asset = submission.asset + self.user = submission.asset.owner self.credentials = google_credentials_from_constance_config() self.storage_client = storage.Client(credentials=self.credentials) - self.bucket = self.storage_client.bucket(bucket_name=settings.GS_BUCKET_NAME) + self.bucket = self.storage_client.bucket( + bucket_name=settings.GS_BUCKET_NAME + ) + + @abstractmethod + def adapt_response(self, results, *args) -> [object]: + pass @abstractmethod - def begin_async_google_operation(self, *args: str) -> (object, int): - return ({}, 0) + def begin_google_operation( + self, + xpath: str, + source_lang: str, + target_lang: str, + content: str, + ) -> (object, int): + pass @property @abstractmethod @@ -37,49 +62,65 @@ def counter_name(self): """ return 'google_' - def update_counters(self, amount) -> None: - update_nlp_counter( - self.counter_name, - amount, - self.asset.owner_id, - self.asset.id, + def handle_google_operation( + self, xpath: str, source_lang: str, target_lang: str, content=None + ) -> str: + submission_id = self.submission.submission_uuid + cache_key = make_async_cache_key( + self.user.pk, submission_id, xpath, source_lang, target_lang ) - - @abstractmethod - def append_operations_response(self, results, *args) -> [object]: - pass - - @abstractmethod - def append_api_response(self, results, *args) -> [object]: - pass - - def handle_google_task_asynchronously(self, api_name, api_version, resource, *args): - cache_key = make_async_cache_key(*args) - # Stop Me If You Think You've Heard This One Before if operation_name := cache.get(cache_key): - google_service = discovery.build(api_name, api_version, credentials=self.credentials) - resource_path = resource.split('.') + google_service = discovery.build( + self.API_NAME, self.API_VERSION, credentials=self.credentials + ) + resource_path = self.API_RESOURCE.split('.') for subresource in resource_path: google_service = getattr(google_service, subresource)() - operation = google_service.get(name=operation_name) - operation = operation.execute() - if not (operation.get('done') or operation.get('state') == 'SUCCEEDED'): + operation = google_service.get(name=operation_name).execute() + if not ( + operation.get('done') or operation.get('state') == 'SUCCEEDED' + ): raise SubsequenceTimeoutError - transcript = self.append_operations_response(operation, *args) + cache.delete(cache_key) + return self.adapt_response(operation) else: - print(f'--couldn\'t find key {cache_key}') - (results, amount) = self.begin_async_google_operation(*args) - print(results.operation) - cache.set(cache_key, results.operation.name, GOOGLE_CACHE_TIMEOUT) - print(cache.get(cache_key)) - self.update_counters(amount) + (response, amount) = self.begin_google_operation( + xpath, source_lang, target_lang, content + ) + if isinstance(response, Operation): + cache.set( + cache_key, response.operation.name, GOOGLE_CACHE_TIMEOUT + ) + self.update_counters(amount) + try: + result = response.result(timeout=REQUEST_TIMEOUT) + except TimeoutError as err: + raise SubsequenceTimeoutError from err + + cache.delete(cache_key) + return self.adapt_response(result) + if isinstance(response, str): + return response + + @abstractmethod + def process_data(self, qpath: str, options: dict) -> dict: + pass - try: - result = results.result(timeout=REQUEST_TIMEOUT) - except TimeoutError as err: - raise SubsequenceTimeoutError from err - transcript = self.append_api_response(result, *args) + def qpath_to_xpath(self, qpath: str) -> str: + xpath = None + for row in self.asset.content['survey']: + if '$qpath' in row and '$xpath' in row and row['$qpath'] == qpath: + xpath = row['$xpath'] + break + if xpath is None: + raise KeyError(f'xpath for {qpath=} not found') + return xpath - cache.delete(cache_key) - return transcript + def update_counters(self, amount) -> None: + update_nlp_counter( + self.counter_name, + amount, + self.asset.owner_id, + self.asset.id, + ) diff --git a/kobo/apps/subsequences/integrations/google/google_transcribe.py b/kobo/apps/subsequences/integrations/google/google_transcribe.py index 34a986ee9f..a1c5d4af75 100644 --- a/kobo/apps/subsequences/integrations/google/google_transcribe.py +++ b/kobo/apps/subsequences/integrations/google/google_transcribe.py @@ -6,141 +6,163 @@ import constance from django.conf import settings -from django.contrib.auth.models import User from google.cloud import speech, storage -from .base import GoogleTask +from kobo.apps.languages.models.transcription import TranscriptionService +from kpi.utils.log import logging +from .base import GoogleService +from ...constants import GOOGLE_CODE, GOOGLETS from ...exceptions import ( AudioTooLongError, SubsequenceTimeoutError, TranscriptionResultsNotFound, ) -REQUEST_TIMEOUT = 10 # seconds # https://cloud.google.com/speech-to-text/quotas#content ASYNC_MAX_LENGTH = timedelta(minutes=479) SYNC_MAX_LENGTH = timedelta(seconds=59) SYNC_MAX_BYTES = 10000000 # 10MB -class AutoTranscription: - """ - The engine for transcribing audio files - """ - def store_transcript(self, transcript, asset, submission_id): - pass +class GoogleTranscriptionService(GoogleService): + API_NAME = 'speech' + API_VERSION = 'v1' + API_RESOURCE = 'operations' - -class GoogleTranscribeEngine(AutoTranscription, GoogleTask): - def __init__(self): - super().__init__() + def __init__(self, *args): + super().__init__(*args) self.destination_path = None - self.storage_client = storage.Client(credentials=self.credentials) - self.bucket = self.storage_client.bucket(bucket_name=settings.GS_BUCKET_NAME) - - def get_converted_audio( - self, - xpath: str, - submission_id: int, - user: object - ): - attachment = self.asset.deployment.get_attachment( - submission_id, user, xpath=xpath - ) - return attachment.get_transcoded_audio('flac', include_duration=True) - - def store_file(self, content): - # Store temporary file. Needed to avoid limits. - # Set Life cycle expiration to delete after 1 day - # https://cloud.google.com/storage/docs/lifecycle - self.destination_path = posixpath.join( - constance.config.ASR_MT_GOOGLE_STORAGE_BUCKET_PREFIX, - f'{uuid.uuid4()}.flac' - ) - - # send the audio file to google storage - destination = self.bucket.blob(self.destination_path) - destination.upload_from_string( - content, - content_type='audio/flac', - ) - return self.destination_path - def transcribe_file( - self, - asset, - xpath: str, - # note: this works with a uuid string ontop of cdd172b - submission_id: int, - source: str, - user: User, - ): - """ - Transcribe file with cache layer around Google operations - When speech api times out, rerun function with same params - to check if operation is finished and return results - """ - self.asset = asset - self.user = user - - return self.handle_google_task_asynchronously('speech', 'v1', 'operations', user.pk, submission_id, xpath, source) - - def begin_async_google_operation(self, submission_id, xpath, source): - # get the audio file in a Google supported format - flac_content, duration = self.get_converted_audio( - xpath=xpath, - submission_id=submission_id, - user=self.user, - ) + def adapt_response(self, response: dict | list) -> str: + transcript = [] + if isinstance(response, dict): + try: + results = response['response']['results'] + except KeyError: + return '' + + for result in results: + alternatives = result['alternatives'] + transcript.append(alternatives[0]['transcript']) + else: + for result in response.results: + alternatives = result.alternatives + transcript.append(alternatives[0].transcript) + result_string = ' '.join(transcript) + return result_string + + def begin_google_operation(self, xpath, source_lang, target_lang, content): + submission_uuid = self.submission.submission_uuid + flac_content, duration = content total_seconds = int(duration.total_seconds()) # Create the parameters required for the transcription - speech_client = speech.SpeechClient( - credentials=self.credentials - ) + speech_client = speech.SpeechClient(credentials=self.credentials) config = speech.RecognitionConfig( - language_code=source, + language_code=source_lang, enable_automatic_punctuation=True, ) if duration < SYNC_MAX_LENGTH and len(flac_content) < SYNC_MAX_BYTES: + logging.info( + f"Synchronous transcription for {submission_uuid=}, {xpath=}" + ) # Performance optimization, it's faster directly audio = speech.RecognitionAudio(content=flac_content) elif duration < ASYNC_MAX_LENGTH: + logging.info( + f"Starting async transcription for {submission_uuid=}, {xpath=}" + ) # Store larger files on gcloud gcs_path = self.store_file(flac_content) - audio = speech.RecognitionAudio(uri=f'gs://{settings.GS_BUCKET_NAME}/{gcs_path}') + audio = speech.RecognitionAudio( + uri=f'gs://{settings.GS_BUCKET_NAME}/{gcs_path}' + ) else: - raise AudioTooLongError('Audio file of duration %s is too long.' % duration) + raise AudioTooLongError( + 'Audio file of duration %s is too long.' % duration + ) - speech_results = speech_client.long_running_recognize(audio=audio, config=config) + speech_results = speech_client.long_running_recognize( + audio=audio, config=config + ) return (speech_results, total_seconds) @property def counter_name(self): return 'google_asr_seconds' - def append_operations_response(self, operation, *args): - # operations api uses a dict, while speech api uses objects + def get_converted_audio( + self, xpath: str, submission_uuid: int, user: object + ): + attachment = self.asset.deployment.get_attachment( + submission_uuid, user, xpath=xpath + ) + return attachment.get_transcoded_audio('flac', include_duration=True) + + def process_data(self, qpath, vals) -> dict: + autoparams = vals[GOOGLETS] + language_code = autoparams.get('languageCode') + region_code = autoparams.get('regionCode') + vals[GOOGLETS] = { + 'status': 'in_progress', + 'languageCode': language_code, + 'regionCode': region_code, + } + xpath = self.qpath_to_xpath(qpath) + region_or_language_code = region_code or language_code + result_string = '' + results = [] try: - results = operation['response']['results'] - except KeyError: - raise TranscriptionResultsNotFound - transcript = [] - for result in results: - alternatives = result['alternatives'] - transcript.append({ - 'transcript': alternatives[0]['transcript'], - 'confidence': alternatives[0]['confidence'], - }) - return transcript - - def append_api_response(self, results, *args): - # ensure this object based version matches operations api version - transcript = [] - for result in results: - alternatives = result.alternatives - transcript.append({ - 'transcript': alternatives[0].transcript, - 'confidence': alternatives[0].confidence, - }) - return transcript + flac_content, duration = self.get_converted_audio( + xpath, + self.submission.submission_uuid, + self.user, + ) + value = self.transcribe_file( + xpath, region_or_language_code, (flac_content, duration) + ) + except SubsequenceTimeoutError: + logging.error( + f'Timeout error; async processing triggered for xpath={xpath}' + ) + return { + 'status': 'in_progress', + 'languageCode': language_code, + 'regionCode': region_code, + } + except TranscriptionResultsNotFound: + logging.error(f'No transcriptions found for xpath={xpath}') + + return { + 'status': 'complete', + 'value': value, + 'languageCode': language_code, + 'regionCode': region_code, + } + + def transcribe_file( + self, xpath: str, source_lang: str, content: (object, int) + ) -> str: + """ + Transcribe file with cache layer around Google operations + When speech api times out, rerun function with same params + to check if operation is finished and return results + """ + return self.handle_google_operation(xpath, source_lang, None, content) + + def store_file(self, content): + # Store temporary file. Needed to avoid limits. + # Set Life cycle expiration to delete after 1 day + # https://cloud.google.com/storage/docs/lifecycle + self.destination_path = posixpath.join( + constance.config.ASR_MT_GOOGLE_STORAGE_BUCKET_PREFIX, + f'{uuid.uuid4()}.flac', + ) + + # send the audio file to google storage + destination = self.bucket.blob(self.destination_path) + destination.upload_from_string( + content, + content_type='audio/flac', + ) + return self.destination_path diff --git a/kobo/apps/subsequences/integrations/google/google_translate.py b/kobo/apps/subsequences/integrations/google/google_translate.py index 7d1787020d..eb24accac8 100644 --- a/kobo/apps/subsequences/integrations/google/google_translate.py +++ b/kobo/apps/subsequences/integrations/google/google_translate.py @@ -8,12 +8,16 @@ from google.api_core.exceptions import InvalidArgument from google.cloud import translate_v3 as translate, storage -from .base import GoogleTask +from kobo.apps.languages.models.translation import TranslationService +from kpi.utils.log import logging +from .base import GoogleService, REQUEST_TIMEOUT from .utils import google_credentials_from_constance_config -from ..misc import ( - TranslationException, +from ...constants import GOOGLETX, GOOGLE_CODE +from ...exceptions import ( + SubsequenceTimeoutError, + TranslationResultsNotFound, + TranslationAsyncResultAvailable, ) -from ...constants import GOOGLETX MAX_SYNC_CHARS = 30720 @@ -22,14 +26,17 @@ def _hashed_strings(self, *strings): return md5(''.join(strings).encode()).hexdigest()[0:10] -class GoogleTranslationEngine(GoogleTask): - def __init__(self): +class GoogleTranslationService(GoogleService): + API_NAME = 'translate' + API_VERSION = 'v3' + API_RESOURCE = 'projects.locations.operations' + + def __init__(self, *args): + super().__init__(*args) + self.translate_client = translate.TranslationServiceClient( credentials=google_credentials_from_constance_config() ) - self.storage_client = storage.Client( - credentials=google_credentials_from_constance_config() - ) self.translate_parent = ( f'projects/{constance.config.ASR_MT_GOOGLE_PROJECT_ID}' ) @@ -40,81 +47,63 @@ def __init__(self): f'projects/{constance.config.ASR_MT_GOOGLE_PROJECT_ID}/' f'locations/{constance.config.ASR_MT_GOOGLE_TRANSLATION_LOCATION}' ) - self.bucket = self.storage_client.bucket(bucket_name=settings.GS_BUCKET_NAME) - self.bucket_prefix = constance.config.ASR_MT_GOOGLE_STORAGE_BUCKET_PREFIX - - super().__init__() + self.bucket_prefix = ( + constance.config.ASR_MT_GOOGLE_STORAGE_BUCKET_PREFIX + ) self.date_string = date.today().isoformat() - @property - def counter_name(self): - return 'google_mt_characters' + def adapt_response(self, response): + if isinstance( + response, translate.types.translation_service.TranslateTextResponse + ): + return response.translations[0].translated_text + elif isinstance(response, str): + return response + elif isinstance(response, dict) and response.get('done'): + raise TranslationAsyncResultAvailable() - def translation_must_be_async(self, content): - return len(content) > MAX_SYNC_CHARS + return '' - def translate_async( - self, - asset, - submission_uuid: str, - username: str, - xpath: str, - content: str, - source_lang: str, - target_lang: str, - ): - self.source_lang = source_lang - self.target_lang = target_lang - self.content = content - self.asset = asset - return self.handle_google_task_asynchronously( - 'translate', - 'v3', - 'projects.locations.operations', - submission_uuid, - username, - xpath, - source_lang, - target_lang, + def begin_google_operation( + self, xpath, source_lang, target_lang, content + ) -> (object, int): + source_path, output_path = self.get_unique_paths( + xpath, source_lang, target_lang ) - def begin_async_google_operation( - self, - submission_uuid, - username, - xpath, - source_lang, - target_lang, - ) -> (object, int): - _uniq_path = _hashed_strings(submission_uuid, xpath, username) - _uniq_dir = f'{self.date_string}/{_uniq_path}/{source_lang}/{target_lang}/' - source_path = posixpath.join(self.bucket_prefix, _uniq_dir, 'source.txt') - self.output_dir = posixpath.join(self.bucket_prefix, _uniq_dir, 'completed/') + # check if directory is not empty + if stored_result := self.get_stored_result(target_lang, output_path): + logging.info(f'Found stored results in {output_path=}') + return (stored_result, len(content)) + logging.info( + f'Starting async translation for {self.submission.submission_uuid=} {xpath=}' + ) dest = self.bucket.blob(source_path) - if not dest.exists(): - dest.upload_from_string(self.content) + dest.upload_from_string(content) req_params = { 'parent': self.translate_async_parent, - 'source_language_code': self.source_lang, - 'target_language_codes': [self.target_lang], - 'input_configs': [{ - 'gcs_source': { - 'input_uri': f'gs://{settings.GS_BUCKET_NAME}/{source_path}' - }, - 'mime_type': 'text/plain', - }], + 'source_language_code': source_lang, + 'target_language_codes': [target_lang], + 'input_configs': [ + { + 'gcs_source': { + 'input_uri': f'gs://{settings.GS_BUCKET_NAME}/{source_path}' + }, + 'mime_type': 'text/plain', + } + ], 'output_config': { 'gcs_destination': { 'output_uri_prefix': ( - f'gs://{settings.GS_BUCKET_NAME}/{self.output_dir}' + f'gs://{settings.GS_BUCKET_NAME}/{output_path}' ) } }, 'labels': { - 'username': username, - 'submission': submission_uuid, + 'username': self.user.username, + 'submission': self.submission.submission_uuid, # this needs to be lowercased to comply with google's API 'xpath': xpath.lower(), }, @@ -123,77 +112,99 @@ def begin_async_google_operation( response = self.translate_client.batch_translate_text( request=req_params ) - return (response, len(self.content)) - - # return { - # 'operation_name': operation_name, - # 'operation_dir': output_dir, - # 'blob_name_includes': f'_{target_lang}_translations', - # 'submission_uuid': submission_uuid, - # 'xpath': xpath, - # 'target_lang': target_lang, - # } - - # operation_client = operations_v1.OperationsClient( - # channel=translate_client.transport.grpc_channel, - # ) - # operation = operation_client.get_operation(name=operation_name) - # if operation.done: - # for blob in bucket.list_blobs(prefix=operation_dir): - # if blob_name_includes in blob.name: - # text = blob.download_as_text(), - # blob.delete() - # else: - # blob.delete() - # save_async_translation(text, submission_uuid, xpath, target_lang) - - def save_async_translation(self, text, submission_uuid, xpath, target_lang): - from kobo.apps.subsequences.models import SubmissionExtras - submission = SubmissionExtras.objects.get(submission_uuid=submission_uuid) - submission.content[xpath][GOOGLETX] = { - 'status': 'complete', - 'languageCode': target_lang, - 'value': text, - } - submission.save() + return (response, len(content)) - def append_operations_response( - self, - results, - submission_uuid, - username, - xpath, - source_lang, - target_lang, - ): - _uniq_path = _hashed_strings(submission_uuid, xpath, username) - _uniq_dir = f'{self.date_string}/{_uniq_path}/{source_lang}/{target_lang}/' - output_dir = posixpath.join(self.bucket_prefix, _uniq_dir, 'completed/') - text = None - print(results) - print(self.bucket.__dict__) + @property + def counter_name(self): + return 'google_mt_characters' + + def get_stored_result(self, target_lang, output_dir: str): + text = '' for blob in self.bucket.list_blobs(prefix=output_dir): if f'_{target_lang}_translations' in blob.name: - text = blob.download_as_text(), + text = blob.download_as_text() blob.delete() else: blob.delete() - if not text: - raise TranslationException - self.save_async_translation(text, submission_uuid, xpath, target_lang) - return (results, len(self.content)) - def append_api_response(self, results, *args): - return self.append_operations_response(results, *args) + return text + + def get_unique_paths(self, xpath, source_lang, target_lang): + submission_uuid = self.submission.submission_uuid + _hash = _hashed_strings( + self.submission.submission_uuid, xpath, self.user.username + ) + _uniq_dir = f'{self.date_string}/{_hash}/{source_lang}/{target_lang}/' + source_path = posixpath.join( + self.bucket_prefix, _uniq_dir, 'source.txt' + ) + output_path = posixpath.join( + self.bucket_prefix, _uniq_dir, 'completed/' + ) + return source_path, output_path + + def process_data(self, qpath: str, vals: dict) -> dict: + autoparams = vals[GOOGLETX] + xpath = self.qpath_to_xpath(qpath) + try: + content = vals['transcript']['value'] + source_lang = vals['transcript']['languageCode'] + target_lang = autoparams.get('languageCode') + except KeyError: + logging.exception('Error while setting up translation processing') + return {'status': 'error'} + + lang_service = TranslationService.objects.get(code=GOOGLE_CODE) + try: + value = self.translate_content( + xpath, + lang_service.get_language_code(source_lang), + lang_service.get_language_code(target_lang), + content, + ) + except SubsequenceTimeoutError: + return { + 'status': 'in_progress', + 'source': source_lang, + 'languageCode': target_lang, + 'value': None, + } + except (TranslationResultsNotFound, InvalidArgument): + logging.exception('Error when processing translation') + return { + 'status': 'error', + 'source': source_lang, + 'languageCode': target_lang, + 'value': None, + } + except TranslationAsyncResultAvailable: + _, output_path = self.get_unique_paths( + xpath, source_lang, target_lang + ) + logging.info( + f'Fetching stored results for {self.submission.submission_uuid=} {xpath=}, {output_path=}' + ) + value = self.get_stored_result(target_lang, output_path) + + return { + 'status': 'complete', + 'source': source_lang, + 'languageCode': target_lang, + 'value': value, + } - def translate_sync( + def translate_content( self, - content: str, - username: str, - target_lang: str, + xpath: str, source_lang: str, - ) -> str: - try: + target_lang: str, + content: str, + ): + content_size = len(content) + if content_size <= MAX_SYNC_CHARS: + logging.info( + f'Starting sync translation for {self.submission.submission_uuid=} {xpath=}' + ) response = self.translate_client.translate_text( request={ 'contents': [content], @@ -201,9 +212,12 @@ def translate_sync( 'target_language_code': target_lang, 'parent': self.translate_parent, 'mime_type': 'text/plain', - 'labels': {'username': username}, + 'labels': {'username': self.user.username}, } ) - except InvalidArgument as e: - raise TranslationException(e.message) - return response.translations[0].translated_text + self.update_counters(content_size) + return self.adapt_response(response) + else: + response = self.handle_google_operation( + xpath, source_lang, target_lang, content + ) diff --git a/kobo/apps/subsequences/integrations/misc.py b/kobo/apps/subsequences/integrations/misc.py index 8b491c3aea..7cc3ba5949 100644 --- a/kobo/apps/subsequences/integrations/misc.py +++ b/kobo/apps/subsequences/integrations/misc.py @@ -32,52 +32,3 @@ def translate( self, content: str, target_lang: str, source_lang: Optional[str] = None ) -> str: pass - -class GoogleTranslationEngineAsyncResult: - def __init__( - self, - username: str, - submission_uuid: str, - operation_name: str, - output_filename: str, - *args, - **kwargs - ): - super().__init__() - self.operation_client = operations_v1.OperationsClient( - channel=self.translate_client.transport.grpc_channel - ) - self.username = username - self.submission_uuid = submission_uuid - self.output_filename = output_filename - self.operation_name = operation_name - self.state = 'BARDO' - - def _wait_for_result(self): - duration = 0 - wait = 10 - done = False - while not done and duration < TIMEOUT: - done = self.operation_client.get_operation( - name=self.operation_name - ).done - time.sleep(wait) - duration += wait - self.state = 'SUCCEEDED' - return True - - def _cleanup(self) -> None: - for blob in self.bucket.list_blobs( - prefix=f'{self.username}/{self.submission_uuid}' - ): - blob.delete() - return True - - def _get_content(self): - return self.bucket.get_blob(self.output_filename).download_as_text() - - def result(self): - self._wait_for_result() - _result = self._get_content() - self._cleanup() - return _result diff --git a/kobo/apps/subsequences/integrations/translate.py b/kobo/apps/subsequences/integrations/translate.py index 81426b0ac6..68c33743c1 100644 --- a/kobo/apps/subsequences/integrations/translate.py +++ b/kobo/apps/subsequences/integrations/translate.py @@ -7,7 +7,6 @@ Union, ) -from .google.google_translate import GoogleTranslationEngine from .misc import TranslationException @@ -25,7 +24,7 @@ class Engine: class Translate: SERVICES = { 'google': Engine( - engine=GoogleTranslationEngine, + engine=None, label='Google Transcription Services', description='Translation services provided by Google.', enabled=True, diff --git a/kobo/apps/subsequences/models.py b/kobo/apps/subsequences/models.py index b7cf09e948..c13ee92491 100644 --- a/kobo/apps/subsequences/models.py +++ b/kobo/apps/subsequences/models.py @@ -1,17 +1,11 @@ # coding: utf-8 from django.db import models -from kobo.apps.languages.models.transcription import TranscriptionService -from kobo.apps.languages.models.translation import TranslationService from kobo.apps.trackers.utils import update_nlp_counter from kpi.models import Asset from kpi.models.abstract_models import AbstractTimeStampedModel from kpi.utils.log import logging from .constants import GOOGLETS, GOOGLETX, ASYNC_TRANSLATION_DELAY_INTERVAL -from .exceptions import SubsequenceTimeoutError, TranscriptionResultsNotFound -from .integrations.google.google_transcribe import GoogleTranscribeEngine -from .integrations.google.google_translate import GoogleTranslationEngine -from .tasks import handle_google_translation_operation from .utils.determine_export_cols_with_values import ( determine_export_cols_indiv, ) @@ -34,119 +28,21 @@ class Meta: unique_together = (('asset', 'submission_uuid'),) def save(self, *args, **kwargs): - features = self.asset.advanced_features - if 'transcript' in features: - for qpath, vals in self.content.items(): - try: - autoparams = vals[GOOGLETS] - status = autoparams['status'] - if status == 'requested': - engine = GoogleTranscribeEngine() - service = TranscriptionService.objects.get(code='goog') - language_code = autoparams.get('languageCode') - region_code = autoparams.get('regionCode') - vals[GOOGLETS] = { - 'status': 'in_progress', - 'languageCode': language_code, - 'regionCode': region_code, - } - for row in self.asset.content['survey']: - if '$qpath' in row and '$xpath' in row: - if row['$qpath'] == qpath: - xpath = row['$xpath'] - break - region_or_language_code = region_code or language_code - try: - results = engine.transcribe_file( - asset=self.asset, - xpath=xpath, - source=region_or_language_code, - submission_id=self.submission_uuid, - user=self.asset.owner, - ) - except SubsequenceTimeoutError: - continue - except TranscriptionResultsNotFound: - logging.error(f'No transcriptions found for {xpath}') - result_string = '' - results = [] - else: - result_string = ' '.join( - [r['transcript'] for r in results] - ) - - vals[GOOGLETS] = { - 'status': 'complete', - 'value': result_string, - 'fullResponse': results, - 'languageCode': language_code, - 'regionCode': region_code, - } - else: - continue - except (KeyError, TypeError) as err: - continue + from .integrations.google.google_transcribe import GoogleTranscriptionService + from .integrations.google.google_translate import GoogleTranslationService - if 'translation' in features: - for key, vals in self.content.items(): - try: - autoparams = vals[GOOGLETX] - status = autoparams['status'] - if status == 'requested': - content = vals['transcript']['value'] - source_lang = vals['transcript']['languageCode'] - target_lang = autoparams.get('languageCode') - elif status == 'complete': - content = False - except KeyError as err: - content = False - if not content: - continue - tx_engine = GoogleTranslationEngine() - # FIXME Code is hardcoded and should be dynamic - service = TranslationService.objects.get(code='goog') - if tx_engine.translation_must_be_async(content): - # must queue - try: - (response, _) = tx_engine.translate_async( - asset=self.asset, - # the string to translate - content=content, - # field IDs to tell us where to save results - submission_uuid=self.submission_uuid, - xpath=key, - # username is used in the label of the request - username=self.asset.owner.username, - # the rest - source_lang=service.get_language_code(source_lang), - target_lang=service.get_language_code(target_lang), - ) - except SubsequenceTimeoutError: - continue - # FIXME: clobbers previous translations; we want a record - # of what Google returned, and another async translation - # could be in progress - vals[GOOGLETX] = { - 'status': 'complete', - 'source': source_lang, - 'languageCode': target_lang, - 'value': response, - } - else: - results = tx_engine.translate_sync( - content=content, - source_lang=service.get_language_code(source_lang), - target_lang=service.get_language_code(target_lang), - username=self.asset.owner.username, - ) - # FIXME: clobbers previous translations; we want a record - # of what Google returned - vals[GOOGLETX] = { - 'status': 'complete', - 'source': source_lang, - 'languageCode': target_lang, - 'value': results, - } + features = self.asset.advanced_features + for qpath, vals in self.content.items(): + if 'transcript' in features: + options = vals.get(GOOGLETS, {}) + if options.get('status') == 'requested': + service = GoogleTranscriptionService(self) + vals[GOOGLETS] = service.process_data(qpath, vals) + if 'translation' in features: + options = vals.get(GOOGLETX, {}) + if options.get('status') == 'requested': + service = GoogleTranslationService(self) + vals[GOOGLETX] = service.process_data(qpath, vals) asset_changes = False asset_known_cols = self.asset.known_cols diff --git a/kobo/apps/subsequences/tasks/__init__.py b/kobo/apps/subsequences/tasks/__init__.py index 4b51df526c..e69de29bb2 100644 --- a/kobo/apps/subsequences/tasks/__init__.py +++ b/kobo/apps/subsequences/tasks/__init__.py @@ -1 +0,0 @@ -from .handle_translation_operation import handle_google_translation_operation diff --git a/kobo/apps/subsequences/tasks/handle_translation_operation.py b/kobo/apps/subsequences/tasks/handle_translation_operation.py deleted file mode 100644 index 58083d3416..0000000000 --- a/kobo/apps/subsequences/tasks/handle_translation_operation.py +++ /dev/null @@ -1,70 +0,0 @@ -from celery import shared_task -from django.conf import settings -from google.api_core import operations_v1 -from google.cloud import translate_v3 as translate, storage - -from ..constants import ( - GOOGLETX, - ASYNC_TRANSLATION_DELAY_INTERVAL, -) -from ..integrations.google.utils import google_credentials_from_constance_config - -# TODO: Transcriptions take a different approach that has the benefit of -# avoiding Celery, by contacting Google only when the browser polls for -# updates: -# https://github.com/kobotoolbox/kpi/blob/d3b81c6d1a647c3676c0cc0affada8979d8f5112/kobo/apps/subsequences/integrations/google/google_transcribe.py#L89 -# However, this means that someone starting an async job, closing -# their browser, and coming back too late (1+ day later?) would never get their -# results. The pros and cons of each approach need to be evaluated, and a -# standard approach needs to be adopted for both transcriptions and -# translations —jnm 20240403 - -@shared_task -def handle_google_translation_operation( - operation_name: str, - operation_dir: str, - blob_name_includes: str, - submission_uuid: str, - xpath: str, - target_lang: str, -) -> None: - translate_client = translate.TranslationServiceClient( - credentials=google_credentials_from_constance_config() - ) - storage_client = storage.Client( - credentials=google_credentials_from_constance_config() - ) - bucket = storage_client.bucket(bucket_name=settings.GS_BUCKET_NAME) - operation_client = operations_v1.OperationsClient( - channel=translate_client.transport.grpc_channel, - ) - operation = operation_client.get_operation(name=operation_name) - if operation.done: - for blob in bucket.list_blobs(prefix=operation_dir): - if blob_name_includes in blob.name: - text = blob.download_as_text(), - blob.delete() - else: - blob.delete() - save_async_translation(text, submission_uuid, xpath, target_lang) - else: - # check back again later - handle_google_translation_operation.apply_async(( - operation_name, - operation_dir, - blob_name_includes, - submission_uuid, - xpath, - target_lang, - ), countdown=ASYNC_TRANSLATION_DELAY_INTERVAL) - - -def save_async_translation(text, submission_uuid, xpath, target_lang): - from kobo.apps.subsequences.models import SubmissionExtras - submission = SubmissionExtras.objects.get(submission_uuid=submission_uuid) - submission.content[xpath][GOOGLETX] = { - 'status': 'complete', - 'languageCode': target_lang, - 'value': text, - } - submission.save() diff --git a/kobo/apps/subsequences/tasks/handlers.py b/kobo/apps/subsequences/tasks/handlers.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/kobo/apps/subsequences/tasks/queues.py b/kobo/apps/subsequences/tasks/queues.py deleted file mode 100644 index bd0c7a5468..0000000000 --- a/kobo/apps/subsequences/tasks/queues.py +++ /dev/null @@ -1,23 +0,0 @@ -import json -from celery import shared_task - -# note this is not currently imported from tasks/__init__.py -# jnm: assume this is dead code after #4002? unless we return to a Celery-based -# approach for async transcriptions and translations -@shared_task -def queue_transcript(**params): - from kobo.apps.subsequences.integrations.google.google_transcribe import GoogleTranscribeEngine - asset_uuid = params['asset_uid'] - submission_uuid = params.get('submission_uuid') - submission_id = params.get('submission_id') - service = params['service'] - lang_code = params['lang_code'] - - engine = GoogleTranscribeEngine() - - xpath = 'path/to/question' - submission_id = 1 - source = 'en-US' - # user = asset.owner - transcript = engine.transcribe_file(asset_uuid, xpath, submission_id, - source, user) diff --git a/kobo/apps/subsequences/tests/test_automatic_transcription.py b/kobo/apps/subsequences/tests/test_automatic_transcription.py deleted file mode 100644 index 3ad064490b..0000000000 --- a/kobo/apps/subsequences/tests/test_automatic_transcription.py +++ /dev/null @@ -1,65 +0,0 @@ -import pytest -from ..actions.base import ACTION_NEEDED, PASSES -from ..actions.automatic_transcription import ( - AutomaticTranscriptionAction, - REQUESTED_BY_USER, - PENDING, -) - - -TEST_TRANSCRIPTION_SERVICES = [ - 'acme_1_speech2text', - 'optimus_transcribers', - 'wonka_stenographers', -] - - -def _survey_and_submission(): - survey = {'survey': [{'type': 'audio', 'name': 'ask_a_question'}]} - submission = {'ask_a_question': 'blah.mp3', '_attachments': [ - {'filename': 'blah.mp3', } - ]} - return survey, submission - - -def test_param_builder(): - AutomaticTranscriptionAction.TRANSCRIPTION_SERVICES = TEST_TRANSCRIPTION_SERVICES - survey = _survey_and_submission()[0] - built_params = AutomaticTranscriptionAction.build_params({}, survey) - assert built_params['values'] == ['ask_a_question'] - assert 'services' in built_params - - -def test_instantiate_action_with_params(): - survey = _survey_and_submission()[0] - action_params = AutomaticTranscriptionAction.build_params({}, survey) - action_instance = AutomaticTranscriptionAction(action_params) - assert action_instance is not None - - -@pytest.mark.skip(reason='transcription currently does not depend on this working') -def test_submission_status_before_change(): - survey, submission = _survey_and_submission() - action_params = AutomaticTranscriptionAction.build_params({}, survey) - action_instance = AutomaticTranscriptionAction(action_params) - - # check that the changes ARE needed - assert action_instance.check_submission_status(submission) == ACTION_NEEDED - - # run the change on the submission - submission = action_instance.run_change(submission) - # # check that the changes are NO LONGER needed - assert action_instance.check_submission_status(submission) == PASSES - - # a user indicating they want an automatic trancription changes the value - # in the submission._supplementalDetails from NOT_REQUESTED to REQUESTED_BY_USER - sdeets = submission['_supplementalDetails'] - example_fs_key = [*sdeets.keys()][0] - sdeets[example_fs_key] = REQUESTED_BY_USER - assert action_instance.check_submission_status(submission) == ACTION_NEEDED - - submission = action_instance.run_change(submission) - assert action_instance.check_submission_status(submission) == PASSES - sdeets = submission['_supplementalDetails'] - example_fs_key = [*sdeets.keys()][0] - assert sdeets[example_fs_key] == PENDING diff --git a/kobo/apps/subsequences/tests/test_nlp_integration.py b/kobo/apps/subsequences/tests/test_nlp_integration.py new file mode 100644 index 0000000000..d5e875f2cb --- /dev/null +++ b/kobo/apps/subsequences/tests/test_nlp_integration.py @@ -0,0 +1,130 @@ +import pytest + +from django.conf import settings +from django.test import TestCase +from django.utils import timezone +from model_bakery import baker +from unittest.mock import patch, Mock + +from kpi.models import Asset +from ..actions.base import ACTION_NEEDED, PASSES +from ..actions.automatic_transcription import ( + AutomaticTranscriptionAction, + REQUESTED_BY_USER, + PENDING, +) +from ..constants import GOOGLETS, GOOGLETX +from ..models import SubmissionExtras + +TEST_TRANSCRIPTION_SERVICES = [ + 'acme_1_speech2text', + 'optimus_transcribers', + 'wonka_stenographers', +] + + +class NLPIntegrationTestCase(TestCase): + + fixtures = ['test_data'] + + def setUp(self): + user = baker.make( + settings.AUTH_USER_MODEL, + username='johndoe', + date_joined=timezone.now().today, + ) + + self.asset = Asset.objects.create( + owner=user, content={ + 'survey': [ + {'type': 'audio', 'name': 'ask_a_question'} + ] + } + ) + self.asset.advanced_features = { + 'transcript': {'languages': ['en']}, + 'translation': {'languages': ['en', 'es']}, + } + + def test_param_builder(self): + AutomaticTranscriptionAction.TRANSCRIPTION_SERVICES = TEST_TRANSCRIPTION_SERVICES + survey = self.asset.content + built_params = AutomaticTranscriptionAction.build_params({}, survey) + assert built_params['values'] == ['ask_a_question'] + assert 'services' in built_params + + + def test_instantiate_action_with_params(self): + survey = self.asset.content + action_params = AutomaticTranscriptionAction.build_params({}, survey) + action_instance = AutomaticTranscriptionAction(action_params) + assert action_instance is not None + + @pytest.mark.skip(reason='transcription currently does not depend on this working') + def test_submission_status_before_change(): + survey = self.asset.content + submission = {'ask_a_question': 'blah.mp3', '_attachments': [ + {'filename': 'blah.mp3', } + ]} + action_params = AutomaticTranscriptionAction.build_params({}, survey) + action_instance = AutomaticTranscriptionAction(action_params) + + # check that the changes ARE needed + assert action_instance.check_submission_status(submission) == ACTION_NEEDED + + # run the change on the submission + submission = action_instance.run_change(submission) + # # check that the changes are NO LONGER needed + assert action_instance.check_submission_status(submission) == PASSES + + # a user indicating they want an automatic trancription changes the value + # in the submission._supplementalDetails from NOT_REQUESTED to REQUESTED_BY_USER + sdeets = submission['_supplementalDetails'] + example_fs_key = [*sdeets.keys()][0] + sdeets[example_fs_key] = REQUESTED_BY_USER + assert action_instance.check_submission_status(submission) == ACTION_NEEDED + + submission = action_instance.run_change(submission) + assert action_instance.check_submission_status(submission) == PASSES + sdeets = submission['_supplementalDetails'] + example_fs_key = [*sdeets.keys()][0] + assert sdeets[example_fs_key] == PENDING + + @patch('kobo.apps.subsequences.integrations.google.google_transcribe.GoogleTranscriptionService') + @patch('kobo.apps.subsequences.integrations.google.google_translate.GoogleTranslationService') + def test_transcription_requested( + self, + mock_TranslationService, + mock_TranscriptionService, + ): + mock_transcript_object = Mock(process_data=Mock(return_value={})) + mock_TranscriptionService.return_value = mock_transcript_object + + submission = SubmissionExtras.objects.create( + asset = self.asset, + submission_uuid='123abc', + content={ + 'ask_a_question': { + GOOGLETS: { + "status":"requested","languageCode":"en" + } + } + } + ) + assert mock_transcript_object.process_data.call_count == 1 + + mock_translation_object = Mock(process_data=Mock(return_value={})) + mock_TranslationService.return_value = mock_translation_object + + submission = SubmissionExtras.objects.create( + asset = self.asset, + submission_uuid='1234abcd', + content={ + 'ask_a_question': { + GOOGLETX: { + "status":"requested","languageCode":"en" + } + } + } + ) + assert mock_translation_object.process_data.call_count == 1