Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
31 changes: 31 additions & 0 deletions app-builder/plugins/fit_py_code_node_tools/conf/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
user:
function:
entrypoint: 'main'

code:
import:
whitelist:
- 'json'
- 'typing'
- 'pandas'
- 'numpy'
- 're'
- 'requests'
- 'httpx'
- 'datetime'
- 'time'
- 'base64'
- 'hashlib'
blacklist:
- 'os'
- 'sys'
- 'cmd'
- 'subprocess'
- 'multiprocessing'
- 'timeit'
- 'platform'
- 'asyncio'
timeout: 10
max_pool: 4
mem_limit: 189792256 # 181*1024*1024
verbose: False
2 changes: 2 additions & 0 deletions app-builder/plugins/fit_py_code_node_tools/conf/info.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
category: "system"
level: 4
139 changes: 139 additions & 0 deletions app-builder/plugins/fit_py_code_node_tools/python_repl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# -- encoding: utf-8 --
# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the ModelEngine Project.
# Licensed under the MIT License. See License.txt in the project root for license information.
# ======================================================================================================================
import multiprocessing
import os
from typing import Dict
import threading

from fitframework.api.decorators import fitable, value as FitConfigValue
from fitframework.api.logging import fit_logger
from fitframework.core.exception.fit_exception import FitException, InternalErrorCode
from fitframework.utils.tools import to_list

from .python_repl_impl import execute_node_impl, GLOBAL_CONFIG


@FitConfigValue(key='user.function.entrypoint', default_value='main')
def _read_entrypoint_from_config():
pass


@FitConfigValue(key='code.import.whitelist', default_value=['asyncio', 'json', 'numpy', 'typing'], converter=to_list)
def _read_import_whitelist_from_config():
pass


@FitConfigValue(key='code.import.blacklist',
default_value=['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'],
converter=to_list)
def _read_import_blacklist_from_config():
pass


@FitConfigValue(key='code.timeout', default_value=10, converter=int)
def _timeout():
pass


@FitConfigValue(key='code.max_pool', default_value=4, converter=int)
def _max_pool():
pass


@FitConfigValue(key='code.mem_limit', default_value=181*1024*1024, converter=int)
def _mem_limit():
pass


@FitConfigValue(key='code.verbose', default_value=False, converter=bool)
def _verbose():
pass


def _init_config():
GLOBAL_CONFIG["entrypoint"] = _read_entrypoint_from_config()
GLOBAL_CONFIG["whitelist"] = _read_import_whitelist_from_config()
GLOBAL_CONFIG["blacklist"] = _read_import_blacklist_from_config()
GLOBAL_CONFIG["timeout"] = _timeout()
GLOBAL_CONFIG["max_pool"] = _max_pool()
GLOBAL_CONFIG["mem_limit"] = _mem_limit()
GLOBAL_CONFIG["verbose"] = _verbose()


class Singleton(type):
_lock = threading.Lock()

def __init__(cls, *args, **kwargs):
cls._instance = None
super().__init__(*args, **kwargs)

def __call__(cls, *args, **kwargs):
if cls._instance:
return cls._instance

with cls._lock:
if not cls._instance:
cls._instance = super().__call__(*args, **kwargs)

return cls._instance


class CodeExecutor(metaclass=Singleton):
def __init__(self):
_init_config()
self.pools = []
for _ in range(GLOBAL_CONFIG["max_pool"]):
lock = threading.Lock()
pool = multiprocessing.Pool(processes=1)
self.pools.append((lock, pool))
self.index = 0
self.index_lock = threading.Lock()
self.config = GLOBAL_CONFIG

def get_and_increment(self) -> int:
with self.index_lock:
i = self.index
self.index = i + 1 if i < self.config["max_pool"] - 1 else 0
return i


def _print_process_usage():
import psutil
# Get the current process ID
pid = os.getpid()

# Create a Process object for the current process
process = psutil.Process(pid)

# Get the CPU and memory usage of the current process
cpu_usage = process.cpu_percent(interval=1.0) # This returns the CPU usage as a percentage
memory_info = process.memory_info() # Returns memory usage as a named tuple (rss, vms)

# rss (Resident Set Size) - the non-swapped physical memory the process has used
# vms (Virtual Memory Size) - the total memory the process can access
memory_usage = memory_info.rss / (1024 * 1024) # Convert to MB
virtual_memory = memory_info.vms / (1024 * 1024) # Convert to MB

# Print CPU and memory usage
fit_logger.info(f"CPU Usage: {cpu_usage}%, Memory Usage (RSS): {memory_usage:.2f} MB, "
f"Virtual Memory Usage (VMS): {virtual_memory:.2f} MB")

current_process = psutil.Process()
children = current_process.children(recursive=True)
for child in children:
fit_logger.info('Child pid is {}'.format(child.pid))


