diff --git a/notebooks/helm/append-only-event-log-prototype.ipynb b/notebooks/helm/append-only-event-log-prototype.ipynb new file mode 100644 index 00000000000..392b6b2ff21 --- /dev/null +++ b/notebooks/helm/append-only-event-log-prototype.ipynb @@ -0,0 +1,4690 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "fb5821ba", + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "from copy import deepcopy\n", + "from datetime import datetime\n", + "from typing import Any\n", + "from typing import ClassVar\n", + "from typing import Dict\n", + "from typing import List\n", + "from typing import Optional\n", + "from typing import Type\n", + "\n", + "# third party\n", + "from pydantic import Field\n", + "\n", + "# syft absolute\n", + "from syft import UID\n", + "from syft.types.base import SyftBaseModel" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "cbd66513", + "metadata": {}, + "outputs": [], + "source": [ + "class MyBaseModel(SyftBaseModel):\n", + " id: UID = Field(default_factory=lambda: UID())" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "77a9326f", + "metadata": {}, + "outputs": [], + "source": [ + "class Event(MyBaseModel):\n", + " creator: UID\n", + " creation_date: datetime = Field(default_factory=lambda: datetime.now())\n", + "\n", + " def handler(self, node):\n", + " method_name = event_handler_registry[self.__class__.__name__]\n", + " return getattr(node, method_name)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "8147bc26", + "metadata": {}, + "outputs": [], + "source": [ + "class EventLog(MyBaseModel):\n", + " log: List[Event] = []" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "8e254ba6", + "metadata": {}, + "outputs": [], + "source": [ + "class LinkedObject(MyBaseModel):\n", + " node_id: UID\n", + " obj_id: UID" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "da7a28d3", + "metadata": {}, + "outputs": [], + "source": [ + "class Dataset(MyBaseModel):\n", + " real: LinkedObject\n", + " mock: str\n", + " description: str" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "50df5253", + "metadata": {}, + "outputs": [], + "source": [ + "class UserCode(MyBaseModel):\n", + " code: str\n", + " approved: bool = False" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "0de045c1", + "metadata": {}, + "outputs": [], + "source": [ + "def register_event_handler(event_type):\n", + " def inner(method):\n", + " event_handler_registry[event_type.__name__] = method.__name__\n", + " return method\n", + "\n", + " return inner" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "046f5d49", + "metadata": {}, + "outputs": [], + "source": [ + "event_handler_registry = {}" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "013b13e0", + "metadata": {}, + "outputs": [], + "source": [ + "# class CUDObjectEvent(Event):\n", + "# object_type: ClassVar[Type]\n", + "# # object_type: Optional[ClassVar[Type]" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "1241f9dc", + "metadata": {}, + "outputs": [], + "source": [ + "class CRUDEvent(Event):\n", + " object_type: ClassVar[Type] = Type\n", + " object_id: UID\n", + "\n", + " @property\n", + " def merge_updates_repr(self):\n", + " return f\"{self.updates} for object {self.object_id} by {self.creator}\"" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "7edf812f", + "metadata": {}, + "outputs": [], + "source": [ + "class CreateObjectEvent(CRUDEvent):\n", + " @property\n", + " def updated_properties(self):\n", + " return list(self.object_type.__annotations__.keys())\n", + "\n", + " @property\n", + " def updates(self):\n", + " return {p: getattr(self, p) for p in self.updated_properties}\n", + "\n", + " @property\n", + " def update_tuples(self):\n", + " return list(self.updates.items())\n", + "\n", + "\n", + "class UpdateObjectEvent(CRUDEvent):\n", + " updates: Dict[str, Any]\n", + "\n", + " @property\n", + " def updated_properties(self):\n", + " return list(self.updates.keys())\n", + "\n", + " @property\n", + " def update_tuples(self):\n", + " return list(self.updates.items())" + ] + }, + { + "cell_type": "markdown", + "id": "6b286e05", + "metadata": {}, + "source": [ + "# Events" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "1a5b494a", + "metadata": {}, + "outputs": [], + "source": [ + "class CreateDatasetEvent(CreateObjectEvent):\n", + " object_type: ClassVar[Type] = Dataset\n", + " mock: Any\n", + " real: LinkedObject\n", + " description: str\n", + " creator: UID\n", + "\n", + " def execute(self, node):\n", + " handler = self.handler(node)\n", + " handler(\n", + " object_id=self.real.obj_id,\n", + " mock=self.mock,\n", + " real=self.real,\n", + " description=self.description,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "5860ff66", + "metadata": {}, + "outputs": [], + "source": [ + "class UpdateDatasetEvent(UpdateObjectEvent):\n", + " object_type: ClassVar[Type] = Dataset\n", + " object_id: UID\n", + "\n", + " def execute(self, node):\n", + " handler = self.handler(node)\n", + " handler(object_id=self.object_id, updates=self.updates)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "45454917", + "metadata": {}, + "outputs": [], + "source": [ + "class CreateUserCodeEvent(CreateObjectEvent):\n", + " object_type: ClassVar[Type] = UserCode\n", + " code: UserCode\n", + "\n", + " def execute(self, node):\n", + " handler = self.handler(node)\n", + " handler(code=self.code)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "74a14171", + "metadata": {}, + "outputs": [], + "source": [ + "class ApproveUserCodeEvent(Event):\n", + " object_type: ClassVar[Type] = UserCode\n", + " code_id: UID\n", + " value: bool\n", + "\n", + " def execute(self, node):\n", + " handler = self.handler(node)\n", + " handler(self.code_id, self.value)" + ] + }, + { + "cell_type": "markdown", + "id": "b9e21de5", + "metadata": {}, + "source": [ + "# Node" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "929c0793", + "metadata": {}, + "outputs": [], + "source": [ + "class Node(MyBaseModel):\n", + " event_log: EventLog = EventLog()\n", + " store: Dict[UID, Any] = {}\n", + " private_store: Dict[UID, Any] = {}\n", + "\n", + " def apply_log(self, log):\n", + " self.store = {}\n", + " self.event_log = deepcopy(log)\n", + "\n", + " for event in self.event_log.log:\n", + " event.execute(self)\n", + "\n", + " def create_usercode(self, usercode: str):\n", + " obj = UserCode(code=usercode)\n", + " event = CreateUserCodeEvent(code=obj, object_id=obj.id, creator=self.id)\n", + " self.event_log.log.append(event)\n", + "\n", + " self._create_usercode(obj)\n", + " return obj.id\n", + "\n", + " @register_event_handler(CreateUserCodeEvent)\n", + " def _create_usercode(self, code):\n", + " self.store[code.id] = code\n", + "\n", + " def approve_usercode(self, code_id: UID, to: bool):\n", + " event = ApproveUserCodeEvent(code_id=code_id, creator=self.id, value=to)\n", + " self.event_log.log.append(event)\n", + " self._approve_usercode(code_id, to)\n", + "\n", + " @register_event_handler(ApproveUserCodeEvent)\n", + " def _approve_usercode(self, code_id, to):\n", + " self.store[code_id].approved = to\n", + "\n", + " def create_dataset(self, mock: str, real: Optional[str], description: str):\n", + " object_id = UID()\n", + " real_id = UID()\n", + " real_obj = LinkedObject(node_id=self.id, obj_id=object_id)\n", + "\n", + " self.private_store[real_id] = real\n", + "\n", + " event = CreateDatasetEvent(\n", + " object_id=object_id,\n", + " mock=mock,\n", + " real=real_obj,\n", + " description=description,\n", + " creator=self.id,\n", + " )\n", + "\n", + " self.event_log.log.append(event)\n", + " self._create_dataset(object_id, mock, real_obj, description)\n", + "\n", + " @register_event_handler(CreateDatasetEvent)\n", + " def _create_dataset(self, object_id, mock, real, description):\n", + " dataset = Dataset(id=object_id, mock=mock, real=real, description=description)\n", + " self.store[dataset.id] = dataset\n", + "\n", + " def update_dataset(self, id, updates):\n", + " event = UpdateDatasetEvent(object_id=id, updates=updates, creator=self.id)\n", + " self.event_log.log.append(event)\n", + " self._update_dataset(id, updates)\n", + "\n", + " @register_event_handler(UpdateDatasetEvent)\n", + " def _update_dataset(self, object_id, updates):\n", + " dataset = self.store[object_id]\n", + "\n", + " for k, v in updates.items():\n", + " setattr(dataset, k, v)\n", + "\n", + " self.store[object_id] = dataset" + ] + }, + { + "cell_type": "markdown", + "id": "1749d266", + "metadata": {}, + "source": [ + "we want to check for 'mutations' of the same object, which is defined as:\n", + "\n", + "- CUD (from CRUD) of objects with the same unique keys\n", + " - create changes all attributes\n", + " - delete changes all attributes\n", + " - update only changes the attributes that were updated\n", + "\n", + "\n", + "In the case of update, if only non overlapping sets of properties were updated its not a merge conflict, as long as those are not code approval mutations." + ] + }, + { + "cell_type": "markdown", + "id": "aa23dd3e", + "metadata": {}, + "source": [ + "# MergeState" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "1df962ab", + "metadata": {}, + "outputs": [], + "source": [ + "class MergeState(SyftBaseModel):\n", + " proposed_merge: List[Event]\n", + " fork_idx: int\n", + " new_log: List[Event] = []\n", + "\n", + " @property\n", + " def current_merge_events(self):\n", + " return self.new_log[self.fork_idx :]\n", + "\n", + " # @property\n", + " # def updates_since_fork(self):\n", + " # updates_since_fork: Dict[UID, List[Event]] = defaultdict(list)\n", + " # # {node_id -> {obj_id -> Event}}\n", + " # for event in self.new_events:\n", + " # updates_since_fork[e.creator] += [event]\n", + " # return updates_since_fork\n", + "\n", + " def merge(self):\n", + " self.new_log = self.proposed_merge[: self.fork_idx]\n", + " for event in self.proposed_merge[self.fork_idx :]:\n", + " if self.add_event(event):\n", + " print(\"merge conflict\")\n", + "\n", + " def request_input(self, event, conflicting_event):\n", + " s = input(\n", + " f\"\"\"\n", + " {event.object_id} was changed by {event.creator} and {conflicting_event.creator}\n", + " Change 0: {event.merge_updates_repr}\n", + " Change 1: {conflicting_event.merge_updates_repr}\n", + " Type 0/1 to keep the corresponding change\n", + " \"\"\"\n", + " )\n", + " idx = int(s)\n", + " assert idx in [0, 1]\n", + " return idx == 1\n", + "\n", + " def object_updates(self, object_id, exclude_node: UID):\n", + " # other_node_ids = [node_id for node_id in self.updates_since_fork.keys()\n", + " # if node_id != event.creator]\n", + " # other_events_updating_object = [e for i in other_node_ids for e in self.updates_since_fork[i]\n", + " # if e.object_id == object_id]\n", + "\n", + " other_events_updating_object = [\n", + " e\n", + " for e in self.current_merge_events\n", + " if e.object_id == object_id and e.creator != exclude_node\n", + " ]\n", + "\n", + " object_updates = {}\n", + "\n", + " for e in other_events_updating_object:\n", + " for p in e.updated_properties:\n", + " val = e.updates[p]\n", + " object_updates[p] = (val, e)\n", + "\n", + " return object_updates\n", + "\n", + " def add_event(self, event):\n", + " merge_object_updates = self.object_updates(\n", + " event.object_id, exclude_node=event.creator\n", + " )\n", + " # we want to find all the events from other nodes that updated the same object\n", + " # then we want to find which properties they updated and to what value\n", + " # if they updated the same property to a different value => merge conflict\n", + " # (property, value) => event\n", + "\n", + " skip_current_event = False\n", + " for prop, val in event.updates.items():\n", + " if skip_current_event:\n", + " continue\n", + " # val -> event\n", + " if prop not in merge_object_updates:\n", + " continue\n", + "\n", + " other_val, other_event = merge_object_updates[prop]\n", + " if other_val != val:\n", + " conflicting_event = other_event\n", + " skip_current_event = self.request_input(event, conflicting_event)\n", + " skip_conflicting_event = not skip_current_event\n", + "\n", + " # merge strategies:\n", + " # accept entire event, reject other event entirely\n", + " # cherry pick per property\n", + "\n", + " if skip_conflicting_event:\n", + " print(\"skip conflicting event\")\n", + " # remove conflicting event from new_log\n", + " self.new_log = [\n", + " e for e in self.new_log if e.id != conflicting_event.id\n", + " ]\n", + "\n", + " if not skip_current_event:\n", + " self.new_log += [event]" + ] + }, + { + "cell_type": "markdown", + "id": "70fac57b", + "metadata": {}, + "source": [ + "# Sync" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "994cbe25", + "metadata": {}, + "outputs": [], + "source": [ + "def sync(node_high, now_low):\n", + " log1 = node_high.event_log.log\n", + " log2 = node_low.event_log.log\n", + "\n", + " # find idx of the fork\n", + " fork_idx = max(len(log1), len(log2))\n", + " for i, (e1, e2) in enumerate(list(zip(log1, log2))):\n", + " if e1.id != e2.id:\n", + " fork_idx = i\n", + " break\n", + "\n", + " branch1 = log1[fork_idx:]\n", + " branch2 = log2[fork_idx:]\n", + "\n", + " proposed_merge = log1[:fork_idx] + sorted(\n", + " branch1 + branch2, key=lambda e: e.creation_date\n", + " )\n", + " print(f\"proposed merge (before merging): {proposed_merge}\")\n", + " merge_state = MergeState(fork_idx=fork_idx, proposed_merge=proposed_merge)\n", + " merge_state.merge()\n", + "\n", + " new_log = EventLog(log=merge_state.new_log)\n", + "\n", + " node_low.apply_log(new_log)\n", + " node_high.apply_log(new_log)\n", + "\n", + " assert all(\n", + " [x == y for x, y in zip(node_low.event_log.log, node_high.event_log.log)]\n", + " ) and len(node_low.event_log.log) == len(node_high.event_log.log)" + ] + }, + { + "cell_type": "markdown", + "id": "90c96310", + "metadata": {}, + "source": [ + "# Sync 1: create dataset and sync" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "617b84e9", + "metadata": {}, + "outputs": [], + "source": [ + "node_high = Node()\n", + "node_low = Node()" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "92d5f2f1", + "metadata": {}, + "outputs": [], + "source": [ + "node_high.create_dataset(real=\"abc\", mock=\"def\", description=\"blabla\")" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "78f19bf5", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

CreateDatasetEvent List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla')]" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node_high.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "085c71ab", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "proposed merge (before merging): [CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla')]\n" + ] + } + ], + "source": [ + "sync(node_high, node_low)" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "1c8ead28", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

