Skip to content

Commit

Permalink
Merge pull request #789 from Ikaros-521/owner
Browse files Browse the repository at this point in the history
【重要更新】修改voice_tmp_path_queue 由Queue改为List,待稳定性测试,为后期音频播放优先级打下基础;修复部分功能中含随机抽文案功能,在文案为空时会异常的bug
  • Loading branch information
Ikaros-521 authored Apr 29, 2024
2 parents 28a7e2b + d542055 commit b00845f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
55 changes: 38 additions & 17 deletions utils/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class Audio:
message_queue = []
message_queue_lock = threading.Lock()
message_queue_not_empty = threading.Condition(lock=message_queue_lock)
# 创建音频路径队列
voice_tmp_path_queue = Queue()
# 创建待播放音频路径队列
voice_tmp_path_queue = []
voice_tmp_path_queue_lock = threading.Lock()
voice_tmp_path_queue_not_empty = threading.Condition(lock=voice_tmp_path_queue_lock)
# # 文案单独一个线程排队播放
# only_play_copywriting_thread = None

Expand Down Expand Up @@ -124,7 +126,7 @@ def is_audio_queue_empty(self):
if len(Audio.message_queue) == 0:
flag += 1

if Audio.voice_tmp_path_queue.empty():
if len(Audio.voice_tmp_path_queue) == 0:
flag += 2

# 检查mixer_normal是否正在播放
Expand Down Expand Up @@ -473,7 +475,6 @@ def audio_synthesis(self, message):

# 是否开启了音频播放
if self.config.get("play_audio", "enable"):
# Audio.voice_tmp_path_queue.put(data_json)
self.data_priority_insert(data_json)
return
# 异常报警
Expand All @@ -491,7 +492,6 @@ def audio_synthesis(self, message):

# 是否开启了音频播放
if self.config.get("play_audio", "enable"):
# Audio.voice_tmp_path_queue.put(data_json)
self.data_priority_insert(data_json)
return
# 是否为本地问答音频
Expand Down Expand Up @@ -525,7 +525,6 @@ def audio_synthesis(self, message):

# 是否开启了音频播放
if self.config.get("play_audio", "enable"):
# Audio.voice_tmp_path_queue.put(data_json)
self.data_priority_insert(data_json)
return
# 是否为助播-本地问答音频
Expand All @@ -543,7 +542,6 @@ def audio_synthesis(self, message):

# 是否开启了音频播放
if self.config.get("play_audio", "enable"):
# Audio.voice_tmp_path_queue.put(data_json)
self.data_priority_insert(data_json)
return

Expand Down Expand Up @@ -575,7 +573,6 @@ def audio_synthesis(self, message):
if "insert_index" in data_json:
data_json["insert_index"] = message["insert_index"]

# Audio.voice_tmp_path_queue.put(data_json)
self.data_priority_insert(data_json)

return
Expand Down Expand Up @@ -938,7 +935,7 @@ def send_audio_play_info_to_callback(self, data: dict=None):
"type": "audio_playback_completed",
"data": {
# 待播放音频数量
"wait_play_audio_num": Audio.voice_tmp_path_queue.qsize(),
"wait_play_audio_num": len(Audio.voice_tmp_path_queue),
# 待合成音频的消息数量
"wait_synthesis_msg_num": len(Audio.message_queue),
}
Expand Down Expand Up @@ -966,7 +963,12 @@ async def my_play_voice(self, message):
try:
# 如果是tts类型为none,暂时这类为直接播放音频,所以就丢给路径队列
if message["tts_type"] == "none":
Audio.voice_tmp_path_queue.put(message)
# 获取线程锁,避免同时操作
with Audio.voice_tmp_path_queue_lock:
# 在计算出的位置插入新数据
Audio.voice_tmp_path_queue.insert(len(Audio.voice_tmp_path_queue) , message)
# 生产者通过notify()通知消费者列表中有新的消息
Audio.voice_tmp_path_queue_not_empty.notify()
return
except Exception as e:
logging.error(traceback.format_exc())
Expand Down Expand Up @@ -1003,13 +1005,23 @@ async def voice_change_and_put_to_queue(message, voice_tmp_path):
if message["type"] == "reply" and False == self.config.get("read_username", "voice_change"):
# 是否开启了音频播放,如果没开,则不会传文件路径给播放队列
if self.config.get("play_audio", "enable"):
Audio.voice_tmp_path_queue.put(data_json)
# 获取线程锁,避免同时操作
with Audio.voice_tmp_path_queue_lock:
# 在计算出的位置插入新数据
Audio.voice_tmp_path_queue.insert(len(Audio.voice_tmp_path_queue) , data_json)
# 生产者通过notify()通知消费者列表中有新的消息
Audio.voice_tmp_path_queue_not_empty.notify()
return True
# 区分消息类型是否是 念弹幕 并且 关闭了变声
elif message["type"] == "read_comment" and False == self.config.get("read_comment", "voice_change"):
# 是否开启了音频播放,如果没开,则不会传文件路径给播放队列
if self.config.get("play_audio", "enable"):
Audio.voice_tmp_path_queue.put(data_json)
# 获取线程锁,避免同时操作
with Audio.voice_tmp_path_queue_lock:
# 在计算出的位置插入新数据
Audio.voice_tmp_path_queue.insert(len(Audio.voice_tmp_path_queue) , data_json)
# 生产者通过notify()通知消费者列表中有新的消息
Audio.voice_tmp_path_queue_not_empty.notify()
return True

