Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency issue with NodePeer update #8851

Merged
merged 60 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
355e513
[syft/network] getting online networks for python gateway node
khoaguin May 9, 2024
9316f69
[syft/network] when checking for online domains, if not able to creat…
khoaguin May 9, 2024
7fe68f9
[syft/network] on getting online domains by checking `domain.ping_sta…
khoaguin May 9, 2024
25eaaee
[syft/network] add total line to show number of online networks / all…
khoaguin May 10, 2024
eddc66f
[syft/network] - update ping url for networks without frontend
khoaguin May 10, 2024
8182a14
[syft/network] integrating `background_tasks` into `Orchestra.launch`
khoaguin May 10, 2024
8268c29
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 13, 2024
da596ff
[syft/network] pass `background_tasks=True` to `worker_class`
khoaguin May 13, 2024
71e4a34
[test/integration] add removing peers to the beginning of some tests
khoaguin May 13, 2024
0f89b8b
[syft/network] revert back to use `ping_status` instead of `ping_stat…
khoaguin May 14, 2024
744a289
[test/integration] add remove existing peers before testing for `test…
khoaguin May 14, 2024
5fd4c31
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 14, 2024
c3e0e62
[test/integration] trimming down `network/gateway_test`
khoaguin May 14, 2024
c89d671
[test/integration] add some waiting times for local gateway tests bef…
khoaguin May 14, 2024
243ca68
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 14, 2024
3782033
[test/integration] update gateway tests
khoaguin May 14, 2024
1545ec3
[test/integration] update gateway k8s tests
khoaguin May 14, 2024
b2b057f
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 14, 2024
2c45fba
[syft/network] - pick the highest priority route to be the oldest by …
khoaguin May 15, 2024
eaf59b5
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
a6cd9c9
[syft/network] `PeerHealthCheckTask.peer_route_heathcheck` now only
khoaguin May 15, 2024
e20ff1f
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
231bcbf
fix linting
khoaguin May 15, 2024
095a235
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
342439f
[syft/network] return Err if the returned node peer is None
khoaguin May 15, 2024
1384e0e
[test/integration] allow running `gateway_local_test.py` in `syft.tes…
khoaguin May 15, 2024
2dd8873
[test/integration] add `@pytest.mark.network` for tests in `gateway_t…
khoaguin May 15, 2024
821efef
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
abee20d
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 16, 2024
7507fff
add a try catch block to catch invalid api registry entries
rasswanth-s May 16, 2024
4619473
[test/integration] trimming down `gateway_local_test.py`
khoaguin May 16, 2024
3e69bbc
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 16, 2024
3e42aad
[test/integration] small fix
khoaguin May 16, 2024
208f7cc
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 19, 2024
a1d9dab
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 20, 2024
63e4648
Merge branch 'dev' into peer-health-check-online-domains
shubham3121 May 21, 2024
a138151
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 22, 2024
ce5db3a
[syft/network] defining `NodePeerUpdate`
khoaguin May 22, 2024
3c54597
fix overwriting the whole object in mongo store instead overwrite onl…
shubham3121 May 23, 2024
479b948
use UpdateNodePeer class to update peer ping status during healthchecks
shubham3121 May 23, 2024
0260756
[syft/network] add checking `NodePeerUpdate` type in `NetworkStash.up…
khoaguin May 23, 2024
752c8cf
Merge branch 'dev' into node-peer-partial-update
khoaguin May 23, 2024
a9b23c6
Merge branch 'dev' into node-peer-partial-update
khoaguin May 24, 2024
1cc75e7
Merge branch 'dev' into node-peer-partial-update
khoaguin May 24, 2024
5a703f8
Merge branch 'dev' into node-peer-partial-update
khoaguin May 27, 2024
f779ba8
[syft/network] change from `NodePeer` to `NodePeerUpdate` everywhere …
khoaguin May 27, 2024
eec708a
Merge branch 'dev' into node-peer-partial-update
khoaguin May 27, 2024
a0ded94
[syft/network] fix bug in updating existing peer network service
khoaguin May 27, 2024
4239e29
Merge branch 'dev' into node-peer-partial-update
khoaguin May 27, 2024
ef306f2
Merge branch 'dev' into node-peer-partial-update
khoaguin May 27, 2024
009123e
Merge branch 'dev' into node-peer-partial-update
khoaguin May 28, 2024
cd44055
[bugfix] fix `KeyError: 'context'` in `reconstruct_args_kwargs`
khoaguin May 28, 2024
a98ba6e
Merge branch 'dev' into node-peer-partial-update
khoaguin May 28, 2024
e3bd251
Merge branch 'dev' into node-peer-partial-update
khoaguin May 30, 2024
b9ddc4d
Merge branch 'dev' into node-peer-partial-update
khoaguin Jun 2, 2024
453ce6b
Merge branch 'dev' into node-peer-partial-update
khoaguin Jun 3, 2024
1127d9d
Merge branch 'dev' into node-peer-partial-update
shubham3121 Jun 3, 2024
362a3d6
clean message during route exchange
shubham3121 Jun 3, 2024
60b2284
Merge pull request #8885 from OpenMined/pr-review-changes
shubham3121 Jun 3, 2024
ba0445b
fix peer update service method
shubham3121 Jun 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 69 additions & 28 deletions packages/syft/src/syft/client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