CreateDatasetEvent List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla')]" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node_high.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "c470bba0", + "metadata": {}, + "outputs": [], + "source": [ + "assert node_high.store.keys() == node_low.store.keys()" + ] + }, + { + "cell_type": "markdown", + "id": "1da3f904", + "metadata": {}, + "source": [ + "# Sync 2: both update same property to same value" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "7667a1b0", + "metadata": {}, + "outputs": [], + "source": [ + "dataset = list(node_high.store.values())[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "a1612222", + "metadata": {}, + "outputs": [], + "source": [ + "node_high.update_dataset(dataset.id, {\"description\": \"a\"})" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "9a6cb619", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla'),\n", + " UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 172952), object_id=, updates={'description': 'a'})]" + ] + }, + "execution_count": 28, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node_high.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "255afa6a", + "metadata": {}, + "outputs": [], + "source": [ + "node_low.update_dataset(dataset.id, {\"description\": \"a\"})" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "ecdcc143", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla'),\n", + " UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 172952), object_id=, updates={'description': 'a'})]" + ] + }, + "execution_count": 30, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node_high.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "f0026d2b", + "metadata": {}, + "outputs": [], + "source": [ + "# node_low.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "5ffaa011", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "proposed merge (before merging): [CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla'), UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 172952), object_id=, updates={'description': 'a'}), UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 182961), object_id=, updates={'description': 'a'})]\n" + ] + } + ], + "source": [ + "sync(node_high, node_low)" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "b89e3a76", + "metadata": {}, + "outputs": [], + "source": [ + "dataset_high = list(node_high.store.values())[0]\n", + "dataset_low = list(node_low.store.values())[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "9f5abaf8", + "metadata": {}, + "outputs": [], + "source": [ + "assert dataset_high.description == dataset_low.description" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "96313e34", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla'),\n", + " UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 172952), object_id=, updates={'description': 'a'}),\n", + " UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 182961), object_id=, updates={'description': 'a'})]" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node_high.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "1351f1fc", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla'),\n", + " UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 172952), object_id=, updates={'description': 'a'}),\n", + " UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 182961), object_id=, updates={'description': 'a'})]" + ] + }, + "execution_count": 36, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node_low.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "id": "2b845ec8", + "metadata": {}, + "outputs": [], + "source": [ + "# we keep both events\n", + "assert len(node_high.event_log.log) == 3 and len(node_low.event_log.log) == 3" + ] + }, + { + "cell_type": "markdown", + "id": "c27219ec", + "metadata": {}, + "source": [ + "# Sync 3: both update same property to different value" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "id": "286030cf", + "metadata": {}, + "outputs": [], + "source": [ + "dataset = list(node_high.store.values())[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c3030a8", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "92731b81", + "metadata": {}, + "outputs": [], + "source": [ + "# node_low.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "97608e6b", + "metadata": {}, + "outputs": [], + "source": [ + "if False:\n", + " node_high.update_dataset(dataset.id, {\"description\": \"b\"})\n", + "\n", + " node_high.event_log.log\n", + "\n", + " node_low.update_dataset(dataset.id, {\"description\": \"c\"})\n", + "\n", + " node_high.event_log.log\n", + "\n", + " sync(node_high, node_low)\n", + "\n", + " dataset_high = list(node_high.store.values())[0]\n", + " dataset_low = list(node_low.store.values())[0]\n", + "\n", + " assert dataset_high.description == dataset_low.description\n", + "\n", + " node_high.event_log.log\n", + "\n", + " assert len(node_high.event_log.log) == 4 and len(node_low.event_log.log) == 4" + ] + }, + { + "cell_type": "markdown", + "id": "101aaf6f", + "metadata": {}, + "source": [ + "# Sync 4: UserCode" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "14d872b5", + "metadata": {}, + "outputs": [], + "source": [ + "user_code_id = node_low.create_usercode(\"print('a')\")" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "id": "6298def6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "proposed merge (before merging): [CreateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 140729), object_id=, mock='def', real=LinkedObject(id=, node_id=, obj_id=), description='blabla'), UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 172952), object_id=, updates={'description': 'a'}), UpdateDatasetEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 182961), object_id=, updates={'description': 'a'}), CreateUserCodeEvent(id=, creator=, creation_date=datetime.datetime(2024, 1, 23, 13, 0, 35, 246659), object_id=, code=UserCode(id=, code=\"print('a')\", approved=False))]\n" + ] + } + ], + "source": [ + "sync(node_low, node_high)" + ] + }, + { + "cell_type": "markdown", + "id": "0a80b1c4", + "metadata": {}, + "source": [ + "# Sync 4: Approve UserCode" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "id": "51899b83", + "metadata": {}, + "outputs": [ + { + "ename": "KeyError", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[43], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mnode_high\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mapprove_usercode\u001b[49m\u001b[43m(\u001b[49m\u001b[43muser_code_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m)\u001b[49m\n", + "Cell \u001b[0;32mIn[17], line 39\u001b[0m, in \u001b[0;36mNode.approve_usercode\u001b[0;34m(self, code_id, to)\u001b[0m\n\u001b[1;32m 33\u001b[0m event \u001b[38;5;241m=\u001b[39m ApproveUserCodeEvent(\n\u001b[1;32m 34\u001b[0m code_id\u001b[38;5;241m=\u001b[39mcode_id,\n\u001b[1;32m 35\u001b[0m creator\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mid,\n\u001b[1;32m 36\u001b[0m value\u001b[38;5;241m=\u001b[39mto\n\u001b[1;32m 37\u001b[0m )\n\u001b[1;32m 38\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mevent_log\u001b[38;5;241m.\u001b[39mlog\u001b[38;5;241m.\u001b[39mappend(event)\n\u001b[0;32m---> 39\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_approve_usercode\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcode_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mto\u001b[49m\u001b[43m)\u001b[49m\n", + "Cell \u001b[0;32mIn[17], line 43\u001b[0m, in \u001b[0;36mNode._approve_usercode\u001b[0;34m(self, code_id, to)\u001b[0m\n\u001b[1;32m 41\u001b[0m \u001b[38;5;129m@register_event_handler\u001b[39m(ApproveUserCodeEvent)\n\u001b[1;32m 42\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_approve_usercode\u001b[39m(\u001b[38;5;28mself\u001b[39m, code_id, to):\n\u001b[0;32m---> 43\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstore\u001b[49m\u001b[43m[\u001b[49m\u001b[43mcode_id\u001b[49m\u001b[43m]\u001b[49m\u001b[38;5;241m.\u001b[39mapproved\u001b[38;5;241m=\u001b[39mto\n", + "\u001b[0;31mKeyError\u001b[0m: " + ] + } + ], + "source": [ + "node_high.approve_usercode(user_code_id, True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e12b19c6", + "metadata": {}, + "outputs": [], + "source": [ + "# TODO: is this result valid?\n", + "sync(node_low, node_high)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ef53526", + "metadata": {}, + "outputs": [], + "source": [ + "node_low.event_log.log" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d625bb85", + "metadata": {}, + "outputs": [], + "source": [ + "node_high.event_log.log" + ] + }, + { + "cell_type": "markdown", + "id": "8fbdf5bf", + "metadata": {}, + "source": [ + "# Scenario list" + ] + }, + { + "cell_type": "markdown", + "id": "6dc3e422", + "metadata": {}, + "source": [ + "\n", + "- create a dataset and sync\n", + " - should create the dataset object on both sides\n", + "- both update the same property (conflict)\n", + "- both update a different property (no conflict)\n", + "- code approval should have same state\n", + "- code execution should be approved" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f95403d9", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + }, + "toc": { + "base_numbering": 1, + "nav_menu": {}, + "number_sections": true, + "sideBar": true, + "skip_h1_title": false, + "title_cell": "Table of Contents", + "title_sidebar": "Contents", + "toc_cell": false, + "toc_position": { + "height": "calc(100% - 180px)", + "left": "10px", + "top": "150px", + "width": "358.398px" + }, + "toc_section_display": true, + "toc_window_display": true + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/helm/direct_azure.ipynb b/notebooks/helm/direct_azure.ipynb index ddb00875b61..44714925fb9 100644 --- a/notebooks/helm/direct_azure.ipynb +++ b/notebooks/helm/direct_azure.ipynb @@ -2,9 +2,17 @@ "cells": [ { "cell_type": "code", - "execution_count": 40, + "execution_count": 1, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft/notebooks\n" + ] + } + ], "source": [ "# stdlib\n", "import os\n", @@ -15,7 +23,14 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 3, "metadata": {}, "outputs": [ { @@ -55,7 +70,557 @@ }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# [x.wait() for x in job.subjobs]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# import random\n", + "# import string\n", + "\n", + "# def generate_random_line(length):\n", + "# return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length))\n", + "\n", + "# def generate_large_file(file_path, num_lines, line_length):\n", + "# with open(file_path, 'w') as file:\n", + "# for _ in range(num_lines):\n", + "# line = generate_random_line(line_length)\n", + "# file.write(line + '\\n')\n", + "\n", + "# file_path = \"large_file.txt\"\n", + "# num_lines = 34359712 # Adjust the number of lines as needed\n", + "# line_length = 1000 # Adjust the line length as needed\n", + "\n", + "# generate_large_file(file_path, num_lines, line_length)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "# with open(\"somefile16gb.txt\", 'rb') as input_file, open(\"somefile32gb.txt\", 'ab') as output_file:\n", + "# while True:\n", + "# chunk = input_file.read(50 -random.randint(1, 10))\n", + "# if not chunk:\n", + "# break\n", + "# output_file.write(chunk)\n", + "# # while True:\n", + "# # chunk = input_file.read(50 -random.randint(1, 10))\n", + "# # if not chunk:\n", + "# # break\n", + "# # output_file.write(chunk)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "# lines_per_gb = int(2**30 / 1000)\n", + "# gbs=32\n", + "# with open(f'somefilerandom{gbs}gb.txt', 'wb') as f:\n", + "# for x in range(lines_per_gb*gbs):\n", + "# f.write(str.encode(''.join(random.choices(string.ascii_uppercase + string.digits, k=1000))) + b'\\n')" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "from pathlib import Path\n", + "\n", + "# syft absolute\n", + "from syft.client.client import SyftClient\n", + "from syft.store.blob_storage import BlobDeposit\n", + "from syft.types.blob_storage import CreateBlobStorageEntry" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "def allocate_file(client: SyftClient, path: Path) -> BlobDeposit:\n", + " create_blob_storage_entry = CreateBlobStorageEntry.from_path(path)\n", + " return client.api.services.blob_storage.allocate(create_blob_storage_entry)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "file = \"somefile32gb.txt\"" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "dep = allocate_file(client, Path(file))" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "```python\n", + "class SeaweedFSBlobDeposit:\n", + " id: str = 741c4e3f5cfd4705ba3d12fc076b19dd\n", + "\n", + "```" + ], + "text/plain": [ + "syft.store.blob_storage.seaweedfs.SeaweedFSBlobDeposit" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dep" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "# from syft.client.api import APIRegistry\n", + "\n", + "# api = APIRegistry.api_for(\n", + "# node_uid=dep.syft_node_location,\n", + "# user_verify_key=dep.syft_client_verify_key,\n", + "# )\n", + "\n", + "# url = dep.urls[0]\n", + "\n", + "# if api is not None:\n", + "# blob_url = api.connection.to_blob_route(\n", + "# url.url_path, host=url.host_or_ip\n", + "# )\n", + "# else:\n", + "# blob_url = url\n", + "\n", + "# url = str(blob_url)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "# import requests\n", + "\n", + "# from syft import SyftError\n", + "\n", + "# from io import BytesIO\n", + "\n", + "# from typing import Generator\n", + "\n", + "# def _byte_chunks(bytes: BytesIO, size: int) -> Generator[bytes, None, None]:\n", + "# while True:\n", + "# try:\n", + "# yield bytes.read(size)\n", + "# except BlockingIOError:\n", + "# return\n", + "\n", + "# DEFAULT_CHUNK_SIZE = (1024**3 // 10) # 100MB\n", + "# DEFAULT_TIMEOUT = 5000 # in seconds\n", + "\n", + "# print(len(dep.urls))" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "# third party" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "# byte_chunk[16384: 16384+10000]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "# # relative\n", + "# with open(file, \"rb\") as data:\n", + "# from syft.client.api import APIRegistry\n", + "\n", + "# api = APIRegistry.api_for(\n", + "# node_uid=dep.syft_node_location,\n", + "# user_verify_key=dep.syft_client_verify_key,\n", + "# )\n", + "\n", + "# etags = []\n", + "\n", + "# try:\n", + "# no_lines = 0\n", + "# for part_no, (byte_chunk, url) in tqdm(enumerate(\n", + "# zip(_byte_chunks(data, DEFAULT_CHUNK_SIZE), dep.urls),\n", + "# start=1,\n", + "# )):\n", + "# no_lines += byte_chunk.count(b\"\\n\")\n", + "# if api is not None:\n", + "# blob_url = api.connection.to_blob_route(\n", + "# url.url_path, host=url.host_or_ip\n", + "# )\n", + "# else:\n", + "# blob_url = url\n", + "\n", + "# def data_generator(bytes_, chunk_size=819200):\n", + "# n=0\n", + "# while n*chunk_size <= len(bytes_):\n", + "# chunk = bytes_[n*chunk_size:(n+1)*chunk_size]\n", + "# n += 1\n", + "# yield chunk\n", + "\n", + "# response = requests.put(\n", + "# url=str(blob_url), data=data_generator(byte_chunk), timeout=DEFAULT_TIMEOUT, stream=True\n", + "# )\n", + "# response.raise_for_status()\n", + "# etag = response.headers[\"ETag\"]\n", + "# etags.append({\"ETag\": etag, \"PartNumber\": part_no})\n", + "# except requests.RequestException as e:\n", + "# print(e)" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "# len([x for x in data_generator(byte_chunk)])" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "# (1024 ** 3) // 10" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "# api.services.blob_storage.mark_write_complete(etags=etags, uid=dep.blob_storage_entry_id, no_lines=no_lines)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "# mark_write_complete_method = from_api_or_context(\n", + "# func_or_path=\"blob_storage.mark_write_complete\",\n", + "# syft_node_location=self.syft_node_location,\n", + "# syft_client_verify_key=self.syft_client_verify_key,\n", + "# )\n", + "# return mark_write_complete_method(\n", + "# etags=etags, uid=self.blob_storage_entry_id, no_lines=no_lines\n", + "# )" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "# with open(\"somefile.txt\", \"rb\") as f:\n", + "# dep.write(f)" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.types.blob_storage import BlobFile" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Uploading\n", + "7\n", + "4913438816\n", + "34394071712\n", + "1\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 1/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:47<00:00, 55.76it/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 2/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:39<00:00, 60.00it/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "3\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 3/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:38<00:00, 60.69it/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "4\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 4/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:48<00:00, 55.42it/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "5\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 5/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:45<00:00, 56.86it/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "6\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 6/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:52<00:00, 53.14it/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "7\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 7/7: 100%|██████████████████████████████████████████████████████████████████████████| 5998/5998 [01:42<00:00, 58.46it/s]\n" + ] + } + ], + "source": [ + "x = BlobFile.upload_from_path(\"somefile32gb.txt\", client)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "b'YEXFIA79OPO0ODMF7Z6SCGCYYHF8EIZ6NWP13WYVFPO7IC2W4CK6EDDCA7V1ITBU2F6IDLLAKX6RP39CDJF7THP30416ACJEGIXMFCGL25IRP6OCH6UEKGNYTL3RVSBI75T0DC2YIP98P0QBAYNEGBDFHENES15PBKOENVNBNTKW4MSGD3V74MDX0QIT5GM386O9QBPBD5NOFVR1GCSWT45A35O2ODJHFQ6HSLE0W5PNH2KWNGYP0OTOGG9DQ9JRHCDIORLLV1TJRDPXRITYKU7URHY0VVQ7G03VZ430A3CBEME6HBBAA0T6EYIM2M0ARIQYMAEAO3YLKOOSG868Z5BQXNY1FWMPQQY6NC942LKPZP9C8VK6WR6PLY926F5VV2G5E304T0RRB6D8I67G8D35SP60PJ3JUOIPW7NKMP99XMG2Z0UTT4SKWM1JW8H1DFOKOJD2OHE157V5HMFGU21VQRHN34M57M6EN7WGU27UKHENPC6DEV36ARCGGUX0A85JJTK0PADOOTLW1M5GDIX02O681LR2PY6ONJI71679VTRKVNCPLCSFQZV4XMR6NYFPDA3TAZCMUUH3HKJY8E4XY6D1FSKEJDC9P4UW05NJTAGAFQNG770F6IRYGKV29TLM1VJUNUYKY6PLYR4CSJ68HC8K5F1ZTDX9UEHRHZ2DAJ8Q8BJ5E2FNQIGZ8WJE4U2PG1BWDIXOXW8QWPQTAVQN8VVPWPUI9EQ8LR0YB0HQ2Q9WAYABIHHH3IEXF169MSGD2M37WFI4LWY2D136V7PY8ZPY9344JTW8L3DMS96IVCAPB3AXYCJK0BTHE01X2F0WNBGTQGLR0IPEOF637L8W9HLODUD6A0OLWSNFSJ6VCU6Y04FRUZQEYNT6IOS79DP6WXUTM6ZYMCEIUVAQCVIG1P94VABXJKW07YBW5HT5FT1B7E0SZ5EK3EX6HZUSD0PDUS49X5H8SDNSYANP98MHKNN7AGZ0546U3EAU'\n", + "b'A0JQM0OZW95T1VAH58JLN3BWP24C39Y4W17O5QVG9SCHZW5C0DE0D1J1XPSKUXC1MKO1832M07BQCL26D3SM25F1GLSEWRZ3MCFILKUIH1WMNWQFFC556H8YCERGVL48WL9W3B1LX7Z071090LBP5MYK27CQB554SLBMS3BDAOWFDYJOEOJH0OPRXKBCIHMIDBRJU70P1L1BH4QST81AIE1A52XOT1QUCRLVVDBF2IVDMZNKCHL0UXMSMQV0KU1LUAQ7KJ5DY74SEGACFBRYMA3JR6PV49YTB4JQOKQQYCJE6F0WWC5N20TKFJ2MO4YLO6443ERPH7TBXEGVUJSN3QNHX9387XMGXUHQ28M0T017HSXDN75TVFX56QESCIELD9SYCXW0HYGKDBBSQHSNVZOZDQQ4HLNEZVZK6GV87F5951KEVGIF32907PIGRWYHJ7LZZ0TM9GWIOYZIAV5QL1Q3V78D1UAE85ZJ6I1BKKHY3IPLMOBUFMB9IX3K4PGREF7E2PU1BGK5U4YLZIWZRXPWUG9PVVY7RCKOXRKPXG3AUX7OUC23E5WI4JUTKKHKW9NTVIWECH44UE2OOLVY1NZO54RWSHSUWOZM9RZ4RY36FMSOK40AZTQ1HTKNDY4MZ35IJMHVWPK4Z2I203U16PMKWVYDQUN98N6CI3WZPSB6Q154D759MUD4XIE84DF4HGTN48VJTYEOWXL2W2IC0FYFGF8G69MBWNBZTO77V6AMBF9ID52MCMEVXQUSXEL3YR6BFS5HXXR5CO7K9ZNEGG1ESMB3ZLK8BYRU8QPWP5NVOAWMI0E76522V6Q1DDDVBXXZEG7Z5Q98DBGQSWYJEUN6Y1J3ZVFKRVF92WSPEG40M5SP6MN3805BDWOYKF27DS8STAU35PUFFF7KQBIM8FIP7FVVO4DLTW8ZU38IER28RI4P87P1MON29LC59HU50FWB3GZHPXBHV17ZCXK588UR4OSC2I0PIC9G5RIV'\n", + "b'L749OOKP38NLWJVG18JU9ZKBG6IRII0TI8PA1MG04RNEIA1RU103293DDVINYAKXZJBI048OPYF2J6KUP98MUJBKWA81A363GNDIUZQADJ8LHFYH2191ADEOEML6ODK050JTZP1V9MUVA4S8A92EK84Z0AZLYCMNJFRBH7J6MMZ9SBI62SBH6A10HIVZ419Q7EVDWL6OLHOMC5LDCFJBRYZEZXED3A13HS6PHAP9594T71UIS3G1ABE92SXRJQDVARTRE2HNRWABRK4OQKQC8918RBCEFGE7CBWR3DT85BYSB9727UZ0JY4L069VD9GGYI226IJZX3I5CTDXK86G1K16HPED7R7U45T6WW5RLEVSGT6Z3RT4G0V1QC8HRLUHSI3M71S98EWYZ6L3Z86OFPZZD0G5EP6T4F8GIMU9Z8J2AEK7JFPBQR5PU2QIV1TIUYG3T8WQC2AKDUV31A3R928OKO2C700B32RXVDWA3YXI8ZQBJ5RFUKP9QYAKHGOKA4HX3MDPJRGFD5EKJQNU0V861MNBXRTQT5SVHDRV1AX20VSEOQ26ATXFIJXTLOP2GL9Z149Y0C1S9T53SEIQMZT1D57SKMTO300PCW9CM8QS6ENQDOROUEQKIP4C644AU68OHJFDFX6LIIW89X2MHYDO34GE5Z4TXEY78NNRY5GQFQ7F9HOLTBJ3SCERK1KHHFVCUBPPWWUC0SJEBSVFNN4LPEVQ7G6PDLBP529ZCBJFGTFY88RCT0XM4NYY4C7SKRTPV50Z1TTKOIJCC5X0L9OKRK9AU67951RWZ9KF5FA4H9RYU5XZTT0GSNJ928NOH14RD0IU30X6BDCR3ZX9DFHV5O2PFNT3VYX68Y4XVCAAD4R72S2LTLQ93RRRGJFDCJLPS9DDRX3H6BCGUXOTX26EFC5P4Q6XBMLSTA4SOSL4I2YMWNOLKLZYWW3JM12S3U4TNYC5FTAM382EZY2NZ3NZ93JH8JWVCC5SWMQ3'\n", + "b'TRN2J91743RKG9JOMWZ0O0RAY3WQMEH3WLKPQWIFWX6JYW5PT9L0KZH5CZXLFEC5LOYKYN3POHODX6OKG2187ITOAYTNJ1XC4WPIZH1X9T6M0IYO2X4WZJUUAQPGU7G6IM1BT9OAWWGNKBUH5VQM7MP7A35LGZD6QA0R7FD48US4TRUKWTYOMSHGC8PB88DZOK8RHJZRB5CTFSOFFGJG0F0FAZNCY6KMHCNBS07UVNEHLC5USGX5HXARTVFY6VHUSY16U3U5KAH0DR2TKA95XC2G6YTWY3TZUW3XHHQHSUTAQWIJ71B6VKGRY8XB1CZBP9UT8FVN2MGGSFLBB011G9F1RLDDU2O3IEIDOL87MSN8XU5Q7UMDKLNB1TSGMYD9LB09RYMZJV7GZLUCGDI38VFZEE3A579TA063W4Z6EFA817ZSYMF25FU62C3LY0Z40AP664DZZFTCWO1J68BFP7WDTWGVTFLE11QIYCO0ZQBHDDK3VB64AMMZEIK0PYNQ6BGRUJP7U332W6OQED1W0U9374E7L3ORC48PE94XZ66F38BPU0QKLSKDYNL7C1PLPHISSBWHRHPQ6MROI1QK6MIB65QBOHOSSHJM9FX8O2AEARTP13CP79ZBA0F7XCT63ZKWZIY3S289D7Z5AEV3FAT9U6E64CAKQCVJQALLQ7IQSS0RWJE9N4N4QIS3YYBGAFJWAK728KBDSYKLQSSNZ6DLHRR6VJ31JZD5TWZBA27M2FGGKCOE1Q7N162BH1SIMAF0M9N0I5EY3TCOMTHR49KTF7JW093CBRIJ00X49G5UAHIIUQOYDIF960K6A25GP07X4V97YKOYVHS7DQQBA7CPKF3HQYFWY3GCOHBSFHNE78XNG3A7ELRYF0JTNZA1BPO8DMRMF2IUKCR1P9XZWC2ESP8NG8EQIP5XE0I28WQ4KA4H0LLFVUZO01KGWDVVU8CXO6PJ2MCF2HNNEUI1GDMJXOPVNS5SGUY9K4ZR'\n" + ] + } + ], + "source": [ + "for i, y in enumerate(x.iter_lines()):\n", + " if i > 3:\n", + " break\n", + " print(y)" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "```python\n", + "class ActionFileData:\n", + " id: str = acfe2005d7004143960a75164b366c2d\n", + "\n", + "```" + ], + "text/plain": [ + "syft.service.action.action_data_empty.ActionFileData" + ] + }, + "execution_count": 28, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "x" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Uploading\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Uploading file part 1/1: 100%|██████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 4144.57it/s]\n" + ] + } + ], + "source": [ + "# x = BlobFile.upload_from_path(\"somefile.txt\", client)" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [], + "source": [ + "# %debug" + ] + }, + { + "cell_type": "code", + "execution_count": 13, "metadata": {}, "outputs": [], "source": [ @@ -80,6 +645,101 @@ "# node.python_node.init_blob_storage(blob_config)" ] }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.types.blob_storage import BlobFile" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "x = BlobFile.upload_from_path(\"somefile.txt\", client)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "ename": "ConnectionError", + "evalue": "Failed to fetch metadata. Response returned with code 500", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mConnectionError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[15], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m x \u001b[38;5;241m=\u001b[39m \u001b[43mBlobFile\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mupload_from_path\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43msomefile4gb.txt\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mclient\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/workspace/PySyft/packages/syft/src/syft/types/blob_storage.py:82\u001b[0m, in \u001b[0;36mBlobFile.upload_from_path\u001b[0;34m(self, path, client)\u001b[0m\n\u001b[1;32m 77\u001b[0m \u001b[38;5;129m@classmethod\u001b[39m\n\u001b[1;32m 78\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mupload_from_path\u001b[39m(\u001b[38;5;28mself\u001b[39m, path, client):\n\u001b[1;32m 79\u001b[0m \u001b[38;5;66;03m# syft absolute\u001b[39;00m\n\u001b[1;32m 80\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01msyft\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01msy\u001b[39;00m\n\u001b[0;32m---> 82\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msy\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mActionObject\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfrom_path\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpath\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mpath\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend\u001b[49m\u001b[43m(\u001b[49m\u001b[43mclient\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241m.\u001b[39msyft_action_data\n", + "File \u001b[0;32m~/workspace/PySyft/packages/syft/src/syft/service/action/action_object.py:1079\u001b[0m, in \u001b[0;36mActionObject.send\u001b[0;34m(self, client)\u001b[0m\n\u001b[1;32m 1077\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_set_obj_location_(client\u001b[38;5;241m.\u001b[39mid, client\u001b[38;5;241m.\u001b[39mverify_key)\n\u001b[1;32m 1078\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_save_to_blob_storage()\n\u001b[0;32m-> 1079\u001b[0m res \u001b[38;5;241m=\u001b[39m \u001b[43mclient\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mapi\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mservices\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43maction\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mset\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1080\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(res, ActionObject):\n\u001b[1;32m 1081\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39msyft_created_at \u001b[38;5;241m=\u001b[39m res\u001b[38;5;241m.\u001b[39msyft_created_at\n", + "File \u001b[0;32m~/workspace/PySyft/packages/syft/src/syft/client/api.py:260\u001b[0m, in \u001b[0;36mgenerate_remote_function..wrapper\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 258\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m allowed:\n\u001b[1;32m 259\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m\n\u001b[0;32m--> 260\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mmake_call\u001b[49m\u001b[43m(\u001b[49m\u001b[43mapi_call\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mapi_call\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 262\u001b[0m result, _ \u001b[38;5;241m=\u001b[39m migrate_args_and_kwargs(\n\u001b[1;32m 263\u001b[0m [result], kwargs\u001b[38;5;241m=\u001b[39m{}, to_latest_protocol\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[1;32m 264\u001b[0m )\n\u001b[1;32m 265\u001b[0m result \u001b[38;5;241m=\u001b[39m result[\u001b[38;5;241m0\u001b[39m]\n", + "File \u001b[0;32m~/workspace/PySyft/packages/syft/src/syft/client/api.py:613\u001b[0m, in \u001b[0;36mSyftAPI.make_call\u001b[0;34m(self, api_call)\u001b[0m\n\u001b[1;32m 611\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mmake_call\u001b[39m(\u001b[38;5;28mself\u001b[39m, api_call: SyftAPICall) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Result:\n\u001b[1;32m 612\u001b[0m signed_call \u001b[38;5;241m=\u001b[39m api_call\u001b[38;5;241m.\u001b[39msign(credentials\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39msigning_key)\n\u001b[0;32m--> 613\u001b[0m signed_result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mconnection\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mmake_call\u001b[49m\u001b[43m(\u001b[49m\u001b[43msigned_call\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 615\u001b[0m result \u001b[38;5;241m=\u001b[39m debox_signed_syftapicall_response(signed_result\u001b[38;5;241m=\u001b[39msigned_result)\n\u001b[1;32m 617\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(result, OkErr):\n", + "File \u001b[0;32m~/workspace/PySyft/packages/syft/src/syft/client/client.py:290\u001b[0m, in \u001b[0;36mHTTPConnection.make_call\u001b[0;34m(self, signed_call)\u001b[0m\n\u001b[1;32m 284\u001b[0m response \u001b[38;5;241m=\u001b[39m requests\u001b[38;5;241m.\u001b[39mpost( \u001b[38;5;66;03m# nosec\u001b[39;00m\n\u001b[1;32m 285\u001b[0m url\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mstr\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mapi_url),\n\u001b[1;32m 286\u001b[0m data\u001b[38;5;241m=\u001b[39mmsg_bytes,\n\u001b[1;32m 287\u001b[0m )\n\u001b[1;32m 289\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m response\u001b[38;5;241m.\u001b[39mstatus_code \u001b[38;5;241m!=\u001b[39m \u001b[38;5;241m200\u001b[39m:\n\u001b[0;32m--> 290\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m requests\u001b[38;5;241m.\u001b[39mConnectionError(\n\u001b[1;32m 291\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mFailed to fetch metadata. Response returned with code \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mresponse\u001b[38;5;241m.\u001b[39mstatus_code\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 292\u001b[0m )\n\u001b[1;32m 294\u001b[0m result \u001b[38;5;241m=\u001b[39m _deserialize(response\u001b[38;5;241m.\u001b[39mcontent, from_bytes\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m)\n\u001b[1;32m 295\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", + "\u001b[0;31mConnectionError\u001b[0m: Failed to fetch metadata. Response returned with code 500" + ] + } + ], + "source": [ + "x = BlobFile.upload_from_path(\"somefile4gb.txt\", client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'abcdef'" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "x.read()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"somefile.txt\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": 35, @@ -858,7 +1518,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.2" + "version": "3.9.16" }, "toc": { "base_numbering": 1, diff --git a/packages/grid/default.env b/packages/grid/default.env index 5e69aca2580..f162f5b4ff5 100644 --- a/packages/grid/default.env +++ b/packages/grid/default.env @@ -26,7 +26,7 @@ DOCKER_IMAGE_TRAEFIK=traefik TRAEFIK_VERSION=v2.10 REDIS_VERSION=6.2 RABBITMQ_VERSION=3 -SEAWEEDFS_VERSION=3.59 +SEAWEEDFS_VERSION=3.62 DOCKER_IMAGE_SEAWEEDFS=openmined/grid-seaweedfs VERSION=latest VERSION_HASH=unknown @@ -71,7 +71,7 @@ S3_ROOT_PWD="admin" # needs randomizing S3_REGION="us-east-1" #not-using S3_PRESIGNED_TIMEOUT_SECS=1800 -S3_VOLUME_SIZE_MB=1024 +S3_VOLUME_SIZE_MB=40000 # Jax JAX_ENABLE_X64=True diff --git a/packages/grid/seaweedfs/seaweedfs.dockerfile b/packages/grid/seaweedfs/seaweedfs.dockerfile index 3982e621c3b..2758dbac457 100644 --- a/packages/grid/seaweedfs/seaweedfs.dockerfile +++ b/packages/grid/seaweedfs/seaweedfs.dockerfile @@ -1,6 +1,7 @@ ARG SEAWEEDFS_VERSION -FROM chrislusf/seaweedfs:${SEAWEEDFS_VERSION} +# FROM chrislusf/seaweedfs:${SEAWEEDFS_VERSION}_large_disk +FROM chrislusf/seaweedfs:3.62_large_disk WORKDIR / @@ -8,7 +9,8 @@ RUN apk update && \ apk add --no-cache python3 py3-pip ca-certificates bash COPY requirements.txt app.py / -RUN pip install --no-cache-dir -r requirements.txt + +RUN pip install --no-cache-dir --break-system-packages -r requirements.txt COPY --chmod=755 start.sh mount_command.sh / diff --git a/packages/grid/seaweedfs/start.sh b/packages/grid/seaweedfs/start.sh index 7972664b44e..01038fc91ba 100644 --- a/packages/grid/seaweedfs/start.sh +++ b/packages/grid/seaweedfs/start.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -weed server -s3 -s3.port="$S3_PORT" -volume.max=500 -master.volumeSizeLimitMB="$S3_VOLUME_SIZE_MB" & +weed server -s3 -s3.port="$S3_PORT" -volume.max=10 -master.volumeSizeLimitMB="$S3_VOLUME_SIZE_MB" & echo "s3.configure -access_key $S3_ROOT_USER -secret_key $S3_ROOT_PWD \ -user iam -actions Read,Write,List,Tagging,Admin -apply" | weed shell > /dev/null 2>&1 diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 0972849f47c..34f5a0e457f 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -29,6 +29,8 @@ from result import Result from typing_extensions import Self +from ..service.event.event_service import EventService + # relative from .. import __version__ from ..abstract_node import AbstractNode @@ -319,6 +321,7 @@ def __init__( SyftWorkerImageService, SyftWorkerPoolService, SyftImageRegistryService, + EventService, ] if services is None else services @@ -877,6 +880,7 @@ def _construct_services(self): SyftWorkerImageService, SyftWorkerPoolService, SyftImageRegistryService, + EventService, ] if OBLV: diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 67f204a0e92..51a2dd048b0 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -1151,6 +1151,48 @@ "hash": "e410de583bb15bc5af57acef7be55ea5fc56b5b0fc169daa3869f4203c4d7473", "action": "add" } + }, + "SeaweedFSBlobDeposit": { + "2": { + "version": 2, + "hash": "07d84a95324d95d9c868cd7d1c33c908f77aa468671d76c144586aab672bcbb5", + "action": "add" + } + }, + "Event": { + "1": { + "version": 1, + "hash": "1f3a5a19594887c11d01385352ba0244e3a57f02019e0df4a0f9da9393a840b1", + "action": "add" + } + }, + "CRUDEvent": { + "1": { + "version": 1, + "hash": "5a58f86d52caaf2ae29c00a5809e5a17d91f480ea796d9107aa9a3a07881c4a1", + "action": "add" + } + }, + "CreateObjectEvent": { + "1": { + "version": 1, + "hash": "58e80bd2f193c55730438468f02459cfc8ce678cbeac347548e243340a8749b0", + "action": "add" + } + }, + "UpdateObjectEvent": { + "1": { + "version": 1, + "hash": "e7af4c8bcb974197235cdabea37d26a35f1066077010d1afaea33ccb4d92b8ce", + "action": "add" + } + }, + "CreateDatasetEvent": { + "1": { + "version": 1, + "hash": "f1bc0d382312d5e91f86098bf561a7e0f716d82560678e69242f8dddb6604746", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/action/action_data_empty.py b/packages/syft/src/syft/service/action/action_data_empty.py index 1434949301d..00a33c29670 100644 --- a/packages/syft/src/syft/service/action/action_data_empty.py +++ b/packages/syft/src/syft/service/action/action_data_empty.py @@ -61,8 +61,4 @@ def __validate_file_path(cls, v: Union[str, Path]) -> Path: if isinstance(v, str): v = Path(v) - if v.exists() and v.is_file(): - return v - - # this breaks server side during deserialization - # raise ValueError(f"Not a valid path to file. {v}") + return v diff --git a/packages/syft/src/syft/service/dataset/dataset_service.py b/packages/syft/src/syft/service/dataset/dataset_service.py index 2558e1e560b..c1ea7182a8e 100644 --- a/packages/syft/src/syft/service/dataset/dataset_service.py +++ b/packages/syft/src/syft/service/dataset/dataset_service.py @@ -4,6 +4,8 @@ from typing import Optional from typing import Union +from ..event.event import CreateDatasetEvent + # relative from ...serde.serializable import serializable from ...store.document_store import DocumentStore @@ -98,6 +100,15 @@ def add( ) if result.is_err(): return SyftError(message=str(result.err())) + + event = CreateDatasetEvent( + object_id=dataset.id, + creator_user=UID(), + ) + res = context.node.get_service("EventService").add(context, event) + if isinstance(res, SyftError): + return res + return SyftSuccess( message=f"Dataset uploaded to '{context.node.name}'. " f"To see the datasets uploaded by a client on this node, use command `[your_client].datasets`" diff --git a/packages/syft/src/syft/service/event/event.py b/packages/syft/src/syft/service/event/event.py new file mode 100644 index 00000000000..6453044ce54 --- /dev/null +++ b/packages/syft/src/syft/service/event/event.py @@ -0,0 +1,86 @@ +from typing import Any, ClassVar, Dict, List, Type +from syft.serde.serializable import serializable + +from syft.service.dataset.dataset import Asset, Dataset +from syft.store.linked_obj import LinkedObject +from ...types.syft_object import SYFT_OBJECT_VERSION_1, SyftObject +from ...types.uid import UID +from datetime import datetime +from pydantic import Field + +event_handler_registry = {} + +def register_event_handler(event_type): + def inner(method): + event_handler_registry[event_type.__name__] = method.__name__ + return method + + return inner + +@serializable() +class Event(SyftObject): + __canonical_name__ = "Event" + __version__ = SYFT_OBJECT_VERSION_1 + creator_user: UID + creation_date: datetime = Field(default_factory=lambda: datetime.now()) + + def handler(self, node): + method_name = event_handler_registry[self.__class__.__name__] + return getattr(node, method_name) + + +@serializable() +class CRUDEvent(Event): + __canonical_name__ = "CRUDEvent" + __version__ = SYFT_OBJECT_VERSION_1 + object_type: ClassVar[Type] = Type + object_id: UID + + @property + def merge_updates_repr(self): + return f"{self.updates} for object {self.object_id} by {self.creator}" + + +@serializable() +class CreateObjectEvent(CRUDEvent): + __canonical_name__ = "CreateObjectEvent" + __version__ = SYFT_OBJECT_VERSION_1 + @property + def updated_properties(self): + return list(self.object_type.__annotations__.keys()) + + @property + def updates(self): + return {p: getattr(self, p) for p in self.updated_properties} + + @property + def update_tuples(self): + return list(self.updates.items()) + + +@serializable() +class UpdateObjectEvent(CRUDEvent): + __canonical_name__ = "UpdateObjectEvent" + __version__ = SYFT_OBJECT_VERSION_1 + updates: Dict[str, Any] + + @property + def updated_properties(self): + return list(self.updates.keys()) + + @property + def update_tuples(self): + return list(self.updates.items()) + + +@serializable() +class CreateDatasetEvent(CreateObjectEvent): + __canonical_name__ = "CreateDatasetEvent" + __version__ = SYFT_OBJECT_VERSION_1 + object_type: ClassVar[Type] = Dataset + + def execute(self, node): + handler = self.handler(node) + handler( + object_id=self.real.obj_id, + ) \ No newline at end of file diff --git a/packages/syft/src/syft/service/event/event_service.py b/packages/syft/src/syft/service/event/event_service.py new file mode 100644 index 00000000000..6819d389253 --- /dev/null +++ b/packages/syft/src/syft/service/event/event_service.py @@ -0,0 +1,65 @@ +from syft.serde.serializable import serializable +from syft.service.context import AuthedServiceContext +from syft.service.event.event_stash import EventStash +from syft.service.response import SyftError, SyftSuccess +from syft.service.service import AbstractService, service_method +from syft.service.user.user_roles import DATA_OWNER_ROLE_LEVEL +from syft.store.document_store import DocumentStore +from syft.types.uid import UID +from syft.util.trace_decorator import instrument +from .event import Event + +@instrument +@serializable() +class EventService(AbstractService): + store: DocumentStore + stash: EventStash + + def __init__(self, store: DocumentStore) -> None: + self.store = store + self.stash = EventStash(store=store) + + @service_method( + path="event.add", + name="add", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def add( + self, context: AuthedServiceContext, event: Event, + ): + result = self.stash.set(context.credentials, event) + if result.is_err(): + return SyftError(message=str(result.err())) + + return SyftSuccess(message=f'Great Success!') + + + @service_method( + path="event.get_by_uid", + name="get_by_uid", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def get_by_uid( + self, context: AuthedServiceContext, uid: UID, + ): + result = self.stash.get_by_uid(context.credentials, uid=uid) + if result.is_err(): + return SyftError(message=str(result.err())) + return result.ok() + + + @service_method( + path="event.get_all", + name="get_all", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def get_all( + self, context: AuthedServiceContext + ): + result = self.stash.get_all(context.credentials) + if result.is_err(): + return SyftError(message=str(result.err())) + + return result.ok() + + \ No newline at end of file diff --git a/packages/syft/src/syft/service/event/event_stash.py b/packages/syft/src/syft/service/event/event_stash.py new file mode 100644 index 00000000000..25478c504b1 --- /dev/null +++ b/packages/syft/src/syft/service/event/event_stash.py @@ -0,0 +1,30 @@ +# stdlib +from typing import List +from typing import Optional + +# third party +from result import Result + +# relative +from ...node.credentials import SyftVerifyKey +from ...serde.serializable import serializable +from ...store.document_store import BaseUIDStoreStash +from ...store.document_store import DocumentStore +from ...store.document_store import PartitionKey +from ...store.document_store import PartitionSettings +from ...store.document_store import QueryKeys +from ...types.uid import UID +from ...util.telemetry import instrument +from .event import Event + + +@instrument +@serializable() +class EventStash(BaseUIDStoreStash): + object_type = Event + settings: PartitionSettings = PartitionSettings( + name=Event.__canonical_name__, object_type=Event + ) + + def __init__(self, store: DocumentStore) -> None: + super().__init__(store=store) \ No newline at end of file diff --git a/packages/syft/src/syft/store/blob_storage/seaweedfs.py b/packages/syft/src/syft/store/blob_storage/seaweedfs.py index 2a27fc2518a..f6736ee291a 100644 --- a/packages/syft/src/syft/store/blob_storage/seaweedfs.py +++ b/packages/syft/src/syft/store/blob_storage/seaweedfs.py @@ -14,6 +14,7 @@ from botocore.client import ClientError as BotoClientError from botocore.client import Config import requests +from tqdm import tqdm from typing_extensions import Self # relative @@ -33,28 +34,42 @@ from ...types.blob_storage import SeaweedSecureFilePathLocation from ...types.blob_storage import SecureFilePathLocation from ...types.grid_url import GridURL +from ...types.syft_migration import migrate from ...types.syft_object import SYFT_OBJECT_VERSION_1 +from ...types.syft_object import SYFT_OBJECT_VERSION_2 +from ...types.transforms import drop +from ...types.transforms import make_set_default from ...util.constants import DEFAULT_TIMEOUT WRITE_EXPIRATION_TIME = 900 # seconds -DEFAULT_CHUNK_SIZE = 1024**3 # 1 GB +DEFAULT_FILE_PART_SIZE = (1024**3) * 5 # 5GB +DEFAULT_UPLOAD_CHUNK_SIZE = 819200 -def _byte_chunks(bytes: BytesIO, size: int) -> Generator[bytes, None, None]: +def _byte_chunks(bytes: BytesIO, chunk_size: int) -> Generator[bytes, None, None]: while True: try: - yield bytes.read(size) + yield bytes.read(chunk_size) except BlockingIOError: return @serializable() -class SeaweedFSBlobDeposit(BlobDeposit): +class SeaweedFSBlobDepositV1(BlobDeposit): __canonical_name__ = "SeaweedFSBlobDeposit" __version__ = SYFT_OBJECT_VERSION_1 urls: List[GridURL] + +@serializable() +class SeaweedFSBlobDeposit(BlobDeposit): + __canonical_name__ = "SeaweedFSBlobDeposit" + __version__ = SYFT_OBJECT_VERSION_2 + + urls: List[GridURL] + size: int + def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]: # relative from ...client.api import APIRegistry @@ -68,8 +83,11 @@ def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]: try: no_lines = 0 + # this loops over the parts, we have multiple parts to allow for + # concurrent uploads of a single file. (We are currently not using that) + part_size = math.ceil(self.size / len(self.urls)) for part_no, (byte_chunk, url) in enumerate( - zip(_byte_chunks(data, DEFAULT_CHUNK_SIZE), self.urls), + zip(_byte_chunks(data, part_size), self.urls), start=1, ): no_lines += byte_chunk.count(b"\n") @@ -79,13 +97,33 @@ def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]: ) else: blob_url = url + + def data_generator(bytes_, chunk_size=DEFAULT_UPLOAD_CHUNK_SIZE): + """Creates a data geneator for the part""" + n = 0 + total_iterations = math.ceil(len(bytes_) / chunk_size) + + with tqdm( + total=total_iterations, + desc=f"Uploading file part {part_no}/{len(self.urls)}", # noqa + ) as pbar: + while n * chunk_size <= len(bytes_): + chunk = bytes_[n * chunk_size : (n + 1) * chunk_size] + n += 1 + pbar.update(1) + yield chunk + response = requests.put( - url=str(blob_url), data=byte_chunk, timeout=DEFAULT_TIMEOUT + url=str(blob_url), + data=data_generator(byte_chunk), + timeout=DEFAULT_TIMEOUT, + stream=True, ) response.raise_for_status() etag = response.headers["ETag"] etags.append({"ETag": etag, "PartNumber": part_no}) except requests.RequestException as e: + print(e) return SyftError(message=str(e)) mark_write_complete_method = from_api_or_context( @@ -98,6 +136,20 @@ def write(self, data: BytesIO) -> Union[SyftSuccess, SyftError]: ) +@migrate(SeaweedFSBlobDeposit, SeaweedFSBlobDepositV1) +def downgrade_seaweedblobdeposit_v2_to_v1(): + return [ + drop(["size"]), + ] + + +@migrate(SeaweedFSBlobDepositV1, SeaweedFSBlobDeposit) +def upgrade_seaweedblobdeposit_v1_to_v2(): + return [ + make_set_default("size", 1), + ] + + @serializable() class SeaweedFSClientConfig(BlobStorageClientConfig): host: str @@ -188,7 +240,7 @@ def allocate( ) def write(self, obj: BlobStorageEntry) -> BlobDeposit: - total_parts = math.ceil(obj.file_size / DEFAULT_CHUNK_SIZE) + total_parts = math.ceil(obj.file_size / DEFAULT_FILE_PART_SIZE) urls = [ GridURL.from_url( @@ -206,7 +258,9 @@ def write(self, obj: BlobStorageEntry) -> BlobDeposit: for i in range(total_parts) ] - return SeaweedFSBlobDeposit(blob_storage_entry_id=obj.id, urls=urls) + return SeaweedFSBlobDeposit( + blob_storage_entry_id=obj.id, urls=urls, size=obj.file_size + ) def complete_multipart_upload( self,