diff --git a/podcastfy/client.py b/podcastfy/client.py index 8a21f97..00554b2 100644 --- a/podcastfy/client.py +++ b/podcastfy/client.py @@ -94,7 +94,8 @@ def process_content( if generate_audio: api_key = None # edge does not require an API key - if tts_model != "edge": + # google requires application credentials file + if tts_model not in ("edge", "google"): api_key = getattr(config, f"{tts_model.upper()}_API_KEY") text_to_speech = TextToSpeech(model=tts_model, api_key=api_key) @@ -128,7 +129,7 @@ def main( None, "--tts-model", "-tts", - help="TTS model to use (openai, elevenlabs or edge)", + help="TTS model to use (openai, elevenlabs, google or edge)", ), transcript_only: bool = typer.Option( False, "--transcript-only", help="Generate only a transcript without audio" diff --git a/podcastfy/conversation_config.yaml b/podcastfy/conversation_config.yaml index bbee6c5..7b7e769 100644 --- a/podcastfy/conversation_config.yaml +++ b/podcastfy/conversation_config.yaml @@ -35,6 +35,17 @@ text_to_speech: default_voices: question: "en-US-JennyNeural" answer: "en-US-EricNeural" + google: + default_voices: + # REF: https://cloud.google.com/text-to-speech/docs/voices + question: + voice: "en-US-Journey-F" + #rate: 1.1 # not supported in Journey voices + #pitch: -2 # not supported in Journey voices + answer: + voice: "en-US-Journey-D" + #rate: 1.1 # not supported in Journey voices + #pitch: 0 # not supported in Journey voices audio_format: "mp3" temp_audio_dir: "data/audio/tmp/" - ending_message: "Bye Bye!" \ No newline at end of file + ending_message: "Bye Bye!" diff --git a/podcastfy/text_to_speech.py b/podcastfy/text_to_speech.py index 151f0cb..9efd34e 100644 --- a/podcastfy/text_to_speech.py +++ b/podcastfy/text_to_speech.py @@ -6,17 +6,25 @@ including cleaning of input text and merging of audio files. """ +# system imports +from typing import List, Tuple, Optional, Union import logging import asyncio -import edge_tts -from elevenlabs import client as elevenlabs_client -from podcastfy.utils.config import load_config -from podcastfy.utils.config_conversation import load_conversation_config -from pydub import AudioSegment import os import re +import io + +# third party imports +from elevenlabs import client as elevenlabs_client +from google.cloud import texttospeech # INSTALL: pip3 install google-cloud-texttospeech +from pydub import AudioSegment +import edge_tts import openai -from typing import List, Tuple, Optional, Union + +# local imports +from podcastfy.utils.config_conversation import load_conversation_config +from podcastfy.utils.config import load_config + logger = logging.getLogger(__name__) @@ -44,8 +52,13 @@ def __init__(self, model: str = 'openai', api_key: Optional[str] = None): openai.api_key = self.api_key elif self.model == 'edge': pass + elif self.model == 'google': + # GOOGLE_APPLICATION_CREDENTIALS environment variable must be set with + # the path to the service account JSON file. + # REF: https://cloud.google.com/docs/authentication/application-default-credentials + self.client = texttospeech.TextToSpeechClient() else: - raise ValueError("Invalid model. Choose 'elevenlabs', 'openai' or 'edge'.") + raise ValueError("Invalid model. Choose 'elevenlabs', 'openai', 'google' or 'edge'.") self.audio_format = self.tts_config['audio_format'] self.temp_audio_dir = self.tts_config['temp_audio_dir'] @@ -104,6 +117,8 @@ def convert_to_speech(self, text: str, output_file: str) -> None: self.__convert_to_speech_openai(cleaned_text, output_file) elif self.model == 'edge': self.__convert_to_speech_edge(cleaned_text, output_file) + elif self.model == 'google': + self.__convert_to_speech_google(cleaned_text, output_file) def __convert_to_speech_elevenlabs(self, text: str, output_file: str) -> None: try: @@ -181,6 +196,91 @@ def __convert_to_speech_openai(self, text: str, output_file: str) -> None: logger.error(f"Error converting text to speech with OpenAI: {str(e)}") raise + def __convert_to_speech_google(self, text: str, output_file: str) -> None: + try: + qa_pairs = self.split_qa(text) + combined_q = AudioSegment.empty() + combined_a = AudioSegment.empty() + tts_config = self.tts_config['google'] + + # configure the voices + question_voice = texttospeech.VoiceSelectionParams( + language_code=tts_config.get('language_code', 'en-US'), + name=tts_config['default_voices']['question']['voice'], + ) + answer_voice = texttospeech.VoiceSelectionParams( + language_code=tts_config.get('language_code', 'en-US'), + name=tts_config['default_voices']['answer']['voice'], + ) + counter = 0 + last_overlap_duration = 0.0 + for question, answer in qa_pairs: + for kind, content, speaker in [ + ('question', question, question_voice), + ('answer', answer, answer_voice), + ]: + counter += 1 + voice_config = tts_config['default_voices'][kind] + + # configure the audio output + config_kwargs = dict( + audio_encoding=texttospeech.AudioEncoding.OGG_OPUS, + speaking_rate=voice_config.get('rate'), + pitch=voice_config.get('pitch'), + ) + # remove None values from config_kwargs (required by some voices) + audio_config = texttospeech.AudioConfig( + **{k: v for k, v in config_kwargs.items() if v is not None} + ) + + # configure the synthesis input + use_text_input = 'Journey' in voice_config['voice'] # UGLY + if use_text_input: + synth_input_type = 'text' + else: + synth_input_type = 'ssml' + if not content.startswith(''): + content = f"{content}" + synth_input = texttospeech.SynthesisInput(**{synth_input_type: content}) + + # get audio content from the TTS service + response = self.client.synthesize_speech( + input=synth_input, + voice=speaker, + audio_config=audio_config, + ) + + # generate silent audio segment for the other speaker + overlap_duration = 0.1 # TODO: make this configurable + audio_segment = AudioSegment.from_file(io.BytesIO(response.audio_content), format='ogg') + duration_seconds = audio_segment.duration_seconds + silence_duration = duration_seconds - overlap_duration - last_overlap_duration + last_overlap_duration = overlap_duration + silence = AudioSegment.silent(duration=silence_duration * 1000) + + # add audio segments to each speaker + if kind == 'question': + combined_q += audio_segment + combined_a += silence + else: + combined_q += silence + combined_a += audio_segment + + # add silence for the last overlap + combined_q += AudioSegment.silent(duration=last_overlap_duration * 1000) + + # export speaker audio segments to separate files + combined_q.export(output_file.replace('.mp3', '_question.mp3'), format=self.audio_format) + combined_a.export(output_file.replace('.mp3', '_answer.mp3'), format=self.audio_format) + # merge speaker audio segments and save the result + combined_q.overlay(combined_a).export(output_file, format=self.audio_format) + + logger.info(f"Audio saved to {output_file}") + + except Exception as e: + logger.error(f"Error converting text to speech with Google: {str(e)}") + raise + def get_or_create_eventloop(): try: return asyncio.get_event_loop() @@ -322,26 +422,15 @@ def main(seed: int = 42) -> None: config = load_config() # Read input text from file - with open('tests/data/transcript_336aa9f955cd4019bc1287379a5a2820.txt', 'r') as file: + with open('tests/data/transcripts/transcript_336aa9f955cd4019bc1287379a5a2820.txt', 'r') as file: input_text = file.read() - # Test ElevenLabs - tts_elevenlabs = TextToSpeech(model='elevenlabs') - elevenlabs_output_file = 'tests/data/response_elevenlabs.mp3' - tts_elevenlabs.convert_to_speech(input_text, elevenlabs_output_file) - logger.info(f"ElevenLabs TTS completed. Output saved to {elevenlabs_output_file}") - - # Test OpenAI - tts_openai = TextToSpeech(model='openai') - openai_output_file = 'tests/data/response_openai.mp3' - tts_openai.convert_to_speech(input_text, openai_output_file) - logger.info(f"OpenAI TTS completed. Output saved to {openai_output_file}") - - # Test OpenAI - tts_edge = TextToSpeech(model='edge') - edge_output_file = 'tests/data/response_edge.mp3' - tts_edge.convert_to_speech(input_text, edge_output_file) - logger.info(f"Edge TTS completed. Output saved to {edge_output_file}") + # generate audio for each model + for model in ['elevenlabs', 'openai', 'google', 'edge']: + tts = TextToSpeech(model=model) + output_file = f'tests/data/response_{model}.mp3' + tts.convert_to_speech(input_text, output_file) + logger.info(f"{model.capitalize()} TTS completed. Output saved to {output_file}") except Exception as e: logger.error(f"An error occurred during text-to-speech conversion: {str(e)}")