# relative
from ..service.metadata.node_metadata import NodeMetadataJSON
from ..service.network.network_service import NodePeer
from ..service.network.node_peer import NodePeer
from ..service.network.node_peer import NodePeerConnectionStatus
from ..service.response import SyftException
from ..types.grid_url import GridURL
from ..util.constants import DEFAULT_TIMEOUT
Expand Down Expand Up @@ -120,13 +121,40 @@ def _repr_html_(self) -> str:
on = self.online_networks
if len(on) == 0:
return "(no gateways online - try syft.gateways.all_networks to see offline gateways)"
return pd.DataFrame(on)._repr_html_() # type: ignore
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[
f"{len(on)} / {len(self.all_networks)} (online networks / all networks)"
]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df._repr_html_() # type: ignore

def __repr__(self) -> str:
on = self.online_networks
if len(on) == 0:
return "(no gateways online - try syft.gateways.all_networks to see offline gateways)"
return pd.DataFrame(on).to_string()
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[
f"{len(on)} / {len(self.all_networks)} (online networks / all networks)"
]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df.to_string()

def __len__(self) -> int:
return len(self.all_networks)

@staticmethod
def create_client(network: dict[str, Any]) -> Client:
Expand Down Expand Up @@ -228,32 +256,25 @@ def check_network(network: dict) -> dict[Any, Any] | None:

@property
def online_domains(self) -> list[tuple[NodePeer, NodeMetadataJSON | None]]:
def check_domain(
peer: NodePeer,
) -> tuple[NodePeer, NodeMetadataJSON | None] | None:
try:
guest_client = peer.guest_client
metadata = guest_client.metadata
return peer, metadata
except Exception as e: # nosec
print(f"Error in checking domain with exception {e}")
return None

networks = self.online_networks

# We can use a with statement to ensure threads are cleaned up promptly
with futures.ThreadPoolExecutor(max_workers=20) as executor:
# map
_all_online_domains = []
for network in networks:
_all_online_domains = []
for network in networks:
try:
network_client = NetworkRegistry.create_client(network)
domains: list[NodePeer] = network_client.domains.retrieve_nodes()
for domain in domains:
self.all_domains[str(domain.id)] = domain
_online_domains = list(
executor.map(lambda domain: check_domain(domain), domains)
)
_all_online_domains += _online_domains
except Exception as e:
print(f"Error in creating network client with exception {e}")
continue

domains: list[NodePeer] = network_client.domains.retrieve_nodes()
for domain in domains:
self.all_domains[str(domain.id)] = domain

_all_online_domains += [
(domain, domain.guest_client.metadata)
for domain in domains
if domain.ping_status == NodePeerConnectionStatus.ACTIVE
]

return [domain for domain in _all_online_domains if domain is not None]

Expand Down Expand Up @@ -281,13 +302,33 @@ def _repr_html_(self) -> str:
on: list[dict[str, Any]] = self.__make_dict__()
if len(on) == 0:
return "(no domains online - try syft.domains.all_domains to see offline domains)"
return pd.DataFrame(on)._repr_html_() # type: ignore
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[f"{len(on)} / {len(self.all_domains)} (online domains / all domains)"]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df._repr_html_() # type: ignore

def __repr__(self) -> str:
on: list[dict[str, Any]] = self.__make_dict__()
if len(on) == 0:
return "(no domains online - try syft.domains.all_domains to see offline domains)"
return pd.DataFrame(on).to_string()
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[f"{len(on)} / {len(self.all_domains)} (online domains / all domains)"]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df._repr_html_() # type: ignore