@fitable("CodeNode.tool", "Python_REPL")
def execute_code(args: Dict[str, object], code: str) -> object:
# 由于插件初始化时使用守护进程,无法拉起进程池中的进程,选择在初次调用时初始化进程池
executor = CodeExecutor()
if GLOBAL_CONFIG["verbose"]:
_print_process_usage()
res = execute_node_impl(executor.pools, executor.get_and_increment(), args, code, GLOBAL_CONFIG)
if res.isOk:
return res.value
raise FitException(res.error_code, res.msg)
168 changes: 168 additions & 0 deletions app-builder/plugins/fit_py_code_node_tools/python_repl_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# -- encoding: utf-8 --
# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the ModelEngine Project.
# Licensed under the MIT License. See License.txt in the project root for license information.
# ======================================================================================================================
import asyncio
import importlib
import inspect
import json
import multiprocessing
import platform
import re
from typing import Any, Dict, List, Tuple
from pydantic import BaseModel

if platform.system() == 'Windows':
from enum import IntEnum

class InternalErrorCode(IntEnum):
EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000105
TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000106 # java 不存在
else:
from fitframework.core.exception.fit_exception import InternalErrorCode
try:
import resource
except ImportError:
resource = None
try:
from .safe_global import safe_builtins
except ImportError as e:
from safe_global import safe_builtins

_PYTHON_REPL_HEADER = '''
import json
from typing import Any

Output = Any


'''

GLOBAL_CONFIG = \
{
"header": _PYTHON_REPL_HEADER,
"header_len": len(_PYTHON_REPL_HEADER.split('\n')),
"entrypoint": 'main',
"whitelist": ['json', 'typing'],
"blacklist": ['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'],
"timeout": 10,
"max_pool": 4,
"mem_limit": 181 * 1024 * 1024,
"verbose": False
}


class Result(BaseModel):
isOk: bool
value: Any = None
error_code: int
msg: str = None

@staticmethod
def ok(data: Any) -> 'Result':
return Result(isOk=True, value=data, error_code=0)

@staticmethod
def err(err_code: int, err_msg: str) -> 'Result':
return Result(isOk=False, error_code=err_code, msg=err_msg)


# 创建一个安全的执行环境
def _create_restricted_exec_env(config: Dict[str, object]):
def safer_import(name, my_globals=None, my_locals=None, fromlist=(), level=0):
if name not in config['whitelist'] or name in config['blacklist']:
raise NameError(f'model {name} is not valid')
return importlib.import_module(name)

safe_globals = {
'__builtins__': {
**safe_builtins,
'__import__': safer_import,
'Args': Dict
}
}
return safe_globals


# 获取内存使用(单位:kB)
def _get_current_memory_usage():
with open('/proc/self/status') as f:
mem_usage = f.read().split('VmPeak:')[1].split('\n')[0].strip()
return int(mem_usage.split()[0].strip())


# 执行受限代码
def _execute_code_with_restricted_python(args: Dict[str, object], code: str, config: Dict[str, object]):
if resource:
resource.setrlimit(resource.RLIMIT_AS, (GLOBAL_CONFIG["mem_limit"], GLOBAL_CONFIG["mem_limit"]))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
full_python_code = (f"{config['header']}"
f'{code}\n\n')

safer_globals = _create_restricted_exec_env(config)
exec(full_python_code, safer_globals)
entrypoint = config['entrypoint']
if (entrypoint not in safer_globals or
not inspect.isfunction(safer_globals.get(entrypoint))):
raise NameError("main function not defined")
entrypoint = safer_globals.get(entrypoint)
if inspect.iscoroutinefunction(entrypoint):
ret = loop.run_until_complete(asyncio.wait_for(entrypoint(args), config['timeout']))
return Result.ok(json.dumps(ret))
else:
return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value,
"Unable to execute non-asynchronous function")
except asyncio.TimeoutError:
return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value,
"[TimeoutError] Execution timed out")
except Exception as err:
return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, _get_except_msg(err, config))
finally:
loop.close()


def _get_except_msg(error: Any, config: Dict[str, object]) -> str:
if isinstance(error, SyntaxError):
error_msg = f"{error.msg} at line {error.lineno - config['header_len']}, column {error.offset}: {error.text}"
elif isinstance(error, KeyError):
error_msg = f"key {str(error)} do not exist"
else:
error_msg = str(error)
return f"[{error.__class__.__name__}] {error_msg}"


def _get_free_process_pool(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index):
lock = pools[index][0]
if lock.acquire():
return lock
raise multiprocessing.TimeoutError()


def execute_node_impl(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index: int,
args: Dict[str, object], code: str, config: Dict[str, object]):
match = _validate_escape(code)
if match is not None:
return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value,
f'{match.group()} is not allowed in code node')
lock = _get_free_process_pool(pools, index)
pool = pools[index][1]
try:
result = pool.apply_async(_execute_code_with_restricted_python, args=[args, code, config])
return result.get(config['timeout'])
except multiprocessing.TimeoutError:
index = pools.index((lock, pool))
pool.terminate()
pools[index] = (lock, multiprocessing.Pool(processes=1))
return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value,
"[TimeoutError] Execution timed out")
finally:
lock.release()


def _validate_escape(code: str) -> bool:
# 校验代码中是否存在获取栈帧的字段,禁用可能用于沙箱逃逸的端
pattern = r'.gi_frame|.tb_frame|__[a-zA-Z]+__'
return re.search(pattern, code)
Loading