Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

append only log #8401

Closed
wants to merge 11 commits into from
4,690 changes: 4,690 additions & 0 deletions notebooks/helm/append-only-event-log-prototype.ipynb

Large diffs are not rendered by default.

670 changes: 665 additions & 5 deletions notebooks/helm/direct_azure.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions packages/grid/default.env
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DOCKER_IMAGE_TRAEFIK=traefik
TRAEFIK_VERSION=v2.10
REDIS_VERSION=6.2
RABBITMQ_VERSION=3
SEAWEEDFS_VERSION=3.59
SEAWEEDFS_VERSION=3.62
DOCKER_IMAGE_SEAWEEDFS=openmined/grid-seaweedfs
VERSION=latest
VERSION_HASH=unknown
Expand Down Expand Up @@ -71,7 +71,7 @@ S3_ROOT_PWD="admin" # needs randomizing
S3_REGION="us-east-1"
#not-using
S3_PRESIGNED_TIMEOUT_SECS=1800
S3_VOLUME_SIZE_MB=1024
S3_VOLUME_SIZE_MB=40000

# Jax
JAX_ENABLE_X64=True
Expand Down
6 changes: 4 additions & 2 deletions packages/grid/seaweedfs/seaweedfs.dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
ARG SEAWEEDFS_VERSION

FROM chrislusf/seaweedfs:${SEAWEEDFS_VERSION}
# FROM chrislusf/seaweedfs:${SEAWEEDFS_VERSION}_large_disk
FROM chrislusf/seaweedfs:3.62_large_disk

WORKDIR /

RUN apk update && \
apk add --no-cache python3 py3-pip ca-certificates bash

COPY requirements.txt app.py /
RUN pip install --no-cache-dir -r requirements.txt

RUN pip install --no-cache-dir --break-system-packages -r requirements.txt

COPY --chmod=755 start.sh mount_command.sh /

Expand Down
2 changes: 1 addition & 1 deletion packages/grid/seaweedfs/start.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

weed server -s3 -s3.port="$S3_PORT" -volume.max=500 -master.volumeSizeLimitMB="$S3_VOLUME_SIZE_MB" &
weed server -s3 -s3.port="$S3_PORT" -volume.max=10 -master.volumeSizeLimitMB="$S3_VOLUME_SIZE_MB" &
echo "s3.configure -access_key $S3_ROOT_USER -secret_key $S3_ROOT_PWD \
-user iam -actions Read,Write,List,Tagging,Admin -apply" | weed shell > /dev/null 2>&1

Expand Down
4 changes: 4 additions & 0 deletions packages/syft/src/syft/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from result import Result
from typing_extensions import Self

from ..service.event.event_service import EventService

# relative
from .. import __version__
from ..abstract_node import AbstractNode
Expand Down Expand Up @@ -319,6 +321,7 @@ def __init__(
SyftWorkerImageService,
SyftWorkerPoolService,
SyftImageRegistryService,
EventService,
]
if services is None
else services
Expand Down Expand Up @@ -877,6 +880,7 @@ def _construct_services(self):
SyftWorkerImageService,
SyftWorkerPoolService,
SyftImageRegistryService,
EventService,
]