voice_tmp_path = await self.voice_change(voice_tmp_path)
Expand All @@ -1019,7 +1031,12 @@ async def voice_change_and_put_to_queue(message, voice_tmp_path):

# 是否开启了音频播放,如果没开,则不会传文件路径给播放队列
if self.config.get("play_audio", "enable"):
Audio.voice_tmp_path_queue.put(data_json)
# 获取线程锁,避免同时操作
with Audio.voice_tmp_path_queue_lock:
# 在计算出的位置插入新数据
Audio.voice_tmp_path_queue.insert(len(Audio.voice_tmp_path_queue) , data_json)
# 生产者通过notify()通知消费者列表中有新的消息
Audio.voice_tmp_path_queue_not_empty.notify()

return True

Expand Down Expand Up @@ -1112,9 +1129,13 @@ async def only_play_audio(self):
Audio.mixer_normal.init()
while True:
try:
# 从队列中获取音频文件路径 队列为空时阻塞等待
data_json = Audio.voice_tmp_path_queue.get(block=True)

# 获取线程锁,避免同时操作
with Audio.voice_tmp_path_queue_lock:
while not Audio.voice_tmp_path_queue:
# 消费者在消费完一个消息后,如果列表为空,则调用wait()方法阻塞自己,直到有新消息到来
Audio.voice_tmp_path_queue_not_empty.wait() # 阻塞直到列表非空
data_json = Audio.voice_tmp_path_queue.pop(0)

logging.debug(f"普通音频播放队列 data_json={data_json}")

voice_tmp_path = data_json["voice_path"]
Expand Down Expand Up @@ -1882,7 +1903,7 @@ async def voice_change_and_put_to_queue(voice_tmp_path):
raise Exception(f"{audio_synthesis_type}合成失败")

break
# Audio.voice_tmp_path_queue.put(voice_tmp_path)

except Exception as e:
logging.error(f"尝试失败,剩余重试次数:{retry_count - 1}")
logging.error(traceback.format_exc())
Expand Down
12 changes: 9 additions & 3 deletions utils/my_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,10 @@ def get_copywriting_and_audio_synthesis(view_num):
if int(integral_entrance_copywriting["entrance_num_interval"].split("-")[0]) <= \
view_num <= \
int(integral_entrance_copywriting["entrance_num_interval"].split("-")[1]):

if len(integral_entrance_copywriting["copywriting"]) <= 0:
return False

# 匹配文案
resp_content = random.choice(integral_entrance_copywriting["copywriting"])

Expand Down Expand Up @@ -2053,9 +2057,11 @@ def comment_handle(self, data):
# 将用户名中特殊字符替换为空
message['username'] = self.common.replace_special_characters(message['username'], "!!@#¥$%^&*_-+/——=()()【】}|{:;<>~`\\")
message['username'] = message['username'][:self.config.get("read_comment", "username_max_len")]
tmp_content = random.choice(self.config.get("read_comment", "read_username_copywriting"))
if "{username}" in tmp_content:
message['content'] = tmp_content.format(username=message['username']) + message['content']

if len(self.config.get("read_comment", "read_username_copywriting")) > 0:
tmp_content = random.choice(self.config.get("read_comment", "read_username_copywriting"))
if "{username}" in tmp_content:
message['content'] = tmp_content.format(username=message['username']) + message['content']


self.audio_synthesis_handle(message)
Expand Down

0 comments on commit b00845f

Please sign in to comment.