Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 320 additions & 0 deletions unilabos/devices/liquid_handling/prcxi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
prcxi9300.py – 完整版 PRCXI 9300 Python SDK(支持新增方案)
"""

import socket, json, contextlib
from typing import Any, List, Dict, Optional


class PRCXIError(RuntimeError):
"""Lilith 返回 Success=false 时抛出的业务异常"""


class PRCXI9300:
# ---------------------------------------------------- 基础初始化
def __init__(self, host: str = "127.0.0.1", port: int = 9999,
timeout: float = 10.0) -> None:
self.host, self.port, self.timeout = host, port, timeout

# ---------------------------------------------------- 公共底座
@staticmethod
def _len_prefix(n: int) -> bytes:
return bytes.fromhex(format(n, "016x"))

def _raw_request(self, payload: str) -> str:
with contextlib.closing(socket.socket()) as sock:
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
data = payload.encode()
sock.sendall(self._len_prefix(len(data)) + data)

chunks, first = [], True
while True:
chunk = sock.recv(4096)
if not chunk:
break
if first:
chunk, first = chunk[8:], False # 去掉首8字节长度
chunks.append(chunk)
return b"".join(chunks).decode()

def _call(self, service: str, method: str,
params: Optional[list] = None) -> Any:
payload = json.dumps(
{"ServiceName": service,
"MethodName": method,
"Paramters": params or []},
separators=(",", ":")
)
resp = json.loads(self._raw_request(payload))
if not resp.get("Success", False):
raise PRCXIError(resp.get("Msg", "Unknown error"))
data = resp.get("Data")
try:
return json.loads(data)
except (TypeError, json.JSONDecodeError):
return data

# ---------------------------------------------------- 方案相关(ISolution)
def list_solutions(self) -> List[Dict[str, Any]]:
"""GetSolutionList"""
return self._call("ISolution", "GetSolutionList")

def load_solution(self, solution_id: str) -> bool:
"""LoadSolution"""
return self._call("ISolution", "LoadSolution", [solution_id])

def add_solution(self, name: str, matrix_id: str,
steps: List[Dict[str, Any]]) -> str:
"""AddSolution → 返回新方案 GUID"""
return self._call("ISolution", "AddSolution",
[name, matrix_id, steps])

# ---------------------------------------------------- 自动化控制(IAutomation)
def start(self) -> bool:
return self._call("IAutomation", "Start")

def stop(self) -> bool:
"""Stop"""
return self._call("IAutomation", "Stop")

def reset(self) -> bool:
"""Reset"""
return self._call("IAutomation", "Reset")

def pause(self) -> bool:
"""Pause"""
return self._call("IAutomation", "Pause")

def resume(self) -> bool:
"""Resume"""
return self._call("IAutomation", "Resume")

def get_error_code(self) -> Optional[str]:
"""GetErrorCode"""
return self._call("IAutomation", "GetErrorCode")

def clear_error_code(self) -> bool:
"""RemoveErrorCodet"""
return self._call("IAutomation", "RemoveErrorCodet")

# ---------------------------------------------------- 运行状态(IMachineState)
def step_state_list(self) -> List[Dict[str, Any]]:
"""GetStepStateList"""
return self._call("IMachineState", "GetStepStateList")

def step_status(self, seq_num: int) -> Dict[str, Any]:
"""GetStepStatus"""
return self._call("IMachineState", "GetStepStatus", [seq_num])

def step_state(self, seq_num: int) -> Dict[str, Any]:
"""GetStepState"""
return self._call("IMachineState", "GetStepState", [seq_num])

def axis_location(self, axis_num: int = 1) -> Dict[str, Any]:
"""GetLocation"""
return self._call("IMachineState", "GetLocation", [axis_num])

# ---------------------------------------------------- 版位矩阵(IMatrix)
def list_matrices(self) -> List[Dict[str, Any]]:
"""GetWorkTabletMatrices"""
return self._call("IMatrix", "GetWorkTabletMatrices")

def matrix_by_id(self, matrix_id: str) -> Dict[str, Any]:
"""GetWorkTabletMatrixById"""
return self._call("IMatrix", "GetWorkTabletMatrixById", [matrix_id])

# ---------------------------------------------------- 辅助:一键运行
def run_solution(self, solution_id: str, channel_idx: int = 1) -> None:
self.load_solution(solution_id)
self.start(channel_idx)


# ---------------------------------------------------- 辅助类 StepData 工具
def build_step(
axis: str,
function: str,
dosage: int,
plate_no: int,
is_whole_plate: bool,
hole_row: int,
hole_col: int,
blending_times: int,
balance_height: int,
plate_or_hole: str,
hole_numbers: str,
assist_fun1: str = "",
assist_fun2: str = "",
assist_fun3: str = "",
assist_fun4: str = "",
assist_fun5: str = "",
liquid_method: str = "NormalDispense"
) -> Dict[str, Any]:
return {
"StepAxis": axis,
"Function": function,
"DosageNum": dosage,
"PlateNo": plate_no,
"IsWholePlate": is_whole_plate,
"HoleRow": hole_row,
"HoleCol": hole_col,
"BlendingTimes": blending_times,
"BalanceHeight": balance_height,
"PlateOrHoleNum": plate_or_hole,
"AssistFun1": assist_fun1,
"AssistFun2": assist_fun2,
"AssistFun3": assist_fun3,
"AssistFun4": assist_fun4,
"AssistFun5": assist_fun5,
"HoleNumbers": hole_numbers,
"LiquidDispensingMethod": liquid_method
}

if __name__ == "__main__":
# 连接
# client = PRCXI9300("192.168.43.236", 9999)
client = PRCXI9300("10.10.35.80", 9999)


# 获取方案
solutions = client.list_solutions()
print(solutions)

# client.reset()

solutions = client.list_solutions()
if not solutions:
raise RuntimeError("未获取到任何方案")

solution_id = solutions[0]["Id"] # ✅ 这是方案 Id
client.load_solution(solution_id)
client.start() # 默认通道 1
#
# #
# # # 查询状态
# # steps = client.step_state_list()
# # for step in steps:
# # print(f"Step {step['SequenceNumber']} status: {step['ProcessState']}")
# #
# # # 暂停/继续/停止
# # client.pause()
# # client.resume()
# # client.stop()
# # client.reset()
# #
# # # 错误码处理
# # error = client.get_error_code()
# # print(f"Error code: {error}")
# # client.clear_error_code()
# #
# # # 新增方案
# # steps = [
# # build_step("Left", "Load", 0, 1, False, 1, 1, 0, 0, "H1-8,T1", "1,2,3,4,5,6,7,8"),
# # build_step("Left", "Imbibing", 10, 2, True, 1, 1, 0, 0, "T2", "1"),
# # build_step("Left", "Tapping", 10, 4, False, 1, 1, 0, 0, "H1-8,T4", "1,2,3,4,5,6,7,8"),
# # build_step("Left", "Blending", 10, 3, True, 1, 1, 5, 0, "T3", "1"),
# # build_step("Left", "UnLoad", 0, 1, False, 1, 1, 0, 0, "H1-8,T1", "1,2,3,4,5,6,7,8")
# # ]
# # matrix_list = client.list_matrices()
# # matrix_id = matrix_list[0]['MatrixId']
# # #
# # new_solution_id = client.add_solution(f"test_solution", matrix_id, steps)
# # print(f"New solution created: {new_solution_id}")
#
# import time
#
# # 建立连接
# client = PRCXI9300("192.168.43.236", 9999)
#
# # 定义步骤列表
# steps = [
# build_step("Left", "Load", 0, 1, False, 1, 1, 0, 0, "H1-8,T1", "1,2,3,4,5,6,7,8"),
# build_step("Left", "Imbibing", 10, 2, True, 1, 1, 0, 0, "T2", "1"),
# build_step("Left", "Tapping", 10, 4, False, 1, 1, 0, 0, "H1-8,T4", "1,2,3,4,5,6,7,8"),
# build_step("Left", "Blending", 10, 3, True, 1, 1, 5, 0, "T3", "1"),
# build_step("Left", "UnLoad", 0, 1, False, 1, 1, 0, 0, "H1-8,T1", "1,2,3,4,5,6,7,8")
# ]
#
# # 获取可用矩阵 ID
# matrix_list = client.list_matrices()
# if not matrix_list:
# raise RuntimeError("未获取到任何矩阵 MatrixId")
# matrix_id = matrix_list[0]['MatrixId']
#
# # 创建方案
# plan_name = f"test_solution_{int(time.time())}" # 确保唯一名称
# new_solution_id = client.add_solution(plan_name, matrix_id, steps)
# print(f"✅ 新增方案: {new_solution_id}(名称: {plan_name})")
#
# # 加载并运行方案
# print("🚀 加载并启动方案 ...")
# client.load_solution(new_solution_id)
# client.start()
#
# # 可选:延时几秒后查看状态
# time.sleep(3)
# print("📋 步骤运行状态:")
# for step in client.step_state_list():
# print(f"Step {step['SequenceNumber']} 状态: {step['ProcessState']}")

# if __name__ == "__main__":
# import time
#
# # 建立连接
# client = PRCXI9300("10.10.35.80", 9999)
#
# # 定义步骤列表
# steps = [
# build_step("Left", "Load", 0, 1, False, 1, 1, 0, 0, "H1-8,T1", "1,2,3,4,5,6,7,8"),
# build_step("Left", "Imbibing", 10, 2, True, 1, 1, 0, 0, "T2", "1"),
# build_step("Left", "Tapping", 10, 4, False, 1, 1, 0, 0, "H1-8,T4", "1,2,3,4,5,6,7,8"),
# build_step("Left", "Blending", 10, 3, True, 1, 1, 5, 0, "T3", "1"),
# build_step("Left", "UnLoad", 0, 1, False, 1, 1, 0, 0, "H1-8,T1", "1,2,3,4,5,6,7,8")
# ]
#
# # 获取可用矩阵 ID
# matrix_list = client.list_matrices()
# if not matrix_list:
# raise RuntimeError("❌ 未获取到任何矩阵 MatrixId")
# matrix_id = matrix_list[0].get("MatrixId")
# print(f"✅ 获取矩阵 ID: {matrix_id}")
#
# # 创建方案
# plan_name = f"test_solution_{int(time.time())}"
# try:
# new_solution_id = client.add_solution(plan_name, matrix_id, steps)
# assert new_solution_id, "方案创建失败,返回空 ID"
# print(f"✅ 新增方案成功: {plan_name}, ID: {new_solution_id}")
# except Exception as e:
# print(f"❌ 方案创建失败: {e}")
# raise
#
# # 加载方案
# try:
# load_ok = client.load_solution(new_solution_id)
# assert load_ok, "加载方案失败"
# print("✅ 加载方案成功")
# except Exception as e:
# print(f"❌ 加载方案失败: {e}")
# raise
#
# # 启动方案
# try:
# start_ok = client.start()
# assert start_ok, "启动方案失败"
# print("✅ 方案已启动")
# except Exception as e:
# print(f"❌ 启动失败: {e}")
# raise
#
# # 等待设备反馈后获取状态
# time.sleep(3)
# try:
# steps_status = client.step_state_list()
# print(f"📋 共加载步骤: {len(steps_status)} 个")
# for step in steps_status:
# print(f"→ Step {step['SequenceNumber']} 状态: {step['ProcessState']}")
# except Exception as e:
# print(f"⚠️ 获取步骤状态失败: {e}")