if OBLV:
Expand Down
42 changes: 42 additions & 0 deletions packages/syft/src/syft/protocol/protocol_version.json
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,48 @@
"hash": "e410de583bb15bc5af57acef7be55ea5fc56b5b0fc169daa3869f4203c4d7473",
"action": "add"
}
},
"SeaweedFSBlobDeposit": {
"2": {
"version": 2,
"hash": "07d84a95324d95d9c868cd7d1c33c908f77aa468671d76c144586aab672bcbb5",
"action": "add"
}
},
"Event": {
"1": {
"version": 1,
"hash": "1f3a5a19594887c11d01385352ba0244e3a57f02019e0df4a0f9da9393a840b1",
"action": "add"
}
},
"CRUDEvent": {
"1": {
"version": 1,
"hash": "5a58f86d52caaf2ae29c00a5809e5a17d91f480ea796d9107aa9a3a07881c4a1",
"action": "add"
}
},
"CreateObjectEvent": {
"1": {
"version": 1,
"hash": "58e80bd2f193c55730438468f02459cfc8ce678cbeac347548e243340a8749b0",
"action": "add"
}
},
"UpdateObjectEvent": {
"1": {
"version": 1,
"hash": "e7af4c8bcb974197235cdabea37d26a35f1066077010d1afaea33ccb4d92b8ce",
"action": "add"
}
},
"CreateDatasetEvent": {
"1": {
"version": 1,
"hash": "f1bc0d382312d5e91f86098bf561a7e0f716d82560678e69242f8dddb6604746",
"action": "add"
}
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions packages/syft/src/syft/service/action/action_data_empty.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,4 @@ def __validate_file_path(cls, v: Union[str, Path]) -> Path:
if isinstance(v, str):
v = Path(v)

if v.exists() and v.is_file():
return v

# this breaks server side during deserialization
# raise ValueError(f"Not a valid path to file. {v}")
return v
11 changes: 11 additions & 0 deletions packages/syft/src/syft/service/dataset/dataset_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Optional
from typing import Union

from ..event.event import CreateDatasetEvent

# relative
from ...serde.serializable import serializable
from ...store.document_store import DocumentStore
Expand Down Expand Up @@ -98,6 +100,15 @@ def add(
)
if result.is_err():
return SyftError(message=str(result.err()))

event = CreateDatasetEvent(
object_id=dataset.id,
creator_user=UID(),
)
res = context.node.get_service("EventService").add(context, event)
if isinstance(res, SyftError):
return res

return SyftSuccess(
message=f"Dataset uploaded to '{context.node.name}'. "
f"To see the datasets uploaded by a client on this node, use command `[your_client].datasets`"
Expand Down
86 changes: 86 additions & 0 deletions packages/syft/src/syft/service/event/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import Any, ClassVar, Dict, List, Type
from syft.serde.serializable import serializable

from syft.service.dataset.dataset import Asset, Dataset
from syft.store.linked_obj import LinkedObject
from ...types.syft_object import SYFT_OBJECT_VERSION_1, SyftObject
from ...types.uid import UID
from datetime import datetime
from pydantic import Field

event_handler_registry = {}

def register_event_handler(event_type):
def inner(method):
event_handler_registry[event_type.__name__] = method.__name__
return method

return inner

@serializable()
class Event(SyftObject):
__canonical_name__ = "Event"
__version__ = SYFT_OBJECT_VERSION_1
creator_user: UID
creation_date: datetime = Field(default_factory=lambda: datetime.now())

def handler(self, node):
method_name = event_handler_registry[self.__class__.__name__]
return getattr(node, method_name)


@serializable()
class CRUDEvent(Event):
__canonical_name__ = "CRUDEvent"
__version__ = SYFT_OBJECT_VERSION_1
object_type: ClassVar[Type] = Type
object_id: UID

@property
def merge_updates_repr(self):
return f"{self.updates} for object {self.object_id} by {self.creator}"


@serializable()
class CreateObjectEvent(CRUDEvent):
__canonical_name__ = "CreateObjectEvent"
__version__ = SYFT_OBJECT_VERSION_1
@property
def updated_properties(self):
return list(self.object_type.__annotations__.keys())

@property
def updates(self):
return {p: getattr(self, p) for p in self.updated_properties}

@property
def update_tuples(self):
return list(self.updates.items())


@serializable()
class UpdateObjectEvent(CRUDEvent):
__canonical_name__ = "UpdateObjectEvent"
__version__ = SYFT_OBJECT_VERSION_1
updates: Dict[str, Any]

@property
def updated_properties(self):
return list(self.updates.keys())

@property
def update_tuples(self):
return list(self.updates.items())


@serializable()
class CreateDatasetEvent(CreateObjectEvent):
__canonical_name__ = "CreateDatasetEvent"
__version__ = SYFT_OBJECT_VERSION_1
object_type: ClassVar[Type] = Dataset

def execute(self, node):
handler = self.handler(node)
handler(
object_id=self.real.obj_id,
)
65 changes: 65 additions & 0 deletions packages/syft/src/syft/service/event/event_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from syft.serde.serializable import serializable
from syft.service.context import AuthedServiceContext
from syft.service.event.event_stash import EventStash
from syft.service.response import SyftError, SyftSuccess
from syft.service.service import AbstractService, service_method
from syft.service.user.user_roles import DATA_OWNER_ROLE_LEVEL
from syft.store.document_store import DocumentStore
from syft.types.uid import UID
from syft.util.trace_decorator import instrument
from .event import Event

@instrument
@serializable()
class EventService(AbstractService):
store: DocumentStore
stash: EventStash

def __init__(self, store: DocumentStore) -> None:
self.store = store
self.stash = EventStash(store=store)

@service_method(
path="event.add",
name="add",
roles=DATA_OWNER_ROLE_LEVEL,
)
def add(
self, context: AuthedServiceContext, event: Event,
):
result = self.stash.set(context.credentials, event)
if result.is_err():
return SyftError(message=str(result.err()))

return SyftSuccess(message=f'Great Success!')


@service_method(
path="event.get_by_uid",
name="get_by_uid",
roles=DATA_OWNER_ROLE_LEVEL,
)
def get_by_uid(
self, context: AuthedServiceContext, uid: UID,
):
result = self.stash.get_by_uid(context.credentials, uid=uid)
if result.is_err():
return SyftError(message=str(result.err()))
return result.ok()


@service_method(
path="event.get_all",
name="get_all",
roles=DATA_OWNER_ROLE_LEVEL,
)
def get_all(
self, context: AuthedServiceContext
):
result = self.stash.get_all(context.credentials)
if result.is_err():
return SyftError(message=str(result.err()))

return result.ok()


30 changes: 30 additions & 0 deletions packages/syft/src/syft/service/event/event_stash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# stdlib
from typing import List
from typing import Optional

# third party
from result import Result

# relative
from ...node.credentials import SyftVerifyKey
from ...serde.serializable import serializable
from ...store.document_store import BaseUIDStoreStash
from ...store.document_store import DocumentStore
from ...store.document_store import PartitionKey
from ...store.document_store import PartitionSettings
from ...store.document_store import QueryKeys
from ...types.uid import UID
from ...util.telemetry import instrument
from .event import Event


@instrument
@serializable()
class EventStash(BaseUIDStoreStash):
object_type = Event
settings: PartitionSettings = PartitionSettings(
name=Event.__canonical_name__, object_type=Event
)

def __init__(self, store: DocumentStore) -> None:
super().__init__(store=store)
Loading
Loading