def create_client(self, peer: NodePeer) -> Client:
try:
Expand Down
22 changes: 12 additions & 10 deletions packages/syft/src/syft/service/network/network_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from ..warnings import CRUDWarning
from .association_request import AssociationRequestChange
from .node_peer import NodePeer
from .node_peer import NodePeerUpdate
from .routes import HTTPNodeRoute
from .routes import NodeRoute
from .routes import NodeRouteType
Expand Down Expand Up @@ -87,13 +88,18 @@ def get_by_name(
def update(
self,
credentials: SyftVerifyKey,
peer: NodePeer,
peer: NodePeer | NodePeerUpdate,
khoaguin marked this conversation as resolved.
Show resolved Hide resolved
has_permission: bool = False,
) -> Result[NodePeer, str]:
valid = self.check_type(peer, NodePeer)
if valid.is_err():
return Err(SyftError(message=valid.err()))
return super().update(credentials, peer)
valid_node_peer = self.check_type(peer, NodePeer)
valid_node_peer_update = self.check_type(peer, NodePeerUpdate)
if valid_node_peer.is_err() and valid_node_peer_update.is_err():
return Err(
SyftError(
message=f"{type(peer)} does not match required type: NodePeer or NodePeerUpdate"
)
)
return super().update(credentials, peer, has_permission=has_permission)

def create_or_update_peer(
self, credentials: SyftVerifyKey, peer: NodePeer
Expand All @@ -113,9 +119,7 @@ def create_or_update_peer(
valid = self.check_type(peer, NodePeer)
if valid.is_err():
return SyftError(message=valid.err())
existing: Result | NodePeer = self.get_by_uid(
credentials=credentials, uid=peer.id
)
existing = self.get_by_uid(credentials=credentials, uid=peer.id)
if existing.is_ok() and existing.ok():
existing = existing.ok()
existing.update_routes(peer.node_routes)
khoaguin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -150,8 +154,6 @@ def __init__(self, store: DocumentStore) -> None:
self.store = store
self.stash = NetworkStash(store=store)

# TODO: Check with MADHAVA, can we even allow guest user to introduce routes to
# domain nodes?
@service_method(
path="network.exchange_credentials_with",
name="exchange_credentials_with",
Expand Down
66 changes: 49 additions & 17 deletions packages/syft/src/syft/service/network/node_peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from ...service.response import SyftError
from ...types.datetime import DateTime
from ...types.syft_migration import migrate
from ...types.syft_object import PartialSyftObject
from ...types.syft_object import SYFT_OBJECT_VERSION_1
from ...types.syft_object import SYFT_OBJECT_VERSION_2
from ...types.syft_object import SYFT_OBJECT_VERSION_3
from ...types.syft_object import SyftObject
Expand Down Expand Up @@ -71,7 +73,7 @@ class NodePeer(SyftObject):
"name",
"node_type",
"admin_email",
"ping_status.value",
"ping_status",
"ping_status_message",
"pinged_timestamp",
]
Expand Down Expand Up @@ -117,9 +119,9 @@ def existed_route(

return (False, None)

def assign_highest_priority(self, route: NodeRoute) -> NodeRoute:
def update_route_priority(self, route: NodeRoute) -> NodeRoute:
"""
Assign the new_route's to have the highest priority
Assign the new_route's priority to be current max + 1

Args:
route (NodeRoute): The new route whose priority is to be updated.
Expand All @@ -131,15 +133,39 @@ def assign_highest_priority(self, route: NodeRoute) -> NodeRoute:
route.priority = current_max_priority + 1
return route

def pick_highest_priority_route(self, oldest: bool = True) -> NodeRoute:
"""
Picks the route with the highest priority from the list of node routes.

Args:
oldest (bool):
If True, picks the oldest route to have the highest priority,
meaning the route with min priority value.
If False, picks the most recent route with the highest priority,
meaning the route with max priority value.

Returns:
NodeRoute: The route with the highest priority.

"""
highest_priority_route: NodeRoute = self.node_routes[-1]
for route in self.node_routes[:-1]:
if oldest:
if route.priority < highest_priority_route.priority:
highest_priority_route = route
else:
if route.priority > highest_priority_route.priority:
highest_priority_route = route
return highest_priority_route

def update_route(self, route: NodeRoute) -> NodeRoute | None:
"""
Update the route for the node.
If the route already exists, return it.
If the route is new, assign it to have the highest priority
before appending it to the peer's list of node routes.
If the route is new, assign it to have the priority of (current_max + 1)

Args:
route (NodeRoute): The new route to be added to the peer.
route (NodeRoute): The new route to be added to the peer's node route list

Returns:
NodeRoute | None: if the route already exists, return it, else returns None
Expand All @@ -148,7 +174,7 @@ def update_route(self, route: NodeRoute) -> NodeRoute | None:
if existed:
return route
else:
new_route = self.assign_highest_priority(route)
new_route = self.update_route_priority(route)
self.node_routes.append(new_route)
return None

Expand Down Expand Up @@ -199,7 +225,7 @@ def update_existed_route_priority(
if priority is not None:
self.node_routes[index].priority = priority
else:
self.node_routes[index].priority = self.assign_highest_priority(
self.node_routes[index].priority = self.update_route_priority(
route
).priority

Expand All @@ -223,7 +249,7 @@ def client_with_context(

if len(self.node_routes) < 1:
raise ValueError(f"No routes to peer: {self}")
# select the highest priority route (i.e. added or updated the latest)
# select the route with highest priority to connect to the peer
final_route: NodeRoute = self.pick_highest_priority_route()
connection: NodeConnection = route_to_connection(route=final_route)
try:
Expand All @@ -244,7 +270,7 @@ def client_with_context(
def client_with_key(self, credentials: SyftSigningKey) -> SyftClient | SyftError:
if len(self.node_routes) < 1:
raise ValueError(f"No routes to peer: {self}")
# select the latest added route

final_route: NodeRoute = self.pick_highest_priority_route()

connection = route_to_connection(route=final_route)
Expand All @@ -262,13 +288,6 @@ def guest_client(self) -> SyftClient:
def proxy_from(self, client: SyftClient) -> SyftClient:
return client.proxy_to(self)

def pick_highest_priority_route(self) -> NodeRoute:
highest_priority_route: NodeRoute = self.node_routes[-1]
for route in self.node_routes:
if route.priority > highest_priority_route.priority:
highest_priority_route = route
return highest_priority_route

def delete_route(
self, route: NodeRouteType | None = None, route_id: UID | None = None
) -> SyftError | None:
Expand Down Expand Up @@ -302,6 +321,19 @@ def delete_route(
return None


class NodePeerUpdate(PartialSyftObject):
__canonical_name__ = "NodePeerUpdate"
__version__ = SYFT_OBJECT_VERSION_1

id: UID
name: str
node_routes: list[NodeRouteType]
admin_email: str
ping_status: NodePeerConnectionStatus
ping_status_message: str
pinged_timestamp: DateTime


def drop_veilid_route() -> Callable:
def _drop_veilid_route(context: TransformContext) -> TransformContext:
if context.output:
Expand Down
20 changes: 13 additions & 7 deletions packages/syft/src/syft/service/network/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .network_service import NodePeerAssociationStatus
from .node_peer import NodePeer
from .node_peer import NodePeerConnectionStatus
from .node_peer import NodePeerUpdate


@serializable(without=["thread"])
Expand Down Expand Up @@ -51,42 +52,47 @@ def peer_route_heathcheck(self, context: AuthedServiceContext) -> SyftError | No
all_peers: list[NodePeer] = result.ok()

for peer in all_peers:
peer.pinged_timestamp = DateTime.now()
peer_update = NodePeerUpdate(id=peer.id)
peer_update.pinged_timestamp = DateTime.now()
try:
peer_client = peer.client_with_context(context=context)
if peer_client.is_err():
logger.error(
f"Failed to create client for peer: {peer}: {peer_client.err()}"
)
peer.ping_status = NodePeerConnectionStatus.TIMEOUT
peer_update.ping_status = NodePeerConnectionStatus.TIMEOUT
peer_client = None
except Exception as e:
logger.error(
f"Failed to create client for peer: {peer} with exception {e}"
)
peer.ping_status = NodePeerConnectionStatus.TIMEOUT

peer_update.ping_status = NodePeerConnectionStatus.TIMEOUT
peer_client = None

if peer_client is not None:
peer_client = peer_client.ok()
peer_status = peer_client.api.services.network.check_peer_association(
peer_id=context.node.id
)
peer.ping_status = (
peer_update.ping_status = (
NodePeerConnectionStatus.ACTIVE
if peer_status == NodePeerAssociationStatus.PEER_ASSOCIATED
else NodePeerConnectionStatus.INACTIVE
)
if isinstance(peer_status, SyftError):
peer.ping_status_message = (
peer_update.ping_status_message = (
f"Error `{peer_status.message}` when pinging peer '{peer.name}'"
)
else:
peer.ping_status_message = f"Peer '{peer.name}''s ping status: {peer.ping_status.value.lower()}"
peer_update.ping_status_message = (
f"Peer '{peer.name}''s ping status: "
f"{peer_update.ping_status.value.lower()}"
)

result = network_stash.update(
credentials=context.node.verify_key,
peer=peer,
peer=peer_update,
has_permission=True,
)

Expand Down
Loading
Loading