From dbda0967f4088cf67b0bdd8ddbca5bfc5344eff0 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 11 Oct 2023 19:25:39 +0530 Subject: [PATCH 1/6] implement methods to migrate exists data to new klass type in store partitions --- .../syft/src/syft/store/document_store.py | 16 +++++++++ .../syft/src/syft/store/kv_document_store.py | 26 ++++++++++++++ .../src/syft/store/mongo_document_store.py | 35 +++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/packages/syft/src/syft/store/document_store.py b/packages/syft/src/syft/store/document_store.py index 98ca622c5bb..ef383d5783c 100644 --- a/packages/syft/src/syft/store/document_store.py +++ b/packages/syft/src/syft/store/document_store.py @@ -456,6 +456,14 @@ def all( ) -> Result[List[BaseStash.object_type], str]: return self._thread_safe_cbk(self._all, credentials, order_by, has_permission) + def migrate_data( + self, + to_klass: SyftObject, + credentials: SyftVerifyKey, + has_permission: Optional[bool] = False, + ) -> Result[bool, str]: + self._thread_safe_cbk(self._migrate_data, to_klass, credentials, has_permission) + # Potentially thread-unsafe methods. # CAUTION: # * Don't use self.lock here. @@ -497,6 +505,14 @@ def remove_permission(self, permission: ActionObjectPermission) -> None: def has_permission(self, permission: ActionObjectPermission) -> bool: raise NotImplementedError + def _migrate_data( + self, + to_klass: SyftObject, + credentials: SyftVerifyKey, + hash_permission: bool, + ) -> Result[bool, str]: + raise NotImplementedError + @instrument @serializable() diff --git a/packages/syft/src/syft/store/kv_document_store.py b/packages/syft/src/syft/store/kv_document_store.py index bc1c6d2ea37..8b99213d647 100644 --- a/packages/syft/src/syft/store/kv_document_store.py +++ b/packages/syft/src/syft/store/kv_document_store.py @@ -608,3 +608,29 @@ def _set_data_and_keys( self.searchable_keys[pk_key] = ck_col self.data[store_query_key.value] = obj + + def _migrate_data( + self, to_klass: SyftObject, credentials: SyftVerifyKey, hash_permission: bool + ) -> Result[bool, str]: + hash_permission = (credentials == self.root_verify_key) or hash_permission + + if hash_permission: + for key, value in self.data: + try: + migrated_value = value.migrate_to(to_klass) + except Exception: + return Err(f"Failed to migrate data to {to_klass} for qk: {key}") + qk = QueryKey.from_obj(key) + result = self._update( + credentials, + qk=qk, + obj=migrated_value, + has_permission=hash_permission, + ) + + if result.is_err(): + return result.err() + + return Ok(True) + + return Err("You don't have permissions to migrate data.") diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index 9a79aac1253..5c5ed89aa56 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -559,6 +559,41 @@ def __len__(self): collection: MongoCollection = collection_status.ok() return collection.count_documents(filter={}) + def _migrate_data( + self, to_klass: SyftObject, credentials: SyftVerifyKey, hash_permission: bool + ) -> Result[bool, str]: + hash_permission = (credentials == self.root_verify_key) or hash_permission + collection_status = self.collection + if collection_status.is_err(): + return collection_status + collection: MongoCollection = collection_status.ok() + + if hash_permission: + storage_objs = collection.find({}) + for storage_obj in storage_objs: + obj = self.storage_type(storage_obj) + transform_context = TransformContext(output={}, obj=obj) + value = obj.to(self.settings.object_type, transform_context) + key = obj.get("_id") + try: + migrated_value = value.migrate_to(to_klass) + except Exception: + return Err(f"Failed to migrate data to {to_klass} for qk: {key}") + qk = QueryKey.from_obj(key) + result = self._update( + credentials, + qk=qk, + obj=migrated_value, + has_permission=hash_permission, + ) + + if result.is_err(): + return result.err() + + return Ok(True) + + return Err("You don't have permissions to migrate data.") + @serializable() class MongoDocumentStore(DocumentStore): From 5ccb255d70b1f9071032a379500312199ce530a2 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 12 Oct 2023 18:14:57 +0530 Subject: [PATCH 2/6] add migrate data functionality to KeyValueActionStore - add method to find pending migrations - add method to find and migrate data for both document and action store --- packages/syft/src/syft/node/node.py | 83 ++++++++++++++----- .../src/syft/service/action/action_store.py | 24 ++++++ .../syft/src/syft/store/document_store.py | 2 +- .../syft/src/syft/store/kv_document_store.py | 8 +- .../src/syft/store/mongo_document_store.py | 8 +- 5 files changed, 97 insertions(+), 28 deletions(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index eb0e9e7d7a3..b350fc78efc 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -337,6 +337,8 @@ def __init__( self.init_blob_storage(config=blob_storage_config) + self.find_and_migrate_data() + NodeRegistry.set_node_for(self.id, self) def init_blob_storage(self, config: Optional[BlobStorageConfig] = None) -> None: @@ -457,9 +459,9 @@ def root_client(self): root_client.api.refresh_api_callback() return root_client - def _find_pending_migrations(self): - klasses_to_be_migrated = [] - + def _find_klasses_pending_for_migration( + self, object_types: List[SyftObject] + ) -> List[SyftObject]: context = AuthedServiceContext( node=self, credentials=self.verify_key, @@ -467,39 +469,82 @@ def _find_pending_migrations(self): ) migration_state_service = self.get_service(MigrateStateService) - canonical_name_version_map = [] - - # Track all object types from document store - for partition in self.document_store.partitions.values(): - object_type = partition.settings.object_type - canonical_name = object_type.__canonical_name__ - object_version = object_type.__version__ - canonical_name_version_map.append((canonical_name, object_version)) + klasses_to_be_migrated = [] - # Track all object types from action store - action_object_types = [Action, ActionObject] - action_object_types.extend(ActionObject.__subclasses__()) - for object_type in action_object_types: + for object_type in object_types: canonical_name = object_type.__canonical_name__ object_version = object_type.__version__ - canonical_name_version_map.append((canonical_name, object_version)) - for canonical_name, current_version in canonical_name_version_map: migration_state = migration_state_service.get_state(context, canonical_name) if ( migration_state is not None and migration_state.current_version != migration_state.latest_version ): - klasses_to_be_migrated.append(canonical_name) + klasses_to_be_migrated.append(object_type) else: migration_state_service.register_migration_state( context, - current_version=current_version, + current_version=object_version, canonical_name=canonical_name, ) return klasses_to_be_migrated + def find_and_migrate_data(self): + # Track all object type that need migration for document store + document_store_object_types = [ + partition.settings.object_type + for partition in self.document_store.partitions.values() + ] + + object_pending_migration = self._find_klasses_pending_for_migration( + object_types=document_store_object_types + ) + + print( + "Object in Document Store that needs migration: ", + object_pending_migration, + ) + + # Migrate data for objects in document store + for object_type in object_pending_migration: + canonical_name = object_type.__canonical_name__ + object_partition = self.document_store.partitions.get(canonical_name) + if object_partition is None: + continue + + migration_status = object_partition.migrate_data( + to_klass=object_type, credentials=self.verify_key + ) + if migration_status.is_err(): + raise Exception( + f"Failed to migrate data for {canonical_name}. Error: {migration_status.err()}" + ) + + # Track all object types from action store + action_object_types = [Action, ActionObject] + action_object_types.extend(ActionObject.__subclasses__()) + action_object_pending_migration = self._find_klasses_pending_for_migration( + action_object_types + ) + + print( + "Object in Action Store that needs migration: ", + action_object_pending_migration, + ) + + # Migrate data for objects in action store + for object_type in action_object_pending_migration: + canonical_name = object_type.__canonical_name__ + + migration_status = self.action_store.migrate_data( + to_klass=object_type, credentials=self.verify_key + ) + if migration_status.is_err(): + raise Exception( + f"Failed to migrate data for {canonical_name}. Error: {migration_status.err()}" + ) + @property def guest_client(self): return self.get_guest_client() diff --git a/packages/syft/src/syft/service/action/action_store.py b/packages/syft/src/syft/service/action/action_store.py index b356bab83bf..f4568352568 100644 --- a/packages/syft/src/syft/service/action/action_store.py +++ b/packages/syft/src/syft/service/action/action_store.py @@ -247,6 +247,30 @@ def add_permissions(self, permissions: List[ActionObjectPermission]) -> None: for permission in permissions: self.add_permission(permission) + def migrate_data(self, to_klass: SyftObject, credentials: SyftVerifyKey): + has_root_permission = credentials == self.root_verify_key + + if has_root_permission: + for key, value in self.data: + try: + if value.__canonical_name__ != to_klass.__canonical_name__: + continue + migrated_value = value.migrate_to(to_klass) + except Exception: + return Err(f"Failed to migrate data to {to_klass} for qk: {key}") + result = self.set( + uid=key, + credentials=credentials, + syft_object=migrated_value, + ) + + if result.is_err(): + return result.err() + + return Ok(True) + + return Err("You don't have permissions to migrate data.") + @serializable() class DictActionStore(KeyValueActionStore): diff --git a/packages/syft/src/syft/store/document_store.py b/packages/syft/src/syft/store/document_store.py index ef383d5783c..049cc8b1766 100644 --- a/packages/syft/src/syft/store/document_store.py +++ b/packages/syft/src/syft/store/document_store.py @@ -509,7 +509,7 @@ def _migrate_data( self, to_klass: SyftObject, credentials: SyftVerifyKey, - hash_permission: bool, + has_permission: bool, ) -> Result[bool, str]: raise NotImplementedError diff --git a/packages/syft/src/syft/store/kv_document_store.py b/packages/syft/src/syft/store/kv_document_store.py index 8b99213d647..a750c0c10a8 100644 --- a/packages/syft/src/syft/store/kv_document_store.py +++ b/packages/syft/src/syft/store/kv_document_store.py @@ -610,11 +610,11 @@ def _set_data_and_keys( self.data[store_query_key.value] = obj def _migrate_data( - self, to_klass: SyftObject, credentials: SyftVerifyKey, hash_permission: bool + self, to_klass: SyftObject, credentials: SyftVerifyKey, has_permission: bool ) -> Result[bool, str]: - hash_permission = (credentials == self.root_verify_key) or hash_permission + has_permission = (credentials == self.root_verify_key) or has_permission - if hash_permission: + if has_permission: for key, value in self.data: try: migrated_value = value.migrate_to(to_klass) @@ -625,7 +625,7 @@ def _migrate_data( credentials, qk=qk, obj=migrated_value, - has_permission=hash_permission, + has_permission=has_permission, ) if result.is_err(): diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index 5c5ed89aa56..0bb0dba021c 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -560,15 +560,15 @@ def __len__(self): return collection.count_documents(filter={}) def _migrate_data( - self, to_klass: SyftObject, credentials: SyftVerifyKey, hash_permission: bool + self, to_klass: SyftObject, credentials: SyftVerifyKey, has_permission: bool ) -> Result[bool, str]: - hash_permission = (credentials == self.root_verify_key) or hash_permission + has_permission = (credentials == self.root_verify_key) or has_permission collection_status = self.collection if collection_status.is_err(): return collection_status collection: MongoCollection = collection_status.ok() - if hash_permission: + if has_permission: storage_objs = collection.find({}) for storage_obj in storage_objs: obj = self.storage_type(storage_obj) @@ -584,7 +584,7 @@ def _migrate_data( credentials, qk=qk, obj=migrated_value, - has_permission=hash_permission, + has_permission=has_permission, ) if result.is_err(): From e7b90e930978096932c3a051f9ede89d21a1f44d Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Oct 2023 18:20:10 +0530 Subject: [PATCH 3/6] rename NodeMetadata to NodeMetadataV2 and NodeMetadataV1 to NodeMetadata - reset protocol version --- packages/syft/src/syft/client/client.py | 4 ++-- packages/syft/src/syft/node/node.py | 11 ++++++----- packages/syft/src/syft/service/metadata/migrations.py | 6 +++--- .../syft/src/syft/service/metadata/node_metadata.py | 8 ++++---- .../syft/src/syft/service/network/network_service.py | 4 ++-- packages/syft/src/syft/service/network/node_peer.py | 4 ++-- packages/syft/src/syft/service/project/project.py | 6 +++--- .../syft/tests/syft/settings/settings_service_test.py | 6 +++--- 8 files changed, 25 insertions(+), 24 deletions(-) diff --git a/packages/syft/src/syft/client/client.py b/packages/syft/src/syft/client/client.py index c6371f487a0..053565a5962 100644 --- a/packages/syft/src/syft/client/client.py +++ b/packages/syft/src/syft/client/client.py @@ -41,8 +41,8 @@ from ..serde.serializable import serializable from ..serde.serialize import _serialize from ..service.context import NodeServiceContext -from ..service.metadata.node_metadata import NodeMetadata from ..service.metadata.node_metadata import NodeMetadataJSON +from ..service.metadata.node_metadata import NodeMetadataV2 from ..service.response import SyftError from ..service.response import SyftSuccess from ..service.user.user import UserCreate @@ -597,7 +597,7 @@ def exchange_route(self, client: Self) -> Union[SyftSuccess, SyftError]: result = self.api.services.network.exchange_credentials_with( self_node_route=self_node_route, remote_node_route=remote_node_route, - remote_node_verify_key=client.metadata.to(NodeMetadata).verify_key, + remote_node_verify_key=client.metadata.to(NodeMetadataV2).verify_key, ) return result diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index b350fc78efc..87cfcbb0e84 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -64,7 +64,7 @@ from ..service.dataset.dataset_service import DatasetService from ..service.enclave.enclave_service import EnclaveService from ..service.metadata.metadata_service import MetadataService -from ..service.metadata.node_metadata import NodeMetadata +from ..service.metadata.node_metadata import NodeMetadataV2 from ..service.network.network_service import NetworkService from ..service.notification.notification_service import NotificationService from ..service.object_search.migration_state_service import MigrateStateService @@ -314,6 +314,9 @@ def __init__( self.services = services self._construct_services() + # Migrate data before any operation on db + self.find_and_migrate_data() + create_admin_new( # nosec B106 name="Jane Doe", email=root_email, @@ -337,8 +340,6 @@ def __init__( self.init_blob_storage(config=blob_storage_config) - self.find_and_migrate_data() - NodeRegistry.set_node_for(self.id, self) def init_blob_storage(self, config: Optional[BlobStorageConfig] = None) -> None: @@ -724,7 +725,7 @@ def _get_service_method_from_path(self, path: str) -> Callable: return getattr(service_obj, method_name) @property - def metadata(self) -> NodeMetadata: + def metadata(self) -> NodeMetadataV2: name = "" deployed_on = "" organization = "" @@ -747,7 +748,7 @@ def metadata(self) -> NodeMetadata: admin_email = settings_data.admin_email show_warnings = settings_data.show_warnings - return NodeMetadata( + return NodeMetadataV2( name=name, id=self.id, verify_key=self.verify_key, diff --git a/packages/syft/src/syft/service/metadata/migrations.py b/packages/syft/src/syft/service/metadata/migrations.py index dd6200b97a2..58d09021eb2 100644 --- a/packages/syft/src/syft/service/metadata/migrations.py +++ b/packages/syft/src/syft/service/metadata/migrations.py @@ -2,10 +2,10 @@ from ...types.syft_migration import migrate from ...types.transforms import rename from .node_metadata import NodeMetadata -from .node_metadata import NodeMetadataV1 +from .node_metadata import NodeMetadataV2 -@migrate(NodeMetadataV1, NodeMetadata) +@migrate(NodeMetadata, NodeMetadataV2) def upgrade_metadata_v1_to_v2(): return [ rename("highest_object_version", "highest_version"), @@ -13,7 +13,7 @@ def upgrade_metadata_v1_to_v2(): ] -@migrate(NodeMetadata, NodeMetadataV1) +@migrate(NodeMetadataV2, NodeMetadata) def downgrade_metadata_v2_to_v1(): return [ rename("highest_version", "highest_object_version"), diff --git a/packages/syft/src/syft/service/metadata/node_metadata.py b/packages/syft/src/syft/service/metadata/node_metadata.py index a3b3922b36e..05f61ba59e7 100644 --- a/packages/syft/src/syft/service/metadata/node_metadata.py +++ b/packages/syft/src/syft/service/metadata/node_metadata.py @@ -63,7 +63,7 @@ class NodeMetadataUpdate(SyftObject): @serializable() -class NodeMetadataV1(SyftObject): +class NodeMetadata(SyftObject): __canonical_name__ = "NodeMetadata" __version__ = SYFT_OBJECT_VERSION_1 @@ -92,7 +92,7 @@ def check_version(self, client_version: str) -> bool: @serializable() -class NodeMetadata(SyftObject): +class NodeMetadataV2(SyftObject): __canonical_name__ = "NodeMetadata" __version__ = SYFT_OBJECT_VERSION_2 @@ -155,7 +155,7 @@ def check_version(self, client_version: str) -> bool: ) -@transform(NodeMetadata, NodeMetadataJSON) +@transform(NodeMetadataV2, NodeMetadataJSON) def metadata_to_json() -> List[Callable]: return [ drop(["__canonical_name__"]), @@ -166,7 +166,7 @@ def metadata_to_json() -> List[Callable]: ] -@transform(NodeMetadataJSON, NodeMetadata) +@transform(NodeMetadataJSON, NodeMetadataV2) def json_to_metadata() -> List[Callable]: return [ drop(["metadata_version", "supported_protocols"]), diff --git a/packages/syft/src/syft/service/network/network_service.py b/packages/syft/src/syft/service/network/network_service.py index 7181027bc7a..d1fb2013eed 100644 --- a/packages/syft/src/syft/service/network/network_service.py +++ b/packages/syft/src/syft/service/network/network_service.py @@ -30,7 +30,7 @@ from ...util.telemetry import instrument from ..context import AuthedServiceContext from ..data_subject.data_subject import NamePartitionKey -from ..metadata.node_metadata import NodeMetadata +from ..metadata.node_metadata import NodeMetadataV2 from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService @@ -556,7 +556,7 @@ def node_route_to_http_connection( return HTTPConnection(url=url, proxy_target_uid=obj.proxy_target_uid) -@transform(NodeMetadata, NodePeer) +@transform(NodeMetadataV2, NodePeer) def metadata_to_peer() -> List[Callable]: return [ keep(["id", "name", "verify_key", "node_type", "admin_email"]), diff --git a/packages/syft/src/syft/service/network/node_peer.py b/packages/syft/src/syft/service/network/node_peer.py index 956f5b50160..4f6ec0466be 100644 --- a/packages/syft/src/syft/service/network/node_peer.py +++ b/packages/syft/src/syft/service/network/node_peer.py @@ -16,7 +16,7 @@ from ...types.syft_object import SyftObject from ...types.uid import UID from ..context import NodeServiceContext -from ..metadata.node_metadata import NodeMetadata +from ..metadata.node_metadata import NodeMetadataV2 from .routes import NodeRoute from .routes import NodeRouteType from .routes import connection_to_route @@ -55,7 +55,7 @@ def from_client(client: SyftClient) -> Self: if not client.metadata: raise Exception("Client has have metadata first") - peer = client.metadata.to(NodeMetadata).to(NodePeer) + peer = client.metadata.to(NodeMetadataV2).to(NodePeer) route = connection_to_route(client.connection) peer.node_routes.append(route) return peer diff --git a/packages/syft/src/syft/service/project/project.py b/packages/syft/src/syft/service/project/project.py index 6a2679694b2..19fc63eb51d 100644 --- a/packages/syft/src/syft/service/project/project.py +++ b/packages/syft/src/syft/service/project/project.py @@ -31,7 +31,7 @@ from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...serde.serialize import _serialize -from ...service.metadata.node_metadata import NodeMetadata +from ...service.metadata.node_metadata import NodeMetadataV2 from ...store.linked_obj import LinkedObject from ...types.datetime import DateTime from ...types.identity import Identity @@ -64,7 +64,7 @@ class EventAlreadyAddedException(SyftException): pass -@transform(NodeMetadata, NodeIdentity) +@transform(NodeMetadataV2, NodeIdentity) def metadata_to_node_identity() -> List[Callable]: return [rename("id", "node_id"), rename("name", "node_name")] @@ -1230,7 +1230,7 @@ def to_node_identity(val: Union[SyftClient, NodeIdentity]): if isinstance(val, NodeIdentity): return val elif isinstance(val, SyftClient): - metadata = val.metadata.to(NodeMetadata) + metadata = val.metadata.to(NodeMetadataV2) return metadata.to(NodeIdentity) else: raise SyftException( diff --git a/packages/syft/tests/syft/settings/settings_service_test.py b/packages/syft/tests/syft/settings/settings_service_test.py index a8ba4d0da68..396c7dad0ff 100644 --- a/packages/syft/tests/syft/settings/settings_service_test.py +++ b/packages/syft/tests/syft/settings/settings_service_test.py @@ -14,7 +14,7 @@ from syft.node.credentials import SyftSigningKey from syft.node.credentials import SyftVerifyKey from syft.service.context import AuthedServiceContext -from syft.service.metadata.node_metadata import NodeMetadata +from syft.service.metadata.node_metadata import NodeMetadataV2 from syft.service.response import SyftError from syft.service.response import SyftSuccess from syft.service.settings.settings import NodeSettings @@ -227,7 +227,7 @@ def test_settings_allow_guest_registration( # Create a new worker verify_key = SyftSigningKey.generate().verify_key - mock_node_metadata = NodeMetadata( + mock_node_metadata = NodeMetadataV2( name=faker.name(), verify_key=verify_key, highest_version=1, @@ -309,7 +309,7 @@ def get_mock_client(faker, root_client, role): return guest_client verify_key = SyftSigningKey.generate().verify_key - mock_node_metadata = NodeMetadata( + mock_node_metadata = NodeMetadataV2( name=faker.name(), verify_key=verify_key, highest_version=1, From d06f0b5ed378e14345f96edfc366fa9e6fc8e455 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Oct 2023 19:52:40 +0530 Subject: [PATCH 4/6] pass auth context to _migrate_data and migrate_to methods - pass klass version to migrate_to in _migrate_data - add a flag to overwrite existing object in _update method - move order of data migration to post setup --- packages/syft/src/syft/node/node.py | 33 ++++++++++++------- .../syft/src/syft/store/document_store.py | 9 +++-- .../syft/src/syft/store/kv_document_store.py | 27 +++++++++------ .../src/syft/store/mongo_document_store.py | 6 ++-- packages/syft/src/syft/types/syft_object.py | 2 +- 5 files changed, 49 insertions(+), 28 deletions(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 87cfcbb0e84..1a2cf7a145d 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -314,9 +314,6 @@ def __init__( self.services = services self._construct_services() - # Migrate data before any operation on db - self.find_and_migrate_data() - create_admin_new( # nosec B106 name="Jane Doe", email=root_email, @@ -340,6 +337,9 @@ def __init__( self.init_blob_storage(config=blob_storage_config) + # Migrate data before any operation on db + self.find_and_migrate_data() + NodeRegistry.set_node_for(self.id, self) def init_blob_storage(self, config: Optional[BlobStorageConfig] = None) -> None: @@ -493,6 +493,11 @@ def _find_klasses_pending_for_migration( def find_and_migrate_data(self): # Track all object type that need migration for document store + context = AuthedServiceContext( + node=self, + credentials=self.verify_key, + role=ServiceRole.ADMIN, + ) document_store_object_types = [ partition.settings.object_type for partition in self.document_store.partitions.values() @@ -502,10 +507,11 @@ def find_and_migrate_data(self): object_types=document_store_object_types ) - print( - "Object in Document Store that needs migration: ", - object_pending_migration, - ) + if object_pending_migration: + print( + "Object in Document Store that needs migration: ", + object_pending_migration, + ) # Migrate data for objects in document store for object_type in object_pending_migration: @@ -514,8 +520,9 @@ def find_and_migrate_data(self): if object_partition is None: continue + print(f"Migrating data for: {canonical_name} table.") migration_status = object_partition.migrate_data( - to_klass=object_type, credentials=self.verify_key + to_klass=object_type, context=context ) if migration_status.is_err(): raise Exception( @@ -529,10 +536,11 @@ def find_and_migrate_data(self): action_object_types ) - print( - "Object in Action Store that needs migration: ", - action_object_pending_migration, - ) + if action_object_pending_migration: + print( + "Object in Action Store that needs migration: ", + action_object_pending_migration, + ) # Migrate data for objects in action store for object_type in action_object_pending_migration: @@ -545,6 +553,7 @@ def find_and_migrate_data(self): raise Exception( f"Failed to migrate data for {canonical_name}. Error: {migration_status.err()}" ) + print("Data Migrated to latest version !!!") @property def guest_client(self): diff --git a/packages/syft/src/syft/store/document_store.py b/packages/syft/src/syft/store/document_store.py index 049cc8b1766..474975a5511 100644 --- a/packages/syft/src/syft/store/document_store.py +++ b/packages/syft/src/syft/store/document_store.py @@ -26,6 +26,7 @@ from ..node.credentials import SyftVerifyKey from ..serde.serializable import serializable from ..service.action.action_permissions import ActionObjectPermission +from ..service.context import AuthedServiceContext from ..service.response import SyftSuccess from ..types.base import SyftBaseModel from ..types.syft_object import SYFT_OBJECT_VERSION_1 @@ -459,10 +460,12 @@ def all( def migrate_data( self, to_klass: SyftObject, - credentials: SyftVerifyKey, + context: AuthedServiceContext, has_permission: Optional[bool] = False, ) -> Result[bool, str]: - self._thread_safe_cbk(self._migrate_data, to_klass, credentials, has_permission) + return self._thread_safe_cbk( + self._migrate_data, to_klass, context, has_permission + ) # Potentially thread-unsafe methods. # CAUTION: @@ -508,7 +511,7 @@ def has_permission(self, permission: ActionObjectPermission) -> bool: def _migrate_data( self, to_klass: SyftObject, - credentials: SyftVerifyKey, + context: AuthedServiceContext, has_permission: bool, ) -> Result[bool, str]: raise NotImplementedError diff --git a/packages/syft/src/syft/store/kv_document_store.py b/packages/syft/src/syft/store/kv_document_store.py index a750c0c10a8..0b86685ea66 100644 --- a/packages/syft/src/syft/store/kv_document_store.py +++ b/packages/syft/src/syft/store/kv_document_store.py @@ -24,6 +24,7 @@ from ..service.action.action_permissions import ActionObjectREAD from ..service.action.action_permissions import ActionObjectWRITE from ..service.action.action_permissions import ActionPermission +from ..service.context import AuthedServiceContext from ..service.response import SyftSuccess from ..types.syft_object import SyftObject from ..types.uid import UID @@ -370,6 +371,7 @@ def _update( qk: QueryKey, obj: SyftObject, has_permission=False, + overwrite=False, ) -> Result[SyftObject, str]: try: if qk.value not in self.data: @@ -396,11 +398,15 @@ def _update( ) # update the object with new data - for key, value in obj.to_dict(exclude_empty=True).items(): - if key == "id": - # protected field - continue - setattr(_original_obj, key, value) + if overwrite: + # Overwrite existing object and their values + _original_obj = obj + else: + for key, value in obj.to_dict(exclude_empty=True).items(): + if key == "id": + # protected field + continue + setattr(_original_obj, key, value) # update data and keys self._set_data_and_keys( @@ -610,22 +616,23 @@ def _set_data_and_keys( self.data[store_query_key.value] = obj def _migrate_data( - self, to_klass: SyftObject, credentials: SyftVerifyKey, has_permission: bool + self, to_klass: SyftObject, context: AuthedServiceContext, has_permission: bool ) -> Result[bool, str]: + credentials = context.credentials has_permission = (credentials == self.root_verify_key) or has_permission - if has_permission: - for key, value in self.data: + for key, value in self.data.items(): try: - migrated_value = value.migrate_to(to_klass) + migrated_value = value.migrate_to(to_klass.__version__, context) except Exception: return Err(f"Failed to migrate data to {to_klass} for qk: {key}") - qk = QueryKey.from_obj(key) + qk = self.settings.store_key.with_obj(key) result = self._update( credentials, qk=qk, obj=migrated_value, has_permission=has_permission, + overwrite=True, ) if result.is_err(): diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index 0bb0dba021c..ac1dcf2ad2b 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -25,6 +25,7 @@ from ..service.action.action_permissions import ActionObjectREAD from ..service.action.action_permissions import ActionObjectWRITE from ..service.action.action_permissions import ActionPermission +from ..service.context import AuthedServiceContext from ..service.response import SyftSuccess from ..types.syft_object import SYFT_OBJECT_VERSION_1 from ..types.syft_object import StorableObjectType @@ -560,8 +561,9 @@ def __len__(self): return collection.count_documents(filter={}) def _migrate_data( - self, to_klass: SyftObject, credentials: SyftVerifyKey, has_permission: bool + self, to_klass: SyftObject, context: AuthedServiceContext, has_permission: bool ) -> Result[bool, str]: + credentials = context.credentials has_permission = (credentials == self.root_verify_key) or has_permission collection_status = self.collection if collection_status.is_err(): @@ -576,7 +578,7 @@ def _migrate_data( value = obj.to(self.settings.object_type, transform_context) key = obj.get("_id") try: - migrated_value = value.migrate_to(to_klass) + migrated_value = value.migrate_to(to_klass.__version__, context) except Exception: return Err(f"Failed to migrate data to {to_klass} for qk: {key}") qk = QueryKey.from_obj(key) diff --git a/packages/syft/src/syft/types/syft_object.py b/packages/syft/src/syft/types/syft_object.py index 49f89f4f99d..7b553faa1c6 100644 --- a/packages/syft/src/syft/types/syft_object.py +++ b/packages/syft/src/syft/types/syft_object.py @@ -315,7 +315,7 @@ def get_migration_for_version( raise Exception( f"No migration found for class type: {type_from} to " - "version: {version_to} in the migration registry." + f"version: {version_to} in the migration registry." ) From fce9374382ba9539d66c33c3ad1d6706bd7d5acd Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 20 Oct 2023 11:06:26 +0530 Subject: [PATCH 5/6] fix query key to derive from storage key in migrate data --- packages/syft/src/syft/store/mongo_document_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index ac1dcf2ad2b..9d7727ccda0 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -581,7 +581,7 @@ def _migrate_data( migrated_value = value.migrate_to(to_klass.__version__, context) except Exception: return Err(f"Failed to migrate data to {to_klass} for qk: {key}") - qk = QueryKey.from_obj(key) + qk = self.settings.store_key.with_obj(key) result = self._update( credentials, qk=qk, From f408f4c7c92ca10d09fd200a30489f9926f3694a Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Mon, 30 Oct 2023 09:12:15 +0530 Subject: [PATCH 6/6] increase timeout for tutorial notebooks tests --- .github/workflows/pr-tests-syft.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-tests-syft.yml b/.github/workflows/pr-tests-syft.yml index 9923b293ec8..6d96d05e497 100644 --- a/.github/workflows/pr-tests-syft.yml +++ b/.github/workflows/pr-tests-syft.yml @@ -186,7 +186,7 @@ jobs: ORCHESTRA_DEPLOYMENT_TYPE: "${{ matrix.deployment-type }}" TEST_NOTEBOOK_PATHS: "${{ matrix.notebook-paths }}" with: - timeout_seconds: 1800 + timeout_seconds: 2400 max_attempts: 3 command: tox -e syft.test.notebook