diff --git a/app-builder/plugins/fit_py_code_node_tools/conf/__init__.py b/app-builder/plugins/fit_py_code_node_tools/conf/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/app-builder/plugins/fit_py_code_node_tools/conf/application.yml b/app-builder/plugins/fit_py_code_node_tools/conf/application.yml new file mode 100644 index 0000000000..95928426f8 --- /dev/null +++ b/app-builder/plugins/fit_py_code_node_tools/conf/application.yml @@ -0,0 +1,31 @@ +user: + function: + entrypoint: 'main' + +code: + import: + whitelist: + - 'json' + - 'typing' + - 'pandas' + - 'numpy' + - 're' + - 'requests' + - 'httpx' + - 'datetime' + - 'time' + - 'base64' + - 'hashlib' + blacklist: + - 'os' + - 'sys' + - 'cmd' + - 'subprocess' + - 'multiprocessing' + - 'timeit' + - 'platform' + - 'asyncio' + timeout: 10 + max_pool: 4 + mem_limit: 189792256 # 181*1024*1024 + verbose: False diff --git a/app-builder/plugins/fit_py_code_node_tools/conf/info.yml b/app-builder/plugins/fit_py_code_node_tools/conf/info.yml new file mode 100644 index 0000000000..eb1e2d37c5 --- /dev/null +++ b/app-builder/plugins/fit_py_code_node_tools/conf/info.yml @@ -0,0 +1,2 @@ +category: "system" +level: 4 diff --git a/app-builder/plugins/fit_py_code_node_tools/python_repl.py b/app-builder/plugins/fit_py_code_node_tools/python_repl.py new file mode 100644 index 0000000000..dd4faf6ada --- /dev/null +++ b/app-builder/plugins/fit_py_code_node_tools/python_repl.py @@ -0,0 +1,139 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +import multiprocessing +import os +from typing import Dict +import threading + +from fitframework.api.decorators import fitable, value as FitConfigValue +from fitframework.api.logging import fit_logger +from fitframework.core.exception.fit_exception import FitException, InternalErrorCode +from fitframework.utils.tools import to_list + +from .python_repl_impl import execute_node_impl, GLOBAL_CONFIG + + +@FitConfigValue(key='user.function.entrypoint', default_value='main') +def _read_entrypoint_from_config(): + pass + + +@FitConfigValue(key='code.import.whitelist', default_value=['asyncio', 'json', 'numpy', 'typing'], converter=to_list) +def _read_import_whitelist_from_config(): + pass + + +@FitConfigValue(key='code.import.blacklist', + default_value=['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'], + converter=to_list) +def _read_import_blacklist_from_config(): + pass + + +@FitConfigValue(key='code.timeout', default_value=10, converter=int) +def _timeout(): + pass + + +@FitConfigValue(key='code.max_pool', default_value=4, converter=int) +def _max_pool(): + pass + + +@FitConfigValue(key='code.mem_limit', default_value=181*1024*1024, converter=int) +def _mem_limit(): + pass + + +@FitConfigValue(key='code.verbose', default_value=False, converter=bool) +def _verbose(): + pass + + +def _init_config(): + GLOBAL_CONFIG["entrypoint"] = _read_entrypoint_from_config() + GLOBAL_CONFIG["whitelist"] = _read_import_whitelist_from_config() + GLOBAL_CONFIG["blacklist"] = _read_import_blacklist_from_config() + GLOBAL_CONFIG["timeout"] = _timeout() + GLOBAL_CONFIG["max_pool"] = _max_pool() + GLOBAL_CONFIG["mem_limit"] = _mem_limit() + GLOBAL_CONFIG["verbose"] = _verbose() + + +class Singleton(type): + _lock = threading.Lock() + + def __init__(cls, *args, **kwargs): + cls._instance = None + super().__init__(*args, **kwargs) + + def __call__(cls, *args, **kwargs): + if cls._instance: + return cls._instance + + with cls._lock: + if not cls._instance: + cls._instance = super().__call__(*args, **kwargs) + + return cls._instance + + +class CodeExecutor(metaclass=Singleton): + def __init__(self): + _init_config() + self.pools = [] + for _ in range(GLOBAL_CONFIG["max_pool"]): + lock = threading.Lock() + pool = multiprocessing.Pool(processes=1) + self.pools.append((lock, pool)) + self.index = 0 + self.index_lock = threading.Lock() + self.config = GLOBAL_CONFIG + + def get_and_increment(self) -> int: + with self.index_lock: + i = self.index + self.index = i + 1 if i < self.config["max_pool"] - 1 else 0 + return i + + +def _print_process_usage(): + import psutil + # Get the current process ID + pid = os.getpid() + + # Create a Process object for the current process + process = psutil.Process(pid) + + # Get the CPU and memory usage of the current process + cpu_usage = process.cpu_percent(interval=1.0) # This returns the CPU usage as a percentage + memory_info = process.memory_info() # Returns memory usage as a named tuple (rss, vms) + + # rss (Resident Set Size) - the non-swapped physical memory the process has used + # vms (Virtual Memory Size) - the total memory the process can access + memory_usage = memory_info.rss / (1024 * 1024) # Convert to MB + virtual_memory = memory_info.vms / (1024 * 1024) # Convert to MB + + # Print CPU and memory usage + fit_logger.info(f"CPU Usage: {cpu_usage}%, Memory Usage (RSS): {memory_usage:.2f} MB, " + f"Virtual Memory Usage (VMS): {virtual_memory:.2f} MB") + + current_process = psutil.Process() + children = current_process.children(recursive=True) + for child in children: + fit_logger.info('Child pid is {}'.format(child.pid)) + + +@fitable("CodeNode.tool", "Python_REPL") +def execute_code(args: Dict[str, object], code: str) -> object: + # 由于插件初始化时使用守护进程,无法拉起进程池中的进程,选择在初次调用时初始化进程池 + executor = CodeExecutor() + if GLOBAL_CONFIG["verbose"]: + _print_process_usage() + res = execute_node_impl(executor.pools, executor.get_and_increment(), args, code, GLOBAL_CONFIG) + if res.isOk: + return res.value + raise FitException(res.error_code, res.msg) \ No newline at end of file diff --git a/app-builder/plugins/fit_py_code_node_tools/python_repl_impl.py b/app-builder/plugins/fit_py_code_node_tools/python_repl_impl.py new file mode 100644 index 0000000000..7402264ff2 --- /dev/null +++ b/app-builder/plugins/fit_py_code_node_tools/python_repl_impl.py @@ -0,0 +1,168 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +import asyncio +import importlib +import inspect +import json +import multiprocessing +import platform +import re +from typing import Any, Dict, List, Tuple +from pydantic import BaseModel + +if platform.system() == 'Windows': + from enum import IntEnum + + class InternalErrorCode(IntEnum): + EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000105 + TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000106 # java 不存在 +else: + from fitframework.core.exception.fit_exception import InternalErrorCode +try: + import resource +except ImportError: + resource = None +try: + from .safe_global import safe_builtins +except ImportError as e: + from safe_global import safe_builtins + +_PYTHON_REPL_HEADER = ''' +import json +from typing import Any + +Output = Any + + +''' + +GLOBAL_CONFIG = \ + { + "header": _PYTHON_REPL_HEADER, + "header_len": len(_PYTHON_REPL_HEADER.split('\n')), + "entrypoint": 'main', + "whitelist": ['json', 'typing'], + "blacklist": ['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'], + "timeout": 10, + "max_pool": 4, + "mem_limit": 181 * 1024 * 1024, + "verbose": False + } + + +class Result(BaseModel): + isOk: bool + value: Any = None + error_code: int + msg: str = None + + @staticmethod + def ok(data: Any) -> 'Result': + return Result(isOk=True, value=data, error_code=0) + + @staticmethod + def err(err_code: int, err_msg: str) -> 'Result': + return Result(isOk=False, error_code=err_code, msg=err_msg) + + +# 创建一个安全的执行环境 +def _create_restricted_exec_env(config: Dict[str, object]): + def safer_import(name, my_globals=None, my_locals=None, fromlist=(), level=0): + if name not in config['whitelist'] or name in config['blacklist']: + raise NameError(f'model {name} is not valid') + return importlib.import_module(name) + + safe_globals = { + '__builtins__': { + **safe_builtins, + '__import__': safer_import, + 'Args': Dict + } + } + return safe_globals + + +# 获取内存使用(单位:kB) +def _get_current_memory_usage(): + with open('/proc/self/status') as f: + mem_usage = f.read().split('VmPeak:')[1].split('\n')[0].strip() + return int(mem_usage.split()[0].strip()) + + +# 执行受限代码 +def _execute_code_with_restricted_python(args: Dict[str, object], code: str, config: Dict[str, object]): + if resource: + resource.setrlimit(resource.RLIMIT_AS, (GLOBAL_CONFIG["mem_limit"], GLOBAL_CONFIG["mem_limit"])) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + full_python_code = (f"{config['header']}" + f'{code}\n\n') + + safer_globals = _create_restricted_exec_env(config) + exec(full_python_code, safer_globals) + entrypoint = config['entrypoint'] + if (entrypoint not in safer_globals or + not inspect.isfunction(safer_globals.get(entrypoint))): + raise NameError("main function not defined") + entrypoint = safer_globals.get(entrypoint) + if inspect.iscoroutinefunction(entrypoint): + ret = loop.run_until_complete(asyncio.wait_for(entrypoint(args), config['timeout'])) + return Result.ok(json.dumps(ret)) + else: + return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, + "Unable to execute non-asynchronous function") + except asyncio.TimeoutError: + return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value, + "[TimeoutError] Execution timed out") + except Exception as err: + return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, _get_except_msg(err, config)) + finally: + loop.close() + + +def _get_except_msg(error: Any, config: Dict[str, object]) -> str: + if isinstance(error, SyntaxError): + error_msg = f"{error.msg} at line {error.lineno - config['header_len']}, column {error.offset}: {error.text}" + elif isinstance(error, KeyError): + error_msg = f"key {str(error)} do not exist" + else: + error_msg = str(error) + return f"[{error.__class__.__name__}] {error_msg}" + + +def _get_free_process_pool(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index): + lock = pools[index][0] + if lock.acquire(): + return lock + raise multiprocessing.TimeoutError() + + +def execute_node_impl(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index: int, + args: Dict[str, object], code: str, config: Dict[str, object]): + match = _validate_escape(code) + if match is not None: + return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, + f'{match.group()} is not allowed in code node') + lock = _get_free_process_pool(pools, index) + pool = pools[index][1] + try: + result = pool.apply_async(_execute_code_with_restricted_python, args=[args, code, config]) + return result.get(config['timeout']) + except multiprocessing.TimeoutError: + index = pools.index((lock, pool)) + pool.terminate() + pools[index] = (lock, multiprocessing.Pool(processes=1)) + return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value, + "[TimeoutError] Execution timed out") + finally: + lock.release() + + +def _validate_escape(code: str) -> bool: + # 校验代码中是否存在获取栈帧的字段,禁用可能用于沙箱逃逸的端 + pattern = r'.gi_frame|.tb_frame|__[a-zA-Z]+__' + return re.search(pattern, code) \ No newline at end of file diff --git a/app-builder/plugins/fit_py_code_node_tools/safe_global.py b/app-builder/plugins/fit_py_code_node_tools/safe_global.py new file mode 100644 index 0000000000..95181a3d3d --- /dev/null +++ b/app-builder/plugins/fit_py_code_node_tools/safe_global.py @@ -0,0 +1,103 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +import builtins + + +safe_builtins = {} + +_safe_names = [ + '__build_class__', + 'None', + 'False', + 'True', + 'abs', + 'bool', + 'bytes', + 'callable', + 'chr', + 'complex', + 'dict', + 'divmod', + 'float', + 'hash', + 'hex', + 'id', + 'int', + 'isinstance', + 'issubclass', + 'len', + 'list', + 'oct', + 'ord', + 'pow', + 'range', + 'repr', + 'round', + 'set', + 'slice', + 'sorted', + 'str', + 'tuple', + 'zip' +] + +_safe_exceptions = [ + 'ArithmeticError', + 'AssertionError', + 'AttributeError', + 'BaseException', + 'BufferError', + 'BytesWarning', + 'DeprecationWarning', + 'EOFError', + 'EnvironmentError', + 'Exception', + 'FloatingPointError', + 'FutureWarning', + 'GeneratorExit', + 'IOError', + 'ImportError', + 'ImportWarning', + 'IndentationError', + 'IndexError', + 'KeyError', + 'KeyboardInterrupt', + 'LookupError', + 'MemoryError', + 'NameError', + 'NotImplementedError', + 'OSError', + 'OverflowError', + 'PendingDeprecationWarning', + 'ReferenceError', + 'RuntimeError', + 'RuntimeWarning', + 'StopIteration', + 'SyntaxError', + 'SyntaxWarning', + 'SystemError', + 'SystemExit', + 'TabError', + 'TypeError', + 'UnboundLocalError', + 'UnicodeDecodeError', + 'UnicodeEncodeError', + 'UnicodeError', + 'UnicodeTranslateError', + 'UnicodeWarning', + 'UserWarning', + 'ValueError', + 'Warning', + 'ZeroDivisionError', +] + +for safe_name in _safe_names: + safe_builtins[safe_name] = getattr(builtins, safe_name) + +for safe_exception in _safe_exceptions: + safe_builtins[safe_exception] = getattr(builtins, safe_exception) + +safe_globals = {'__builtins__': safe_builtins} diff --git a/app-builder/plugins/fit_py_code_node_tools/test_python_repl_impl.py b/app-builder/plugins/fit_py_code_node_tools/test_python_repl_impl.py new file mode 100644 index 0000000000..d9bcfc122f --- /dev/null +++ b/app-builder/plugins/fit_py_code_node_tools/test_python_repl_impl.py @@ -0,0 +1,201 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +import json +import multiprocessing +import threading +import unittest + +try: + from python_repl_impl import execute_node_impl, GLOBAL_CONFIG +except ImportError as e: + from .python_repl_impl import execute_node_impl, GLOBAL_CONFIG + +WITH_ARGS_CODE = """ +async def main(args): + return args['n'] +""" + +WITHOUT_ARGS_CODE = """ +async def main(): + return args['n'] +""" + +INVALID_MODULE_CODE = """ +import time +def main(args): + return 1 +""" + +NO_ENTRYPOINT_CODE = """ +async def mine(args): + return 1 +""" + +SYNTAX_ERROR_CODE = """ +async def main(args): + return 1////2 +""" + +ASYNC_CODE = """ +import asyncio +async def main(args): + await asyncio.sleep(10) + return 1 +""" + +INF_LOOP_CODE = """ +async def main(args): + while True: + continue + return 1 +""" + +NON_ASYNC_CODE = """ +def main(args): + return 1 +""" + +WARN_CODE = ''' +async def exception_frame(): + try: + import os + except Exception as e: + return e.__traceback__.tb_frame.f_back.f_back.f_globals['_IMPORT_WHITELIST'] +b=exception_frame() +b.add('os') +''' + +WARN_CODE1 = ''' +async def exception_frame(): + ret = ().__class__.__bases__[0].__subclasses__()[133].__globals__["mkdir"]("tmp_file") +''' + +WARN_CODE2 = ''' +async def exception_frame(): + try: + import os + except Exception as e: + return e.tb_frame.f_back.f_back.f_globals['_IMPORT_WHITELIST'] +b=exception_frame() +b.add('os') +''' + +ERROR_CODE = """ +async def main(args): + return 1/0 +""" + +UNSERIALIZABLE_CODE = """ +async def main(args): + return {"n" : 1}.keys() +""" + +MEMORY_ERROR_CODE = """ +async def main(args): + k = [i for i in range(10**7)] + return k +""" + + +class CodeExecutor: + def __init__(self): + self.pools = [] + for _ in range(GLOBAL_CONFIG["max_pool"]): + lock = threading.Lock() + pool = multiprocessing.Pool(processes=1) + self.pools.append((lock, pool)) + self.index = 0 + self.index_lock = threading.Lock() + self.config = GLOBAL_CONFIG + + def get_and_increment(self) -> int: + with self.index_lock: + i = self.index + self.index = i + 1 if i < self.config["max_pool"] - 1 else 0 + return i + + +executor = CodeExecutor() +user_args = dict() + + +class TestStringMethods(unittest.TestCase): + def test_with_args(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), {"n": 1}, WITH_ARGS_CODE, executor.config) + self.assertTrue(res.isOk) + self.assertEqual(json.loads(res.value), 1) + + def test_without_args(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), {"n": 1}, WITHOUT_ARGS_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, "[TypeError] main() takes 0 positional arguments but 1 was given") + + def test_key_not_exist(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WITH_ARGS_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, "[KeyError] key 'n' do not exist") + + def test_invalid_module(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, INVALID_MODULE_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '[NameError] model time is not valid') + + def test_no_entrypoint(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, NO_ENTRYPOINT_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '[NameError] main function not defined') + + def test_syntax_error(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, SYNTAX_ERROR_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '[SyntaxError] invalid syntax at line 2, column 15: return 1////2') + + def test_async_timeout(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, ASYNC_CODE, executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '[TimeoutError] Execution timed out') + + def test_non_async_code(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, NON_ASYNC_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, "Unable to execute non-asynchronous function") + + def test_warn_code(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WARN_CODE, executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '__traceback__ is not allowed in code node') + + def test_warn_code_double_under_score(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WARN_CODE1, executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '__class__ is not allowed in code node') + + def test_warn_code_frame_escape(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, WARN_CODE2, executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '.tb_frame is not allowed in code node') + + def test_code_error(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, ERROR_CODE, executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '[ZeroDivisionError] division by zero') + + def test_unserializable_code(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), user_args, UNSERIALIZABLE_CODE, + executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, "[TypeError] Object of type dict_keys is not JSON serializable") + + def test_inf_loop(self): + res = execute_node_impl(executor.pools, executor.get_and_increment(), {"n": 1}, INF_LOOP_CODE, executor.config) + self.assertFalse(res.isOk) + self.assertEqual(res.msg, '[TimeoutError] Execution timed out') diff --git a/app-builder/plugins/fit_py_internet_search/conf/__init__.py b/app-builder/plugins/fit_py_internet_search/conf/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/app-builder/plugins/fit_py_internet_search/conf/application.yml b/app-builder/plugins/fit_py_internet_search/conf/application.yml new file mode 100644 index 0000000000..8f8f2116c8 --- /dev/null +++ b/app-builder/plugins/fit_py_internet_search/conf/application.yml @@ -0,0 +1,6 @@ +internet-search: + max_results_per_provider: 5 + api-key: + exa: "https://dashboard.exa.ai/home -- 登录获取api key" + tavily: "https://app.tavily.com/home -- 登录获取api key" + linkup: "https://app.linkup.so/api-keys -- 登录获取api key" diff --git a/app-builder/plugins/fit_py_internet_search/conf/info.yml b/app-builder/plugins/fit_py_internet_search/conf/info.yml new file mode 100644 index 0000000000..eb1e2d37c5 --- /dev/null +++ b/app-builder/plugins/fit_py_internet_search/conf/info.yml @@ -0,0 +1,2 @@ +category: "system" +level: 4 diff --git a/app-builder/plugins/fit_py_internet_search/schema/search-online-plugin.sql b/app-builder/plugins/fit_py_internet_search/schema/search-online-plugin.sql new file mode 100644 index 0000000000..c73a3eb430 --- /dev/null +++ b/app-builder/plugins/fit_py_internet_search/schema/search-online-plugin.sql @@ -0,0 +1,22 @@ +-- 1. 导出 store_plugin 表数据 +INSERT INTO store_plugin (plugin_id, plugin_name, extension, deploy_status, is_builtin, source, icon) VALUES ('54e9a5117a5e3806c32fcefbec3368e90fdb57fe592415d47c553f62fc14f6c4', '联网搜索插件', '{"pluginFullName":"demo-1759045119331.zip","checksum":"d2c24dcd8436a0d7c7f7beb3717c0da1f3b0593b7cab2b2666d2542dd64a1db9","name":"联网搜索插件","description":"联网搜索插件","type":"python","uniqueness.name":"abbb-20250926103953-ea11a707"}', 'DEPLOYED', true, '', NULL) ON CONFLICT (plugin_id) DO NOTHING; + +-- 2. 导出 store_plugin_tool 表数据 +INSERT INTO store_plugin_tool (tool_name, plugin_id, tool_unique_name, source, icon) VALUES ('联网搜索工具', '54e9a5117a5e3806c32fcefbec3368e90fdb57fe592415d47c553f62fc14f6c4', '4b2ad1e7-5019-40c3-9de9-cbbb7cb7dea6', '', NULL) ON CONFLICT(plugin_id, tool_unique_name) DO NOTHING; + +-- 3. 导出 store_tool 表数据 +INSERT INTO store_tool (name, schema, runnables, extensions, unique_name, version, is_latest, group_name, definition_name, definition_group_name) VALUES ('联网搜索工具', '{"name":"联网搜索工具","description":"联网搜索工具","parameters":{"type":"object","properties":{"query":{"examples":"","defaultValue":"","name":"query","description":"问题","type":"string","required":false}},"required":["query"]},"return":{"convertor":"","examples":"","name":"","description":"联网搜索的结果","type":"array","items":{"type":"object","properties":{"score":{"type":"number"},"metadata":{"type":"object"},"id":{"type":"string"},"text":{"type":"string"}}}},"order":["query"]}', '{"FIT":{"genericableId":"Search.Online.tool","fitableId":"Python_REPL"}}', '{"tags":["FIT"]}', '4b2ad1e7-5019-40c3-9de9-cbbb7cb7dea6', '1.0.0', true, 'Search-Online-tool-Impl', '联网搜索工具', 'Search-Online-tool') ON CONFLICT(unique_name, version) DO NOTHING; + +-- 4. 导出 store_definition 表数据 +INSERT INTO store_definition (name, schema, definition_group_name) VALUES ('联网搜索工具', '{"name":"联网搜索工具","description":"联网搜索工具","parameters":{"type":"object","properties":{"query":{"defaultValue":"","description":"问题","name":"query","type":"string","examples":"","required":true}},"required":["query"]},"order":["query"],"return":{"type":"array","items":{"type":"object","properties":{"id":{"type":"string"},"text":{"type":"string"},"score":{"type":"number"},"metadata":{"type":"object"}}},"convertor":""}}', 'Search-Online-tool') ON CONFLICT (definition_group_name, name) DO NOTHING; + +-- 5. 导出 store_definition_group 表数据(去重 name) +INSERT INTO store_definition_group (name, summary, description, extensions) VALUES ('Search-Online-tool', '', '', '{}') ON CONFLICT(name) DO NOTHING; + +-- 6. 导出 store_tag 表数据 +INSERT INTO store_tag (tool_unique_name, name) VALUES ('4b2ad1e7-5019-40c3-9de9-cbbb7cb7dea6', 'FIT') ON CONFLICT(tool_unique_name, name) DO NOTHING; +INSERT INTO store_tag (tool_unique_name, name) VALUES ('4b2ad1e7-5019-40c3-9de9-cbbb7cb7dea6', 'SEARCHONLINENODESTATE') ON CONFLICT(tool_unique_name, name) DO NOTHING; + +-- 7. 导出 store_tool_group 表数据(去重 name) +INSERT INTO store_tool_group (name, definition_group_name, summary, description, extensions) VALUES ('Search-Online-tool-Impl', 'Search-Online-tool', '', '', '{}') ON CONFLICT(name) DO NOTHING; + diff --git a/app-builder/plugins/fit_py_internet_search/src/__init__.py b/app-builder/plugins/fit_py_internet_search/src/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/app-builder/plugins/fit_py_internet_search/src/internet_search.py b/app-builder/plugins/fit_py_internet_search/src/internet_search.py new file mode 100644 index 0000000000..02038f66d0 --- /dev/null +++ b/app-builder/plugins/fit_py_internet_search/src/internet_search.py @@ -0,0 +1,239 @@ +# -- encoding: utf-8 -- +# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the ModelEngine Project. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ====================================================================================================================== +import json +from dataclasses import dataclass +from typing import Dict, List, Optional, Sequence +from linkup import LinkupClient +from tavily import TavilyClient +from exa_py import Exa + +from fitframework.api.decorators import fitable, value +from fitframework.api.logging import sys_plugin_logger +from fitframework.core.exception.fit_exception import FitException, InternalErrorCode + + +@dataclass +class SearchItem: + id: str + text: str + score: float + metadata: Dict[str, object] + + def to_dict(self) -> dict: + """转换为字典,确保所有字段都可序列化""" + return { + "id": self.id, + "text": self.text, + "score": self.score, + "metadata": { + k: (v.isoformat() if hasattr(v, 'isoformat') else v) # 处理日期等特殊类型 + for k, v in self.metadata.items() + } + } + + def to_json(self) -> str: + """转换为JSON字符串""" + return json.dumps(self.to_dict(), ensure_ascii=False) + + @classmethod + def from_dict(cls, data: dict) -> 'SearchItem': + """从字典创建SearchItem""" + return cls( + id=data["id"], + text=data["text"], + score=data["score"], + metadata=data["metadata"] + ) + + +@value('internet-search.api-key.exa') +def _get_exa_api_key() -> str: + pass + + +@value('internet-search.api-key.tavily') +def _get_tavily_api_key() -> str: + pass + + +@value('internet-search.api-key.linkup') +def _get_linkup_api_key() -> str: + pass + + +@value('internet-search.max_results_per_provider') +def _get_max_results_per_provider() -> int: + pass + + +def _truncate(text: str, max_chars: int) -> str: + if len(text) <= max_chars: + return text + return text[: max_chars - 1].rstrip() + "…" + + +def _internet_search( + query: str, + api_keys: Dict[str, str], + providers: Optional[Sequence[str]] = None, + max_results_per_provider: int = 5, + max_snippet_chars: int = 500, +) -> List[SearchItem]: + """Run internet search via selected providers and return unified items with individual summaries.""" + selected = list(providers) if providers is not None else [] + if not selected: + for name in ("exa", "tavily", "linkup"): + if api_keys.get(name): + selected.append(name) + items: List[SearchItem] = [] + errors = [] # 记录失败的搜索工具 + + # Exa + if "exa" in selected and api_keys.get("exa"): + try: + exa_client = Exa(api_key=api_keys["exa"]) + res = exa_client.search_and_contents( + query, + text={"max_characters": 2000}, + livecrawl="always", + num_results=max_results_per_provider, + ) + for i, r in enumerate(getattr(res, "results", [])[:max_results_per_provider]): + text = _truncate(getattr(r, "text", "") or getattr(r, "content", "") or "", max_snippet_chars) + items.append( + SearchItem( + id=getattr(r, "id", "") or f"exa_{i}", + text=text, + score=12.0, # 使用float确保序列化 + metadata={ + "fileName": getattr(r, "title", "") or "", + "url": getattr(r, "url", "") or "", + "source": "exa", + "published_date": getattr(r, "published_date", None), + "summary": text, + } + ) + ) + except Exception as e: + sys_plugin_logger.warning(f'Failed to search in Exa tool: {str(e)}') + errors.append("exa") + + # Tavily + if "tavily" in selected and api_keys.get("tavily"): + try: + tavily_client = TavilyClient(api_key=api_keys["tavily"]) + res = tavily_client.search( + query=query, + max_results=max_results_per_provider, + include_images=False, + ) + for i, r in enumerate(res.get("results", [])[:max_results_per_provider]): + text = _truncate(r.get("content", "") or "", max_snippet_chars) + items.append( + SearchItem( + id=r.get("id", "") or f"tavily_{i}", + text=text, + score=12.0, + metadata={ + "fileName": r.get("title", "") or "", + "url": r.get("url", "") or "", + "source": "tavily", + "published_date": r.get("published_date"), + "summary": text, + } + ) + ) + except Exception as e: + sys_plugin_logger.warning(f'Failed to search in Tavily tool: {str(e)}') + errors.append("tavily") + + # Linkup + if "linkup" in selected and api_keys.get("linkup"): + try: + linkup_client = LinkupClient(api_key=api_keys["linkup"]) + resp = linkup_client.search( + query=query, + depth="standard", + output_type="searchResults", + include_images=False, + ) + for i, r in enumerate(getattr(resp, "results", [])[:max_results_per_provider]): + text = _truncate(getattr(r, "content", "") or getattr(r, "text", "") or "", max_snippet_chars) + items.append( + SearchItem( + id=getattr(r, "id", "") or f"linkup_{i}", + text=text, + score=12.0, + metadata={ + "fileName": getattr(r, "name", None) or getattr(r, "title", "") or "", + "url": getattr(r, "url", "") or "", + "source": "linkup", + "published_date": None, + "summary": text, + } + ) + ) + except Exception as e: + sys_plugin_logger.warning(f'Failed to search in Linkup tool: {str(e)}') + errors.append("linkup") + + # 如果所有搜索都失败了,才抛出异常 + if not items and errors: + raise FitException( + InternalErrorCode.CLIENT_ERROR, + f'All search tools failed: {", ".join(errors)}' + ) + + # 去重逻辑 + seen = set() + deduped: List[SearchItem] = [] + for it in items: + key = (it.metadata.get("url") or it.metadata.get("fileName") or it.id).strip() + if not key or key in seen: + continue + seen.add(key) + deduped.append(it) + + return deduped + + +# 序列化工具函数 +def serialize_search_items(items: List[SearchItem]) -> List[dict]: + """将SearchItem列表序列化为字典列表""" + return [item.to_dict() for item in items] + + +def deserialize_search_items(data: List[dict]) -> List[SearchItem]: + """从字典列表反序列化为SearchItem列表""" + return [SearchItem.from_dict(item) for item in data] + + +def search_items_to_json(items: List[SearchItem]) -> str: + """将SearchItem列表转换为JSON字符串""" + return json.dumps(serialize_search_items(items), ensure_ascii=False, indent=2) + + +@fitable("Search.Online.tool", "Python_REPL") +def search_online(query: str) -> List[SearchItem]: + try: + return _internet_search( + query=query, + api_keys={ + "exa": _get_exa_api_key(), + "tavily": _get_tavily_api_key(), + "linkup": _get_linkup_api_key(), + }, + providers=["exa", "tavily", "linkup"], + max_results_per_provider=_get_max_results_per_provider(), + ) + except Exception: + raise FitException(InternalErrorCode.CLIENT_ERROR, 'Failed to search for node information on the network') + + +def search_items_from_json(json_str: str) -> List[SearchItem]: + """从JSON字符串解析为SearchItem列表""" + data = json.loads(json_str) + return deserialize_search_items(data) diff --git a/app-builder/plugins/requirements.txt b/app-builder/plugins/requirements.txt new file mode 100644 index 0000000000..56cebacb5b --- /dev/null +++ b/app-builder/plugins/requirements.txt @@ -0,0 +1,9 @@ +pydantic==2.7.4 +psutil==6.1.1 +httpx==0.28.1 +pandas==2.1.3 +linkup-sdk +exa-py +openai +python-dotenv +tavily-python \ No newline at end of file diff --git a/shell/sql_build.sh b/shell/sql_build.sh index c16daaaf62..d2b2921c26 100644 --- a/shell/sql_build.sh +++ b/shell/sql_build.sh @@ -102,6 +102,9 @@ do cp "$i" "$directory/schema" done +# 联网搜索相关 sql 脚本 +cp ../app-builder/plugins/fit_py_internet_search/schema/search-online-plugin.sql "$directory/schema" + # 自定义模型相关 sql 脚本 app_model_center_schema_sql_list=$(find ../app-builder/plugins/aipp-custom-model-center/src/main/resources/sql/schema -name "*.sql") echo "${app_model_center_schema_sql_list}"