From 355e5134b9e18dcbd6d5eaa4d93c60d1e25c3959 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 9 May 2024 11:03:08 +0700 Subject: [PATCH 01/32] [syft/network] getting online networks for python gateway node --- packages/syft/src/syft/client/registry.py | 61 +++++++++++-------- tests/integration/local/gateway_local_test.py | 50 +++++++++++++-- 2 files changed, 79 insertions(+), 32 deletions(-) diff --git a/packages/syft/src/syft/client/registry.py b/packages/syft/src/syft/client/registry.py index 018c101de36..5c7f32c2449 100644 --- a/packages/syft/src/syft/client/registry.py +++ b/packages/syft/src/syft/client/registry.py @@ -73,39 +73,45 @@ def online_networks(self) -> list[dict]: networks = self.all_networks def check_network(network: dict) -> dict[Any, Any] | None: - url = "http://" + network["host_or_ip"] + ":" + str(network["port"]) + "/" - try: - res = requests.get(url, timeout=DEFAULT_TIMEOUT) # nosec - online = "This is a PyGrid Network node." in res.text - except Exception: - online = False - - # networks without frontend have a /ping route in 0.7.0 - if not online: + if network["protocol"] == "http": + url = ( + "http://" + network["host_or_ip"] + ":" + str(network["port"]) + "/" + ) try: - ping_url = url + "ping" - res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) # nosec - online = res.status_code == 200 + res = requests.get(url, timeout=DEFAULT_TIMEOUT) # nosec + online = "This is a PyGrid Network node." in res.text except Exception: online = False - if online: - version = network.get("version", None) - # Check if syft version was described in NetworkRegistry - # If it's unknown, try to update it to an available version. - if not version or version == "unknown": - # If not defined, try to ask in /syft/version endpoint (supported by 0.7.0) + # networks without frontend have a /ping route in 0.7.0 + if not online: try: - version_url = url + "api/v2/metadata" - res = requests.get(version_url, timeout=DEFAULT_TIMEOUT) # nosec - if res.status_code == 200: - network["version"] = res.json()["syft_version"] - else: - network["version"] = "unknown" + ping_url = url + "ping" + res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) # nosec + online = res.status_code == 200 except Exception: - network["version"] = "unknown" + online = False + + if online: + version = network.get("version", None) + # Check if syft version was described in NetworkRegistry + # If it's unknown, try to update it to an available version. + if not version or version == "unknown": + # If not defined, try to ask in /syft/version endpoint (supported by 0.7.0) + try: + version_url = url + "api/v2/metadata" + res = requests.get(version_url, timeout=DEFAULT_TIMEOUT) # nosec + if res.status_code == 200: + network["version"] = res.json()["syft_version"] + else: + network["version"] = "unknown" + except Exception: + network["version"] = "unknown" + return network + return None + + else: return network - return None # We can use a with statement to ensure threads are cleaned up promptly with futures.ThreadPoolExecutor(max_workers=20) as executor: @@ -128,6 +134,9 @@ def __repr__(self) -> str: return "(no gateways online - try syft.gateways.all_networks to see offline gateways)" return pd.DataFrame(on).to_string() + def __len__(self) -> int: + return len(self.all_networks) + @staticmethod def create_client(network: dict[str, Any]) -> Client: # relative diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index c01052aecb0..8fa775242d3 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -1,4 +1,5 @@ # stdlib +import os from secrets import token_hex import time @@ -21,7 +22,7 @@ from syft.service.user.user_roles import ServiceRole -def launch(node_type: NodeType, association_request_auto_approval: bool = True): +def _launch(node_type: NodeType, association_request_auto_approval: bool = True): return sy.orchestra.launch( name=token_hex(8), node_type=node_type, @@ -34,7 +35,7 @@ def launch(node_type: NodeType, association_request_auto_approval: bool = True): @pytest.fixture def gateway(): - node = launch(NodeType.GATEWAY) + node = _launch(NodeType.GATEWAY) yield node node.python_node.cleanup() node.land() @@ -42,7 +43,7 @@ def gateway(): @pytest.fixture(params=[True, False]) def gateway_association_request_auto_approval(request: pytest.FixtureRequest): - node = launch(NodeType.GATEWAY, association_request_auto_approval=request.param) + node = _launch(NodeType.GATEWAY, association_request_auto_approval=request.param) yield (request.param, node) node.python_node.cleanup() node.land() @@ -50,7 +51,7 @@ def gateway_association_request_auto_approval(request: pytest.FixtureRequest): @pytest.fixture def domain(): - node = launch(NodeType.DOMAIN) + node = _launch(NodeType.DOMAIN) yield node node.python_node.cleanup() node.land() @@ -58,7 +59,7 @@ def domain(): @pytest.fixture def domain_2(): - node = launch(NodeType.DOMAIN) + node = _launch(NodeType.DOMAIN) yield node node.python_node.cleanup() node.land() @@ -66,12 +67,49 @@ def domain_2(): @pytest.fixture def enclave(): - node = launch(NodeType.ENCLAVE) + node = _launch(NodeType.ENCLAVE) yield node node.python_node.cleanup() node.land() +@pytest.fixture(scope="function") +def set_network_json_env_var(gateway): + """Set the environment variable for the network registry JSON string.""" + json_string = f""" + {{ + "2.0.0": {{ + "gateways": [ + {{ + "name": "{gateway.name}", + "host_or_ip": "localhost (in-memory)", + "protocol": "{gateway.deployment_type.value}", + "port": "{gateway.port}", + "admin_email": "support@openmined.org", + "website": "https://www.openmined.org/", + "slack": "https://slack.openmined.org/", + "slack_channel": "#support" + }} + ] + }} + }} + """ + os.environ["NETWORK_REGISTRY_JSON"] = json_string + yield + # Clean up the environment variable after all tests in the module have run + del os.environ["NETWORK_REGISTRY_JSON"] + + +@pytest.mark.local_node +def test_create_gateway(set_network_json_env_var, gateway): + assert isinstance(sy.gateways, sy.NetworkRegistry) + assert len(sy.gateways) == 1 + assert len(sy.gateways.all_networks) == 1 + assert len(sy.gateways.online_networks) == 1 + assert sy.gateways.all_networks[0]["name"] == gateway.name + assert sy.gateways.all_networks[0]["protocol"] == gateway.deployment_type.value + + @pytest.mark.local_node def test_create_gateway_client(gateway): client = gateway.client From 9316f69d873650c3fe9cc7163fe590aa7fa407c1 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 9 May 2024 14:25:22 +0700 Subject: [PATCH 02/32] [syft/network] when checking for online domains, if not able to create a guess client at a gateway, just ignore it and move on to another one Co-authored-by: Shubham Gupta --- packages/syft/src/syft/client/registry.py | 5 ++++- packages/syft/src/syft/store/sqlite_document_store.py | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/client/registry.py b/packages/syft/src/syft/client/registry.py index 5c7f32c2449..0ef8c323905 100644 --- a/packages/syft/src/syft/client/registry.py +++ b/packages/syft/src/syft/client/registry.py @@ -255,7 +255,10 @@ def check_domain( # map _all_online_domains = [] for network in networks: - network_client = NetworkRegistry.create_client(network) + try: + network_client = NetworkRegistry.create_client(network) + except Exception: + continue domains: list[NodePeer] = network_client.domains.retrieve_nodes() for domain in domains: self.all_domains[str(domain.id)] = domain diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py index f07df398dd5..3db187c35cc 100644 --- a/packages/syft/src/syft/store/sqlite_document_store.py +++ b/packages/syft/src/syft/store/sqlite_document_store.py @@ -351,9 +351,9 @@ def __iter__(self) -> Any: def __del__(self) -> None: try: self._close() - except BaseException: - print("Could not close connection") - pass + except Exception as e: + print(f"Could not close connection. Error: {e}") + raise e @serializable() From 7fe68f994f7c33d319cbcd179c5a4f54be4f943d Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 9 May 2024 17:26:09 +0700 Subject: [PATCH 03/32] [syft/network] on getting online domains by checking `domain.ping_status` for `sy.domains` --- packages/syft/src/syft/client/registry.py | 96 +++++++++---------- tests/integration/local/gateway_local_test.py | 71 ++++++++++++-- 2 files changed, 107 insertions(+), 60 deletions(-) diff --git a/packages/syft/src/syft/client/registry.py b/packages/syft/src/syft/client/registry.py index 0ef8c323905..bf35a24fd8c 100644 --- a/packages/syft/src/syft/client/registry.py +++ b/packages/syft/src/syft/client/registry.py @@ -13,7 +13,8 @@ # relative from ..service.metadata.node_metadata import NodeMetadataJSON -from ..service.network.network_service import NodePeer +from ..service.network.node_peer import NodePeer +from ..service.network.node_peer import NodePeerConnectionStatus from ..service.response import SyftException from ..types.grid_url import GridURL from ..util.constants import DEFAULT_TIMEOUT @@ -73,45 +74,39 @@ def online_networks(self) -> list[dict]: networks = self.all_networks def check_network(network: dict) -> dict[Any, Any] | None: - if network["protocol"] == "http": - url = ( - "http://" + network["host_or_ip"] + ":" + str(network["port"]) + "/" - ) + url = "http://" + network["host_or_ip"] + ":" + str(network["port"]) + "/" + try: + res = requests.get(url, timeout=DEFAULT_TIMEOUT) # nosec + online = "This is a PyGrid Network node." in res.text + except Exception: + online = False + + # networks without frontend have a /ping route in 0.7.0 + if not online: try: - res = requests.get(url, timeout=DEFAULT_TIMEOUT) # nosec - online = "This is a PyGrid Network node." in res.text + ping_url = url + "ping" + res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) # nosec + online = res.status_code == 200 except Exception: online = False - # networks without frontend have a /ping route in 0.7.0 - if not online: + if online: + version = network.get("version", None) + # Check if syft version was described in NetworkRegistry + # If it's unknown, try to update it to an available version. + if not version or version == "unknown": + # If not defined, try to ask in /syft/version endpoint (supported by 0.7.0) try: - ping_url = url + "ping" - res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) # nosec - online = res.status_code == 200 - except Exception: - online = False - - if online: - version = network.get("version", None) - # Check if syft version was described in NetworkRegistry - # If it's unknown, try to update it to an available version. - if not version or version == "unknown": - # If not defined, try to ask in /syft/version endpoint (supported by 0.7.0) - try: - version_url = url + "api/v2/metadata" - res = requests.get(version_url, timeout=DEFAULT_TIMEOUT) # nosec - if res.status_code == 200: - network["version"] = res.json()["syft_version"] - else: - network["version"] = "unknown" - except Exception: + version_url = url + "api/v2/metadata" + res = requests.get(version_url, timeout=DEFAULT_TIMEOUT) # nosec + if res.status_code == 200: + network["version"] = res.json()["syft_version"] + else: network["version"] = "unknown" - return network - return None - - else: + except Exception: + network["version"] = "unknown" return network + return None # We can use a with statement to ensure threads are cleaned up promptly with futures.ThreadPoolExecutor(max_workers=20) as executor: @@ -248,24 +243,25 @@ def check_domain( print(f"Error in checking domain with exception {e}") return None - networks = self.online_networks + # networks = self.online_networks + networks = self.all_networks - # We can use a with statement to ensure threads are cleaned up promptly - with futures.ThreadPoolExecutor(max_workers=20) as executor: - # map - _all_online_domains = [] - for network in networks: - try: - network_client = NetworkRegistry.create_client(network) - except Exception: - continue - domains: list[NodePeer] = network_client.domains.retrieve_nodes() - for domain in domains: - self.all_domains[str(domain.id)] = domain - _online_domains = list( - executor.map(lambda domain: check_domain(domain), domains) - ) - _all_online_domains += _online_domains + _all_online_domains = [] + for network in networks: + try: + network_client = NetworkRegistry.create_client(network) + except Exception as e: + print(f"Error in creating network client with exception {e}") + continue + domains: list[NodePeer] = network_client.domains.retrieve_nodes() + for domain in domains: + self.all_domains[str(domain.id)] = domain + _online_domains = [ + (domain, domain.guest_client.metadata) + for domain in domains + if domain.ping_status == NodePeerConnectionStatus.ACTIVE + ] + _all_online_domains += _online_domains return [domain for domain in _all_online_domains if domain is not None] diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 8fa775242d3..4036ef2366f 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -22,7 +22,11 @@ from syft.service.user.user_roles import ServiceRole -def _launch(node_type: NodeType, association_request_auto_approval: bool = True): +def _launch( + node_type: NodeType, + association_request_auto_approval: bool = True, + port: int | str | None = None, +): return sy.orchestra.launch( name=token_hex(8), node_type=node_type, @@ -30,6 +34,7 @@ def _launch(node_type: NodeType, association_request_auto_approval: bool = True) reset=True, local_db=True, association_request_auto_approval=association_request_auto_approval, + port=port, ) @@ -73,18 +78,39 @@ def enclave(): node.land() +@pytest.fixture +def gateway_webserver(): + node = _launch(node_type=NodeType.GATEWAY, port="auto") + yield node + node.land() + + +@pytest.fixture +def domain_webserver(): + node = _launch(NodeType.DOMAIN, port="auto") + yield node + node.land() + + +@pytest.fixture +def domain_2_webserver(): + node = _launch(NodeType.DOMAIN, port="auto") + yield node + node.land() + + @pytest.fixture(scope="function") -def set_network_json_env_var(gateway): +def set_network_json_env_var(gateway_webserver): """Set the environment variable for the network registry JSON string.""" json_string = f""" {{ "2.0.0": {{ "gateways": [ {{ - "name": "{gateway.name}", - "host_or_ip": "localhost (in-memory)", - "protocol": "{gateway.deployment_type.value}", - "port": "{gateway.port}", + "name": "{gateway_webserver.name}", + "host_or_ip": "localhost", + "protocol": "http", + "port": "{gateway_webserver.port}", "admin_email": "support@openmined.org", "website": "https://www.openmined.org/", "slack": "https://slack.openmined.org/", @@ -101,13 +127,38 @@ def set_network_json_env_var(gateway): @pytest.mark.local_node -def test_create_gateway(set_network_json_env_var, gateway): +def test_create_gateway( + set_network_json_env_var, gateway_webserver, domain_webserver, domain_2_webserver +): assert isinstance(sy.gateways, sy.NetworkRegistry) assert len(sy.gateways) == 1 assert len(sy.gateways.all_networks) == 1 - assert len(sy.gateways.online_networks) == 1 - assert sy.gateways.all_networks[0]["name"] == gateway.name - assert sy.gateways.all_networks[0]["protocol"] == gateway.deployment_type.value + assert sy.gateways.all_networks[0]["name"] == gateway_webserver.name + # assert len(sy.gateways.online_networks) == 1 + # assert sy.gateways.online_networks[0]["name"] == gateway_webserver.name + + gateway_client: GatewayClient = gateway_webserver.login( + email="info@openmined.org", + password="changethis", + ) + res = gateway_client.settings.allow_association_request_auto_approval(enable=True) + assert isinstance(res, SyftSuccess) + + domain_client: DomainClient = domain_webserver.login( + email="info@openmined.org", + password="changethis", + ) + domain_client_2: DomainClient = domain_2_webserver.login( + email="info@openmined.org", + password="changethis", + ) + result = domain_client.connect_to_gateway(handle=gateway_webserver) + assert isinstance(result, SyftSuccess) + result = domain_client_2.connect_to_gateway(handle=gateway_webserver) + assert isinstance(result, SyftSuccess) + + assert len(sy.domains.all_domains) == 2 + assert len(sy.domains.online_domains) == 2 @pytest.mark.local_node From 25eaaee2db55cfa79ec2735648ee09f7bf654c6b Mon Sep 17 00:00:00 2001 From: khoaguin Date: Fri, 10 May 2024 12:28:55 +0700 Subject: [PATCH 04/32] [syft/network] add total line to show number of online networks / all networks for `sy.gateways` --- packages/syft/src/syft/client/registry.py | 40 +++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/client/registry.py b/packages/syft/src/syft/client/registry.py index bf35a24fd8c..84b1a6cd3b6 100644 --- a/packages/syft/src/syft/client/registry.py +++ b/packages/syft/src/syft/client/registry.py @@ -121,13 +121,49 @@ def _repr_html_(self) -> str: on = self.online_networks if len(on) == 0: return "(no gateways online - try syft.gateways.all_networks to see offline gateways)" - return pd.DataFrame(on)._repr_html_() # type: ignore + df = pd.DataFrame(on) + total_df = pd.DataFrame( + [ + [ + "", + "", + "", + "", + f"{len(on)} / {len(self.all_networks)} (online networks / all networks)", + "", + "", + "", + ] + ], + columns=df.columns, + index=["Total"], + ) + df = pd.concat([df, total_df]) + return df._repr_html_() # type: ignore def __repr__(self) -> str: on = self.online_networks if len(on) == 0: return "(no gateways online - try syft.gateways.all_networks to see offline gateways)" - return pd.DataFrame(on).to_string() + df = pd.DataFrame(on) + total_df = pd.DataFrame( + [ + [ + "", + "", + "", + "", + f"{len(on)} / {len(self.all_networks)} (online networks / all networks)", + "", + "", + "", + ] + ], + columns=df.columns, + index=["Total"], + ) + df = pd.concat([df, total_df]) + return df.to_string() def __len__(self) -> int: return len(self.all_networks) From eddc66febccae67366dc9ad21f4b47365226d8c0 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Fri, 10 May 2024 15:40:14 +0700 Subject: [PATCH 05/32] [syft/network] - update ping url for networks without frontend - improve repr for domain and network registry to show online domains and networks --- packages/syft/src/syft/client/registry.py | 71 +++++++++---------- tests/integration/local/gateway_local_test.py | 4 +- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/packages/syft/src/syft/client/registry.py b/packages/syft/src/syft/client/registry.py index 84b1a6cd3b6..ee57b642f53 100644 --- a/packages/syft/src/syft/client/registry.py +++ b/packages/syft/src/syft/client/registry.py @@ -81,10 +81,10 @@ def check_network(network: dict) -> dict[Any, Any] | None: except Exception: online = False - # networks without frontend have a /ping route in 0.7.0 + # networks without frontend if not online: try: - ping_url = url + "ping" + ping_url = url + "api/v2/" res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) # nosec online = res.status_code == 200 except Exception: @@ -125,15 +125,9 @@ def _repr_html_(self) -> str: total_df = pd.DataFrame( [ [ - "", - "", - "", - "", - f"{len(on)} / {len(self.all_networks)} (online networks / all networks)", - "", - "", - "", + f"{len(on)} / {len(self.all_networks)} (online networks / all networks)" ] + + [""] * (len(df.columns) - 1) ], columns=df.columns, index=["Total"], @@ -149,15 +143,9 @@ def __repr__(self) -> str: total_df = pd.DataFrame( [ [ - "", - "", - "", - "", - f"{len(on)} / {len(self.all_networks)} (online networks / all networks)", - "", - "", - "", + f"{len(on)} / {len(self.all_networks)} (online networks / all networks)" ] + + [""] * (len(df.columns) - 1) ], columns=df.columns, index=["Total"], @@ -229,10 +217,10 @@ def check_network(network: dict) -> dict[Any, Any] | None: except Exception: online = False - # networks without frontend have a /ping route in 0.7.0 + # networks without frontend if not online: try: - ping_url = url + "ping" + ping_url = url + "api/v2/" res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) online = res.status_code == 200 except Exception: @@ -268,19 +256,7 @@ def check_network(network: dict) -> dict[Any, Any] | None: @property def online_domains(self) -> list[tuple[NodePeer, NodeMetadataJSON | None]]: - def check_domain( - peer: NodePeer, - ) -> tuple[NodePeer, NodeMetadataJSON | None] | None: - try: - guest_client = peer.guest_client - metadata = guest_client.metadata - return peer, metadata - except Exception as e: # nosec - print(f"Error in checking domain with exception {e}") - return None - - # networks = self.online_networks - networks = self.all_networks + networks = self.online_networks _all_online_domains = [] for network in networks: @@ -289,15 +265,16 @@ def check_domain( except Exception as e: print(f"Error in creating network client with exception {e}") continue + domains: list[NodePeer] = network_client.domains.retrieve_nodes() for domain in domains: self.all_domains[str(domain.id)] = domain - _online_domains = [ + + _all_online_domains += [ (domain, domain.guest_client.metadata) for domain in domains if domain.ping_status == NodePeerConnectionStatus.ACTIVE ] - _all_online_domains += _online_domains return [domain for domain in _all_online_domains if domain is not None] @@ -325,13 +302,33 @@ def _repr_html_(self) -> str: on: list[dict[str, Any]] = self.__make_dict__() if len(on) == 0: return "(no domains online - try syft.domains.all_domains to see offline domains)" - return pd.DataFrame(on)._repr_html_() # type: ignore + df = pd.DataFrame(on) + total_df = pd.DataFrame( + [ + [f"{len(on)} / {len(self.all_domains)} (online domains / all domains)"] + + [""] * (len(df.columns) - 1) + ], + columns=df.columns, + index=["Total"], + ) + df = pd.concat([df, total_df]) + return df._repr_html_() # type: ignore def __repr__(self) -> str: on: list[dict[str, Any]] = self.__make_dict__() if len(on) == 0: return "(no domains online - try syft.domains.all_domains to see offline domains)" - return pd.DataFrame(on).to_string() + df = pd.DataFrame(on) + total_df = pd.DataFrame( + [ + [f"{len(on)} / {len(self.all_domains)} (online domains / all domains)"] + + [""] * (len(df.columns) - 1) + ], + columns=df.columns, + index=["Total"], + ) + df = pd.concat([df, total_df]) + return df._repr_html_() # type: ignore def create_client(self, peer: NodePeer) -> Client: try: diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 4036ef2366f..93420696e75 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -134,8 +134,8 @@ def test_create_gateway( assert len(sy.gateways) == 1 assert len(sy.gateways.all_networks) == 1 assert sy.gateways.all_networks[0]["name"] == gateway_webserver.name - # assert len(sy.gateways.online_networks) == 1 - # assert sy.gateways.online_networks[0]["name"] == gateway_webserver.name + assert len(sy.gateways.online_networks) == 1 + assert sy.gateways.online_networks[0]["name"] == gateway_webserver.name gateway_client: GatewayClient = gateway_webserver.login( email="info@openmined.org", From 8182a1480a6dfcf3f56568428b35aa644e475807 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Fri, 10 May 2024 16:47:14 +0700 Subject: [PATCH 06/32] [syft/network] integrating `background_tasks` into `Orchestra.launch` - `sy.gateways` and `sy.domains` work with python webserver nodes --- packages/hagrid/hagrid/orchestra.py | 5 ++++- packages/syft/src/syft/node/server.py | 5 +++++ packages/syft/src/syft/service/network/utils.py | 2 +- tests/integration/local/gateway_local_test.py | 1 + tests/integration/network/gateway_test.py | 10 ++++++---- 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/hagrid/hagrid/orchestra.py b/packages/hagrid/hagrid/orchestra.py index dcf0c597995..8826c073841 100644 --- a/packages/hagrid/hagrid/orchestra.py +++ b/packages/hagrid/hagrid/orchestra.py @@ -238,6 +238,7 @@ def deploy_to_python( create_producer: bool = False, queue_port: int | None = None, association_request_auto_approval: bool = False, + background_tasks: bool = False, ) -> NodeHandle | None: stage_protocol_changes = ImportFromSyft.import_stage_protocol_changes() NodeType = ImportFromSyft.import_node_type() @@ -272,7 +273,7 @@ def deploy_to_python( "n_consumers": n_consumers, "create_producer": create_producer, "association_request_auto_approval": association_request_auto_approval, - "background_tasks": True, + "background_tasks": background_tasks, } if port: @@ -493,6 +494,7 @@ def launch( queue_port: int | None = None, in_memory_workers: bool = True, association_request_auto_approval: bool = False, + background_tasks: bool = False, ) -> NodeHandle | None: NodeType = ImportFromSyft.import_node_type() os.environ["DEV_MODE"] = str(dev_mode) @@ -540,6 +542,7 @@ def launch( create_producer=create_producer, queue_port=queue_port, association_request_auto_approval=association_request_auto_approval, + background_tasks=background_tasks, ) elif deployment_type_enum == DeploymentType.K8S: diff --git a/packages/syft/src/syft/node/server.py b/packages/syft/src/syft/node/server.py index 8c9b71559cb..f5f05bf35ac 100644 --- a/packages/syft/src/syft/node/server.py +++ b/packages/syft/src/syft/node/server.py @@ -79,6 +79,7 @@ def run_uvicorn( create_producer: bool, association_request_auto_approval: bool, n_consumers: int, + background_tasks: bool, ) -> None: async def _run_uvicorn( name: str, @@ -112,6 +113,7 @@ async def _run_uvicorn( create_producer=create_producer, n_consumers=n_consumers, association_request_auto_approval=association_request_auto_approval, + background_tasks=background_tasks, ) else: worker = worker_class( @@ -127,6 +129,7 @@ async def _run_uvicorn( create_producer=create_producer, n_consumers=n_consumers, association_request_auto_approval=association_request_auto_approval, + background_tasks=background_tasks, ) router = make_routes(worker=worker) app = make_app(worker.name, router=router) @@ -186,6 +189,7 @@ def serve_node( create_producer: bool = False, n_consumers: int = 0, association_request_auto_approval: bool = False, + background_tasks: bool = False, ) -> tuple[Callable, Callable]: server_process = multiprocessing.Process( target=run_uvicorn, @@ -204,6 +208,7 @@ def serve_node( "create_producer": create_producer, "n_consumers": n_consumers, "association_request_auto_approval": association_request_auto_approval, + "background_tasks": background_tasks, }, ) diff --git a/packages/syft/src/syft/service/network/utils.py b/packages/syft/src/syft/service/network/utils.py index 437d2521f42..c9e98da6179 100644 --- a/packages/syft/src/syft/service/network/utils.py +++ b/packages/syft/src/syft/service/network/utils.py @@ -19,7 +19,7 @@ @serializable(without=["thread"]) class PeerHealthCheckTask: - repeat_time = 60 # in seconds + repeat_time = 10 # in seconds def __init__(self) -> None: self.thread: threading.Thread | None = None diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 93420696e75..4b385562c47 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -35,6 +35,7 @@ def _launch( local_db=True, association_request_auto_approval=association_request_auto_approval, port=port, + background_tasks=True, ) diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index 2aebd270f7d..be72aae81e6 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -1,6 +1,7 @@ # stdlib import itertools import os +import time import uuid # third party @@ -20,8 +21,10 @@ from syft.service.network.association_request import AssociationRequestChange from syft.service.network.network_service import NodePeerAssociationStatus from syft.service.network.node_peer import NodePeer +from syft.service.network.node_peer import NodePeerConnectionStatus from syft.service.network.routes import HTTPNodeRoute from syft.service.network.routes import NodeRouteType +from syft.service.network.utils import PeerHealthCheckTask from syft.service.request.request import Request from syft.service.response import SyftError from syft.service.response import SyftSuccess @@ -899,10 +902,9 @@ def test_peer_health_check(set_env_var, gateway_port: int, domain_1_port: int) - assert isinstance(res, NodePeerAssociationStatus) assert res.value == "PEER_ASSOCIATED" - # TODO: check for peer connection status (now it fails) - # time.sleep(PeerHealthCheckTask.repeat_time + 1) - # domain_peer = gateway_client.api.services.network.get_all_peers()[0] - # assert domain_peer.ping_status == NodePeerConnectionStatus.ACTIVE + time.sleep(PeerHealthCheckTask.repeat_time + 1) + domain_peer = gateway_client.api.services.network.get_all_peers()[0] + assert domain_peer.ping_status == NodePeerConnectionStatus.ACTIVE # Remove existing peers assert isinstance(_remove_existing_peers(domain_client), SyftSuccess) From da596ffcddc11c7a55f7a7b33590580b68751c19 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 13 May 2024 13:43:36 +0700 Subject: [PATCH 07/32] [syft/network] pass `background_tasks=True` to `worker_class` - add a wait in `gateway_local_test` before checking for online domains Co-authored-by: Shubham Gupta --- packages/grid/backend/grid/core/node.py | 1 + tests/integration/local/gateway_local_test.py | 1 + 2 files changed, 2 insertions(+) diff --git a/packages/grid/backend/grid/core/node.py b/packages/grid/backend/grid/core/node.py index 12e083ed602..cde36f8c5fe 100644 --- a/packages/grid/backend/grid/core/node.py +++ b/packages/grid/backend/grid/core/node.py @@ -105,4 +105,5 @@ def seaweedfs_config() -> SeaweedFSConfig: smtp_port=settings.SMTP_PORT, smtp_host=settings.SMTP_HOST, association_request_auto_approval=settings.ASSOCIATION_REQUEST_AUTO_APPROVAL, + background_tasks=True, ) diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 4b385562c47..428cc3113b9 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -158,6 +158,7 @@ def test_create_gateway( result = domain_client_2.connect_to_gateway(handle=gateway_webserver) assert isinstance(result, SyftSuccess) + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.domains.all_domains) == 2 assert len(sy.domains.online_domains) == 2 From 71e4a349fa3c12f36f4c979a15ef7d6c87b19326 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 13 May 2024 20:51:28 +0700 Subject: [PATCH 08/32] [test/integration] add removing peers to the beginning of some tests - add some wait times before checking node connection status --- tests/integration/network/gateway_test.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index be72aae81e6..2cbaf34681f 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -106,6 +106,10 @@ def test_domain_connect_to_gateway( port=domain_1_port, email="info@openmined.org", password="changethis" ) + # Try removing existing peers just to make sure + _remove_existing_peers(domain_client) + _remove_existing_peers(gateway_client) + # connecting the domain to the gateway result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, Request) @@ -122,6 +126,7 @@ def test_domain_connect_to_gateway( assert len(gateway_client.peers) == 1 + time.sleep(PeerHealthCheckTask.repeat_time + 1) # check that the domain is online on the network assert len(sy.domains.all_domains) == 1 assert len(sy.domains.online_domains) == 1 @@ -177,12 +182,17 @@ def test_dataset_search(set_env_var, gateway_port: int, domain_1_port: int) -> N port=domain_1_port, email="info@openmined.org", password="changethis" ) + # Try removing existing peers just to make sure + _remove_existing_peers(domain_client) + _remove_existing_peers(gateway_client) + res = gateway_client.settings.allow_association_request_auto_approval(enable=True) assert isinstance(res, SyftSuccess) # connect the domain to the gateway result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, SyftSuccess) + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.gateways.all_networks) == len(sy.gateways.online_networks) == 1 assert len(sy.domains.all_domains) == len(sy.domains.online_domains) == 1 @@ -229,6 +239,10 @@ def test_domain_gateway_user_code( port=domain_1_port, email="info@openmined.org", password="changethis" ) + # Try removing existing peers just to make sure + _remove_existing_peers(domain_client) + _remove_existing_peers(gateway_client) + # the domain client uploads a dataset input_data = np.array([1, 2, 3]) mock_data = np.array([4, 5, 6]) @@ -313,6 +327,7 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert len(domain_client.peers) == 1 assert len(gateway_client.peers) == 1 # check that the domain is online on the network + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.domains.all_domains) == 1 assert len(sy.domains.online_domains) == 1 @@ -320,6 +335,7 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert isinstance(_remove_existing_peers(domain_client), SyftSuccess) assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) # check that removing peers work as expected + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.gateways.all_networks) == 1 assert len(sy.domains.all_domains) == 0 assert len(sy.domains.all_domains) == 0 @@ -332,7 +348,8 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert isinstance(result, SyftSuccess) assert len(domain_client.peers) == 1 assert len(gateway_client.peers) == 1 - # check that the domain + # check online domains + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.domains.all_domains) == 1 assert len(sy.domains.online_domains) == 1 @@ -340,6 +357,7 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert isinstance(_remove_existing_peers(domain_client), SyftSuccess) assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) # check that removing peers work as expected + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.domains.all_domains) == 0 assert len(sy.domains.all_domains) == 0 assert len(sy.domains.online_domains) == 0 @@ -800,6 +818,8 @@ def test_dataset_stream(set_env_var, gateway_port: int, domain_1_port: int) -> N # connect the domain to the gateway result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, SyftSuccess) + + time.sleep(PeerHealthCheckTask.repeat_time + 1) assert len(sy.gateways.all_networks) == len(sy.gateways.online_networks) == 1 assert len(sy.domains.all_domains) == len(sy.domains.online_domains) == 1 From 0f89b8b1fb0dd383a23aed5cbfdca07334a032ef Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 14 May 2024 09:40:20 +0700 Subject: [PATCH 09/32] [syft/network] revert back to use `ping_status` instead of `ping_status.value` for `NodePeer` repr --- packages/syft/src/syft/service/network/node_peer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index 35292dd89dd..6e90ead3ddd 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -71,7 +71,7 @@ class NodePeer(SyftObject): "name", "node_type", "admin_email", - "ping_status.value", + "ping_status", "ping_status_message", "pinged_timestamp", ] From 744a289069c53729dc26a390bbe0f001486f216f Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 14 May 2024 11:18:02 +0700 Subject: [PATCH 10/32] [test/integration] add remove existing peers before testing for `test_delete_route_on_peer` --- tests/integration/network/gateway_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index 2cbaf34681f..a4e953c1135 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -603,6 +603,10 @@ def test_delete_route_on_peer( port=domain_1_port, email="info@openmined.org", password="changethis" ) + # Remove existing peers + _remove_existing_peers(domain_client) + _remove_existing_peers(gateway_client) + # Enable automatic acceptance of association requests res = gateway_client.settings.allow_association_request_auto_approval(enable=True) assert isinstance(res, SyftSuccess) From c3e0e620e219cf0154e5c2aee5c6006b6b8ed2af Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 14 May 2024 14:25:43 +0700 Subject: [PATCH 11/32] [test/integration] trimming down `network/gateway_test` --- tests/integration/network/gateway_test.py | 38 ++++++----------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index a4e953c1135..e3a87f6d97a 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -126,7 +126,7 @@ def test_domain_connect_to_gateway( assert len(gateway_client.peers) == 1 - time.sleep(PeerHealthCheckTask.repeat_time + 1) + time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) # check that the domain is online on the network assert len(sy.domains.all_domains) == 1 assert len(sy.domains.online_domains) == 1 @@ -192,9 +192,6 @@ def test_dataset_search(set_env_var, gateway_port: int, domain_1_port: int) -> N # connect the domain to the gateway result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, SyftSuccess) - time.sleep(PeerHealthCheckTask.repeat_time + 1) - assert len(sy.gateways.all_networks) == len(sy.gateways.online_networks) == 1 - assert len(sy.domains.all_domains) == len(sy.domains.online_domains) == 1 # the domain client uploads a dataset input_data = np.array([1, 2, 3]) @@ -206,6 +203,9 @@ def test_dataset_search(set_env_var, gateway_port: int, domain_1_port: int) -> N dataset_res = domain_client.upload_dataset(dataset) assert isinstance(dataset_res, SyftSuccess) + # since dataset search is done by checking from the online domains, + # we need to wait to make sure peers health check is done + time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) # test if the dataset can be searched by the syft network right_search = sy.search(dataset_name) assert isinstance(right_search, SearchResults) @@ -326,41 +326,29 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert isinstance(result, SyftSuccess) assert len(domain_client.peers) == 1 assert len(gateway_client.peers) == 1 - # check that the domain is online on the network - time.sleep(PeerHealthCheckTask.repeat_time + 1) - assert len(sy.domains.all_domains) == 1 - assert len(sy.domains.online_domains) == 1 # Remove existing peers assert isinstance(_remove_existing_peers(domain_client), SyftSuccess) assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) # check that removing peers work as expected - time.sleep(PeerHealthCheckTask.repeat_time + 1) + assert len(domain_client.peers) == 0 + assert len(gateway_client.peers) == 0 + # check that the online domains and gateways are updated + time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) assert len(sy.gateways.all_networks) == 1 assert len(sy.domains.all_domains) == 0 - assert len(sy.domains.all_domains) == 0 assert len(sy.domains.online_domains) == 0 - assert len(domain_client.peers) == 0 - assert len(gateway_client.peers) == 0 # reconnect the domain to the gateway result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, SyftSuccess) assert len(domain_client.peers) == 1 assert len(gateway_client.peers) == 1 - # check online domains - time.sleep(PeerHealthCheckTask.repeat_time + 1) - assert len(sy.domains.all_domains) == 1 - assert len(sy.domains.online_domains) == 1 # Remove existing peers assert isinstance(_remove_existing_peers(domain_client), SyftSuccess) assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) # check that removing peers work as expected - time.sleep(PeerHealthCheckTask.repeat_time + 1) - assert len(sy.domains.all_domains) == 0 - assert len(sy.domains.all_domains) == 0 - assert len(sy.domains.online_domains) == 0 assert len(domain_client.peers) == 0 assert len(gateway_client.peers) == 0 @@ -823,10 +811,6 @@ def test_dataset_stream(set_env_var, gateway_port: int, domain_1_port: int) -> N result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, SyftSuccess) - time.sleep(PeerHealthCheckTask.repeat_time + 1) - assert len(sy.gateways.all_networks) == len(sy.gateways.online_networks) == 1 - assert len(sy.domains.all_domains) == len(sy.domains.online_domains) == 1 - # the domain client uploads a dataset input_data = np.array([1, 2, 3]) mock_data = np.array([4, 5, 6]) @@ -862,10 +846,6 @@ def test_peer_health_check(set_env_var, gateway_port: int, domain_1_port: int) - Scenario: Connecting a domain node to a gateway node. The gateway client approves the association request. The gateway client checks that the domain peer is associated - TODO: check for peer connection status through NodePeer.pingstatus - TODO: check that the domain is online with `DomainRegistry.online_domains` - Then make the domain go offline, which should be reflected when calling - `DomainRegistry.online_domains` """ # login to the domain and gateway gateway_client: GatewayClient = sy.login( @@ -926,7 +906,7 @@ def test_peer_health_check(set_env_var, gateway_port: int, domain_1_port: int) - assert isinstance(res, NodePeerAssociationStatus) assert res.value == "PEER_ASSOCIATED" - time.sleep(PeerHealthCheckTask.repeat_time + 1) + time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) domain_peer = gateway_client.api.services.network.get_all_peers()[0] assert domain_peer.ping_status == NodePeerConnectionStatus.ACTIVE From c89d6713c76926ad3b9cd8220e5f90e25194b75b Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 14 May 2024 16:38:44 +0700 Subject: [PATCH 12/32] [test/integration] add some waiting times for local gateway tests before checking for peer connection status --- tests/integration/local/gateway_local_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 428cc3113b9..b49c62cfdb5 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -158,7 +158,7 @@ def test_create_gateway( result = domain_client_2.connect_to_gateway(handle=gateway_webserver) assert isinstance(result, SyftSuccess) - time.sleep(PeerHealthCheckTask.repeat_time + 1) + time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) assert len(sy.domains.all_domains) == 2 assert len(sy.domains.online_domains) == 2 @@ -378,6 +378,6 @@ def test_repeated_association_requests_peers_health_check( assert res.value == "PEER_ASSOCIATED" # check for peer connection status - time.sleep(PeerHealthCheckTask.repeat_time + 1) + time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) domain_peer = gateway_client.api.services.network.get_all_peers()[0] assert domain_peer.ping_status == NodePeerConnectionStatus.ACTIVE From 37820331a6c83d63aeed5efa2b961ed7ee420038 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 14 May 2024 17:10:14 +0700 Subject: [PATCH 13/32] [test/integration] update gateway tests --- tests/integration/local/gateway_local_test.py | 8 +++----- tests/integration/network/gateway_test.py | 4 ++++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index b49c62cfdb5..8da1d535d2c 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -161,6 +161,9 @@ def test_create_gateway( time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) assert len(sy.domains.all_domains) == 2 assert len(sy.domains.online_domains) == 2 + # check for peer connection status + for peer in gateway_client.api.services.network.get_all_peers(): + assert peer.ping_status == NodePeerConnectionStatus.ACTIVE @pytest.mark.local_node @@ -376,8 +379,3 @@ def test_repeated_association_requests_peers_health_check( ) assert isinstance(res, NodePeerAssociationStatus) assert res.value == "PEER_ASSOCIATED" - - # check for peer connection status - time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) - domain_peer = gateway_client.api.services.network.get_all_peers()[0] - assert domain_peer.ping_status == NodePeerConnectionStatus.ACTIVE diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index e3a87f6d97a..9a35b7a76e9 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -461,6 +461,10 @@ def test_delete_route(set_env_var, gateway_port: int, domain_1_port: int) -> Non port=domain_1_port, email="info@openmined.org", password="changethis" ) + # Try removing existing peers just to make sure + _remove_existing_peers(domain_client) + _remove_existing_peers(gateway_client) + # Enable automatic acceptance of association requests res = gateway_client.settings.allow_association_request_auto_approval(enable=True) assert isinstance(res, SyftSuccess) From 1545ec35a86632bc71cb1ed4732654102185285b Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 14 May 2024 17:29:09 +0700 Subject: [PATCH 14/32] [test/integration] update gateway k8s tests --- tests/integration/network/gateway_test.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index 9a35b7a76e9..695b0f98332 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -421,9 +421,9 @@ def test_add_update_route_priority( # getting the proxy client using the current highest priority route should # give back an error since it is a route with a random port (10001) - proxy_domain_client = gateway_client.peers[0] - assert isinstance(proxy_domain_client, SyftError) - assert "Failed to establish a connection with" in proxy_domain_client.message + # proxy_domain_client = gateway_client.peers[0] + # assert isinstance(proxy_domain_client, SyftError) + # assert "Failed to establish a connection with" in proxy_domain_client.message # update the valid route to have the highest priority res = gateway_client.api.services.network.update_route_priority( @@ -542,7 +542,7 @@ def test_add_update_route_priority_on_peer( peer=domain_peer, route=new_route ) assert isinstance(res, SyftSuccess) - gateway_peer = domain_client.peers[0] + gateway_peer = domain_client.api.services.network.get_all_peers()[0] assert len(gateway_peer.node_routes) == 2 assert gateway_peer.node_routes[-1].port == new_route.port assert gateway_peer.node_routes[-1].priority == 2 @@ -553,7 +553,7 @@ def test_add_update_route_priority_on_peer( peer=domain_peer, route=new_route2 ) assert isinstance(res, SyftSuccess) - gateway_peer = domain_client.peers[0] + gateway_peer = domain_client.api.services.network.get_all_peers()[0] assert len(gateway_peer.node_routes) == 3 assert gateway_peer.node_routes[-1].port == new_route2.port assert gateway_peer.node_routes[-1].priority == 3 From 2c45fba74ed43be68304877a5b07837d53fe9000 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 15 May 2024 10:25:51 +0700 Subject: [PATCH 15/32] [syft/network] - pick the highest priority route to be the oldest by default - update k8s integration tests accordingly - move skip test to the test file rather than in `tox.ini` --- .../src/syft/service/network/node_peer.py | 49 +++++++++++------ tests/integration/network/gateway_test.py | 52 ++++++------------- tox.ini | 2 +- 3 files changed, 51 insertions(+), 52 deletions(-) diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index 6e90ead3ddd..011df08c447 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -117,9 +117,9 @@ def existed_route( return (False, None) - def assign_highest_priority(self, route: NodeRoute) -> NodeRoute: + def update_route_priority(self, route: NodeRoute) -> NodeRoute: """ - Assign the new_route's to have the highest priority + Assign the new_route's priority to be current max + 1 Args: route (NodeRoute): The new route whose priority is to be updated. @@ -131,15 +131,39 @@ def assign_highest_priority(self, route: NodeRoute) -> NodeRoute: route.priority = current_max_priority + 1 return route + def pick_highest_priority_route(self, oldest: bool = True) -> NodeRoute: + """ + Picks the route with the highest priority from the list of node routes. + + Args: + oldest (bool): + If True, picks the oldest route to have the highest priority, + meaning the route with min priority value. + If False, picks the most recent route with the highest priority, + meaning the route with max priority value. + + Returns: + NodeRoute: The route with the highest priority. + + """ + highest_priority_route: NodeRoute = self.node_routes[-1] + for route in self.node_routes[:-1]: + if oldest: + if route.priority < highest_priority_route.priority: + highest_priority_route = route + else: + if route.priority > highest_priority_route.priority: + highest_priority_route = route + return highest_priority_route + def update_route(self, route: NodeRoute) -> NodeRoute | None: """ Update the route for the node. If the route already exists, return it. - If the route is new, assign it to have the highest priority - before appending it to the peer's list of node routes. + If the route is new, assign it to have the priority of (current_max + 1) Args: - route (NodeRoute): The new route to be added to the peer. + route (NodeRoute): The new route to be added to the peer's node route list Returns: NodeRoute | None: if the route already exists, return it, else returns None @@ -148,7 +172,7 @@ def update_route(self, route: NodeRoute) -> NodeRoute | None: if existed: return route else: - new_route = self.assign_highest_priority(route) + new_route = self.update_route_priority(route) self.node_routes.append(new_route) return None @@ -199,7 +223,7 @@ def update_existed_route_priority( if priority is not None: self.node_routes[index].priority = priority else: - self.node_routes[index].priority = self.assign_highest_priority( + self.node_routes[index].priority = self.update_route_priority( route ).priority @@ -223,7 +247,7 @@ def client_with_context( if len(self.node_routes) < 1: raise ValueError(f"No routes to peer: {self}") - # select the highest priority route (i.e. added or updated the latest) + # select the route with highest priority to connect to the peer final_route: NodeRoute = self.pick_highest_priority_route() connection: NodeConnection = route_to_connection(route=final_route) try: @@ -244,7 +268,7 @@ def client_with_context( def client_with_key(self, credentials: SyftSigningKey) -> SyftClient | SyftError: if len(self.node_routes) < 1: raise ValueError(f"No routes to peer: {self}") - # select the latest added route + final_route: NodeRoute = self.pick_highest_priority_route() connection = route_to_connection(route=final_route) @@ -262,13 +286,6 @@ def guest_client(self) -> SyftClient: def proxy_from(self, client: SyftClient) -> SyftClient: return client.proxy_to(self) - def pick_highest_priority_route(self) -> NodeRoute: - highest_priority_route: NodeRoute = self.node_routes[-1] - for route in self.node_routes: - if route.priority > highest_priority_route.priority: - highest_priority_route = route - return highest_priority_route - def delete_route( self, route: NodeRouteType | None = None, route_id: UID | None = None ) -> SyftError | None: diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index 695b0f98332..fa6d116e499 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -228,6 +228,7 @@ def test_dataset_search(set_env_var, gateway_port: int, domain_1_port: int) -> N assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.skip(reason="Possible bug") def test_domain_gateway_user_code( set_env_var, domain_1_port: int, gateway_port: int ) -> None: @@ -317,6 +318,10 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N port=domain_1_port, email="info@openmined.org", password="changethis" ) + # clean up before test + _remove_existing_peers(domain_client) + _remove_existing_peers(gateway_client) + # Enable automatic acceptance of association requests res = gateway_client.settings.allow_association_request_auto_approval(enable=True) assert isinstance(res, SyftSuccess) @@ -333,6 +338,7 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N # check that removing peers work as expected assert len(domain_client.peers) == 0 assert len(gateway_client.peers) == 0 + # check that the online domains and gateways are updated time.sleep(PeerHealthCheckTask.repeat_time * 2 + 1) assert len(sy.gateways.all_networks) == 1 @@ -353,15 +359,12 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert len(gateway_client.peers) == 0 -def test_add_update_route_priority( - set_env_var, gateway_port: int, domain_1_port: int -) -> None: +def test_add_route(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Test the network service's `add_route` functionalities to add routes directly for a self domain. Scenario: Connect a domain to a gateway. The gateway adds 2 new routes to the domain - and check their priorities. - Then update an existed route's priority and check if its priority gets updated. + and check their priorities get updated. Check for the gateway if the proxy client to connect to the domain uses the route with the highest priority. """ @@ -420,21 +423,8 @@ def test_add_update_route_priority( assert domain_peer.node_routes[0].priority == 1 # getting the proxy client using the current highest priority route should - # give back an error since it is a route with a random port (10001) - # proxy_domain_client = gateway_client.peers[0] - # assert isinstance(proxy_domain_client, SyftError) - # assert "Failed to establish a connection with" in proxy_domain_client.message - - # update the valid route to have the highest priority - res = gateway_client.api.services.network.update_route_priority( - peer_verify_key=domain_peer.verify_key, route=domain_peer.node_routes[0] - ) - assert isinstance(res, SyftSuccess) - domain_peer = gateway_client.api.services.network.get_all_peers()[0] - assert len(domain_peer.node_routes) == 3 - assert domain_peer.node_routes[0].priority == 4 - - # proxying should success now + # be successful since now we pick the oldest route (port 9082 with priority 1) + # to have the highest priority by default proxy_domain_client = gateway_client.peers[0] assert isinstance(proxy_domain_client, DomainClient) @@ -500,14 +490,11 @@ def test_delete_route(set_env_var, gateway_port: int, domain_1_port: int) -> Non assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) -def test_add_update_route_priority_on_peer( - set_env_var, gateway_port: int, domain_1_port: int -) -> None: +def test_add_route_on_peer(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Test the `add_route_on_peer` of network service. Connect a domain to a gateway. - The gateway adds 2 new routes for the domain and check their priorities. - The gateway updates the route priority for the domain remotely. + The gateway adds 2 new routes for itself remotely on the domain and check their priorities. Then the domain adds a route to itself for the gateway. """ # login to the domain and gateway @@ -558,13 +545,6 @@ def test_add_update_route_priority_on_peer( assert gateway_peer.node_routes[-1].port == new_route2.port assert gateway_peer.node_routes[-1].priority == 3 - # update the route priority remotely on the domain - first_route = gateway_peer.node_routes[0] - res = gateway_client.api.services.network.update_route_priority_on_peer( - peer=domain_peer, route=first_route - ) - assert isinstance(res, SyftSuccess) - # the domain calls `add_route_on_peer` to to add a route to itself for the gateway assert len(domain_peer.node_routes) == 1 res = domain_client.api.services.network.add_route_on_peer( @@ -707,6 +687,8 @@ def test_update_route_priority( } assert routes_port_priority[new_route.port] == 5 + # if we don't specify `priority`, the route will be automatically updated + # to have the biggest priority value among all routes res = gateway_client.api.services.network.update_route_priority( peer_verify_key=domain_peer.verify_key, route=new_route2 ) @@ -745,7 +727,7 @@ def test_update_route_priority_on_peer( result = domain_client.connect_to_gateway(gateway_client) assert isinstance(result, SyftSuccess) - # gateway adds 2 new routes for the domain to itself + # gateway adds 2 new routes to itself remotely on the domain node domain_peer: NodePeer = gateway_client.api.services.network.get_all_peers()[0] new_route = HTTPNodeRoute(host_or_ip="localhost", port=10000) res = gateway_client.api.services.network.add_route_on_peer( @@ -774,7 +756,7 @@ def test_update_route_priority_on_peer( ) assert isinstance(res, SyftSuccess) res = gateway_client.api.services.network.update_route_priority_on_peer( - peer=domain_peer, route=gateway_peer.node_routes[0] + peer=domain_peer, route=new_route2 ) assert isinstance(res, SyftSuccess) @@ -783,7 +765,7 @@ def test_update_route_priority_on_peer( route.port: route.priority for route in gateway_peer.node_routes } assert routes_port_priority[new_route.port] == 5 - assert routes_port_priority[gateway_port] == 6 + assert routes_port_priority[new_route2.port] == 6 # Remove existing peers assert isinstance(_remove_existing_peers(domain_client), SyftSuccess) diff --git a/tox.ini b/tox.ini index 41e6f168c95..d1f1b736806 100644 --- a/tox.ini +++ b/tox.ini @@ -737,7 +737,7 @@ commands = # Gateway tests are not run in kuberetes, as currently,it does not have a way to configure # high/low side warning flag. bash -c "source ./scripts/get_k8s_secret_ci.sh; \ - pytest tests/integration/network -k 'not test_domain_gateway_user_code' -p no:randomly -vvvv" + pytest tests/integration/network -p no:randomly -vvvv" # Shutting down the gateway cluster to free up space, as the # below code does not require gateway cluster From a6cd9c9ef6fbe8b9c7acd1ecaddb90f51d338a07 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 15 May 2024 13:54:12 +0700 Subject: [PATCH 16/32] [syft/network] `PeerHealthCheckTask.peer_route_heathcheck` now only updates ping status related fields (in backgroudn threads) instead of the whole peer object to avoid outdated node routes (and possibly also other fields) issues --- .../syft/service/network/network_service.py | 25 ++++++++++++++++--- .../syft/src/syft/service/network/utils.py | 2 +- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 10074353146..6d041219290 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -93,7 +93,7 @@ def update( valid = self.check_type(peer, NodePeer) if valid.is_err(): return Err(SyftError(message=valid.err())) - return super().update(credentials, peer) + return super().update(credentials, peer, has_permission=has_permission) def create_or_update_peer( self, credentials: SyftVerifyKey, peer: NodePeer @@ -113,9 +113,7 @@ def create_or_update_peer( valid = self.check_type(peer, NodePeer) if valid.is_err(): return SyftError(message=valid.err()) - existing: Result | NodePeer = self.get_by_uid( - credentials=credentials, uid=peer.id - ) + existing = self.get_by_uid(credentials=credentials, uid=peer.id) if existing.is_ok() and existing.ok(): existing = existing.ok() existing.update_routes(peer.node_routes) @@ -125,6 +123,25 @@ def create_or_update_peer( result = self.set(credentials, peer) return result + def update_peer_ping_status( + self, + credentials: SyftVerifyKey, + peer: NodePeer, + has_permission: bool = False, + ) -> SyftSuccess | SyftError: + """ + Get the existing peer from the store, then only update its ping status related fields + """ + # get the node peer for the given sender peer_id + result = self.get_by_uid(credentials=credentials, uid=peer.id) + if result.is_err(): + return Err(message=f"Failed to query peer from stash. Err: {result.err()}") + existing: NodePeer = result.ok() + existing.ping_status = peer.ping_status + existing.ping_status_message = peer.ping_status + existing.pinged_timestamp = peer.pinged_timestamp + return super().update(credentials, existing, has_permission=has_permission) + def get_by_verify_key( self, credentials: SyftVerifyKey, verify_key: SyftVerifyKey ) -> Result[NodePeer | None, SyftError]: diff --git a/packages/syft/src/syft/service/network/utils.py b/packages/syft/src/syft/service/network/utils.py index c9e98da6179..c42b35a5f6b 100644 --- a/packages/syft/src/syft/service/network/utils.py +++ b/packages/syft/src/syft/service/network/utils.py @@ -84,7 +84,7 @@ def peer_route_heathcheck(self, context: AuthedServiceContext) -> SyftError | No else: peer.ping_status_message = f"Peer '{peer.name}''s ping status: {peer.ping_status.value.lower()}" - result = network_stash.update( + result = network_stash.update_peer_ping_status( credentials=context.node.verify_key, peer=peer, has_permission=True, From 231bcbfd4e394a9a32fa032415eda724642efe0f Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 15 May 2024 14:04:18 +0700 Subject: [PATCH 17/32] fix linting --- packages/syft/src/syft/service/network/network_service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 6d041219290..ce2014fe706 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -136,9 +136,10 @@ def update_peer_ping_status( result = self.get_by_uid(credentials=credentials, uid=peer.id) if result.is_err(): return Err(message=f"Failed to query peer from stash. Err: {result.err()}") - existing: NodePeer = result.ok() + if existing := result.ok() is None: + return Err(message="Failed to query peer from stash: peer is None") existing.ping_status = peer.ping_status - existing.ping_status_message = peer.ping_status + existing.ping_status_message = peer.ping_status_message existing.pinged_timestamp = peer.pinged_timestamp return super().update(credentials, existing, has_permission=has_permission) From 342439f3dc66fd784991d66ccdea0e3fb20e62e2 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 15 May 2024 14:17:36 +0700 Subject: [PATCH 18/32] [syft/network] return Err if the returned node peer is None --- packages/syft/src/syft/service/network/network_service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index ce2014fe706..21e3cf45d25 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -136,7 +136,8 @@ def update_peer_ping_status( result = self.get_by_uid(credentials=credentials, uid=peer.id) if result.is_err(): return Err(message=f"Failed to query peer from stash. Err: {result.err()}") - if existing := result.ok() is None: + existing = result.ok() + if existing is None: return Err(message="Failed to query peer from stash: peer is None") existing.ping_status = peer.ping_status existing.ping_status_message = peer.ping_status_message From 1384e0e876c3a6a91db72546f000289d8602e5df Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 15 May 2024 14:40:11 +0700 Subject: [PATCH 19/32] [test/integration] allow running `gateway_local_test.py` in `syft.test.integration` --- packages/syft/src/syft/service/network/network_service.py | 8 ++++++-- tox.ini | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 21e3cf45d25..ebd183f0fb2 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -135,10 +135,14 @@ def update_peer_ping_status( # get the node peer for the given sender peer_id result = self.get_by_uid(credentials=credentials, uid=peer.id) if result.is_err(): - return Err(message=f"Failed to query peer from stash. Err: {result.err()}") + return Err( + f"Failed to query peer peer {peer.id} with name '{peer.name}' from stash. Err: {result.err()}" + ) existing = result.ok() if existing is None: - return Err(message="Failed to query peer from stash: peer is None") + return Err( + f"Failed to query peer {peer.id} with name '{peer.name}' from stash: peer is None" + ) existing.ping_status = peer.ping_status existing.ping_status_message = peer.ping_status_message existing.pinged_timestamp = peer.pinged_timestamp diff --git a/tox.ini b/tox.ini index 375ea2a39b8..2f914ac4a80 100644 --- a/tox.ini +++ b/tox.ini @@ -649,7 +649,7 @@ allowlist_externals = setenv = PYTEST_MODULES = {env:PYTEST_MODULES:local_node} ASSOCIATION_REQUEST_AUTO_APPROVAL = {env:ASSOCIATION_REQUEST_AUTO_APPROVAL:true} - PYTEST_FLAGS = {env:PYTEST_FLAGS:--ignore=tests/integration/local/gateway_local_test.py --ignore=tests/integration/local/job_test.py} + PYTEST_FLAGS = {env:PYTEST_FLAGS:--ignore=tests/integration/local/job_test.py} commands = python -c 'import syft as sy; sy.stage_protocol_changes()' From 2dd887350b3bf7324e7bc7d69148903bab2c29d0 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 15 May 2024 15:22:42 +0700 Subject: [PATCH 20/32] [test/integration] add `@pytest.mark.network` for tests in `gateway_test.py` --- tests/integration/network/gateway_test.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/integration/network/gateway_test.py b/tests/integration/network/gateway_test.py index fa6d116e499..9c42a9e9687 100644 --- a/tests/integration/network/gateway_test.py +++ b/tests/integration/network/gateway_test.py @@ -84,6 +84,7 @@ def test_network_registry_from_url() -> None: assert len(sy.gateways.all_networks) == len(sy.gateways.online_networks) == 1 +@pytest.mark.network def test_network_registry_env_var(set_env_var) -> None: assert isinstance(sy.gateways, NetworkRegistry) assert len(sy.gateways.all_networks) == len(sy.gateways.online_networks) == 1 @@ -91,6 +92,7 @@ def test_network_registry_env_var(set_env_var) -> None: assert isinstance(sy.gateways[0].connection, HTTPConnection) +@pytest.mark.network def test_domain_connect_to_gateway( set_env_var, domain_1_port: int, gateway_port: int ) -> None: @@ -168,6 +170,7 @@ def test_domain_connect_to_gateway( assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_dataset_search(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Scenario: Connecting a domain node to a gateway node. The domain @@ -229,6 +232,7 @@ def test_dataset_search(set_env_var, gateway_port: int, domain_1_port: int) -> N @pytest.mark.skip(reason="Possible bug") +@pytest.mark.network def test_domain_gateway_user_code( set_env_var, domain_1_port: int, gateway_port: int ) -> None: @@ -309,6 +313,7 @@ def mock_function(asset): assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> None: # login to the domain and gateway gateway_client: GatewayClient = sy.login( @@ -359,6 +364,7 @@ def test_deleting_peers(set_env_var, domain_1_port: int, gateway_port: int) -> N assert len(gateway_client.peers) == 0 +@pytest.mark.network def test_add_route(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Test the network service's `add_route` functionalities to add routes directly @@ -437,6 +443,7 @@ def test_add_route(set_env_var, gateway_port: int, domain_1_port: int) -> None: assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_delete_route(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Scenario: @@ -490,6 +497,7 @@ def test_delete_route(set_env_var, gateway_port: int, domain_1_port: int) -> Non assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_add_route_on_peer(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Test the `add_route_on_peer` of network service. @@ -560,6 +568,7 @@ def test_add_route_on_peer(set_env_var, gateway_port: int, domain_1_port: int) - assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_delete_route_on_peer( set_env_var, gateway_port: int, domain_1_port: int ) -> None: @@ -631,6 +640,7 @@ def test_delete_route_on_peer( assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_update_route_priority( set_env_var, gateway_port: int, domain_1_port: int ) -> None: @@ -704,6 +714,7 @@ def test_update_route_priority( assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_update_route_priority_on_peer( set_env_var, gateway_port: int, domain_1_port: int ) -> None: @@ -772,6 +783,7 @@ def test_update_route_priority_on_peer( assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_dataset_stream(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Scenario: Connecting a domain node to a gateway node. The domain @@ -827,6 +839,7 @@ def test_dataset_stream(set_env_var, gateway_port: int, domain_1_port: int) -> N assert isinstance(_remove_existing_peers(gateway_client), SyftSuccess) +@pytest.mark.network def test_peer_health_check(set_env_var, gateway_port: int, domain_1_port: int) -> None: """ Scenario: Connecting a domain node to a gateway node. From 7507fffe7d857ccd81e17f96a94e84b223e3e622 Mon Sep 17 00:00:00 2001 From: rasswanth-s <43314053+rasswanth-s@users.noreply.github.com> Date: Thu, 16 May 2024 13:37:43 +0530 Subject: [PATCH 21/32] add a try catch block to catch invalid api registry entries --- .../syft/src/syft/service/policy/policy.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/packages/syft/src/syft/service/policy/policy.py b/packages/syft/src/syft/service/policy/policy.py index 04f49bac453..7301423902f 100644 --- a/packages/syft/src/syft/service/policy/policy.py +++ b/packages/syft/src/syft/service/policy/policy.py @@ -17,6 +17,7 @@ # third party from RestrictedPython import compile_restricted +import requests from result import Err from result import Ok from result import Result @@ -151,15 +152,20 @@ def partition_by_node(kwargs: dict[str, Any]) -> dict[NodeIdentity, dict[str, UI _obj_exists = False for api in api_list: - if api.services.action.exists(uid): - node_identity = NodeIdentity.from_api(api) - if node_identity not in output_kwargs: - output_kwargs[node_identity] = {k: uid} - else: - output_kwargs[node_identity].update({k: uid}) - - _obj_exists = True - break + try: + if api.services.action.exists(uid): + node_identity = NodeIdentity.from_api(api) + if node_identity not in output_kwargs: + output_kwargs[node_identity] = {k: uid} + else: + output_kwargs[node_identity].update({k: uid}) + + _obj_exists = True + break + except requests.exceptions.ConnectionError: + # To handle the cases , where there an old api objects in + # in APIRegistry + continue if not _obj_exists: raise Exception(f"Input data {k}:{uid} does not belong to any Domain") From 4619473dedbc0108d2cc24a72484a7fa79101f33 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 16 May 2024 20:07:18 +0700 Subject: [PATCH 22/32] [test/integration] trimming down `gateway_local_test.py` --- tests/integration/local/gateway_local_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 8da1d535d2c..14a7b8e316b 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -165,9 +165,7 @@ def test_create_gateway( for peer in gateway_client.api.services.network.get_all_peers(): assert peer.ping_status == NodePeerConnectionStatus.ACTIVE - -@pytest.mark.local_node -def test_create_gateway_client(gateway): + # check the guest client client = gateway.client assert isinstance(client, GatewayClient) assert client.metadata.node_type == NodeType.GATEWAY.value @@ -248,7 +246,6 @@ def test_domain_connect_to_gateway(gateway_association_request_auto_approval, do def test_domain_connect_to_gateway_routes_priority(gateway, domain, domain_2) -> None: """ A test for routes' priority (PythonNodeRoute) - TODO: Add a similar test for HTTPNodeRoute """ gateway_client: GatewayClient = gateway.login( email="info@openmined.org", From 3e42aadbf5f40519f322a8b330262adaff782dd6 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 16 May 2024 20:16:01 +0700 Subject: [PATCH 23/32] [test/integration] small fix --- tests/integration/local/gateway_local_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 14a7b8e316b..ef71e3faf0e 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -166,7 +166,7 @@ def test_create_gateway( assert peer.ping_status == NodePeerConnectionStatus.ACTIVE # check the guest client - client = gateway.client + client = gateway_webserver.client assert isinstance(client, GatewayClient) assert client.metadata.node_type == NodeType.GATEWAY.value From ce5db3aeeea8059f3ae4be36225d4120f4feed09 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 22 May 2024 09:27:21 +0700 Subject: [PATCH 24/32] [syft/network] defining `NodePeerUpdate` --- packages/syft/src/syft/service/network/node_peer.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index 011df08c447..6711e5480d9 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -17,6 +17,7 @@ from ...service.response import SyftError from ...types.datetime import DateTime from ...types.syft_migration import migrate +from ...types.syft_object import PartialSyftObject from ...types.syft_object import SYFT_OBJECT_VERSION_2 from ...types.syft_object import SYFT_OBJECT_VERSION_3 from ...types.syft_object import SyftObject @@ -319,6 +320,18 @@ def delete_route( return None +class NodePeerUpdate(PartialSyftObject): + id: UID + name: str + verify_key: SyftVerifyKey + node_routes: list[NodeRouteType] + node_type: NodeType + admin_email: str + ping_status: NodePeerConnectionStatus + ping_status_message: str + pinged_timestamp: DateTime + + def drop_veilid_route() -> Callable: def _drop_veilid_route(context: TransformContext) -> TransformContext: if context.output: From 3c54597305e5861ce62f17effa24e4aec3a942c7 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 23 May 2024 13:34:42 +0530 Subject: [PATCH 25/32] fix overwriting the whole object in mongo store instead overwrite only fields changed --- .../syft/src/syft/service/network/node_peer.py | 7 ++++--- .../syft/src/syft/store/mongo_document_store.py | 16 ++++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index 6711e5480d9..d28c14d693b 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -18,6 +18,7 @@ from ...types.datetime import DateTime from ...types.syft_migration import migrate from ...types.syft_object import PartialSyftObject +from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.syft_object import SYFT_OBJECT_VERSION_2 from ...types.syft_object import SYFT_OBJECT_VERSION_3 from ...types.syft_object import SyftObject @@ -321,11 +322,11 @@ def delete_route( class NodePeerUpdate(PartialSyftObject): - id: UID + __canonical_name__ = "NodePeerUpdate" + __version__ = SYFT_OBJECT_VERSION_1 + name: str - verify_key: SyftVerifyKey node_routes: list[NodeRouteType] - node_type: NodeType admin_email: str ping_status: NodePeerConnectionStatus ping_status_message: str diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index fa38d6c1ba8..59d6799c2bb 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -357,17 +357,17 @@ def _update( if has_permission or self.has_permission( ActionObjectWRITE(uid=prev_obj.id, credentials=credentials) ): - # we don't want to overwrite Mongo's "id_" or Syft's "id" on update - obj_id = obj["id"] + for key, value in obj.to_dict(exclude_empty=True).items(): + # we don't want to overwrite Mongo's "id_" or Syft's "id" on update + if key == "id": + # protected field + continue - # Set ID to the updated object value - obj.id = prev_obj["id"] + # Overwrite the value if the key is already present + setattr(prev_obj, key, value) # Create the Mongo object - storage_obj = obj.to(self.storage_type) - - # revert the ID - obj.id = obj_id + storage_obj = prev_obj.to(self.storage_type) try: collection.update_one( From 479b948dfdfafbcc79016c4aae295f367c143b1a Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 23 May 2024 13:50:11 +0530 Subject: [PATCH 26/32] use UpdateNodePeer class to update peer ping status during healthchecks --- .../syft/service/network/network_service.py | 27 ------------------- .../src/syft/service/network/node_peer.py | 1 + .../syft/src/syft/service/network/utils.py | 22 +++++++++------ 3 files changed, 15 insertions(+), 35 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index ebd183f0fb2..622316fa606 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -123,31 +123,6 @@ def create_or_update_peer( result = self.set(credentials, peer) return result - def update_peer_ping_status( - self, - credentials: SyftVerifyKey, - peer: NodePeer, - has_permission: bool = False, - ) -> SyftSuccess | SyftError: - """ - Get the existing peer from the store, then only update its ping status related fields - """ - # get the node peer for the given sender peer_id - result = self.get_by_uid(credentials=credentials, uid=peer.id) - if result.is_err(): - return Err( - f"Failed to query peer peer {peer.id} with name '{peer.name}' from stash. Err: {result.err()}" - ) - existing = result.ok() - if existing is None: - return Err( - f"Failed to query peer {peer.id} with name '{peer.name}' from stash: peer is None" - ) - existing.ping_status = peer.ping_status - existing.ping_status_message = peer.ping_status_message - existing.pinged_timestamp = peer.pinged_timestamp - return super().update(credentials, existing, has_permission=has_permission) - def get_by_verify_key( self, credentials: SyftVerifyKey, verify_key: SyftVerifyKey ) -> Result[NodePeer | None, SyftError]: @@ -173,8 +148,6 @@ def __init__(self, store: DocumentStore) -> None: self.store = store self.stash = NetworkStash(store=store) - # TODO: Check with MADHAVA, can we even allow guest user to introduce routes to - # domain nodes? @service_method( path="network.exchange_credentials_with", name="exchange_credentials_with", diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index d28c14d693b..db7b5366973 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -325,6 +325,7 @@ class NodePeerUpdate(PartialSyftObject): __canonical_name__ = "NodePeerUpdate" __version__ = SYFT_OBJECT_VERSION_1 + id: UID name: str node_routes: list[NodeRouteType] admin_email: str diff --git a/packages/syft/src/syft/service/network/utils.py b/packages/syft/src/syft/service/network/utils.py index c42b35a5f6b..4f658475dc3 100644 --- a/packages/syft/src/syft/service/network/utils.py +++ b/packages/syft/src/syft/service/network/utils.py @@ -15,6 +15,7 @@ from .network_service import NodePeerAssociationStatus from .node_peer import NodePeer from .node_peer import NodePeerConnectionStatus +from .node_peer import NodePeerUpdate @serializable(without=["thread"]) @@ -51,20 +52,22 @@ def peer_route_heathcheck(self, context: AuthedServiceContext) -> SyftError | No all_peers: list[NodePeer] = result.ok() for peer in all_peers: - peer.pinged_timestamp = DateTime.now() + peer_update = NodePeerUpdate(id=peer.id) + peer_update.pinged_timestamp = DateTime.now() try: peer_client = peer.client_with_context(context=context) if peer_client.is_err(): logger.error( f"Failed to create client for peer: {peer}: {peer_client.err()}" ) - peer.ping_status = NodePeerConnectionStatus.TIMEOUT + peer_update.ping_status = NodePeerConnectionStatus.TIMEOUT peer_client = None except Exception as e: logger.error( f"Failed to create client for peer: {peer} with exception {e}" ) - peer.ping_status = NodePeerConnectionStatus.TIMEOUT + + peer_update.ping_status = NodePeerConnectionStatus.TIMEOUT peer_client = None if peer_client is not None: @@ -72,21 +75,24 @@ def peer_route_heathcheck(self, context: AuthedServiceContext) -> SyftError | No peer_status = peer_client.api.services.network.check_peer_association( peer_id=context.node.id ) - peer.ping_status = ( + peer_update.ping_status = ( NodePeerConnectionStatus.ACTIVE if peer_status == NodePeerAssociationStatus.PEER_ASSOCIATED else NodePeerConnectionStatus.INACTIVE ) if isinstance(peer_status, SyftError): - peer.ping_status_message = ( + peer_update.ping_status_message = ( f"Error `{peer_status.message}` when pinging peer '{peer.name}'" ) else: - peer.ping_status_message = f"Peer '{peer.name}''s ping status: {peer.ping_status.value.lower()}" + peer_update.ping_status_message = ( + f"Peer '{peer.name}''s ping status: " + f"{peer_update.ping_status.value.lower()}" + ) - result = network_stash.update_peer_ping_status( + result = network_stash.update( credentials=context.node.verify_key, - peer=peer, + peer=peer_update, has_permission=True, ) From 0260756fd77f9781c25633a628841cfda22157ae Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 23 May 2024 17:32:30 +0700 Subject: [PATCH 27/32] [syft/network] add checking `NodePeerUpdate` type in `NetworkStash.update` --- .../src/syft/service/network/network_service.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 622316fa606..5a44afc69cd 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -50,6 +50,7 @@ from ..warnings import CRUDWarning from .association_request import AssociationRequestChange from .node_peer import NodePeer +from .node_peer import NodePeerUpdate from .routes import HTTPNodeRoute from .routes import NodeRoute from .routes import NodeRouteType @@ -87,12 +88,17 @@ def get_by_name( def update( self, credentials: SyftVerifyKey, - peer: NodePeer, + peer: NodePeer | NodePeerUpdate, has_permission: bool = False, ) -> Result[NodePeer, str]: - valid = self.check_type(peer, NodePeer) - if valid.is_err(): - return Err(SyftError(message=valid.err())) + valid_node_peer = self.check_type(peer, NodePeer) + valid_node_peer_update = self.check_type(peer, NodePeerUpdate) + if valid_node_peer.is_err() and valid_node_peer_update.is_err(): + return Err( + SyftError( + message=f"{type(peer)} does not match required type: NodePeer or NodePeerUpdate" + ) + ) return super().update(credentials, peer, has_permission=has_permission) def create_or_update_peer( From f779ba81607975274ffbfe4ef8287f3a00596b30 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 27 May 2024 17:27:50 +0700 Subject: [PATCH 28/32] [syft/network] change from `NodePeer` to `NodePeerUpdate` everywhere when using `NetworkStash.update` in the network service --- .../syft/service/network/network_service.py | 55 ++++++++++++------- .../syft/src/syft/service/network/utils.py | 2 +- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 5a44afc69cd..0cdb848de6c 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -5,7 +5,6 @@ from typing import Any # third party -from result import Err from result import Result # relative @@ -88,18 +87,13 @@ def get_by_name( def update( self, credentials: SyftVerifyKey, - peer: NodePeer | NodePeerUpdate, + peer_update: NodePeerUpdate, has_permission: bool = False, ) -> Result[NodePeer, str]: - valid_node_peer = self.check_type(peer, NodePeer) - valid_node_peer_update = self.check_type(peer, NodePeerUpdate) - if valid_node_peer.is_err() and valid_node_peer_update.is_err(): - return Err( - SyftError( - message=f"{type(peer)} does not match required type: NodePeer or NodePeerUpdate" - ) - ) - return super().update(credentials, peer, has_permission=has_permission) + valid = self.check_type(peer_update, NodePeerUpdate) + if valid.is_err(): + return SyftError(message=valid.err()) + return super().update(credentials, peer_update, has_permission=has_permission) def create_or_update_peer( self, credentials: SyftVerifyKey, peer: NodePeer @@ -119,11 +113,15 @@ def create_or_update_peer( valid = self.check_type(peer, NodePeer) if valid.is_err(): return SyftError(message=valid.err()) + existing = self.get_by_uid(credentials=credentials, uid=peer.id) - if existing.is_ok() and existing.ok(): - existing = existing.ok() - existing.update_routes(peer.node_routes) - result = self.update(credentials, existing) + if existing.is_ok() and existing.ok() is not None: + existing_peer: NodePeer = existing.ok() + existing_peer.update_routes(peer.node_routes) + peer_update = NodePeerUpdate( + id=peer.id, node_routes=existing_peer.node_routes + ) + result = self.update(credentials, peer_update) return result else: result = self.set(credentials, peer) @@ -233,7 +231,7 @@ def exchange_credentials_with( f"{self_node_peer.node_type} peer '{self_node_peer.name}' information change detected." ) if isinstance(result, SyftError): - msg.apnpend( + msg.append( f"Attempt to remotely update {self_node_peer.node_type} peer " f"'{self_node_peer.name}' information remotely failed." ) @@ -474,9 +472,17 @@ def update_peer( context: AuthedServiceContext, peer: NodePeer, ) -> SyftSuccess | SyftError: + # try setting all fields of NodePeerUpdate according to NodePeer + peer_update = NodePeerUpdate() + for field_name, value in peer.to_dict().items(): + try: + setattr(peer_update, field_name, value) + except AttributeError: + print(f"Failed to set {field_name} to {value}") + pass result = self.stash.update( credentials=context.node.verify_key, - peer=peer, + peer_update=peer_update, ) if result.is_err(): return SyftError( @@ -591,9 +597,12 @@ def add_route( f"peer '{remote_node_peer.name}' with id '{existed_route.id}'." ) # update the peer in the store with the updated routes + peer_update = NodePeerUpdate( + id=remote_node_peer.id, node_routes=remote_node_peer.node_routes + ) result = self.stash.update( credentials=context.node.verify_key, - peer=remote_node_peer, + peer_update=peer_update, ) if result.is_err(): return SyftError(message=str(result.err())) @@ -749,8 +758,11 @@ def delete_route( ) else: # update the peer with the route removed + peer_update = NodePeerUpdate( + id=remote_node_peer.id, node_routes=remote_node_peer.node_routes + ) result = self.stash.update( - credentials=context.node.verify_key, peer=remote_node_peer + credentials=context.node.verify_key, peer_update=peer_update ) if result.is_err(): return SyftError(message=str(result.err())) @@ -848,7 +860,10 @@ def update_route_priority( return updated_node_route new_priority: int = updated_node_route.priority # update the peer in the store - result = self.stash.update(context.node.verify_key, remote_node_peer) + peer_update = NodePeerUpdate( + id=remote_node_peer.id, node_routes=remote_node_peer.node_routes + ) + result = self.stash.update(context.node.verify_key, peer_update) if result.is_err(): return SyftError(message=str(result.err())) diff --git a/packages/syft/src/syft/service/network/utils.py b/packages/syft/src/syft/service/network/utils.py index 4f658475dc3..b03bc589d15 100644 --- a/packages/syft/src/syft/service/network/utils.py +++ b/packages/syft/src/syft/service/network/utils.py @@ -92,7 +92,7 @@ def peer_route_heathcheck(self, context: AuthedServiceContext) -> SyftError | No result = network_stash.update( credentials=context.node.verify_key, - peer=peer_update, + peer_update=peer_update, has_permission=True, ) From a0ded94e6eb04dd2091fc5520da21f8c73de24d1 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 27 May 2024 17:59:03 +0700 Subject: [PATCH 29/32] [syft/network] fix bug in updating existing peer network service --- packages/syft/src/syft/service/network/network_service.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 0cdb848de6c..7d6e3eba9b9 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -5,6 +5,7 @@ from typing import Any # third party +from loguru import logger from result import Result # relative @@ -233,7 +234,7 @@ def exchange_credentials_with( if isinstance(result, SyftError): msg.append( f"Attempt to remotely update {self_node_peer.node_type} peer " - f"'{self_node_peer.name}' information remotely failed." + f"'{self_node_peer.name}' information remotely failed. Error: {result.message}" ) return SyftError(message="\n".join(msg)) msg.append( @@ -477,8 +478,8 @@ def update_peer( for field_name, value in peer.to_dict().items(): try: setattr(peer_update, field_name, value) - except AttributeError: - print(f"Failed to set {field_name} to {value}") + except Exception as e: + logger.debug(f"Failed to set {field_name} to {value}. Exception: {e}") pass result = self.stash.update( credentials=context.node.verify_key, From cd440556eacd4c3c592bf4ea538884df9b638715 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 28 May 2024 09:57:44 +0700 Subject: [PATCH 30/32] [bugfix] fix `KeyError: 'context'` in `reconstruct_args_kwargs` --- packages/syft/src/syft/service/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/service.py b/packages/syft/src/syft/service/service.py index 55f2c1f4b5d..cda115cb8b4 100644 --- a/packages/syft/src/syft/service/service.py +++ b/packages/syft/src/syft/service/service.py @@ -289,7 +289,7 @@ def reconstruct_args_kwargs( else: raise Exception(f"Missing {param_key} not in kwargs.") - if "context": + if "context" in kwargs: final_kwargs["context"] = kwargs["context"] return (args, final_kwargs) From 362a3d66bcf0fad54be65ac94b183d3878bc0593 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 3 Jun 2024 15:00:50 +0530 Subject: [PATCH 31/32] clean message during route exchange - in node peer update only update the fields that have changed --- .../syft/service/network/network_service.py | 48 +++++++++++-------- .../src/syft/store/sqlite_document_store.py | 1 - 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 30df5991a31..ce78d35d9d7 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -192,26 +192,21 @@ def exchange_credentials_with( existing_peer_result.is_ok() and (existing_peer := existing_peer_result.ok()) is not None ): - msg = [ - ( - f"{existing_peer.node_type} peer '{existing_peer.name}' already exist for " - f"{self_node_peer.node_type} '{self_node_peer.name}'." - ) - ] + logger.info( + f"{remote_node_peer.node_type} '{remote_node_peer.name}' already exist as a peer for " + f"{self_node_peer.node_type} '{self_node_peer.name}'." + ) + if existing_peer != remote_node_peer: result = self.stash.create_or_update_peer( context.node.verify_key, remote_node_peer, ) - msg.append( - f"{existing_peer.node_type} peer '{existing_peer.name}' information change detected." - ) if result.is_err(): - msg.append( - f"Attempt to update peer '{existing_peer.name}' information failed." + return SyftError( + message=f"Failed to update peer: {remote_node_peer.name} information." ) - return SyftError(message="\n".join(msg)) - msg.append( + logger.info( f"{existing_peer.node_type} peer '{existing_peer.name}' information successfully updated." ) @@ -220,7 +215,7 @@ def exchange_credentials_with( name=self_node_peer.name ) if isinstance(remote_self_node_peer, NodePeer): - msg.append( + logger.info( f"{self_node_peer.node_type} '{self_node_peer.name}' already exist " f"as a peer for {remote_node_peer.node_type} '{remote_node_peer.name}'." ) @@ -228,20 +223,21 @@ def exchange_credentials_with( result = remote_client.api.services.network.update_peer( peer=self_node_peer, ) - msg.append( + logger.info( f"{self_node_peer.node_type} peer '{self_node_peer.name}' information change detected." ) if isinstance(result, SyftError): - msg.append( + logger.error( f"Attempt to remotely update {self_node_peer.node_type} peer " f"'{self_node_peer.name}' information remotely failed. Error: {result.message}" ) - return SyftError(message="\n".join(msg)) - msg.append( + return SyftError(message="Failed to update peer information.") + + logger.info( f"{self_node_peer.node_type} peer '{self_node_peer.name}' " f"information successfully updated." ) - msg.append( + msg = ( f"Routes between {remote_node_peer.node_type} '{remote_node_peer.name}' and " f"{self_node_peer.node_type} '{self_node_peer.name}' already exchanged." ) @@ -474,10 +470,20 @@ def update_peer( peer: NodePeer, ) -> SyftSuccess | SyftError: # try setting all fields of NodePeerUpdate according to NodePeer + + get_peer_result = self.stash.get_by_uid(context.credentials, peer.id) + if get_peer_result.is_err(): + return SyftError( + message=f"Failed to get peer '{peer.name}'. Error: {get_peer_result.err()}" + ) + existing_peer = get_peer_result.ok() peer_update = NodePeerUpdate() - for field_name, value in peer.to_dict().items(): + + # Only update the fields that have changed + for field_name, value in existing_peer.to_dict().items(): try: - setattr(peer_update, field_name, value) + if getattr(peer, field_name) != value: + setattr(peer_update, field_name, value) except Exception as e: logger.debug(f"Failed to set {field_name} to {value}. Exception: {e}") pass diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py index 3db187c35cc..8ef1b2803a8 100644 --- a/packages/syft/src/syft/store/sqlite_document_store.py +++ b/packages/syft/src/syft/store/sqlite_document_store.py @@ -353,7 +353,6 @@ def __del__(self) -> None: self._close() except Exception as e: print(f"Could not close connection. Error: {e}") - raise e @serializable() From ba0445bff68d22a64b8885d946821577ebc6d6bf Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 3 Jun 2024 18:10:42 +0530 Subject: [PATCH 32/32] fix peer update service method - add serializable decorator to NodePeerUpdate --- .../src/syft/protocol/protocol_version.json | 7 +++++ .../syft/service/network/network_service.py | 29 ++++++------------- .../src/syft/service/network/node_peer.py | 1 + 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 233e7c3c355..305a4904798 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -246,6 +246,13 @@ "hash": "c1796e7b01c9eae0dbf59cfd5c2c2f0e7eba593e0cea615717246572b27aae4b", "action": "remove" } + }, + "NodePeerUpdate": { + "1": { + "version": 1, + "hash": "9e7cd39f6a9f90e8c595452865525e0989df1688236acfd1a665ed047ba47de9", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index ce78d35d9d7..b38d822c7f4 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -220,8 +220,11 @@ def exchange_credentials_with( f"as a peer for {remote_node_peer.node_type} '{remote_node_peer.name}'." ) if remote_self_node_peer != self_node_peer: + updated_peer = NodePeerUpdate( + id=self_node_peer.id, node_routes=self_node_peer.node_routes + ) result = remote_client.api.services.network.update_peer( - peer=self_node_peer, + peer_update=updated_peer ) logger.info( f"{self_node_peer.node_type} peer '{self_node_peer.name}' information change detected." @@ -462,38 +465,24 @@ def get_peers_by_type( return result.ok() or [] @service_method( - path="network.update_peer", name="update_peer", roles=GUEST_ROLE_LEVEL + path="network.update_peer", + name="update_peer", + roles=GUEST_ROLE_LEVEL, ) def update_peer( self, context: AuthedServiceContext, - peer: NodePeer, + peer_update: NodePeerUpdate, ) -> SyftSuccess | SyftError: # try setting all fields of NodePeerUpdate according to NodePeer - get_peer_result = self.stash.get_by_uid(context.credentials, peer.id) - if get_peer_result.is_err(): - return SyftError( - message=f"Failed to get peer '{peer.name}'. Error: {get_peer_result.err()}" - ) - existing_peer = get_peer_result.ok() - peer_update = NodePeerUpdate() - - # Only update the fields that have changed - for field_name, value in existing_peer.to_dict().items(): - try: - if getattr(peer, field_name) != value: - setattr(peer_update, field_name, value) - except Exception as e: - logger.debug(f"Failed to set {field_name} to {value}. Exception: {e}") - pass result = self.stash.update( credentials=context.node.verify_key, peer_update=peer_update, ) if result.is_err(): return SyftError( - message=f"Failed to update peer '{peer.name}'. Error: {result.err()}" + message=f"Failed to update peer '{peer_update.name}'. Error: {result.err()}" ) return SyftSuccess( message=f"Peer '{result.ok().name}' information successfully updated." diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index a780de3a2c2..c2db506ba23 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -321,6 +321,7 @@ def delete_route( return None +@serializable() class NodePeerUpdate(PartialSyftObject): __canonical_name__ = "NodePeerUpdate" __version__ = SYFT_OBJECT_VERSION_1