From ad189ac041e9a2f27dc31923bcfa8dc7fb4df9cb Mon Sep 17 00:00:00 2001 From: Reed Johnson Date: Wed, 15 Jan 2025 07:08:58 -0600 Subject: [PATCH 1/5] Modbus TCP Block --- .../workflows/enterprise_blocks/loader.py | 4 + .../sinks/PLC_modbus/__init__.py | 0 .../enterprise_blocks/sinks/PLC_modbus/v1.py | 160 ++++++++++++++++++ requirements/_requirements.txt | 1 + .../test_workflow_with_modbus_tcp.py | 37 ++++ .../core_steps/sinks/test_modbus_tcp.py | 159 +++++++++++++++++ 6 files changed, 361 insertions(+) create mode 100644 inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/__init__.py create mode 100644 inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py create mode 100644 tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py create mode 100644 tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py diff --git a/inference/enterprise/workflows/enterprise_blocks/loader.py b/inference/enterprise/workflows/enterprise_blocks/loader.py index 5aa0e25151..8d3ac3a73a 100644 --- a/inference/enterprise/workflows/enterprise_blocks/loader.py +++ b/inference/enterprise/workflows/enterprise_blocks/loader.py @@ -10,6 +10,9 @@ from inference.enterprise.workflows.enterprise_blocks.sinks.PLCethernetIP.v1 import ( PLCBlockV1, ) +from inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1 import ( + ModbusTCPBlockV1, +) def load_enterprise_blocks() -> List[Type[WorkflowBlock]]: @@ -17,4 +20,5 @@ def load_enterprise_blocks() -> List[Type[WorkflowBlock]]: OPCWriterSinkBlockV1, MQTTWriterSinkBlockV1, PLCBlockV1, + ModbusTCPBlockV1, ] diff --git a/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/__init__.py b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py new file mode 100644 index 0000000000..0c71f4a9ac --- /dev/null +++ b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py @@ -0,0 +1,160 @@ +# modbus_tcp_block.py + +from typing import Dict, List, Optional, Type, Union +from pydantic import ConfigDict, Field +from typing_extensions import Literal + +from pymodbus.client import ModbusTcpClient as ModbusClient + +from inference.core.workflows.execution_engine.entities.base import ( + OutputDefinition, + WorkflowImageData, + VideoMetadata, +) +from inference.core.workflows.execution_engine.entities.types import ( + LIST_OF_VALUES_KIND, + STRING_KIND, + WorkflowParameterSelector, + Selector, +) +from inference.core.workflows.prototypes.block import ( + WorkflowBlock, + WorkflowBlockManifest, +) + +LONG_DESCRIPTION = """ +This **Modbus TCP** block integrates a Roboflow Workflow with a PLC using Modbus TCP. +It can: +- Read registers from a PLC if `mode='read'`. +- Write registers to a PLC if `mode='write'`. +- Perform both read and write in a single run if `mode='read_and_write'`. + +**Parameters depending on mode:** +- If `mode='read'` or `mode='read_and_write'`, `tags_to_read` must be provided as a list of register addresses. +- If `mode='write'` or `mode='read_and_write'`, `tags_to_write` must be provided as a dictionary mapping register addresses to values. + +If a read or write operation fails, an error message is printed to the terminal, +and the corresponding entry in the output dictionary is set to "ReadFailure" or "WriteFailure". +""" + +class ModbusTCPBlockManifest(WorkflowBlockManifest): + model_config = ConfigDict( + json_schema_extra={ + "name": "Modbus TCP", + "version": "v1", + "short_description": "Generic Modbus TCP read/write block using pymodbus.", + "long_description": LONG_DESCRIPTION, + "license": "Apache-2.0", + "block_type": "analytics", + } + ) + + type: Literal["roboflow_core/modbus_tcp@v1"] + + plc_ip: Union[str, WorkflowParameterSelector(kind=[STRING_KIND])] = Field( + description="IP address of the target PLC.", + examples=["10.0.1.31"] + ) + plc_port: int = Field( + default=502, + description="Port number for Modbus TCP communication.", + examples=[502] + ) + mode: Literal["read", "write", "read_and_write"] = Field( + description="Mode of operation: 'read', 'write', or 'read_and_write'.", + examples=["read", "write", "read_and_write"] + ) + tags_to_read: Union[List[int], WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND])] = Field( + default=[], + description="List of register addresses to read. Applicable if mode='read' or 'read_and_write'.", + examples=[[1000, 1001]] + ) + tags_to_write: Union[Dict[int, int], WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND])] = Field( + default={}, + description="Dictionary mapping register addresses to values to write. Applicable if mode='write' or 'read_and_write'.", + examples=[{1005: 25}] + ) + depends_on: Selector() = Field( + description="Reference to the step output this block depends on.", + examples=["$steps.some_previous_step"] + ) + + @classmethod + def describe_outputs(cls) -> List[OutputDefinition]: + return [OutputDefinition(name="modbus_results", kind=[LIST_OF_VALUES_KIND])] + + @classmethod + def get_execution_engine_compatibility(cls) -> Optional[str]: + return ">=1.0.0,<2.0.0" + +class ModbusTCPBlockV1(WorkflowBlock): + """A Modbus TCP communication block using pymodbus. + + Supports: + - 'read': Reads specified registers. + - 'write': Writes values to specified registers. + - 'read_and_write': Reads and writes in one execution. + + On failures, errors are printed and marked as "ReadFailure" or "WriteFailure". + """ + + @classmethod + def get_manifest(cls) -> Type[WorkflowBlockManifest]: + return ModbusTCPBlockManifest + + def run( + self, + plc_ip: str, + plc_port: int, + mode: str, + tags_to_read: List[int], + tags_to_write: Dict[int, int], + depends_on: any, + image: Optional[WorkflowImageData] = None, + metadata: Optional[VideoMetadata] = None, + ) -> dict: + read_results = {} + write_results = {} + + client = ModbusClient(plc_ip, port=plc_port) + if not client.connect(): + print("Failed to connect to PLC") + return {"modbus_results": [{"error": "ConnectionFailure"}]} + + # If mode involves reading + if mode in ["read", "read_and_write"]: + for address in tags_to_read: + try: + response = client.read_holding_registers(address, 1) + if not response.isError(): + read_results[address] = response.registers[0] if response.registers else None + else: + print(f"Error reading register {address}: {response}") + read_results[address] = "ReadFailure" + except Exception as e: + print(f"Exception reading register {address}: {e}") + read_results[address] = "ReadFailure" + + # If mode involves writing + if mode in ["write", "read_and_write"]: + for address, value in tags_to_write.items(): + try: + response = client.write_register(address, value) + if not response.isError(): + write_results[address] = "WriteSuccess" + else: + print(f"Error writing register {address} with value {value}: {response}") + write_results[address] = "WriteFailure" + except Exception as e: + print(f"Exception writing register {address} with value {value}: {e}") + write_results[address] = "WriteFailure" + + client.close() + + modbus_output = {} + if read_results: + modbus_output["read"] = read_results + if write_results: + modbus_output["write"] = write_results + + return {"modbus_results": [modbus_output]} diff --git a/requirements/_requirements.txt b/requirements/_requirements.txt index c99c7a9f5e..c8fc226259 100644 --- a/requirements/_requirements.txt +++ b/requirements/_requirements.txt @@ -39,3 +39,4 @@ slack-sdk~=3.33.4 twilio~=9.3.7 httpx>=0.25.1,<0.28.0 # must be pinned as bc in 0.28.0 is causing Anthropics to fail pylogix==1.0.5 +pymodbus==3.8.3 \ No newline at end of file diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py b/tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py new file mode 100644 index 0000000000..7a8699325c --- /dev/null +++ b/tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py @@ -0,0 +1,37 @@ +import unittest +import threading +import pytest +from inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1 import ModbusTCPBlockV1 + +@pytest.mark.timeout(5) +def test_successful_read_operation(fake_modbus_server): + # given + block = ModbusTCPBlockV1() + expected_value = 123 # Example register value expected from the fake server + + # Configure the fake server to respond with the expected value for register 1000 + fake_modbus_server.set_register(1000, expected_value) + fake_modbus_server.registers_to_expect_read = [1000] + + # Start the fake Modbus server in a separate thread + server_thread = threading.Thread(target=fake_modbus_server.start) + server_thread.start() + + # when + result = block.run( + plc_ip=fake_modbus_server.host, + plc_port=fake_modbus_server.port, + mode='read', + tags_to_read=[1000], + tags_to_write={}, + depends_on=None + ) + + server_thread.join(timeout=2) + + # then + assert 'modbus_results' in result + results = result['modbus_results'][0] + assert 'read' in results + read_results = results['read'] + assert read_results.get(1000) == expected_value diff --git a/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py b/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py new file mode 100644 index 0000000000..8481c6c051 --- /dev/null +++ b/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py @@ -0,0 +1,159 @@ +import unittest +from unittest.mock import patch, MagicMock +from modbus_tcp_block import ModbusTCPBlockV1 + +class TestModbusTCPBlockV1(unittest.TestCase): inference.enterprise.workflows.enterprise_blocks.sinks.mqtt_writer.v1.mqtt.Clien + @patch('inference.enterprise.workflows.enterprise_blocks.PLC_writer.ModbusClient') + def test_successful_read_mode(self, MockModbusClient): + # Arrange + mock_client = MagicMock() + MockModbusClient.return_value = mock_client + mock_client.connect.return_value = True + + # Simulate successful read for each register + def fake_read(address, count): + response = MagicMock() + response.isError.return_value = False + response.registers = [123] # Sample value + return response + mock_client.read_holding_registers.side_effect = fake_read + + block = ModbusTCPBlockV1() + + # Act + result = block.run( + plc_ip='10.0.1.31', + plc_port=502, + mode='read', + tags_to_read=[1000, 1001], + tags_to_write={}, + depends_on=None + ) + + # Assert + self.assertIn('modbus_results', result) + results = result['modbus_results'][0] + self.assertIn('read', results) + self.assertEqual(results['read'][1000], 123) + self.assertEqual(results['read'][1001], 123) + + @patch('modbus_tcp_block.ModbusClient') + def test_successful_write_mode(self, MockModbusClient): + # Arrange + mock_client = MagicMock() + MockModbusClient.return_value = mock_client + mock_client.connect.return_value = True + + def fake_write_register(address, value): + response = MagicMock() + response.isError.return_value = False + return response + mock_client.write_register.side_effect = fake_write_register + + block = ModbusTCPBlockV1() + + # Act + result = block.run( + plc_ip='10.0.1.31', + plc_port=502, + mode='write', + tags_to_read=[], + tags_to_write={1005: 25}, + depends_on=None + ) + + # Assert + self.assertIn('modbus_results', result) + results = result['modbus_results'][0] + self.assertIn('write', results) + self.assertEqual(results['write'][1005], 'WriteSuccess') + + @patch('modbus_tcp_block.ModbusClient') + def test_connection_failure(self, MockModbusClient): + # Arrange + mock_client = MagicMock() + MockModbusClient.return_value = mock_client + mock_client.connect.return_value = False + + block = ModbusTCPBlockV1() + + # Act + result = block.run( + plc_ip='10.0.1.31', + plc_port=502, + mode='read', + tags_to_read=[1000], + tags_to_write={}, + depends_on=None + ) + + # Assert + self.assertIn('modbus_results', result) + results = result['modbus_results'][0] + self.assertIn('error', results) + self.assertEqual(results['error'], 'ConnectionFailure') + + @patch('modbus_tcp_block.ModbusClient') + def test_read_failure(self, MockModbusClient): + # Arrange + mock_client = MagicMock() + MockModbusClient.return_value = mock_client + mock_client.connect.return_value = True + + def fake_read(address, count): + response = MagicMock() + response.isError.return_value = True + return response + mock_client.read_holding_registers.side_effect = fake_read + + block = ModbusTCPBlockV1() + + # Act + result = block.run( + plc_ip='10.0.1.31', + plc_port=502, + mode='read', + tags_to_read=[1000], + tags_to_write={}, + depends_on=None + ) + + # Assert + self.assertIn('modbus_results', result) + results = result['modbus_results'][0] + self.assertIn('read', results) + self.assertEqual(results['read'][1000], 'ReadFailure') + + @patch('modbus_tcp_block.ModbusClient') + def test_write_failure(self, MockModbusClient): + # Arrange + mock_client = MagicMock() + MockModbusClient.return_value = mock_client + mock_client.connect.return_value = True + + def fake_write_register(address, value): + response = MagicMock() + response.isError.return_value = True + return response + mock_client.write_register.side_effect = fake_write_register + + block = ModbusTCPBlockV1() + + # Act + result = block.run( + plc_ip='10.0.1.31', + plc_port=502, + mode='write', + tags_to_read=[], + tags_to_write={1005: 25}, + depends_on=None + ) + + # Assert + self.assertIn('modbus_results', result) + results = result['modbus_results'][0] + self.assertIn('write', results) + self.assertEqual(results['write'][1005], 'WriteFailure') + +if __name__ == '__main__': + unittest.main() From beb201bae7a8f6411b24e72582d4647101ccf7e7 Mon Sep 17 00:00:00 2001 From: Reed Johnson Date: Thu, 16 Jan 2025 07:17:28 -0600 Subject: [PATCH 2/5] Touchups and Unit Test --- .../workflows/enterprise_blocks/loader.py | 6 +- .../enterprise_blocks/sinks/PLC_modbus/v1.py | 61 +++++++++++-------- .../test_workflow_with_modbus_tcp.py | 37 ----------- .../core_steps/sinks/test_modbus_tcp.py | 40 ++++++------ 4 files changed, 58 insertions(+), 86 deletions(-) delete mode 100644 tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py diff --git a/inference/enterprise/workflows/enterprise_blocks/loader.py b/inference/enterprise/workflows/enterprise_blocks/loader.py index 8d3ac3a73a..7fb8c82c58 100644 --- a/inference/enterprise/workflows/enterprise_blocks/loader.py +++ b/inference/enterprise/workflows/enterprise_blocks/loader.py @@ -7,12 +7,12 @@ from inference.enterprise.workflows.enterprise_blocks.sinks.opc_writer.v1 import ( OPCWriterSinkBlockV1, ) -from inference.enterprise.workflows.enterprise_blocks.sinks.PLCethernetIP.v1 import ( - PLCBlockV1, -) from inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1 import ( ModbusTCPBlockV1, ) +from inference.enterprise.workflows.enterprise_blocks.sinks.PLCethernetIP.v1 import ( + PLCBlockV1, +) def load_enterprise_blocks() -> List[Type[WorkflowBlock]]: diff --git a/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py index 0c71f4a9ac..73fb614229 100644 --- a/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py +++ b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py @@ -1,21 +1,19 @@ -# modbus_tcp_block.py - from typing import Dict, List, Optional, Type, Union -from pydantic import ConfigDict, Field -from typing_extensions import Literal +from pydantic import ConfigDict, Field from pymodbus.client import ModbusTcpClient as ModbusClient +from typing_extensions import Literal from inference.core.workflows.execution_engine.entities.base import ( OutputDefinition, - WorkflowImageData, VideoMetadata, + WorkflowImageData, ) from inference.core.workflows.execution_engine.entities.types import ( LIST_OF_VALUES_KIND, STRING_KIND, - WorkflowParameterSelector, Selector, + WorkflowParameterSelector, ) from inference.core.workflows.prototypes.block import ( WorkflowBlock, @@ -30,17 +28,18 @@ - Perform both read and write in a single run if `mode='read_and_write'`. **Parameters depending on mode:** -- If `mode='read'` or `mode='read_and_write'`, `tags_to_read` must be provided as a list of register addresses. -- If `mode='write'` or `mode='read_and_write'`, `tags_to_write` must be provided as a dictionary mapping register addresses to values. +- If `mode='read'` or `mode='read_and_write'`, `registers_to_read` must be provided as a list of register addresses. +- If `mode='write'` or `mode='read_and_write'`, `registers_to_write` must be provided as a dictionary mapping register addresses to values. If a read or write operation fails, an error message is printed to the terminal, and the corresponding entry in the output dictionary is set to "ReadFailure" or "WriteFailure". """ + class ModbusTCPBlockManifest(WorkflowBlockManifest): model_config = ConfigDict( json_schema_extra={ - "name": "Modbus TCP", + "name": "PLC ModbusTCP", "version": "v1", "short_description": "Generic Modbus TCP read/write block using pymodbus.", "long_description": LONG_DESCRIPTION, @@ -52,31 +51,34 @@ class ModbusTCPBlockManifest(WorkflowBlockManifest): type: Literal["roboflow_core/modbus_tcp@v1"] plc_ip: Union[str, WorkflowParameterSelector(kind=[STRING_KIND])] = Field( - description="IP address of the target PLC.", - examples=["10.0.1.31"] + description="IP address of the target PLC.", examples=["10.0.1.31"] ) plc_port: int = Field( default=502, description="Port number for Modbus TCP communication.", - examples=[502] + examples=[502], ) mode: Literal["read", "write", "read_and_write"] = Field( description="Mode of operation: 'read', 'write', or 'read_and_write'.", - examples=["read", "write", "read_and_write"] + examples=["read", "write", "read_and_write"], ) - tags_to_read: Union[List[int], WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND])] = Field( + registers_to_read: Union[ + List[int], WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND]) + ] = Field( default=[], description="List of register addresses to read. Applicable if mode='read' or 'read_and_write'.", - examples=[[1000, 1001]] + examples=[[1000, 1001]], ) - tags_to_write: Union[Dict[int, int], WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND])] = Field( + registers_to_write: Union[ + Dict[int, int], WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND]) + ] = Field( default={}, description="Dictionary mapping register addresses to values to write. Applicable if mode='write' or 'read_and_write'.", - examples=[{1005: 25}] + examples=[{1005: 25}], ) depends_on: Selector() = Field( description="Reference to the step output this block depends on.", - examples=["$steps.some_previous_step"] + examples=["$steps.some_previous_step"], ) @classmethod @@ -87,6 +89,7 @@ def describe_outputs(cls) -> List[OutputDefinition]: def get_execution_engine_compatibility(cls) -> Optional[str]: return ">=1.0.0,<2.0.0" + class ModbusTCPBlockV1(WorkflowBlock): """A Modbus TCP communication block using pymodbus. @@ -107,8 +110,8 @@ def run( plc_ip: str, plc_port: int, mode: str, - tags_to_read: List[int], - tags_to_write: Dict[int, int], + registers_to_read: List[int], + registers_to_write: Dict[int, int], depends_on: any, image: Optional[WorkflowImageData] = None, metadata: Optional[VideoMetadata] = None, @@ -123,11 +126,13 @@ def run( # If mode involves reading if mode in ["read", "read_and_write"]: - for address in tags_to_read: + for address in registers_to_read: try: - response = client.read_holding_registers(address, 1) + response = client.read_holding_registers(address) if not response.isError(): - read_results[address] = response.registers[0] if response.registers else None + read_results[address] = ( + response.registers[0] if response.registers else None + ) else: print(f"Error reading register {address}: {response}") read_results[address] = "ReadFailure" @@ -137,16 +142,20 @@ def run( # If mode involves writing if mode in ["write", "read_and_write"]: - for address, value in tags_to_write.items(): + for address, value in registers_to_write.items(): try: response = client.write_register(address, value) if not response.isError(): write_results[address] = "WriteSuccess" else: - print(f"Error writing register {address} with value {value}: {response}") + print( + f"Error writing register {address} with value {value}: {response}" + ) write_results[address] = "WriteFailure" except Exception as e: - print(f"Exception writing register {address} with value {value}: {e}") + print( + f"Exception writing register {address} with value {value}: {e}" + ) write_results[address] = "WriteFailure" client.close() diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py b/tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py deleted file mode 100644 index 7a8699325c..0000000000 --- a/tests/workflows/integration_tests/execution/test_workflow_with_modbus_tcp.py +++ /dev/null @@ -1,37 +0,0 @@ -import unittest -import threading -import pytest -from inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1 import ModbusTCPBlockV1 - -@pytest.mark.timeout(5) -def test_successful_read_operation(fake_modbus_server): - # given - block = ModbusTCPBlockV1() - expected_value = 123 # Example register value expected from the fake server - - # Configure the fake server to respond with the expected value for register 1000 - fake_modbus_server.set_register(1000, expected_value) - fake_modbus_server.registers_to_expect_read = [1000] - - # Start the fake Modbus server in a separate thread - server_thread = threading.Thread(target=fake_modbus_server.start) - server_thread.start() - - # when - result = block.run( - plc_ip=fake_modbus_server.host, - plc_port=fake_modbus_server.port, - mode='read', - tags_to_read=[1000], - tags_to_write={}, - depends_on=None - ) - - server_thread.join(timeout=2) - - # then - assert 'modbus_results' in result - results = result['modbus_results'][0] - assert 'read' in results - read_results = results['read'] - assert read_results.get(1000) == expected_value diff --git a/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py b/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py index 8481c6c051..7e7a05312d 100644 --- a/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py +++ b/tests/workflows/unit_tests/core_steps/sinks/test_modbus_tcp.py @@ -1,9 +1,9 @@ import unittest from unittest.mock import patch, MagicMock -from modbus_tcp_block import ModbusTCPBlockV1 +from inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1 import ModbusTCPBlockV1 -class TestModbusTCPBlockV1(unittest.TestCase): inference.enterprise.workflows.enterprise_blocks.sinks.mqtt_writer.v1.mqtt.Clien - @patch('inference.enterprise.workflows.enterprise_blocks.PLC_writer.ModbusClient') +class TestModbusTCPBlockV1(unittest.TestCase): + @patch('inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1.ModbusClient') def test_successful_read_mode(self, MockModbusClient): # Arrange mock_client = MagicMock() @@ -11,7 +11,7 @@ def test_successful_read_mode(self, MockModbusClient): mock_client.connect.return_value = True # Simulate successful read for each register - def fake_read(address, count): + def fake_read(address): response = MagicMock() response.isError.return_value = False response.registers = [123] # Sample value @@ -25,8 +25,8 @@ def fake_read(address, count): plc_ip='10.0.1.31', plc_port=502, mode='read', - tags_to_read=[1000, 1001], - tags_to_write={}, + registers_to_read=[1000, 1001], + registers_to_write={}, depends_on=None ) @@ -37,7 +37,7 @@ def fake_read(address, count): self.assertEqual(results['read'][1000], 123) self.assertEqual(results['read'][1001], 123) - @patch('modbus_tcp_block.ModbusClient') + @patch('inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1.ModbusClient') def test_successful_write_mode(self, MockModbusClient): # Arrange mock_client = MagicMock() @@ -54,11 +54,11 @@ def fake_write_register(address, value): # Act result = block.run( - plc_ip='10.0.1.31', + plc_ip='10.0.0.205', plc_port=502, mode='write', - tags_to_read=[], - tags_to_write={1005: 25}, + registers_to_read=[], + registers_to_write={1005: 25}, depends_on=None ) @@ -68,7 +68,7 @@ def fake_write_register(address, value): self.assertIn('write', results) self.assertEqual(results['write'][1005], 'WriteSuccess') - @patch('modbus_tcp_block.ModbusClient') + @patch('inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1.ModbusClient') def test_connection_failure(self, MockModbusClient): # Arrange mock_client = MagicMock() @@ -82,8 +82,8 @@ def test_connection_failure(self, MockModbusClient): plc_ip='10.0.1.31', plc_port=502, mode='read', - tags_to_read=[1000], - tags_to_write={}, + registers_to_read=[1000], + registers_to_write={}, depends_on=None ) @@ -93,14 +93,14 @@ def test_connection_failure(self, MockModbusClient): self.assertIn('error', results) self.assertEqual(results['error'], 'ConnectionFailure') - @patch('modbus_tcp_block.ModbusClient') + @patch('inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1.ModbusClient') def test_read_failure(self, MockModbusClient): # Arrange mock_client = MagicMock() MockModbusClient.return_value = mock_client mock_client.connect.return_value = True - def fake_read(address, count): + def fake_read(address): response = MagicMock() response.isError.return_value = True return response @@ -113,8 +113,8 @@ def fake_read(address, count): plc_ip='10.0.1.31', plc_port=502, mode='read', - tags_to_read=[1000], - tags_to_write={}, + registers_to_read=[1000], + registers_to_write={}, depends_on=None ) @@ -124,7 +124,7 @@ def fake_read(address, count): self.assertIn('read', results) self.assertEqual(results['read'][1000], 'ReadFailure') - @patch('modbus_tcp_block.ModbusClient') + @patch('inference.enterprise.workflows.enterprise_blocks.sinks.PLC_modbus.v1.ModbusClient') def test_write_failure(self, MockModbusClient): # Arrange mock_client = MagicMock() @@ -144,8 +144,8 @@ def fake_write_register(address, value): plc_ip='10.0.1.31', plc_port=502, mode='write', - tags_to_read=[], - tags_to_write={1005: 25}, + registers_to_read=[], + registers_to_write={1005: 25}, depends_on=None ) From 4703d129e76b8507b3891f67f0de1399d9313ded Mon Sep 17 00:00:00 2001 From: Reed Johnson Date: Fri, 17 Jan 2025 09:44:30 -0600 Subject: [PATCH 3/5] Update pymodbus version --- requirements/_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/_requirements.txt b/requirements/_requirements.txt index c8fc226259..fccbc6eb88 100644 --- a/requirements/_requirements.txt +++ b/requirements/_requirements.txt @@ -39,4 +39,4 @@ slack-sdk~=3.33.4 twilio~=9.3.7 httpx>=0.25.1,<0.28.0 # must be pinned as bc in 0.28.0 is causing Anthropics to fail pylogix==1.0.5 -pymodbus==3.8.3 \ No newline at end of file +pymodbus~=3.8.1 \ No newline at end of file From 6a113b608fc4de02d3bccd1e2abae0eb6b551ed1 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Fri, 24 Jan 2025 12:05:15 +0100 Subject: [PATCH 4/5] Add support for python3.8 when installing pymodbus --- requirements/_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/_requirements.txt b/requirements/_requirements.txt index fccbc6eb88..a1729a5ad6 100644 --- a/requirements/_requirements.txt +++ b/requirements/_requirements.txt @@ -39,4 +39,4 @@ slack-sdk~=3.33.4 twilio~=9.3.7 httpx>=0.25.1,<0.28.0 # must be pinned as bc in 0.28.0 is causing Anthropics to fail pylogix==1.0.5 -pymodbus~=3.8.1 \ No newline at end of file +pymodbus>=3.6.9,<=3.8.3 From e46550e2258ba701572a7f1e654c811cd702ad86 Mon Sep 17 00:00:00 2001 From: Grzegorz Klimaszewski <166530809+grzegorz-roboflow@users.noreply.github.com> Date: Fri, 24 Jan 2025 12:12:55 +0100 Subject: [PATCH 5/5] Reuse modbus client for the duration of process --- .../enterprise_blocks/sinks/PLC_modbus/v1.py | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py index 73fb614229..51c935581b 100644 --- a/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py +++ b/inference/enterprise/workflows/enterprise_blocks/sinks/PLC_modbus/v1.py @@ -4,6 +4,7 @@ from pymodbus.client import ModbusTcpClient as ModbusClient from typing_extensions import Literal +from inference.core.logger import logger from inference.core.workflows.execution_engine.entities.base import ( OutputDefinition, VideoMetadata, @@ -101,6 +102,16 @@ class ModbusTCPBlockV1(WorkflowBlock): On failures, errors are printed and marked as "ReadFailure" or "WriteFailure". """ + def __init__(self): + self.client: Optional[ModbusClient] = None + + def __del__(self): + if self.client: + try: + self.client.close() + except Exception as exc: + logger.debug("Failed to release modbus client: %s", exc) + @classmethod def get_manifest(cls) -> Type[WorkflowBlockManifest]: return ModbusTCPBlockManifest @@ -119,16 +130,17 @@ def run( read_results = {} write_results = {} - client = ModbusClient(plc_ip, port=plc_port) - if not client.connect(): - print("Failed to connect to PLC") - return {"modbus_results": [{"error": "ConnectionFailure"}]} + if not self.client: + self.client: ModbusClient = ModbusClient(plc_ip, port=plc_port) + if not self.client.connect(): + print("Failed to connect to PLC") + return {"modbus_results": [{"error": "ConnectionFailure"}]} # If mode involves reading if mode in ["read", "read_and_write"]: for address in registers_to_read: try: - response = client.read_holding_registers(address) + response = self.client.read_holding_registers(address) if not response.isError(): read_results[address] = ( response.registers[0] if response.registers else None @@ -144,7 +156,7 @@ def run( if mode in ["write", "read_and_write"]: for address, value in registers_to_write.items(): try: - response = client.write_register(address, value) + response = self.client.write_register(address, value) if not response.isError(): write_results[address] = "WriteSuccess" else: @@ -158,8 +170,6 @@ def run( ) write_results[address] = "WriteFailure" - client.close() - modbus_output = {} if read_results: modbus_output["read"] = read_results