Skip to content

Conversation

Soulter
Copy link
Member

@Soulter Soulter commented Sep 16, 2025

closes: #2768


Motivation / 动机

优化请求队列,防止当并发量高时消息历史记录被互相覆盖、无法保证顺序。

Modifications / 改动点

在 process_stage 加上了 session lock

Verification Steps / 验证步骤

可以同时在同一个会话中发送多条消息来测试顺序。

Screenshots or Test Results / 运行截图或测试结果

Compatibility & Breaking Changes / 兼容性与破坏性变更

  • 这是一个破坏性变更 (Breaking Change)。/ This is a breaking change.
  • 这不是一个破坏性变更。/ This is NOT a breaking change.

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Sourcery 总结

通过获取会话锁和验证处理程序元数据来优化事件处理,以确保在高并发下有序处理

错误修复:

  • 通过按会话序列化处理,防止消息历史记录被覆盖,并确保高负载下的事件顺序

功能增强:

  • 在 ProcessStage 的事件处理周围获取会话锁,以防止并发排序问题
  • 验证 activated_handlers 是一个列表,如果不是则记录错误
Original summary in English

Summary by Sourcery

Streamline event processing by acquiring session locks and validating handler metadata to ensure ordered handling under high concurrency

Bug Fixes:

  • Prevent message history from being overwritten and ensure event order under high load by serializing processing per session

Enhancements:

  • Acquire a session lock around event processing in ProcessStage to prevent concurrent ordering issues
  • Validate that activated_handlers is a list and log an error if not

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你好 - 我已经审阅了你的修改,它们看起来很棒!

AI 代理提示
请解决此代码审查中的注释:

## 单独注释

### 注释 1
<location> `astrbot/core/pipeline/process_stage/stage.py:44-47` </location>
<code_context>
-                if isinstance(resp, ProviderRequest):
-                    # Handler 的 LLM 请求
-                    event.set_extra("provider_request", resp)
-                    _t = False
-                    async for _ in self.llm_request_sub_stage.process(event):
-                        _t = True
-                        yield
</code_context>

<issue_to_address>
**suggestion:** 使用 '_t' 作为标志不明确,可以简化。

考虑将 '_t' 重命名为更具描述性的名称,例如 'has_yielded',以阐明其目的。

```suggestion
                    has_yielded = False
                    async for _ in self.llm_request_sub_stage.process(event):
                        has_yielded = True
                        yield
```
</issue_to_address>

### 注释 2
<location> `astrbot/core/pipeline/process_stage/stage.py:57-74` </location>
<code_context>
            if (
                not event._has_send_oper
                and event.is_at_or_wake_command
                and not event.call_llm
            ):
                # 是否有过发送操作 and 是否是被 @ 或者通过唤醒前缀
                if (
                    event.get_result() and not event.get_result().is_stopped()
                ) or not event.get_result():
                    # 事件没有终止传播
                    provider = self.ctx.plugin_manager.context.get_using_provider()

                    if not provider:
                        logger.info("未找到可用的 LLM 提供商,请先前往配置服务提供商。")
                        return

                    async for _ in self.llm_request_sub_stage.process(event):
                        yield

</code_context>

<issue_to_address>
**suggestion (code-quality):** 合并嵌套的 if 条件 ([`merge-nested-ifs`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/merge-nested-ifs))

```suggestion
            if (
                            not event._has_send_oper
                            and event.is_at_or_wake_command
                            and not event.call_llm
                        ) and ((
                                event.get_result() and not event.get_result().is_stopped()
                            ) or not event.get_result()):
                provider = self.ctx.plugin_manager.context.get_using_provider()

                if not provider:
                    logger.info("未找到可用的 LLM 提供商,请先前往配置服务提供商。")
                    return

                async for _ in self.llm_request_sub_stage.process(event):
                    yield

```

<br/><details><summary>解释</summary>过多的嵌套会使代码难以理解,在 Python 中尤其如此,因为没有括号来帮助区分不同的嵌套级别。阅读深度嵌套的代码令人困惑,因为你必须记住哪些条件与哪些级别相关。因此,我们力求在可能的情况下减少嵌套,并且使用 `and` 组合两个 `if` 条件的情况是一个简单的胜利。
</details>
</issue_to_address>

Sourcery 对开源项目免费 - 如果你喜欢我们的评论,请考虑分享它们 ✨
帮助我更有用!请点击每个评论上的 👍 或 👎,我将利用这些反馈来改进你的评论。
Original comment in English

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `astrbot/core/pipeline/process_stage/stage.py:44-47` </location>
<code_context>
-                if isinstance(resp, ProviderRequest):
-                    # Handler 的 LLM 请求
-                    event.set_extra("provider_request", resp)
-                    _t = False
-                    async for _ in self.llm_request_sub_stage.process(event):
-                        _t = True
-                        yield
</code_context>

<issue_to_address>
**suggestion:** The use of '_t' as a flag is unclear and could be simplified.

Consider renaming '_t' to something more descriptive, such as 'has_yielded', to clarify its purpose.

```suggestion
                    has_yielded = False
                    async for _ in self.llm_request_sub_stage.process(event):
                        has_yielded = True
                        yield
```
</issue_to_address>

### Comment 2
<location> `astrbot/core/pipeline/process_stage/stage.py:57-74` </location>
<code_context>
            if (
                not event._has_send_oper
                and event.is_at_or_wake_command
                and not event.call_llm
            ):
                # 是否有过发送操作 and 是否是被 @ 或者通过唤醒前缀
                if (
                    event.get_result() and not event.get_result().is_stopped()
                ) or not event.get_result():
                    # 事件没有终止传播
                    provider = self.ctx.plugin_manager.context.get_using_provider()

                    if not provider:
                        logger.info("未找到可用的 LLM 提供商,请先前往配置服务提供商。")
                        return

                    async for _ in self.llm_request_sub_stage.process(event):
                        yield

</code_context>

<issue_to_address>
**suggestion (code-quality):** Merge nested if conditions ([`merge-nested-ifs`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/merge-nested-ifs))

```suggestion
            if (
                            not event._has_send_oper
                            and event.is_at_or_wake_command
                            and not event.call_llm
                        ) and ((
                                event.get_result() and not event.get_result().is_stopped()
                            ) or not event.get_result()):
                provider = self.ctx.plugin_manager.context.get_using_provider()

                if not provider:
                    logger.info("未找到可用的 LLM 提供商,请先前往配置服务提供商。")
                    return

                async for _ in self.llm_request_sub_stage.process(event):
                    yield

```

<br/><details><summary>Explanation</summary>Too much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.

Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two `if` conditions can be combined using
`and` is an easy win.
</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Member

@anka-afk anka-afk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5fe1022c004593aed44688a5ac4f0809

@anka-afk anka-afk mentioned this pull request Sep 16, 2025
6 tasks
@Soulter
Copy link
Member Author

Soulter commented Sep 16, 2025

先不要合并,刚想到可能会存在死锁的问题

@Soulter Soulter marked this pull request as draft September 16, 2025 06:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature]增加请求队列选项
2 participants